"""F8 健康检查单元测试 按 test-plan-v2.6.md §F8: - T1: 正常场景(P0) - T2: 僵尸检测(P0) - T3: 恢复场景(P0) - T4: 多项目独立检测(P1) """ import asyncio import json import pytest pytestmark = pytest.mark.unit from pathlib import Path from src.blackboard.operations import Blackboard from src.blackboard.models import Task from src.daemon.health import HealthChecker from src.daemon.ticker import Ticker from src.blackboard.registry import ProjectRegistry @pytest.fixture def db_path(tmp_path): return tmp_path / "test-proj" / "blackboard.db" @pytest.fixture def bb(db_path): return Blackboard(db_path) @pytest.fixture def checker(): return HealthChecker(zombie_threshold=3) # --------------------------------------------------------------------------- # T1: 正常场景 # --------------------------------------------------------------------------- class TestNormal: def test_healthy_project(self, checker, db_path, bb): """有真实变更 → 健康""" # 写一个真实事件 bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d")) result = checker.check("test-proj", db_path, tick_num=1) assert result["healthy"] is True assert result["zombie"] is False def test_no_zombie_alert_under_threshold(self, checker, db_path, bb): """stale < threshold → 不告警""" for i in range(2): result = checker.check("test-proj", db_path, tick_num=i + 1) assert result["healthy"] is True assert result["zombie"] is False def test_stale_ticks_increment(self, checker, db_path, bb): """每次无变更 tick 递增 stale_ticks""" r1 = checker.check("test-proj", db_path, tick_num=1) assert r1["stale_ticks"] == 1 r2 = checker.check("test-proj", db_path, tick_num=2) assert r2["stale_ticks"] == 2 def test_no_db_is_healthy(self, checker, tmp_path): """无 DB 文件 → 健康(不报错)""" result = checker.check("no-db", tmp_path / "nonexistent.db", tick_num=1) assert result["healthy"] is True # --------------------------------------------------------------------------- # T2: 僵尸检测 # --------------------------------------------------------------------------- class TestZombieDetection: def test_zombie_after_threshold(self, checker, db_path, bb): """达到 threshold → 僵尸告警""" # 连续 threshold 次 tick 无真实变更 for i in range(3): result = checker.check("test-proj", db_path, tick_num=i + 1) assert result["zombie"] is True assert result["healthy"] is False assert result["alert_written"] is True def test_zombie_writes_observation(self, checker, db_path, bb): """僵尸告警写入 observations 表""" for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) from src.blackboard.queries import Queries queries = Queries(db_path) # 查 observations(排除 events 里的 daemon_tick) conn = queries._conn() try: rows = conn.execute( "SELECT * FROM observations WHERE observer='daemon'" ).fetchall() finally: conn.close() assert len(rows) >= 1 body = json.loads(rows[0]["body"]) assert body["type"] == "zombie_detected" assert body["stale_ticks"] == 3 def test_zombie_writes_event(self, checker, db_path, bb): """僵尸告警写入 events 表""" for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) from src.blackboard.queries import Queries queries = Queries(db_path) events = queries.recent_events(limit=20) zombie_events = [e for e in events if e["event_type"] == "agent_zombie_detected"] assert len(zombie_events) >= 1 def test_no_duplicate_alert(self, checker, db_path, bb): """已告警后不再重复告警""" for i in range(5): result = checker.check("test-proj", db_path, tick_num=i + 1) # 只在第一次达到 threshold 时告警 assert result["alert_written"] is False # 后续 tick 不再重复 assert result["zombie"] is True def test_custom_threshold(self, db_path, bb): """自定义 threshold""" checker = HealthChecker(zombie_threshold=5) for i in range(4): result = checker.check("test-proj", db_path, tick_num=i + 1) assert result["healthy"] is True r5 = checker.check("test-proj", db_path, tick_num=5) assert r5["zombie"] is True # --------------------------------------------------------------------------- # T3: 恢复场景 # --------------------------------------------------------------------------- class TestRecovery: def test_recovery_after_real_event(self, checker, db_path, bb): """僵尸后有真实变更 → 告警解除""" # 触发僵尸 for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) # 写一个真实事件(非 daemon_tick) bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d")) # 下一次 check result = checker.check("test-proj", db_path, tick_num=4) assert result["healthy"] is True assert result["resolved"] is True def test_recovery_resets_stale(self, checker, db_path, bb): """恢复后 stale_ticks 归零""" for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d")) result = checker.check("test-proj", db_path, tick_num=4) assert result["stale_ticks"] == 0 def test_recovery_then_zombie_again(self, checker, db_path, bb): """恢复后再次变僵尸""" # 第一次僵尸 for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) # 恢复 bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d")) checker.check("test-proj", db_path, tick_num=4) # 再次僵尸 for i in range(3): r = checker.check("test-proj", db_path, tick_num=5 + i) assert r["zombie"] is True def test_recovery_writes_info_observation(self, checker, db_path, bb): """恢复时写 info observation""" for i in range(3): checker.check("test-proj", db_path, tick_num=i + 1) bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d")) checker.check("test-proj", db_path, tick_num=4) from src.blackboard.queries import Queries queries = Queries(db_path) conn = queries._conn() try: rows = conn.execute( "SELECT * FROM observations WHERE observer='daemon' " "ORDER BY id DESC" ).fetchall() finally: conn.close() # 最新一条应该是 resolved body = json.loads(rows[0]["body"]) assert body["type"] == "zombie_resolved" assert rows[0]["severity"] == "info" # --------------------------------------------------------------------------- # T4: 多项目独立检测(P1) # --------------------------------------------------------------------------- class TestMultiProject: def test_independent_detection(self, tmp_path): """项目 A 僵尸不影响项目 B""" checker = HealthChecker(zombie_threshold=3) # 项目 A db_a = tmp_path / "a" / "blackboard.db" bb_a = Blackboard(db_a) # 项目 B(有真实事件) db_b = tmp_path / "b" / "blackboard.db" bb_b = Blackboard(db_b) bb_b.create_task(Task(id="b1", title="B", status="pending", assigned_by="d")) for i in range(3): checker.check("a", db_a, tick_num=i + 1) checker.check("b", db_b, tick_num=i + 1) status_a = checker.get_status("a") status_b = checker.get_status("b") assert status_a["is_zombie"] is True assert status_b["is_zombie"] is False def test_independent_recovery(self, tmp_path): """项目 A 恢复不影响项目 B""" checker = HealthChecker(zombie_threshold=2) db_a = tmp_path / "a" / "blackboard.db" bb_a = Blackboard(db_a) db_b = tmp_path / "b" / "blackboard.db" bb_b = Blackboard(db_b) # 两个都变僵尸 for i in range(2): checker.check("a", db_a, tick_num=i + 1) checker.check("b", db_b, tick_num=i + 1) # 只恢复 A bb_a.create_task(Task(id="a1", title="A", status="pending", assigned_by="d")) checker.check("a", db_a, tick_num=3) checker.check("b", db_b, tick_num=3) assert checker.get_status("a")["is_zombie"] is False assert checker.get_status("b")["is_zombie"] is True