"""F6 Daemon Ticker 单元测试 按司马懿测试计划 test-plan-v2.6.md §F6: - T1: tick 循环正常运行(P0) - T2: scan_tasks 检测 pending(P0) - T3: 依赖推进(P0) - T4: events 写入(P0) - T5: 多项目轮询(P0) - T6: tick 异常不中断(P1) - T7: 手动 tick 端点(P1) """ import asyncio import json import pytest from pathlib import Path from src.blackboard.operations import Blackboard from src.blackboard.models import Task from src.blackboard.registry import ProjectRegistry from src.daemon.ticker import Ticker # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def data_root(tmp_path): return tmp_path / "projects" @pytest.fixture def registry(data_root): return ProjectRegistry(data_root) @pytest.fixture def project_with_tasks(registry, data_root): """创建一个项目并添加几个任务""" registry.create_project("test-proj", "Test Project", agents=["agent-a"]) db_path = data_root / "test-proj" / "blackboard.db" bb = Blackboard(db_path) # pending 任务 bb.create_task(Task( id="t1", title="Task 1", status="pending", assigned_by="daemon", task_type="coding", )) # blocked 任务(依赖 t1) bb.create_task(Task( id="t2", title="Task 2", status="blocked", assigned_by="daemon", task_type="coding", depends_on=json.dumps(["t1"]), )) return registry, db_path, bb # --------------------------------------------------------------------------- # T1: tick 循环正常运行 # --------------------------------------------------------------------------- class TestTickLoop: def test_ticker_runs(self, registry): """Ticker 可以启动和停止""" ticker = Ticker(registry, tick_interval=0.1, max_ticks=3) assert not ticker.is_running async def run(): await ticker.start() # 等待几个 tick await asyncio.sleep(0.5) await ticker.stop() asyncio.run(run()) assert ticker.tick_count >= 1 def test_max_ticks_respected(self, registry): """max_ticks 限制 tick 次数""" ticker = Ticker(registry, tick_interval=0.05, max_ticks=3) async def run(): await ticker.start() await asyncio.sleep(1.0) # max_ticks 达到后应自动停止 asyncio.run(run()) assert ticker.tick_count <= 4 # 允许少量误差 def test_tick_count_increments(self, registry): """tick_count 随 tick 递增""" ticker = Ticker(registry, tick_interval=0.05, max_ticks=2) async def run(): await ticker.start() await asyncio.sleep(0.3) asyncio.run(run()) assert ticker.tick_count >= 2 def test_ticker_no_projects(self, registry): """无项目时 tick 正常运行不报错""" ticker = Ticker(registry, tick_interval=0.05, max_ticks=1) async def run(): result = await ticker.tick() assert result["tick"] == 1 assert result["projects"] == {} asyncio.run(run()) # --------------------------------------------------------------------------- # T2: scan_tasks 检测 pending # --------------------------------------------------------------------------- class TestScanTasks: def test_scan_finds_pending(self, project_with_tasks): registry, db_path, bb = project_with_tasks ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() proj_result = result["projects"]["test-proj"] assert proj_result["status"] == "ok" assert proj_result["summary_before"]["pending"] == 1 assert proj_result["summary_before"]["blocked"] == 1 asyncio.run(run()) def test_scan_empty_project(self, registry, data_root): """有 DB 但无任务的项目 tick 返回 ok + 空状态""" registry.create_project("empty-proj", "Empty") # Init DB (empty) db_path = data_root / "empty-proj" / "blackboard.db" Blackboard(db_path) # creates tables ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() proj_result = result["projects"]["empty-proj"] assert proj_result["status"] == "ok" assert proj_result["summary_before"] == {} asyncio.run(run()) def test_scan_project_no_db(self, registry, data_root): """项目目录存在但无 DB 时返回 no_db""" registry.create_project("no-db-proj", "No DB") ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() proj_result = result["projects"]["no-db-proj"] assert proj_result["status"] == "no_db" asyncio.run(run()) # --------------------------------------------------------------------------- # T3: 依赖推进 # --------------------------------------------------------------------------- class TestDependencyAdvance: def test_blocked_advances_when_deps_done(self, project_with_tasks): """依赖完成后 blocked → pending""" registry, db_path, bb = project_with_tasks # 把 t1 标记为 done bb.update_task_status("t1", "claimed", agent="agent-a") bb.update_task_status("t1", "working", agent="agent-a") bb.update_task_status("t1", "review", agent="agent-a") bb.update_task_status("t1", "done", agent="agent-a") ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() proj_result = result["projects"]["test-proj"] assert "t2" in proj_result["advanced"] # 验证 t2 状态变为 pending from src.blackboard.queries import Queries queries = Queries(db_path) summary = queries.task_summary() assert summary.get("pending", 0) >= 1 # t2 现在 pending assert summary.get("blocked", 0) == 0 asyncio.run(run()) def test_blocked_stays_when_deps_not_done(self, project_with_tasks): """依赖未完成时 blocked 不推进""" registry, db_path, bb = project_with_tasks ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() proj_result = result["projects"]["test-proj"] assert len(proj_result["advanced"]) == 0 from src.blackboard.queries import Queries queries = Queries(db_path) summary = queries.task_summary() assert summary.get("blocked", 0) == 1 asyncio.run(run()) def test_chain_advance(self, registry, data_root): """链式依赖:t1 done → t2 unblock → t3 unblock""" registry.create_project("chain", "Chain") db_path = data_root / "chain" / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d")) bb.create_task(Task( id="t2", title="T2", status="blocked", assigned_by="d", depends_on=json.dumps(["t1"]), )) bb.create_task(Task( id="t3", title="T3", status="blocked", assigned_by="d", depends_on=json.dumps(["t2"]), )) # t1 done for s in ("claimed", "working", "review", "done"): bb.update_task_status("t1", s, agent="a") ticker = Ticker(registry, tick_interval=30) async def run(): # First tick: t1 done → t2 unblock r1 = await ticker.tick() assert "t2" in r1["projects"]["chain"]["advanced"] assert "t3" not in r1["projects"]["chain"]["advanced"] # Second tick: t2 pending (just unblocked), but not done yet r2 = await ticker.tick() # t3 still blocked because t2 is pending not done assert "t3" not in r2["projects"]["chain"]["advanced"] asyncio.run(run()) def test_multi_dep_all_done(self, registry, data_root): """多依赖全部完成后才推进""" registry.create_project("multi", "Multi") db_path = data_root / "multi" / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d")) bb.create_task(Task(id="t2", title="T2", status="pending", assigned_by="d")) bb.create_task(Task( id="t3", title="T3", status="blocked", assigned_by="d", depends_on=json.dumps(["t1", "t2"]), )) # Only t1 done for s in ("claimed", "working", "review", "done"): bb.update_task_status("t1", s, agent="a") ticker = Ticker(registry, tick_interval=30) async def run(): r1 = await ticker.tick() assert "t3" not in r1["projects"]["multi"]["advanced"] # Now t2 also done for s in ("claimed", "working", "review", "done"): bb.update_task_status("t2", s, agent="a") r2 = await ticker.tick() assert "t3" in r2["projects"]["multi"]["advanced"] asyncio.run(run()) # --------------------------------------------------------------------------- # T4: events 写入 # --------------------------------------------------------------------------- class TestEvents: def test_daemon_tick_event_written(self, project_with_tasks): registry, db_path, bb = project_with_tasks ticker = Ticker(registry, tick_interval=30) async def run(): await ticker.tick() from src.blackboard.queries import Queries queries = Queries(db_path) events = queries.recent_events(limit=5) tick_events = [e for e in events if e["event_type"] == "daemon_tick"] assert len(tick_events) >= 1 detail = json.loads(tick_events[0]["detail"]) assert detail["tick"] == 1 asyncio.run(run()) def test_advance_event_in_detail(self, project_with_tasks): registry, db_path, bb = project_with_tasks # Make t1 done so t2 can advance for s in ("claimed", "working", "review", "done"): bb.update_task_status("t1", s, agent="a") ticker = Ticker(registry, tick_interval=30) async def run(): await ticker.tick() from src.blackboard.queries import Queries queries = Queries(db_path) events = queries.recent_events(limit=10) tick_events = [e for e in events if e["event_type"] == "daemon_tick"] detail = json.loads(tick_events[0]["detail"]) assert detail["advanced_count"] == 1 asyncio.run(run()) # --------------------------------------------------------------------------- # T5: 多项目轮询 # --------------------------------------------------------------------------- class TestMultiProject: def test_ticks_all_active_projects(self, registry, data_root): """tick 遍历所有 active 项目""" for pid in ("proj-a", "proj-b", "proj-c"): registry.create_project(pid, f"Project {pid}") db_path = data_root / pid / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task( id=f"{pid}-t1", title=f"Task {pid}", status="pending", assigned_by="d", )) ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() assert len(result["projects"]) == 3 for pid in ("proj-a", "proj-b", "proj-c"): assert pid in result["projects"] assert result["projects"][pid]["status"] == "ok" asyncio.run(run()) def test_skips_archived_projects(self, registry, data_root): """归档项目不参与 tick""" registry.create_project("active", "Active") registry.create_project("archived", "Archived") registry.archive_project("archived") # Add DB to both for pid in ("active", "archived"): db_path = data_root / pid / "blackboard.db" Blackboard(db_path) ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.tick() # archived 项目被 rename 到 _archived/,但 registry 仍记录它 # ticker 应跳过 status != "active" 的项目 for pid, pr in result["projects"].items(): assert pid != "archived" asyncio.run(run()) # --------------------------------------------------------------------------- # T6: tick 异常不中断(P1) # --------------------------------------------------------------------------- class TestTickResilience: def test_tick_continues_after_error(self, registry, data_root): """单次 tick 异常不影响后续 tick""" registry.create_project("good", "Good") db_path = data_root / "good" / "blackboard.db" Blackboard(db_path) ticker = Ticker(registry, tick_interval=0.05, max_ticks=3) tick_count = [0] original_tick_project = ticker._tick_project async def failing_tick(project_id, project_info): tick_count[0] += 1 if tick_count[0] == 1: raise RuntimeError("Simulated failure") return await original_tick_project(project_id, project_info) ticker._tick_project = failing_tick async def run(): await ticker.start() await asyncio.sleep(0.3) asyncio.run(run()) assert ticker.tick_count >= 2 # First tick had error but continued # --------------------------------------------------------------------------- # T7: 手动 tick 端点(P1) # --------------------------------------------------------------------------- class TestManualTick: def test_manual_tick(self, project_with_tasks): registry, db_path, bb = project_with_tasks ticker = Ticker(registry, tick_interval=30) async def run(): result = await ticker.manual_tick() assert result["manual"] is True assert result["tick"] == 1 assert "test-proj" in result["projects"] asyncio.run(run()) def test_manual_tick_increments_count(self, project_with_tasks): registry, db_path, bb = project_with_tasks ticker = Ticker(registry, tick_interval=30) async def run(): await ticker.manual_tick() await ticker.manual_tick() assert ticker.tick_count == 2 asyncio.run(run())