Files
sanguo_moziplus_v2/tests/integration/test_dispatcher_integration.py
2026-06-05 11:03:30 +08:00

129 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent 调度器集成测试 — rollback/on_complete DB 交互"""
import asyncio
import json
import pytest
from pathlib import Path
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.daemon.dispatcher import Dispatcher
pytestmark = pytest.mark.integration
# ---------------------------------------------------------------------------
# 司马懿评审补充:_rollback_current_agent + on_complete 统一(v2.8 #07.2
# ---------------------------------------------------------------------------
class TestRollbackAndOnComplete:
"""司马懿评审遗漏 #3 + #4: crash 后 current_agent 回退 + on_complete 统一路径"""
def test_rollback_current_agent_on_crash(self, tmp_path):
"""executor crash → _rollback_current_agent 回退 current_agent → assignee"""
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent, assignee FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == row["assignee"] == "zhangfei-dev"
def test_rollback_different_agent(self, tmp_path):
"""current_agent ≠ agent_id → 不回退(安全检查)"""
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
dispatcher._rollback_current_agent(db_path, "t1", "wrong-agent")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == "simayi-challenger"
def test_on_complete_crash_rollback_executor(self, tmp_path):
"""executor crash → rollback current_agent + _task_auto_complete(标 review"""
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == "zhangfei-dev"
def test_on_complete_crash_rollback_review(self, tmp_path):
"""review crash → rollback current_agent + 保持 review 状态"""
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="review",
assigned_by="daemon", assignee="zhangfei-dev",
))
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["simayi-challenger"])
dispatcher._rollback_current_agent(db_path, "t1", "simayi-challenger")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent, status FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == "zhangfei-dev"
assert row["status"] == "review"