Files
sanguo_moziplus_v2/src/daemon/review.py
T
cfdaily d58e38d58f
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。
修复内容:
- autoflake 移除未使用导入/变量
- autopep8 修复缩进/空格
- 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量)
- 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
2026-06-09 23:53:29 +08:00

339 lines
10 KiB
Python

"""Review Pipeline — 产出验证流水线
验证链:产出物存在性 → 格式合规 → 内容质量 → 评分
Guardrail 门控:低风险免审、高风险强制审、极高风险双审
"""
from __future__ import annotations
import json
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
logger = logging.getLogger("moziplus-v2.review")
class ReviewVerdict(str, Enum):
PASS = "pass"
FAIL = "fail"
NEEDS_REVIEW = "needs_review"
class ReviewResult:
"""单步验证结果"""
def __init__(
self,
step: str,
verdict: ReviewVerdict,
score: float = 0.0,
details: str = "",
suggestions: List[str] = None,
):
self.step = step
self.verdict = verdict
self.score = score
self.details = details
self.suggestions = suggestions or []
def to_dict(self) -> Dict[str, Any]:
return {
"step": self.step,
"verdict": self.verdict.value,
"score": self.score,
"details": self.details,
"suggestions": self.suggestions,
}
class ReviewPipeline:
"""产出验证流水线"""
def __init__(
self,
bb: Optional[Blackboard] = None,
custom_checks: Optional[Dict[str, Callable]] = None,
output_dir: Optional[Path] = None,
):
self.bb = bb
self.custom_checks = custom_checks or {}
self.output_dir = output_dir
def run_review(
self,
task: Task,
outputs: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""运行完整验证流水线
Returns:
{"verdict": str, "score": float, "results": [...],
"gate": str, "needs_human": bool}
"""
results: List[ReviewResult] = []
# Step 1: 产出物存在性
r1 = self._check_existence(task, outputs)
results.append(r1)
# Step 2: 格式合规
r2 = self._check_format(task, outputs)
results.append(r2)
# Step 3: 内容质量(自定义检查)
r3 = self._check_quality(task, outputs)
results.append(r3)
# Step 4: Guardrail 门控
gate = self._determine_gate(task, results)
needs_human = gate in ("mandatory", "dual")
# 综合评分
scores = [r.score for r in results if r.score > 0]
avg_score = sum(scores) / len(scores) if scores else 0.0
# 综合判定
any_fail = any(r.verdict == ReviewVerdict.FAIL for r in results)
final_verdict = ReviewVerdict.FAIL if any_fail else (
ReviewVerdict.NEEDS_REVIEW if needs_human else ReviewVerdict.PASS
)
# 记录 observation
if self.bb:
self._record_observation(task, final_verdict, avg_score, results)
return {
"verdict": final_verdict.value,
"score": round(avg_score, 2),
"results": [r.to_dict() for r in results],
"gate": gate,
"needs_human": needs_human,
}
def _check_existence(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 1: 产出物存在性"""
if not outputs:
return ReviewResult(
"existence", ReviewVerdict.FAIL, 0.0,
"No outputs provided",
)
missing = []
for out in outputs:
if out.get("path"):
p = Path(out["path"])
if not p.exists():
missing.append(str(p))
if missing:
return ReviewResult(
"existence", ReviewVerdict.FAIL, 0.0,
f"Missing files: {', '.join(missing)}",
)
return ReviewResult(
"existence", ReviewVerdict.PASS, 1.0,
f"All {len(outputs)} output(s) verified",
)
def _check_format(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 2: 格式合规"""
if not outputs:
return ReviewResult(
"format", ReviewVerdict.FAIL, 0.0, "No outputs")
issues = []
for out in outputs:
# output.md 必须存在且非空
if out.get("type") == "markdown" or out.get(
"path", "").endswith(".md"):
content = out.get("content", "")
if not content and out.get("path"):
try:
content = Path(out["path"]).read_text()
except Exception:
pass
if not content or len(content.strip()) < 10:
issues.append(f"Output too short: {out.get('path', '?')}")
# 结论 JSON 必须有效
if out.get("type") == "json" or out.get(
"path", "").endswith(".json"):
content = out.get("content", "")
if not content and out.get("path"):
try:
content = Path(out["path"]).read_text()
except Exception:
pass
try:
data = json.loads(content)
if not isinstance(data, dict):
issues.append(
f"JSON not a dict: {out.get('path', '?')}")
except (json.JSONDecodeError, TypeError):
issues.append(f"Invalid JSON: {out.get('path', '?')}")
if issues:
return ReviewResult(
"format", ReviewVerdict.FAIL, 0.5,
"; ".join(issues),
)
return ReviewResult("format", ReviewVerdict.PASS, 1.0, "Format OK")
def _check_quality(
self, task: Task, outputs: Optional[List[Dict]]
) -> ReviewResult:
"""Step 3: 内容质量(自定义检查)"""
if not outputs:
return ReviewResult(
"quality", ReviewVerdict.FAIL, 0.0, "No outputs")
suggestions = []
total_score = 0.0
for check_name, check_fn in self.custom_checks.items():
try:
result = check_fn(task, outputs)
if isinstance(result, dict):
total_score += result.get("score", 0.0)
if result.get("suggestions"):
suggestions.extend(result["suggestions"])
except Exception as e:
suggestions.append(f"Check '{check_name}' error: {e}")
if self.custom_checks:
avg = total_score / len(self.custom_checks)
else:
avg = 1.0 # 无自定义检查默认通过
verdict = ReviewVerdict.PASS if avg >= 0.6 else ReviewVerdict.FAIL
return ReviewResult("quality", verdict, round(
avg, 2), suggestions=suggestions)
def _determine_gate(
self, task: Task, results: List[ReviewResult]
) -> str:
"""Step 4: Guardrail 门控
Returns:
"auto" / "optional" / "mandatory" / "dual"
"""
risk = (task.risk_level or "").lower()
# 任何步骤 FAIL → mandatory
any_fail = any(r.verdict == ReviewVerdict.FAIL for r in results)
if any_fail:
return "mandatory"
# 极高风险 → dual(双审)
if risk == "critical":
return "dual"
# 高风险 → mandatory
if risk == "high":
return "mandatory"
# 中风险或标准风险 → optional
if risk in ("medium", "standard"):
return "optional"
# 低风险 → auto(免审)
return "auto"
def _record_observation(
self,
task: Task,
verdict: ReviewVerdict,
score: float,
results: List[ReviewResult],
) -> None:
"""记录审查 observation"""
try:
detail = json.dumps({
"verdict": verdict.value,
"score": score,
"steps": [r.to_dict() for r in results],
})
self.bb.add_observation(
task_id=task.id,
observer="review-pipeline",
body=f"Review: {verdict.value} (score={score})\n{detail}",
)
except Exception:
logger.exception("Failed to record review observation")
class RebuttalManager:
"""反驳权管理
Agent 对审查结论不服时,可发起反驳:
1. 提交反驳理由
2. 升级给更高权限审查者
3. 最多 2 轮
"""
MAX_ROUNDS = 2
def __init__(self, bb: Optional[Blackboard] = None):
self.bb = bb
def submit_rebuttal(
self,
task_id: str,
agent_id: str,
reason: str,
evidence: Optional[str] = None,
) -> Dict[str, Any]:
"""提交反驳"""
# 检查轮次
existing = self._get_rebuttal_count(task_id)
if existing >= self.MAX_ROUNDS:
return {
"status": "rejected",
"reason": f"Max rebuttal rounds ({self.MAX_ROUNDS}) reached",
}
round_num = existing + 1
# 记录 observation
if self.bb:
try:
self.bb.add_observation(
task_id=task_id,
observer=agent_id,
body=f"Rebuttal round {round_num}: {reason[:200]}",
)
except Exception:
logger.exception("Failed to record rebuttal")
# 决定升级目标
escalation_target = "simayi-challenger" if round_num == 1 else "pangtong-fujunshi"
return {
"status": "accepted",
"round": round_num,
"escalation_target": escalation_target,
"max_rounds": self.MAX_ROUNDS,
}
def _get_rebuttal_count(self, task_id: str) -> int:
"""获取已有反驳轮次"""
if not self.bb:
return 0
try:
observations = self.bb.get_observations(task_id=task_id)
return sum(
1 for o in observations if "Rebuttal round" in (o.body or ""))
except Exception:
return 0