auto-sync: 2026-05-17 05:55:15

This commit is contained in:
cfdaily
2026-05-17 05:55:15 +08:00
parent 86f24d98e7
commit 9957893379
2 changed files with 257 additions and 2 deletions
+2 -2
View File
@@ -260,8 +260,8 @@ Layer 6: F17 SSE+Hook → F18 前端
| F3 | 多项目管理 | ✅ 完成 | ✅ 11/11 | ✅ 通过 |
| F4 | CLI 工具 | ✅ 完成 | ✅ 14/14 | ✅ 通过 |
| F5 | API 层 | ✅ 完成 | ✅ 23/23 | ✅ 通过 |
| F6 | Daemon Ticker | ✅ 完成 | ✅ 18/18 | ⬜ 待审 |
| F7 | Inbox JSONL | ⬜ 待开始 | — | — |
| F6 | Daemon Ticker | ✅ 完成 | ✅ 18/18 | ✅ 通过 |
| F7 | Inbox JSONL | ✅ 完成 | ✅ 16/16 | ⬜ 待审 |
| F8 | 健康检查 | ⬜ 待开始 | — | — |
| F9 | Agent 调度器 | ⬜ 待开始 | — | — |
| F10 | Counter | ⬜ 待开始 | — | — |
+255
View File
@@ -0,0 +1,255 @@
"""F8 健康检查单元测试
按 test-plan-v2.6.md §F8
- T1: 正常场景(P0
- T2: 僵尸检测(P0
- T3: 恢复场景(P0
- T4: 多项目独立检测(P1
"""
import asyncio
import json
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
from src.daemon.health import HealthChecker
from src.daemon.ticker import Ticker
from src.blackboard.registry import ProjectRegistry
@pytest.fixture
def db_path(tmp_path):
return tmp_path / "test-proj" / "blackboard.db"
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def checker():
return HealthChecker(zombie_threshold=3)
# ---------------------------------------------------------------------------
# T1: 正常场景
# ---------------------------------------------------------------------------
class TestNormal:
def test_healthy_project(self, checker, db_path, bb):
"""有真实变更 → 健康"""
# 写一个真实事件
bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d"))
result = checker.check("test-proj", db_path, tick_num=1)
assert result["healthy"] is True
assert result["zombie"] is False
def test_no_zombie_alert_under_threshold(self, checker, db_path, bb):
"""stale < threshold → 不告警"""
for i in range(2):
result = checker.check("test-proj", db_path, tick_num=i + 1)
assert result["healthy"] is True
assert result["zombie"] is False
def test_stale_ticks_increment(self, checker, db_path, bb):
"""每次无变更 tick 递增 stale_ticks"""
r1 = checker.check("test-proj", db_path, tick_num=1)
assert r1["stale_ticks"] == 1
r2 = checker.check("test-proj", db_path, tick_num=2)
assert r2["stale_ticks"] == 2
def test_no_db_is_healthy(self, checker, tmp_path):
"""无 DB 文件 → 健康(不报错)"""
result = checker.check("no-db", tmp_path / "nonexistent.db", tick_num=1)
assert result["healthy"] is True
# ---------------------------------------------------------------------------
# T2: 僵尸检测
# ---------------------------------------------------------------------------
class TestZombieDetection:
def test_zombie_after_threshold(self, checker, db_path, bb):
"""达到 threshold → 僵尸告警"""
# 连续 threshold 次 tick 无真实变更
for i in range(3):
result = checker.check("test-proj", db_path, tick_num=i + 1)
assert result["zombie"] is True
assert result["healthy"] is False
assert result["alert_written"] is True
def test_zombie_writes_observation(self, checker, db_path, bb):
"""僵尸告警写入 observations 表"""
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
from src.blackboard.queries import Queries
queries = Queries(db_path)
# 查 observations(排除 events 里的 daemon_tick
conn = queries._conn()
try:
rows = conn.execute(
"SELECT * FROM observations WHERE observer='daemon'"
).fetchall()
finally:
conn.close()
assert len(rows) >= 1
body = json.loads(rows[0]["body"])
assert body["type"] == "zombie_detected"
assert body["stale_ticks"] == 3
def test_zombie_writes_event(self, checker, db_path, bb):
"""僵尸告警写入 events 表"""
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=20)
zombie_events = [e for e in events
if e["event_type"] == "agent_zombie_detected"]
assert len(zombie_events) >= 1
def test_no_duplicate_alert(self, checker, db_path, bb):
"""已告警后不再重复告警"""
for i in range(5):
result = checker.check("test-proj", db_path, tick_num=i + 1)
# 只在第一次达到 threshold 时告警
assert result["alert_written"] is False # 后续 tick 不再重复
assert result["zombie"] is True
def test_custom_threshold(self, db_path, bb):
"""自定义 threshold"""
checker = HealthChecker(zombie_threshold=5)
for i in range(4):
result = checker.check("test-proj", db_path, tick_num=i + 1)
assert result["healthy"] is True
r5 = checker.check("test-proj", db_path, tick_num=5)
assert r5["zombie"] is True
# ---------------------------------------------------------------------------
# T3: 恢复场景
# ---------------------------------------------------------------------------
class TestRecovery:
def test_recovery_after_real_event(self, checker, db_path, bb):
"""僵尸后有真实变更 → 告警解除"""
# 触发僵尸
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
# 写一个真实事件(非 daemon_tick
bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d"))
# 下一次 check
result = checker.check("test-proj", db_path, tick_num=4)
assert result["healthy"] is True
assert result["resolved"] is True
def test_recovery_resets_stale(self, checker, db_path, bb):
"""恢复后 stale_ticks 归零"""
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d"))
result = checker.check("test-proj", db_path, tick_num=4)
assert result["stale_ticks"] == 0
def test_recovery_then_zombie_again(self, checker, db_path, bb):
"""恢复后再次变僵尸"""
# 第一次僵尸
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
# 恢复
bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d"))
checker.check("test-proj", db_path, tick_num=4)
# 再次僵尸
for i in range(3):
r = checker.check("test-proj", db_path, tick_num=5 + i)
assert r["zombie"] is True
def test_recovery_writes_info_observation(self, checker, db_path, bb):
"""恢复时写 info observation"""
for i in range(3):
checker.check("test-proj", db_path, tick_num=i + 1)
bb.create_task(Task(id="t1", title="T", status="pending", assigned_by="d"))
checker.check("test-proj", db_path, tick_num=4)
from src.blackboard.queries import Queries
queries = Queries(db_path)
conn = queries._conn()
try:
rows = conn.execute(
"SELECT * FROM observations WHERE observer='daemon' "
"ORDER BY created_at DESC"
).fetchall()
finally:
conn.close()
# 最新一条应该是 resolved
body = json.loads(rows[0]["body"])
assert body["type"] == "zombie_resolved"
assert rows[0]["severity"] == "info"
# ---------------------------------------------------------------------------
# T4: 多项目独立检测(P1
# ---------------------------------------------------------------------------
class TestMultiProject:
def test_independent_detection(self, tmp_path):
"""项目 A 僵尸不影响项目 B"""
checker = HealthChecker(zombie_threshold=3)
# 项目 A
db_a = tmp_path / "a" / "blackboard.db"
bb_a = Blackboard(db_a)
# 项目 B(有真实事件)
db_b = tmp_path / "b" / "blackboard.db"
bb_b = Blackboard(db_b)
bb_b.create_task(Task(id="b1", title="B", status="pending", assigned_by="d"))
for i in range(3):
checker.check("a", db_a, tick_num=i + 1)
checker.check("b", db_b, tick_num=i + 1)
status_a = checker.get_status("a")
status_b = checker.get_status("b")
assert status_a["is_zombie"] is True
assert status_b["is_zombie"] is False
def test_independent_recovery(self, tmp_path):
"""项目 A 恢复不影响项目 B"""
checker = HealthChecker(zombie_threshold=2)
db_a = tmp_path / "a" / "blackboard.db"
bb_a = Blackboard(db_a)
db_b = tmp_path / "b" / "blackboard.db"
bb_b = Blackboard(db_b)
# 两个都变僵尸
for i in range(2):
checker.check("a", db_a, tick_num=i + 1)
checker.check("b", db_b, tick_num=i + 1)
# 只恢复 A
bb_a.create_task(Task(id="a1", title="A", status="pending", assigned_by="d"))
checker.check("a", db_a, tick_num=3)
checker.check("b", db_b, tick_num=3)
assert checker.get_status("a")["is_zombie"] is False
assert checker.get_status("b")["is_zombie"] is True