"""Experience Distillation — 经验蒸馏 从已完成的任务产出中提取经验: 1. 模式识别(成功/失败 pattern) 2. 经验分类(pitfall / best_practice / environment) 3. 向量索引 + Markdown 持久化 4. 自动推荐给后续相似任务 """ from __future__ import annotations import json import logging import re from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger("moziplus-v2.experience") class ExperienceCategory(str): PITFALL = "pitfall" BEST_PRACTICE = "best_practice" ENVIRONMENT = "environment" PATTERN = "pattern" DECISION = "decision" class Experience: """一条经验""" def __init__( self, category: str, summary: str, source_task_id: Optional[str] = None, agent_id: Optional[str] = None, evidence: Optional[str] = None, tags: Optional[List[str]] = None, confidence: float = 0.8, experience_id: Optional[str] = None, created_at: Optional[str] = None, ): self.id = experience_id or f"exp-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hash(summary) % 10000:04d}" self.category = category self.summary = summary self.source_task_id = source_task_id self.agent_id = agent_id self.evidence = evidence self.tags = tags or [] self.confidence = confidence self.created_at = created_at or datetime.utcnow().isoformat() def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "category": self.category, "summary": self.summary, "source_task_id": self.source_task_id, "agent_id": self.agent_id, "evidence": self.evidence, "tags": self.tags, "confidence": self.confidence, "created_at": self.created_at, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> Experience: return cls(**{k: v for k, v in data.items() if k != "id"}, experience_id=data.get("id")) class ExperienceStore: """经验存储(JSONL 持久化)""" def __init__(self, store_path: Optional[Path] = None): self.store_path = store_path self._experiences: Dict[str, Experience] = {} if store_path and store_path.exists(): self._load() def _load(self): if not self.store_path: return for line in self.store_path.read_text().strip().split("\n"): line = line.strip() if not line: continue try: exp = Experience.from_dict(json.loads(line)) self._experiences[exp.id] = exp except Exception: logger.warning("Failed to parse experience: %s", line[:80]) def _save(self): if not self.store_path: return lines = [json.dumps(exp.to_dict(), ensure_ascii=False) for exp in self._experiences.values()] self.store_path.write_text("\n".join(lines) + "\n") def add(self, experience: Experience) -> str: self._experiences[experience.id] = experience self._save() return experience.id def get(self, experience_id: str) -> Optional[Experience]: return self._experiences.get(experience_id) def list_all(self) -> List[Experience]: return list(self._experiences.values()) def search( self, category: Optional[str] = None, tags: Optional[List[str]] = None, query: Optional[str] = None, limit: int = 10, ) -> List[Experience]: results = list(self._experiences.values()) if category: results = [e for e in results if e.category == category] if tags: results = [e for e in results if any(t in e.tags for t in tags)] if query: q_lower = query.lower() results = [e for e in results if q_lower in e.summary.lower() or q_lower in (e.evidence or "").lower()] return results[:limit] def delete(self, experience_id: str) -> bool: if experience_id in self._experiences: del self._experiences[experience_id] self._save() return True return False def count(self) -> int: return len(self._experiences) class ExperienceDistiller: """经验蒸馏器""" # 模式关键词映射 PATTERNS = { "pitfall": [ r"bug|error|fail|broken|crash|wrong|incorrect", r"doesn'?t work|not working|issue", r"forgot to|missed|overlooked", r"陷阱|踩坑|错误|失败", ], "best_practice": [ r"should|recommend|best practice|always|never", r"tip|trick|pro tip|remember", r"最佳实践|建议|推荐|必须", ], "environment": [ r"install|configure|setup|deploy|version|compatibility", r"环境|配置|安装|部署|版本", ], } def __init__(self, store: Optional[ExperienceStore] = None): self.store = store or ExperienceStore() def distill_from_task( self, task_id: str, task_title: str, task_type: Optional[str] = None, outputs: Optional[List[Dict[str, Any]]] = None, review_result: Optional[Dict[str, Any]] = None, agent_id: Optional[str] = None, ) -> List[Experience]: """从任务产出中蒸馏经验""" experiences = [] # 从 review 结果提取 if review_result: if review_result.get("verdict") == "fail": for step_result in review_result.get("results", []): if step_result.get("verdict") == "fail": exp = Experience( category="pitfall", summary=f"[{task_title}] {step_result.get('step', '?')}: {step_result.get('details', '')}", source_task_id=task_id, agent_id=agent_id, tags=[task_type or "unknown", "review-failure"], ) self.store.add(exp) experiences.append(exp) # 从 suggestions 提取 for step_result in review_result.get("results", []): for suggestion in step_result.get("suggestions", []): category = self._classify_text(suggestion) exp = Experience( category=category, summary=suggestion, source_task_id=task_id, agent_id=agent_id, tags=[task_type or "unknown"], ) self.store.add(exp) experiences.append(exp) # 从产出文本提取 if outputs: for out in outputs: content = out.get("content", "") if not content and out.get("path"): try: content = Path(out["path"]).read_text() except Exception: pass if content: extracted = self._extract_from_text( content, task_id, task_title, task_type, agent_id ) for exp in extracted: self.store.add(exp) experiences.extend(extracted) return experiences def _extract_from_text( self, text: str, task_id: str, task_title: str, task_type: Optional[str], agent_id: Optional[str], ) -> List[Experience]: """从文本中提取经验(基于段落模式匹配)""" experiences = [] # 简单段落分割 paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] for para in paragraphs: category = self._classify_text(para) if category: # 只提取有模式匹配的段落 exp = Experience( category=category, summary=para[:200], source_task_id=task_id, agent_id=agent_id, tags=[task_type or "unknown", category], evidence=para[:500], ) experiences.append(exp) return experiences def _classify_text(self, text: str) -> Optional[str]: """文本模式分类""" text_lower = text.lower() for category, patterns in self.PATTERNS.items(): for pattern in patterns: if re.search(pattern, text_lower): return category return None def recommend( self, task_type: Optional[str] = None, tags: Optional[List[str]] = None, query: Optional[str] = None, limit: int = 5, ) -> List[Experience]: """推荐相关经验""" all_tags = list(tags or []) if task_type: all_tags.append(task_type) results = self.store.search(tags=all_tags if all_tags else None, query=query, limit=limit) # 按置信度排序 results.sort(key=lambda e: e.confidence, reverse=True) return results[:limit]