auto-sync: 2026-05-17 06:09:30

This commit is contained in:
cfdaily
2026-05-17 06:09:30 +08:00
parent e868b0b437
commit c98ef6b5a0
+291
View File
@@ -0,0 +1,291 @@
"""Experience Distillation — 经验蒸馏
从已完成的任务产出中提取经验:
1. 模式识别(成功/失败 pattern)
2. 经验分类(pitfall / best_practice / environment
3. 向量索引 + Markdown 持久化
4. 自动推荐给后续相似任务
"""
from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger("moziplus-v2.experience")
class ExperienceCategory(str):
PITFALL = "pitfall"
BEST_PRACTICE = "best_practice"
ENVIRONMENT = "environment"
PATTERN = "pattern"
DECISION = "decision"
class Experience:
"""一条经验"""
def __init__(
self,
category: str,
summary: str,
source_task_id: Optional[str] = None,
agent_id: Optional[str] = None,
evidence: Optional[str] = None,
tags: Optional[List[str]] = None,
confidence: float = 0.8,
experience_id: Optional[str] = None,
created_at: Optional[str] = None,
):
self.id = experience_id or f"exp-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hash(summary) % 10000:04d}"
self.category = category
self.summary = summary
self.source_task_id = source_task_id
self.agent_id = agent_id
self.evidence = evidence
self.tags = tags or []
self.confidence = confidence
self.created_at = created_at or datetime.utcnow().isoformat()
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"category": self.category,
"summary": self.summary,
"source_task_id": self.source_task_id,
"agent_id": self.agent_id,
"evidence": self.evidence,
"tags": self.tags,
"confidence": self.confidence,
"created_at": self.created_at,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Experience:
return cls(**{k: v for k, v in data.items() if k != "id"},
experience_id=data.get("id"))
class ExperienceStore:
"""经验存储(JSONL 持久化)"""
def __init__(self, store_path: Optional[Path] = None):
self.store_path = store_path
self._experiences: Dict[str, Experience] = {}
if store_path and store_path.exists():
self._load()
def _load(self):
if not self.store_path:
return
for line in self.store_path.read_text().strip().split("\n"):
line = line.strip()
if not line:
continue
try:
exp = Experience.from_dict(json.loads(line))
self._experiences[exp.id] = exp
except Exception:
logger.warning("Failed to parse experience: %s", line[:80])
def _save(self):
if not self.store_path:
return
lines = [json.dumps(exp.to_dict(), ensure_ascii=False)
for exp in self._experiences.values()]
self.store_path.write_text("\n".join(lines) + "\n")
def add(self, experience: Experience) -> str:
self._experiences[experience.id] = experience
self._save()
return experience.id
def get(self, experience_id: str) -> Optional[Experience]:
return self._experiences.get(experience_id)
def list_all(self) -> List[Experience]:
return list(self._experiences.values())
def search(
self,
category: Optional[str] = None,
tags: Optional[List[str]] = None,
query: Optional[str] = None,
limit: int = 10,
) -> List[Experience]:
results = list(self._experiences.values())
if category:
results = [e for e in results if e.category == category]
if tags:
results = [e for e in results
if any(t in e.tags for t in tags)]
if query:
q_lower = query.lower()
results = [e for e in results
if q_lower in e.summary.lower()
or q_lower in (e.evidence or "").lower()]
return results[:limit]
def delete(self, experience_id: str) -> bool:
if experience_id in self._experiences:
del self._experiences[experience_id]
self._save()
return True
return False
def count(self) -> int:
return len(self._experiences)
class ExperienceDistiller:
"""经验蒸馏器"""
# 模式关键词映射
PATTERNS = {
"pitfall": [
r"bug|error|fail|broken|crash|wrong|incorrect",
r"doesn'?t work|not working|issue",
r"forgot to|missed|overlooked",
r"陷阱|踩坑|错误|失败",
],
"best_practice": [
r"should|recommend|best practice|always|never",
r"tip|trick|pro tip|remember",
r"最佳实践|建议|推荐|必须",
],
"environment": [
r"install|configure|setup|deploy|version|compatibility",
r"环境|配置|安装|部署|版本",
],
}
def __init__(self, store: Optional[ExperienceStore] = None):
self.store = store or ExperienceStore()
def distill_from_task(
self,
task_id: str,
task_title: str,
task_type: Optional[str] = None,
outputs: Optional[List[Dict[str, Any]]] = None,
review_result: Optional[Dict[str, Any]] = None,
agent_id: Optional[str] = None,
) -> List[Experience]:
"""从任务产出中蒸馏经验"""
experiences = []
# 从 review 结果提取
if review_result:
if review_result.get("verdict") == "fail":
for step_result in review_result.get("results", []):
if step_result.get("verdict") == "fail":
exp = Experience(
category="pitfall",
summary=f"[{task_title}] {step_result.get('step', '?')}: {step_result.get('details', '')}",
source_task_id=task_id,
agent_id=agent_id,
tags=[task_type or "unknown", "review-failure"],
)
self.store.add(exp)
experiences.append(exp)
# 从 suggestions 提取
for step_result in review_result.get("results", []):
for suggestion in step_result.get("suggestions", []):
category = self._classify_text(suggestion)
exp = Experience(
category=category,
summary=suggestion,
source_task_id=task_id,
agent_id=agent_id,
tags=[task_type or "unknown"],
)
self.store.add(exp)
experiences.append(exp)
# 从产出文本提取
if outputs:
for out in outputs:
content = out.get("content", "")
if not content and out.get("path"):
try:
content = Path(out["path"]).read_text()
except Exception:
pass
if content:
extracted = self._extract_from_text(
content, task_id, task_title, task_type, agent_id
)
for exp in extracted:
self.store.add(exp)
experiences.extend(extracted)
return experiences
def _extract_from_text(
self,
text: str,
task_id: str,
task_title: str,
task_type: Optional[str],
agent_id: Optional[str],
) -> List[Experience]:
"""从文本中提取经验(基于段落模式匹配)"""
experiences = []
# 简单段落分割
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
for para in paragraphs:
category = self._classify_text(para)
if category:
# 只提取有模式匹配的段落
exp = Experience(
category=category,
summary=para[:200],
source_task_id=task_id,
agent_id=agent_id,
tags=[task_type or "unknown", category],
evidence=para[:500],
)
experiences.append(exp)
return experiences
def _classify_text(self, text: str) -> Optional[str]:
"""文本模式分类"""
text_lower = text.lower()
for category, patterns in self.PATTERNS.items():
for pattern in patterns:
if re.search(pattern, text_lower):
return category
return None
def recommend(
self,
task_type: Optional[str] = None,
tags: Optional[List[str]] = None,
query: Optional[str] = None,
limit: int = 5,
) -> List[Experience]:
"""推荐相关经验"""
all_tags = list(tags or [])
if task_type:
all_tags.append(task_type)
results = self.store.search(tags=all_tags if all_tags else None,
query=query, limit=limit)
# 按置信度排序
results.sort(key=lambda e: e.confidence, reverse=True)
return results[:limit]