Files
sanguo_moziplus_v2/tests/test_ticker.py
T
2026-05-17 05:47:59 +08:00

425 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F6 Daemon Ticker 单元测试
按司马懿测试计划 test-plan-v2.6.md §F6
- T1: tick 循环正常运行(P0
- T2: scan_tasks 检测 pendingP0
- T3: 依赖推进(P0
- T4: events 写入(P0
- T5: 多项目轮询(P0
- T6: tick 异常不中断(P1
- T7: 手动 tick 端点(P1
"""
import asyncio
import json
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def data_root(tmp_path):
return tmp_path / "projects"
@pytest.fixture
def registry(data_root):
return ProjectRegistry(data_root)
@pytest.fixture
def project_with_tasks(registry, data_root):
"""创建一个项目并添加几个任务"""
registry.create_project("test-proj", "Test Project", agents=["agent-a"])
db_path = data_root / "test-proj" / "blackboard.db"
bb = Blackboard(db_path)
# pending 任务
bb.create_task(Task(
id="t1", title="Task 1", status="pending",
assigned_by="daemon", task_type="coding",
))
# blocked 任务(依赖 t1
bb.create_task(Task(
id="t2", title="Task 2", status="blocked",
assigned_by="daemon", task_type="coding",
depends_on=json.dumps(["t1"]),
))
return registry, db_path, bb
# ---------------------------------------------------------------------------
# T1: tick 循环正常运行
# ---------------------------------------------------------------------------
class TestTickLoop:
def test_ticker_runs(self, registry):
"""Ticker 可以启动和停止"""
ticker = Ticker(registry, tick_interval=0.1, max_ticks=3)
assert not ticker.is_running
async def run():
await ticker.start()
# 等待几个 tick
await asyncio.sleep(0.5)
await ticker.stop()
asyncio.run(run())
assert ticker.tick_count >= 1
def test_max_ticks_respected(self, registry):
"""max_ticks 限制 tick 次数"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
async def run():
await ticker.start()
await asyncio.sleep(1.0)
# max_ticks 达到后应自动停止
asyncio.run(run())
assert ticker.tick_count <= 4 # 允许少量误差
def test_tick_count_increments(self, registry):
"""tick_count 随 tick 递增"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=2)
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2
def test_ticker_no_projects(self, registry):
"""无项目时 tick 正常运行不报错"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=1)
async def run():
result = await ticker.tick()
assert result["tick"] == 1
assert result["projects"] == {}
asyncio.run(run())
# ---------------------------------------------------------------------------
# T2: scan_tasks 检测 pending
# ---------------------------------------------------------------------------
class TestScanTasks:
def test_scan_finds_pending(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"]["pending"] == 1
assert proj_result["summary_before"]["blocked"] == 1
asyncio.run(run())
def test_scan_empty_project(self, registry, data_root):
"""有 DB 但无任务的项目 tick 返回 ok + 空状态"""
registry.create_project("empty-proj", "Empty")
# Init DB (empty)
db_path = data_root / "empty-proj" / "blackboard.db"
Blackboard(db_path) # creates tables
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["empty-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"] == {}
asyncio.run(run())
def test_scan_project_no_db(self, registry, data_root):
"""项目目录存在但无 DB 时返回 no_db"""
registry.create_project("no-db-proj", "No DB")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["no-db-proj"]
assert proj_result["status"] == "no_db"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T3: 依赖推进
# ---------------------------------------------------------------------------
class TestDependencyAdvance:
def test_blocked_advances_when_deps_done(self, project_with_tasks):
"""依赖完成后 blocked → pending"""
registry, db_path, bb = project_with_tasks
# 把 t1 标记为 done
bb.update_task_status("t1", "claimed", agent="agent-a")
bb.update_task_status("t1", "working", agent="agent-a")
bb.update_task_status("t1", "review", agent="agent-a")
bb.update_task_status("t1", "done", agent="agent-a")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert "t2" in proj_result["advanced"]
# 验证 t2 状态变为 pending
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("pending", 0) >= 1 # t2 现在 pending
assert summary.get("blocked", 0) == 0
asyncio.run(run())
def test_blocked_stays_when_deps_not_done(self, project_with_tasks):
"""依赖未完成时 blocked 不推进"""
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert len(proj_result["advanced"]) == 0
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("blocked", 0) == 1
asyncio.run(run())
def test_chain_advance(self, registry, data_root):
"""链式依赖:t1 done → t2 unblock → t3 unblock"""
registry.create_project("chain", "Chain")
db_path = data_root / "chain" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t2", title="T2", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1"]),
))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t2"]),
))
# t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
# First tick: t1 done → t2 unblock
r1 = await ticker.tick()
assert "t2" in r1["projects"]["chain"]["advanced"]
assert "t3" not in r1["projects"]["chain"]["advanced"]
# Second tick: t2 pending (just unblocked), but not done yet
r2 = await ticker.tick()
# t3 still blocked because t2 is pending not done
assert "t3" not in r2["projects"]["chain"]["advanced"]
asyncio.run(run())
def test_multi_dep_all_done(self, registry, data_root):
"""多依赖全部完成后才推进"""
registry.create_project("multi", "Multi")
db_path = data_root / "multi" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(id="t2", title="T2", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1", "t2"]),
))
# Only t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
r1 = await ticker.tick()
assert "t3" not in r1["projects"]["multi"]["advanced"]
# Now t2 also done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t2", s, agent="a")
r2 = await ticker.tick()
assert "t3" in r2["projects"]["multi"]["advanced"]
asyncio.run(run())
# ---------------------------------------------------------------------------
# T4: events 写入
# ---------------------------------------------------------------------------
class TestEvents:
def test_daemon_tick_event_written(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=5)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
assert len(tick_events) >= 1
detail = json.loads(tick_events[0]["detail"])
assert detail["tick"] == 1
asyncio.run(run())
def test_advance_event_in_detail(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
# Make t1 done so t2 can advance
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=10)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
detail = json.loads(tick_events[0]["detail"])
assert detail["advanced_count"] == 1
asyncio.run(run())
# ---------------------------------------------------------------------------
# T5: 多项目轮询
# ---------------------------------------------------------------------------
class TestMultiProject:
def test_ticks_all_active_projects(self, registry, data_root):
"""tick 遍历所有 active 项目"""
for pid in ("proj-a", "proj-b", "proj-c"):
registry.create_project(pid, f"Project {pid}")
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id=f"{pid}-t1", title=f"Task {pid}",
status="pending", assigned_by="d",
))
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
assert len(result["projects"]) == 3
for pid in ("proj-a", "proj-b", "proj-c"):
assert pid in result["projects"]
assert result["projects"][pid]["status"] == "ok"
asyncio.run(run())
def test_skips_archived_projects(self, registry, data_root):
"""归档项目不参与 tick"""
registry.create_project("active", "Active")
registry.create_project("archived", "Archived")
registry.archive_project("archived")
# Add DB to both
for pid in ("active", "archived"):
db_path = data_root / pid / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
# archived 项目被 rename 到 _archived/,但 registry 仍记录它
# ticker 应跳过 status != "active" 的项目
for pid, pr in result["projects"].items():
assert pid != "archived"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T6: tick 异常不中断(P1
# ---------------------------------------------------------------------------
class TestTickResilience:
def test_tick_continues_after_error(self, registry, data_root):
"""单次 tick 异常不影响后续 tick"""
registry.create_project("good", "Good")
db_path = data_root / "good" / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
tick_count = [0]
original_tick_project = ticker._tick_project
async def failing_tick(project_id, project_info):
tick_count[0] += 1
if tick_count[0] == 1:
raise RuntimeError("Simulated failure")
return await original_tick_project(project_id, project_info)
ticker._tick_project = failing_tick
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2 # First tick had error but continued
# ---------------------------------------------------------------------------
# T7: 手动 tick 端点(P1
# ---------------------------------------------------------------------------
class TestManualTick:
def test_manual_tick(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.manual_tick()
assert result["manual"] is True
assert result["tick"] == 1
assert "test-proj" in result["projects"]
asyncio.run(run())
def test_manual_tick_increments_count(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.manual_tick()
await ticker.manual_tick()
assert ticker.tick_count == 2
asyncio.run(run())