Files
sanguo_moziplus_v2/src/daemon/review.py
T
2026-05-17 06:07:01 +08:00

343 lines
10 KiB
Python

"""Review Pipeline — 产出验证流水线
验证链:产出物存在性 → 格式合规 → 内容质量 → 评分
Guardrail 门控:低风险免审、高风险强制审、极高风险双审
"""
from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from src.blackboard.models import ObservationType, Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.review")
class ReviewVerdict(str, Enum):
PASS = "pass"
FAIL = "fail"
NEEDS_REVIEW = "needs_review"
class ReviewResult:
"""单步验证结果"""
def __init__(
self,
step: str,
verdict: ReviewVerdict,
score: float = 0.0,
details: str = "",
suggestions: List[str] = None,
):
self.step = step
self.verdict = verdict
self.score = score
self.details = details
self.suggestions = suggestions or []
def to_dict(self) -> Dict[str, Any]:
return {
"step": self.step,
"verdict": self.verdict.value,
"score": self.score,
"details": self.details,
"suggestions": self.suggestions,
}
class ReviewPipeline:
"""产出验证流水线"""
def __init__(
self,
bb: Optional[Blackboard] = None,
custom_checks: Optional[Dict[str, Callable]] = None,
output_dir: Optional[Path] = None,
):
self.bb = bb
self.custom_checks = custom_checks or {}
self.output_dir = output_dir
def run_review(
self,
task: Task,
outputs: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""运行完整验证流水线
Returns:
{"verdict": str, "score": float, "results": [...],
"gate": str, "needs_human": bool}
"""
results: List[ReviewResult] = []
# Step 1: 产出物存在性
r1 = self._check_existence(task, outputs)
results.append(r1)
# Step 2: 格式合规
r2 = self._check_format(task, outputs)
results.append(r2)
# Step 3: 内容质量(自定义检查)
r3 = self._check_quality(task, outputs)
results.append(r3)
# Step 4: Guardrail 门控
gate = self._determine_gate(task, results)
needs_human = gate in ("mandatory", "dual")
# 综合评分
scores = [r.score for r in results if r.score > 0]
avg_score = sum(scores) / len(scores) if scores else 0.0
# 综合判定
any_fail = any(r.verdict == ReviewVerdict.FAIL for r in results)
final_verdict = ReviewVerdict.FAIL if any_fail else (
ReviewVerdict.NEEDS_REVIEW if needs_human else ReviewVerdict.PASS
)
# 记录 observation
if self.bb:
self._record_observation(task, final_verdict, avg_score, results)
return {
"verdict": final_verdict.value,
"score": round(avg_score, 2),
"results": [r.to_dict() for r in results],
"gate": gate,
"needs_human": needs_human,
}
def _check_existence(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 1: 产出物存在性"""
if not outputs:
return ReviewResult(
"existence", ReviewVerdict.FAIL, 0.0,
"No outputs provided",
)
missing = []
for out in outputs:
if out.get("path"):
p = Path(out["path"])
if not p.exists():
missing.append(str(p))
if missing:
return ReviewResult(
"existence", ReviewVerdict.FAIL, 0.0,
f"Missing files: {', '.join(missing)}",
)
return ReviewResult(
"existence", ReviewVerdict.PASS, 1.0,
f"All {len(outputs)} output(s) verified",
)
def _check_format(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 2: 格式合规"""
if not outputs:
return ReviewResult("format", ReviewVerdict.FAIL, 0.0, "No outputs")
issues = []
for out in outputs:
# output.md 必须存在且非空
if out.get("type") == "markdown" or out.get("path", "").endswith(".md"):
content = out.get("content", "")
if not content and out.get("path"):
try:
content = Path(out["path"]).read_text()
except Exception:
pass
if not content or len(content.strip()) < 10:
issues.append(f"Output too short: {out.get('path', '?')}")
# 结论 JSON 必须有效
if out.get("type") == "json" or out.get("path", "").endswith(".json"):
content = out.get("content", "")
if not content and out.get("path"):
try:
content = Path(out["path"]).read_text()
except Exception:
pass
try:
data = json.loads(content)
if not isinstance(data, dict):
issues.append(f"JSON not a dict: {out.get('path', '?')}")
except (json.JSONDecodeError, TypeError):
issues.append(f"Invalid JSON: {out.get('path', '?')}")
if issues:
return ReviewResult(
"format", ReviewVerdict.FAIL, 0.5,
"; ".join(issues),
)
return ReviewResult("format", ReviewVerdict.PASS, 1.0, "Format OK")
def _check_quality(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 3: 内容质量(自定义检查)"""
if not outputs:
return ReviewResult("quality", ReviewVerdict.FAIL, 0.0, "No outputs")
suggestions = []
total_score = 0.0
for check_name, check_fn in self.custom_checks.items():
try:
result = check_fn(task, outputs)
if isinstance(result, dict):
total_score += result.get("score", 0.0)
if result.get("suggestions"):
suggestions.extend(result["suggestions"])
except Exception as e:
suggestions.append(f"Check '{check_name}' error: {e}")
if self.custom_checks:
avg = total_score / len(self.custom_checks)
else:
avg = 1.0 # 无自定义检查默认通过
verdict = ReviewVerdict.PASS if avg >= 0.6 else ReviewVerdict.FAIL
return ReviewResult("quality", verdict, round(avg, 2), suggestions=suggestions)
def _determine_gate(
self, task: Task, results: List[ReviewResult]
) -> str:
"""Step 4: Guardrail 门控
Returns:
"auto" / "optional" / "mandatory" / "dual"
"""
risk = (task.risk_level or "").lower()
# 任何步骤 FAIL → mandatory
any_fail = any(r.verdict == ReviewVerdict.FAIL for r in results)
if any_fail:
return "mandatory"
# 极高风险 → dual(双审)
if risk == "critical":
return "dual"
# 高风险 → mandatory
if risk == "high":
return "mandatory"
# 中风险 → optional
if risk == "medium":
return "optional"
# 低风险 → auto(免审)
return "auto"
def _record_observation(
self,
task: Task,
verdict: ReviewVerdict,
score: float,
results: List[ReviewResult],
) -> None:
"""记录审查 observation"""
try:
detail = json.dumps({
"verdict": verdict.value,
"score": score,
"steps": [r.to_dict() for r in results],
})
self.bb.add_observation(
task_id=task.id,
agent="review-pipeline",
obs_type=ObservationType.VERIFICATION.value,
content=f"Review: {verdict.value} (score={score})",
detail=detail,
)
except Exception:
logger.exception("Failed to record review observation")
class RebuttalManager:
"""反驳权管理
Agent 对审查结论不服时,可发起反驳:
1. 提交反驳理由
2. 升级给更高权限审查者
3. 最多 2 轮
"""
MAX_ROUNDS = 2
def __init__(self, bb: Optional[Blackboard] = None):
self.bb = bb
def submit_rebuttal(
self,
task_id: str,
agent_id: str,
reason: str,
evidence: Optional[str] = None,
) -> Dict[str, Any]:
"""提交反驳"""
# 检查轮次
existing = self._get_rebuttal_count(task_id)
if existing >= self.MAX_ROUNDS:
return {
"status": "rejected",
"reason": f"Max rebuttal rounds ({self.MAX_ROUNDS}) reached",
}
round_num = existing + 1
# 记录 observation
if self.bb:
try:
self.bb.add_observation(
task_id=task_id,
agent=agent_id,
obs_type="rebuttal",
content=f"Rebuttal round {round_num}: {reason[:200]}",
detail=json.dumps({
"round": round_num,
"reason": reason,
"evidence": evidence,
}),
)
except Exception:
logger.exception("Failed to record rebuttal")
# 决定升级目标
escalation_target = "simayi-challenger" if round_num == 1 else "pangtong-fujunshi"
return {
"status": "accepted",
"round": round_num,
"escalation_target": escalation_target,
"max_rounds": self.MAX_ROUNDS,
}
def _get_rebuttal_count(self, task_id: str) -> int:
"""获取已有反驳轮次"""
if not self.bb:
return 0
try:
observations = self.bb.list_observations(task_id=task_id)
return sum(1 for o in observations if o.get("obs_type") == "rebuttal")
except Exception:
return 0