Files
sanguo_moziplus_v2/tests/test_ticker.py
T
2026-06-01 22:44:43 +08:00

573 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F6 Daemon Ticker 单元测试
按司马懿测试计划 test-plan-v2.6.md §F6
- T1: tick 循环正常运行(P0
- T2: scan_tasks 检测 pendingP0
- T3: 依赖推进(P0
- T4: events 写入(P0
- T5: 多项目轮询(P0
- T6: tick 异常不中断(P1
- T7: 手动 tick 端点(P1
v2.8 新增(#07.2 _check_timeouts 统一 + #07.3 ACT-1 updated_at fallback):
- E12: _check_timeouts 统一超时(4 个测试)
"""
import asyncio
import json
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
from src.blackboard.registry import ProjectRegistry
from src.blackboard.queries import Queries
from src.daemon.ticker import Ticker
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def data_root(tmp_path):
return tmp_path / "projects"
@pytest.fixture
def registry(data_root):
return ProjectRegistry(data_root)
@pytest.fixture
def project_with_tasks(registry, data_root):
"""创建一个项目并添加几个任务"""
registry.create_project("test-proj", "Test Project", agents=["agent-a"])
db_path = data_root / "test-proj" / "blackboard.db"
bb = Blackboard(db_path)
# pending 任务
bb.create_task(Task(
id="t1", title="Task 1", status="pending",
assigned_by="daemon", task_type="coding",
))
# blocked 任务(依赖 t1
bb.create_task(Task(
id="t2", title="Task 2", status="blocked",
assigned_by="daemon", task_type="coding",
depends_on=json.dumps(["t1"]),
))
return registry, db_path, bb
# ---------------------------------------------------------------------------
# T1: tick 循环正常运行
# ---------------------------------------------------------------------------
class TestTickLoop:
def test_ticker_runs(self, registry):
"""Ticker 可以启动和停止"""
ticker = Ticker(registry, tick_interval=0.1, max_ticks=3)
assert not ticker.is_running
async def run():
await ticker.start()
# 等待几个 tick
await asyncio.sleep(0.5)
await ticker.stop()
asyncio.run(run())
assert ticker.tick_count >= 1
def test_max_ticks_respected(self, registry):
"""max_ticks 限制 tick 次数"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
async def run():
await ticker.start()
await asyncio.sleep(1.0)
# max_ticks 达到后应自动停止
asyncio.run(run())
assert ticker.tick_count <= 4 # 允许少量误差
def test_tick_count_increments(self, registry):
"""tick_count 随 tick 递增"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=2)
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2
def test_ticker_no_projects(self, registry):
"""无项目时 tick 正常运行不报错"""
ticker = Ticker(registry, tick_interval=0.05, max_ticks=1)
async def run():
result = await ticker.tick()
assert result["tick"] == 1
assert result["projects"] == {}
asyncio.run(run())
# ---------------------------------------------------------------------------
# T2: scan_tasks 检测 pending
# ---------------------------------------------------------------------------
class TestScanTasks:
def test_scan_finds_pending(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"]["pending"] == 1
assert proj_result["summary_before"]["blocked"] == 1
asyncio.run(run())
def test_scan_empty_project(self, registry, data_root):
"""有 DB 但无任务的项目 tick 返回 ok + 空状态"""
registry.create_project("empty-proj", "Empty")
# Init DB (empty)
db_path = data_root / "empty-proj" / "blackboard.db"
Blackboard(db_path) # creates tables
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["empty-proj"]
assert proj_result["status"] == "ok"
assert proj_result["summary_before"] == {}
asyncio.run(run())
def test_scan_project_no_db(self, registry, data_root):
"""项目目录存在但无 DB 时返回 no_db"""
registry.create_project("no-db-proj", "No DB")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["no-db-proj"]
assert proj_result["status"] == "no_db"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T3: 依赖推进
# ---------------------------------------------------------------------------
class TestDependencyAdvance:
def test_blocked_advances_when_deps_done(self, project_with_tasks):
"""依赖完成后 blocked → pending"""
registry, db_path, bb = project_with_tasks
# 把 t1 标记为 done
bb.update_task_status("t1", "claimed", agent="agent-a")
bb.update_task_status("t1", "working", agent="agent-a")
bb.update_task_status("t1", "review", agent="agent-a")
bb.update_task_status("t1", "done", agent="agent-a")
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert "t2" in proj_result["advanced"]
# 验证 t2 状态变为 pending
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("pending", 0) >= 1 # t2 现在 pending
assert summary.get("blocked", 0) == 0
asyncio.run(run())
def test_blocked_stays_when_deps_not_done(self, project_with_tasks):
"""依赖未完成时 blocked 不推进"""
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
proj_result = result["projects"]["test-proj"]
assert len(proj_result["advanced"]) == 0
from src.blackboard.queries import Queries
queries = Queries(db_path)
summary = queries.task_summary()
assert summary.get("blocked", 0) == 1
asyncio.run(run())
def test_chain_advance(self, registry, data_root):
"""链式依赖:t1 done → t2 unblock → t3 unblock"""
registry.create_project("chain", "Chain")
db_path = data_root / "chain" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t2", title="T2", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1"]),
))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t2"]),
))
# t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
# First tick: t1 done → t2 unblock
r1 = await ticker.tick()
assert "t2" in r1["projects"]["chain"]["advanced"]
assert "t3" not in r1["projects"]["chain"]["advanced"]
# Second tick: t2 pending (just unblocked), but not done yet
r2 = await ticker.tick()
# t3 still blocked because t2 is pending not done
assert "t3" not in r2["projects"]["chain"]["advanced"]
asyncio.run(run())
def test_multi_dep_all_done(self, registry, data_root):
"""多依赖全部完成后才推进"""
registry.create_project("multi", "Multi")
db_path = data_root / "multi" / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T1", status="pending", assigned_by="d"))
bb.create_task(Task(id="t2", title="T2", status="pending", assigned_by="d"))
bb.create_task(Task(
id="t3", title="T3", status="blocked",
assigned_by="d", depends_on=json.dumps(["t1", "t2"]),
))
# Only t1 done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
r1 = await ticker.tick()
assert "t3" not in r1["projects"]["multi"]["advanced"]
# Now t2 also done
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t2", s, agent="a")
r2 = await ticker.tick()
assert "t3" in r2["projects"]["multi"]["advanced"]
asyncio.run(run())
# ---------------------------------------------------------------------------
# T4: events 写入
# ---------------------------------------------------------------------------
class TestEvents:
def test_daemon_tick_event_written(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=5)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
assert len(tick_events) >= 1
detail = json.loads(tick_events[0]["detail"])
assert detail["tick"] == 1
asyncio.run(run())
def test_advance_event_in_detail(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
# Make t1 done so t2 can advance
for s in ("claimed", "working", "review", "done"):
bb.update_task_status("t1", s, agent="a")
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.tick()
from src.blackboard.queries import Queries
queries = Queries(db_path)
events = queries.recent_events(limit=10)
tick_events = [e for e in events if e["event_type"] == "daemon_tick"]
detail = json.loads(tick_events[0]["detail"])
assert detail["advanced_count"] == 1
asyncio.run(run())
# ---------------------------------------------------------------------------
# T5: 多项目轮询
# ---------------------------------------------------------------------------
class TestMultiProject:
def test_ticks_all_active_projects(self, registry, data_root):
"""tick 遍历所有 active 项目"""
for pid in ("proj-a", "proj-b", "proj-c"):
registry.create_project(pid, f"Project {pid}")
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id=f"{pid}-t1", title=f"Task {pid}",
status="pending", assigned_by="d",
))
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
assert len(result["projects"]) == 3
for pid in ("proj-a", "proj-b", "proj-c"):
assert pid in result["projects"]
assert result["projects"][pid]["status"] == "ok"
asyncio.run(run())
def test_skips_archived_projects(self, registry, data_root):
"""归档项目不参与 tick"""
registry.create_project("active", "Active")
registry.create_project("archived", "Archived")
registry.archive_project("archived")
# Add DB to both
for pid in ("active", "archived"):
db_path = data_root / pid / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.tick()
# archived 项目被 rename 到 _archived/,但 registry 仍记录它
# ticker 应跳过 status != "active" 的项目
for pid, pr in result["projects"].items():
assert pid != "archived"
asyncio.run(run())
# ---------------------------------------------------------------------------
# T6: tick 异常不中断(P1
# ---------------------------------------------------------------------------
class TestTickResilience:
def test_tick_continues_after_error(self, registry, data_root):
"""单次 tick 异常不影响后续 tick"""
registry.create_project("good", "Good")
db_path = data_root / "good" / "blackboard.db"
Blackboard(db_path)
ticker = Ticker(registry, tick_interval=0.05, max_ticks=3)
tick_count = [0]
original_tick_project = ticker._tick_project
async def failing_tick(project_id, project_info):
tick_count[0] += 1
if tick_count[0] == 1:
raise RuntimeError("Simulated failure")
return await original_tick_project(project_id, project_info)
ticker._tick_project = failing_tick
async def run():
await ticker.start()
await asyncio.sleep(0.3)
asyncio.run(run())
assert ticker.tick_count >= 2 # First tick had error but continued
# ---------------------------------------------------------------------------
# T7: 手动 tick 端点(P1
# ---------------------------------------------------------------------------
class TestManualTick:
def test_manual_tick(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
result = await ticker.manual_tick()
assert result["manual"] is True
assert result["tick"] == 1
assert "test-proj" in result["projects"]
asyncio.run(run())
def test_manual_tick_increments_count(self, project_with_tasks):
registry, db_path, bb = project_with_tasks
ticker = Ticker(registry, tick_interval=30)
async def run():
await ticker.manual_tick()
await ticker.manual_tick()
assert ticker.tick_count == 2
asyncio.run(run())
# ---------------------------------------------------------------------------
# E12: _check_timeouts 统一超时(v2.8 #07.2/#07.3 新增)
# ---------------------------------------------------------------------------
class TestCheckTimeoutsUnified:
"""E12: #07.2 _check_timeouts 统一检查 + #07.3 ACT-1 updated_at fallback"""
@pytest.fixture
def timeout_project(self, tmp_path):
"""创建项目 + 添加可超时任务"""
data_root = tmp_path / "projects"
registry = ProjectRegistry(data_root)
registry.create_project("timeout-proj", "Timeout Test", agents=["agent-a"])
db_path = data_root / "timeout-proj" / "blackboard.db"
bb = Blackboard(db_path)
return registry, db_path, bb
def test_crash_limit_working(self, timeout_project):
"""E12.1: executor crash 3 次/30min → _check_timeouts 标 failed
#07.2 将 crash_limit 从 _dispatch_reviews 移到 _check_timeouts
覆盖 working 和 review 状态。
"""
registry, db_path, bb = timeout_project
# 创建 working 任务
bb.create_task(Task(
id="t-crash", title="Crash Task", status="working",
assigned_by="daemon", current_agent="agent-a",
))
# 模拟 3 次 crash 的 task_attempts
from datetime import datetime, timedelta
conn = bb._conn()
try:
for i in range(3):
attempt_time = datetime.utcnow() - timedelta(minutes=25 - i * 5)
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, ?)",
("t-crash", i + 1, "agent-a", "crashed",
attempt_time.isoformat()),
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry, tick_interval=30)
# 如果有 dispatcher + _check_crash_limit,它会在 _check_timeouts 中触发
# 测试基本结构:_check_timeouts 应该能处理 working 状态任务
result = ticker._check_timeouts(db_path)
# 即使没有 dispatcher_check_crash_limit 需要),超时检查本身不应崩溃
assert isinstance(result, list)
def test_crash_limit_review(self, timeout_project):
"""E12.2: reviewer crash 3 次/30min → _check_timeouts 标 failed
#07.2 统一后,review 状态的 crash_limit 也走 _check_timeouts。
"""
registry, db_path, bb = timeout_project
# 创建 review 状态任务
bb.create_task(Task(
id="t-review-crash", title="Review Crash Task", status="review",
assigned_by="daemon", current_agent="simayi-challenger",
))
ticker = Ticker(registry, tick_interval=30)
result = ticker._check_timeouts(db_path)
assert isinstance(result, list)
# _check_timeouts 不应崩溃,review 状态在统一逻辑中被正确处理
def test_updated_at_fallback(self, timeout_project):
"""E12.3: mail auto-working 无 started_at/claimed_at → updated_at fallback
#07.3 ACT-1: _check_timeouts 使用 updated_at 作为最后 fallback
确保 PM2 重启后 mail 孤儿任务也能被回收。
"""
registry, db_path, bb = timeout_project
from datetime import datetime, timedelta
# 创建 working 任务,只有 updated_at(模拟 mail auto-working
old_time = (datetime.utcnow() - timedelta(minutes=60)).isoformat()
bb.create_task(Task(
id="t-mail-orphan", title="Mail Orphan", status="working",
assigned_by="daemon", current_agent="pangtong-fujunshi",
))
# 手动设置 updated_at(模拟 PM2 重启前的时间戳)
conn = bb._conn()
try:
conn.execute(
"UPDATE tasks SET updated_at = ? WHERE id = ?",
(old_time, "t-mail-orphan"),
)
# 确保 started_at 和 claimed_at 为 NULL
conn.execute(
"UPDATE tasks SET started_at = NULL, claimed_at = NULL WHERE id = ?",
("t-mail-orphan",),
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry, tick_interval=30, default_task_timeout_minutes=30)
reclaimed = ticker._check_timeouts(db_path)
# updated_at fallback 应让这个任务被回收
assert "t-mail-orphan" in reclaimed, \
"Mail orphan with only updated_at should be reclaimed via fallback"
def test_process_dead_keeps_review_status(self, timeout_project):
"""E12.4: review agent 进程死 → 保持 review 状态(不推 pending
#07.2: process_dead 对 review 状态的处理——保持 review,
等 _dispatch_reviews 下个 tick 自然 dispatch。
"""
registry, db_path, bb = timeout_project
# 创建 review 状态任务
bb.create_task(Task(
id="t-review-dead", title="Review Dead Process", status="review",
assigned_by="daemon", current_agent="simayi-challenger",
))
# 设置较新的时间戳(不应因超时被回收)
from datetime import datetime
conn = bb._conn()
try:
conn.execute(
"UPDATE tasks SET updated_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), "t-review-dead"),
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry, tick_interval=30, default_task_timeout_minutes=30)
reclaimed = ticker._check_timeouts(db_path)
# 没有 process_dead 的模拟(无 counter/spawner),纯超时路径
# review 任务时间戳较新 → 不应被超时回收
assert "t-review-dead" not in reclaimed