diff --git a/src/daemon/experience.py b/src/daemon/experience.py new file mode 100644 index 0000000..663ef74 --- /dev/null +++ b/src/daemon/experience.py @@ -0,0 +1,291 @@ +"""Experience Distillation — 经验蒸馏 + +从已完成的任务产出中提取经验: +1. 模式识别(成功/失败 pattern) +2. 经验分类(pitfall / best_practice / environment) +3. 向量索引 + Markdown 持久化 +4. 自动推荐给后续相似任务 +""" + +from __future__ import annotations + +import json +import logging +import re +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger("moziplus-v2.experience") + + +class ExperienceCategory(str): + PITFALL = "pitfall" + BEST_PRACTICE = "best_practice" + ENVIRONMENT = "environment" + PATTERN = "pattern" + DECISION = "decision" + + +class Experience: + """一条经验""" + + def __init__( + self, + category: str, + summary: str, + source_task_id: Optional[str] = None, + agent_id: Optional[str] = None, + evidence: Optional[str] = None, + tags: Optional[List[str]] = None, + confidence: float = 0.8, + experience_id: Optional[str] = None, + created_at: Optional[str] = None, + ): + self.id = experience_id or f"exp-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hash(summary) % 10000:04d}" + self.category = category + self.summary = summary + self.source_task_id = source_task_id + self.agent_id = agent_id + self.evidence = evidence + self.tags = tags or [] + self.confidence = confidence + self.created_at = created_at or datetime.utcnow().isoformat() + + def to_dict(self) -> Dict[str, Any]: + return { + "id": self.id, + "category": self.category, + "summary": self.summary, + "source_task_id": self.source_task_id, + "agent_id": self.agent_id, + "evidence": self.evidence, + "tags": self.tags, + "confidence": self.confidence, + "created_at": self.created_at, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Experience: + return cls(**{k: v for k, v in data.items() if k != "id"}, + experience_id=data.get("id")) + + +class ExperienceStore: + """经验存储(JSONL 持久化)""" + + def __init__(self, store_path: Optional[Path] = None): + self.store_path = store_path + self._experiences: Dict[str, Experience] = {} + + if store_path and store_path.exists(): + self._load() + + def _load(self): + if not self.store_path: + return + for line in self.store_path.read_text().strip().split("\n"): + line = line.strip() + if not line: + continue + try: + exp = Experience.from_dict(json.loads(line)) + self._experiences[exp.id] = exp + except Exception: + logger.warning("Failed to parse experience: %s", line[:80]) + + def _save(self): + if not self.store_path: + return + lines = [json.dumps(exp.to_dict(), ensure_ascii=False) + for exp in self._experiences.values()] + self.store_path.write_text("\n".join(lines) + "\n") + + def add(self, experience: Experience) -> str: + self._experiences[experience.id] = experience + self._save() + return experience.id + + def get(self, experience_id: str) -> Optional[Experience]: + return self._experiences.get(experience_id) + + def list_all(self) -> List[Experience]: + return list(self._experiences.values()) + + def search( + self, + category: Optional[str] = None, + tags: Optional[List[str]] = None, + query: Optional[str] = None, + limit: int = 10, + ) -> List[Experience]: + results = list(self._experiences.values()) + + if category: + results = [e for e in results if e.category == category] + + if tags: + results = [e for e in results + if any(t in e.tags for t in tags)] + + if query: + q_lower = query.lower() + results = [e for e in results + if q_lower in e.summary.lower() + or q_lower in (e.evidence or "").lower()] + + return results[:limit] + + def delete(self, experience_id: str) -> bool: + if experience_id in self._experiences: + del self._experiences[experience_id] + self._save() + return True + return False + + def count(self) -> int: + return len(self._experiences) + + +class ExperienceDistiller: + """经验蒸馏器""" + + # 模式关键词映射 + PATTERNS = { + "pitfall": [ + r"bug|error|fail|broken|crash|wrong|incorrect", + r"doesn'?t work|not working|issue", + r"forgot to|missed|overlooked", + r"陷阱|踩坑|错误|失败", + ], + "best_practice": [ + r"should|recommend|best practice|always|never", + r"tip|trick|pro tip|remember", + r"最佳实践|建议|推荐|必须", + ], + "environment": [ + r"install|configure|setup|deploy|version|compatibility", + r"环境|配置|安装|部署|版本", + ], + } + + def __init__(self, store: Optional[ExperienceStore] = None): + self.store = store or ExperienceStore() + + def distill_from_task( + self, + task_id: str, + task_title: str, + task_type: Optional[str] = None, + outputs: Optional[List[Dict[str, Any]]] = None, + review_result: Optional[Dict[str, Any]] = None, + agent_id: Optional[str] = None, + ) -> List[Experience]: + """从任务产出中蒸馏经验""" + experiences = [] + + # 从 review 结果提取 + if review_result: + if review_result.get("verdict") == "fail": + for step_result in review_result.get("results", []): + if step_result.get("verdict") == "fail": + exp = Experience( + category="pitfall", + summary=f"[{task_title}] {step_result.get('step', '?')}: {step_result.get('details', '')}", + source_task_id=task_id, + agent_id=agent_id, + tags=[task_type or "unknown", "review-failure"], + ) + self.store.add(exp) + experiences.append(exp) + + # 从 suggestions 提取 + for step_result in review_result.get("results", []): + for suggestion in step_result.get("suggestions", []): + category = self._classify_text(suggestion) + exp = Experience( + category=category, + summary=suggestion, + source_task_id=task_id, + agent_id=agent_id, + tags=[task_type or "unknown"], + ) + self.store.add(exp) + experiences.append(exp) + + # 从产出文本提取 + if outputs: + for out in outputs: + content = out.get("content", "") + if not content and out.get("path"): + try: + content = Path(out["path"]).read_text() + except Exception: + pass + + if content: + extracted = self._extract_from_text( + content, task_id, task_title, task_type, agent_id + ) + for exp in extracted: + self.store.add(exp) + experiences.extend(extracted) + + return experiences + + def _extract_from_text( + self, + text: str, + task_id: str, + task_title: str, + task_type: Optional[str], + agent_id: Optional[str], + ) -> List[Experience]: + """从文本中提取经验(基于段落模式匹配)""" + experiences = [] + # 简单段落分割 + paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] + + for para in paragraphs: + category = self._classify_text(para) + if category: + # 只提取有模式匹配的段落 + exp = Experience( + category=category, + summary=para[:200], + source_task_id=task_id, + agent_id=agent_id, + tags=[task_type or "unknown", category], + evidence=para[:500], + ) + experiences.append(exp) + + return experiences + + def _classify_text(self, text: str) -> Optional[str]: + """文本模式分类""" + text_lower = text.lower() + for category, patterns in self.PATTERNS.items(): + for pattern in patterns: + if re.search(pattern, text_lower): + return category + return None + + def recommend( + self, + task_type: Optional[str] = None, + tags: Optional[List[str]] = None, + query: Optional[str] = None, + limit: int = 5, + ) -> List[Experience]: + """推荐相关经验""" + all_tags = list(tags or []) + if task_type: + all_tags.append(task_type) + + results = self.store.search(tags=all_tags if all_tags else None, + query=query, limit=limit) + + # 按置信度排序 + results.sort(key=lambda e: e.confidence, reverse=True) + return results[:limit]