"""F12 Review Pipeline + F13 Guardrail + F14 Rebuttal 单元测试 按 test-plan-v2.6.md §F12-F14: - F12 T1: 验证流水线四步(P0) - F12 T2: 评分计算(P0) - F13 T1: Guardrail 门控(P0) - F14 T1: 反驳权流程(P0) - F14 T2: 最大轮次限制(P0) """ import json import pytest from pathlib import Path from unittest.mock import MagicMock from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.daemon.review import ( RebuttalManager, ReviewPipeline, ReviewResult, ReviewVerdict, ) @pytest.fixture def db_path(tmp_path): return tmp_path / "blackboard.db" @pytest.fixture def bb(db_path): return Blackboard(db_path) @pytest.fixture def pipeline(): return ReviewPipeline() @pytest.fixture def pipeline_with_bb(bb): return ReviewPipeline(bb=bb) @pytest.fixture def low_risk_task(): return Task(id="t1", title="Low Risk", status="pending", assigned_by="d", risk_level="low") @pytest.fixture def high_risk_task(): return Task(id="t2", title="High Risk", status="pending", assigned_by="d", risk_level="high") @pytest.fixture def critical_task(): return Task(id="t3", title="Critical", status="pending", assigned_by="d", risk_level="critical") @pytest.fixture def task_with_outputs(tmp_path): """有真实产出的 task""" out_md = tmp_path / "output.md" out_md.write_text("# Result\n\nThis is a valid output with enough content.") out_json = tmp_path / "result.json" out_json.write_text(json.dumps({"status": "ok", "value": 42})) return [ {"path": str(out_md), "type": "markdown"}, {"path": str(out_json), "type": "json"}, ] # --------------------------------------------------------------------------- # F12 T1: 验证流水线四步 # --------------------------------------------------------------------------- class TestReviewPipeline: def test_no_outputs_fails(self, pipeline, low_risk_task): result = pipeline.run_review(low_risk_task, outputs=None) assert result["verdict"] == "fail" assert result["score"] == 0.0 def test_empty_outputs_fails(self, pipeline, low_risk_task): result = pipeline.run_review(low_risk_task, outputs=[]) assert result["verdict"] == "fail" def test_existing_outputs_pass(self, pipeline, low_risk_task, task_with_outputs): result = pipeline.run_review(low_risk_task, outputs=task_with_outputs) assert result["verdict"] == "pass" def test_missing_file_fails(self, pipeline, low_risk_task): outputs = [{"path": "/nonexistent/file.md", "type": "markdown"}] result = pipeline.run_review(low_risk_task, outputs=outputs) assert result["verdict"] == "fail" def test_results_have_all_steps(self, pipeline, low_risk_task, task_with_outputs): result = pipeline.run_review(low_risk_task, outputs=task_with_outputs) steps = {r["step"] for r in result["results"]} assert "existence" in steps assert "format" in steps assert "quality" in steps def test_observation_recorded(self, pipeline_with_bb, low_risk_task, task_with_outputs): pipeline_with_bb.bb.create_task(low_risk_task) pipeline_with_bb.run_review(low_risk_task, outputs=task_with_outputs) obs = pipeline_with_bb.bb.get_observations(task_id=low_risk_task.id) assert any("review-pipeline" in (o.observer or "") for o in obs) # --------------------------------------------------------------------------- # F12 T2: 评分计算 # --------------------------------------------------------------------------- class TestScoring: def test_perfect_score(self, pipeline, low_risk_task, task_with_outputs): result = pipeline.run_review(low_risk_task, outputs=task_with_outputs) assert result["score"] >= 0.9 def test_custom_check_boosts_score(self, low_risk_task, task_with_outputs): def good_check(task, outputs): return {"score": 1.0, "suggestions": []} p = ReviewPipeline(custom_checks={"custom": good_check}) result = p.run_review(low_risk_task, outputs=task_with_outputs) assert result["score"] >= 0.9 def test_custom_check_lowers_score(self, low_risk_task, task_with_outputs): def bad_check(task, outputs): return {"score": 0.2, "suggestions": ["Improve X"]} p = ReviewPipeline(custom_checks={"custom": bad_check}) result = p.run_review(low_risk_task, outputs=task_with_outputs) assert result["score"] < 1.0 def test_custom_check_exception_handled(self, low_risk_task, task_with_outputs): def error_check(task, outputs): raise ValueError("test error") p = ReviewPipeline(custom_checks={"bad": error_check}) result = p.run_review(low_risk_task, outputs=task_with_outputs) assert result["verdict"] in ("pass", "fail") # --------------------------------------------------------------------------- # F13 T1: Guardrail 门控 # --------------------------------------------------------------------------- class TestGuardrail: def test_low_risk_auto(self, pipeline, low_risk_task, task_with_outputs): result = pipeline.run_review(low_risk_task, outputs=task_with_outputs) assert result["gate"] == "auto" assert result["needs_human"] is False def test_high_risk_mandatory(self, pipeline, high_risk_task, task_with_outputs): result = pipeline.run_review(high_risk_task, outputs=task_with_outputs) assert result["gate"] == "mandatory" assert result["needs_human"] is True def test_critical_dual(self, pipeline, critical_task, task_with_outputs): result = pipeline.run_review(critical_task, outputs=task_with_outputs) assert result["gate"] == "dual" assert result["needs_human"] is True def test_fail_makes_mandatory(self, pipeline, low_risk_task): """低风险但验证失败 → mandatory""" result = pipeline.run_review(low_risk_task, outputs=None) assert result["gate"] == "mandatory" def test_medium_risk_optional(self): task = Task(id="t", title="T", status="pending", assigned_by="d", risk_level="medium") p = ReviewPipeline() outputs = [{"content": "valid output here", "type": "text"}] result = p.run_review(task, outputs=outputs) assert result["gate"] == "optional" # --------------------------------------------------------------------------- # F14 T1: 反驳权流程 # --------------------------------------------------------------------------- class TestRebuttal: def test_submit_rebuttal_accepted(self, bb): task = Task(id="t1", title="T", status="pending", assigned_by="d") bb.create_task(task) rm = RebuttalManager(bb=bb) result = rm.submit_rebuttal("t1", "agent-1", "I disagree with the review") assert result["status"] == "accepted" assert result["round"] == 1 assert result["escalation_target"] == "simayi-challenger" def test_second_round_escalates_to_pangtong(self, bb): task = Task(id="t1", title="T", status="pending", assigned_by="d") bb.create_task(task) rm = RebuttalManager(bb=bb) rm.submit_rebuttal("t1", "agent-1", "Round 1") result = rm.submit_rebuttal("t1", "agent-1", "Round 2") assert result["status"] == "accepted" assert result["round"] == 2 assert result["escalation_target"] == "pangtong-fujunshi" # --------------------------------------------------------------------------- # F14 T2: 最大轮次限制 # --------------------------------------------------------------------------- class TestRebuttalLimits: def test_max_rounds_rejected(self, bb): task = Task(id="t1", title="T", status="pending", assigned_by="d") bb.create_task(task) rm = RebuttalManager(bb=bb) rm.submit_rebuttal("t1", "a", "R1") rm.submit_rebuttal("t1", "a", "R2") result = rm.submit_rebuttal("t1", "a", "R3") assert result["status"] == "rejected" assert "Max" in result["reason"] def test_rebuttal_without_bb(self): rm = RebuttalManager(bb=None) result = rm.submit_rebuttal("t1", "a", "reason") assert result["status"] == "accepted" assert result["round"] == 1 def test_rebuttal_observation_recorded(self, bb): task = Task(id="t1", title="T", status="pending", assigned_by="d") bb.create_task(task) rm = RebuttalManager(bb=bb) rm.submit_rebuttal("t1", "agent-1", "test reason", evidence="file.txt") obs = bb.get_observations(task_id="t1") rebuttals = [o for o in obs if "Rebuttal round" in (o.body or "")] assert len(rebuttals) == 1