auto-sync: 2026-05-17 05:47:11

This commit is contained in:
cfdaily
2026-05-17 05:47:11 +08:00
parent 11252e0f3d
commit 538cc9c0af
2 changed files with 432 additions and 4 deletions
+11 -4
View File
@@ -9,11 +9,13 @@ router = APIRouter(prefix="/api/daemon", tags=["daemon"])
@router.get("/status")
async def daemon_status():
from src.main import _ticker_task, config
from src.main import get_ticker, config
t = get_ticker()
return {
"status": "running",
"version": "2.6.0",
"ticker_running": _ticker_task is not None and not _ticker_task.done(),
"ticker_running": t is not None and t.is_running,
"tick_count": t.tick_count if t else 0,
"config": {
"tick_interval": config.get("daemon", {}).get("tick_interval", 30),
"max_global_agents": config.get("daemon", {}).get("max_global_agents", 5),
@@ -23,5 +25,10 @@ async def daemon_status():
@router.post("/tick")
async def manual_tick():
"""触发手动 tick(占位,F6 实现)"""
return {"ok": True, "message": "Manual tick triggered (placeholder)"}
"""触发手动 tick"""
from src.main import get_ticker
t = get_ticker()
if t is None:
return {"ok": False, "error": "Ticker not initialized"}
result = await t.manual_tick()
return {"ok": True, "result": result}
+421
View File
@@ -0,0 +1,421 @@
"""F6 Daemon Ticker 单元测试
按司马懿测试计划 test-plan-v2.6.md §F6
- T1: tick 循环正常运行(P0
- T2: scan_tasks 检测 pendingP0
- T3: 依赖推进(P0
- T4: events 写入(P0
- T5: 多项目轮询(P0
- T6: tick 异常不中断(P1
- T7: 手动 tick 端点(P1
"""
import asyncio
import json
import pytest
from pathlib import Path
from src.blackboard.blackboard import Blackboard
from src.blackboard.models import Task
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def data_root(tmp_path):
return tmp_path / "projects"
@pytest.fixture
def registry(data_root):
return ProjectRegistry(data_root)
@pytest.fixture
def project_with_tasks(registry, data_root):
"""创建一个项目并添加几个任务"""
registry.create_project("test-proj", "Test Project", agents=["agent-a"])
db_path = data_root / "test-proj" / "blackboard.db"
bb = Blackboard(db_path)
# pending 任务
bb.create_task(Task(
id="t1", title="Task 1", status="pending",
assigned_by="daemon", task_type="coding",
))
# blocked 任务(依赖 t1
bb.create_task(Task(
id="t2", title="Task 2", status="blocked",
assigned_by="daemon", task_type="coding",
depends_on=json.dumps(["t1"]),
))
return registry, db_path, bb
# ---------------------------------------------------------------------------
# T1: tick 循环正常运行
# ---------------------------------------------------------------------------
class TestTickLoop:
def test_ticker_runs(self, registry):
"""Ticker 可以启动和停止"""
ticker = Ticker(registry, tick_interval=0.1, max_ticks=3)
assert not ticker.is_running
async def run():
await ticker.start()
# 等待几个 tick
await asyncio.sleep(0.5)
await ticker.stop()
asyncio.run(run())
assert ticker.tick_count >= 1
def test_max_ticks_respected(self, registry):
"""max_ticks 限制 tick 次数"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
async def run():
await ticker.start()
await asyncio.sleep(1.0)
# max_ticks 达到后应自动停止
asyncio.run(run())
assert ticker.tick_count <= 4 # 允许少量误差
def test_tick_count_increments(self, registry):
"""tick_count 随 tick 递增"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=2)
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2
def test_ticker_no_projects(self, registry):
"""无项目时 tick 正常运行不报错"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=1)
async def run():
result = await ticker.tick()
assert result["tick"] == 1
assert result["projects"] == {}
asyncio.run(run())
# ---------------------------------------------------------------------------
# T2: scan_tasks 检测 pending
# ---------------------------------------------------------------------------
class TestScanTasks:
def test_scan_finds_pending(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"]["pending"] == 1
assert proj_result["summary_before"]["blocked"] == 1
asyncio.run(run())
def test_scan_empty_project(self, registry, data_root):
"""空项目的 tick 返回 ok + 空状态"""
registry.create_project("empty-proj", "Empty")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["empty-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"] == {}
asyncio.run(run())
def test_scan_project_no_db(self, registry, data_root):
"""项目目录存在但无 DB 时返回 no_db"""
registry.create_project("no-db-proj", "No DB")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["no-db-proj"]
assert proj_result["status"] == "no_db"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T3: 依赖推进
# ---------------------------------------------------------------------------
class TestDependencyAdvance:
def test_blocked_advances_when_deps_done(self, project_with_tasks):
"""依赖完成后 blocked → pending"""
registry, db_path, bb = project_with_tasks
# 把 t1 标记为 done
bb.update_task_status("t1", "claimed", agent="agent-a")
bb.update_task_status("t1", "working", agent="agent-a")
bb.update_task_status("t1", "review", agent="agent-a")
bb.update_task_status("t1", "done", agent="agent-a")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert "t2" in proj_result["advanced"]
# 验证 t2 状态变为 pending
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("pending", 0) >= 1 # t2 现在 pending
assert summary.get("blocked", 0) == 0
asyncio.run(run())
def test_blocked_stays_when_deps_not_done(self, project_with_tasks):
"""依赖未完成时 blocked 不推进"""
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert len(proj_result["advanced"]) == 0
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("blocked", 0) == 1
asyncio.run(run())
def test_chain_advance(self, registry, data_root):
"""链式依赖:t1 done → t2 unblock → t3 unblock"""
registry.create_project("chain", "Chain")
db_path = data_root / "chain" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t2", title="T2", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1"]),
))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t2"]),
))
# t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
# First tick: t1 done → t2 unblock
r1 = await ticker.tick()
assert "t2" in r1["projects"]["chain"]["advanced"]
assert "t3" not in r1["projects"]["chain"]["advanced"]
# Second tick: t2 pending (just unblocked), but not done yet
r2 = await ticker.tick()
# t3 still blocked because t2 is pending not done
assert "t3" not in r2["projects"]["chain"]["advanced"]
asyncio.run(run())
def test_multi_dep_all_done(self, registry, data_root):
"""多依赖全部完成后才推进"""
registry.create_project("multi", "Multi")
db_path = data_root / "multi" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(id="t2", title="T2", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1", "t2"]),
))
# Only t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
r1 = await ticker.tick()
assert "t3" not in r1["projects"]["multi"]["advanced"]
# Now t2 also done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t2", s, agent="a")
r2 = await ticker.tick()
assert "t3" in r2["projects"]["multi"]["advanced"]
asyncio.run(run())
# ---------------------------------------------------------------------------
# T4: events 写入
# ---------------------------------------------------------------------------
class TestEvents:
def test_daemon_tick_event_written(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=5)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
assert len(tick_events) >= 1
detail = json.loads(tick_events[0]["detail"])
assert detail["tick"] == 1
asyncio.run(run())
def test_advance_event_in_detail(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
# Make t1 done so t2 can advance
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=10)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
detail = json.loads(tick_events[0]["detail"])
assert detail["advanced_count"] == 1
asyncio.run(run())
# ---------------------------------------------------------------------------
# T5: 多项目轮询
# ---------------------------------------------------------------------------
class TestMultiProject:
def test_ticks_all_active_projects(self, registry, data_root):
"""tick 遍历所有 active 项目"""
for pid in ("proj-a", "proj-b", "proj-c"):
registry.create_project(pid, f"Project {pid}")
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id=f"{pid}-t1", title=f"Task {pid}",
status="pending", assigned_by="d",
))
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
assert len(result["projects"]) == 3
for pid in ("proj-a", "proj-b", "proj-c"):
assert pid in result["projects"]
assert result["projects"][pid]["status"] == "ok"
asyncio.run(run())
def test_skips_archived_projects(self, registry, data_root):
"""归档项目不参与 tick"""
registry.create_project("active", "Active")
registry.create_project("archived", "Archived")
registry.archive_project("archived")
# Add DB to both
for pid in ("active", "archived"):
db_path = data_root / pid / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
# archived 项目被 rename 到 _archived/,但 registry 仍记录它
# ticker 应跳过 status != "active" 的项目
for pid, pr in result["projects"].items():
assert pid != "archived"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T6: tick 异常不中断(P1
# ---------------------------------------------------------------------------
class TestTickResilience:
def test_tick_continues_after_error(self, registry, data_root):
"""单次 tick 异常不影响后续 tick"""
registry.create_project("good", "Good")
db_path = data_root / "good" / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
tick_count = [0]
original_tick_project = ticker._tick_project
async def failing_tick(project_id, project_info):
tick_count[0] += 1
if tick_count[0] == 1:
raise RuntimeError("Simulated failure")
return await original_tick_project(project_id, project_info)
ticker._tick_project = failing_tick
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2 # First tick had error but continued
# ---------------------------------------------------------------------------
# T7: 手动 tick 端点(P1
# ---------------------------------------------------------------------------
class TestManualTick:
def test_manual_tick(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.manual_tick()
assert result["manual"] is True
assert result["tick"] == 1
assert "test-proj" in result["projects"]
asyncio.run(run())
def test_manual_tick_increments_count(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.manual_tick()
await ticker.manual_tick()
assert ticker.tick_count == 2
asyncio.run(run())