auto-sync: 2026-06-05 13:28:23

This commit is contained in:
cfdaily
2026-06-05 13:28:23 +08:00
parent 3e8e9e6d9c
commit 204b3a6640
+241
View File
@@ -0,0 +1,241 @@
"""Broadcast claim 流程单元测试
覆盖 ticker.py 中的 record_broadcast_response 和 BroadcastRound 轮次逻辑。
场景 B1-B8。
"""
import pytest
from unittest.mock import MagicMock, patch
pytestmark = pytest.mark.unit
from src.daemon.ticker import Ticker, BroadcastRound
# ---------------------------------------------------------------------------
# 辅助:构造最小 Ticker(只初始化 _broadcast_tracker
# ---------------------------------------------------------------------------
def _make_minimal_ticker() -> MagicMock:
"""创建只包含 _broadcast_tracker 的 Ticker mock,绑定真实 record_broadcast_response"""
ticker = MagicMock(spec=Ticker)
ticker._broadcast_tracker = {}
# 绑定真实实例方法
ticker.record_broadcast_response = Ticker.record_broadcast_response.__get__(ticker, Ticker)
return ticker
def _make_round(task_id: str = "task-1",
notified: set | None = None,
responded: set | None = None,
round_number: int = 0) -> BroadcastRound:
"""快速构造 BroadcastRound"""
br = BroadcastRound(task_id=task_id)
br.notified_agents = notified if notified is not None else set()
br.responded_agents = responded if responded is not None else set()
br.round_number = round_number
return br
# ===================================================================
# B1: agent claimed → tracker 被清理
# ===================================================================
class TestClaimedClearsTracker:
def test_claimed_removes_tracker(self):
"""B1: outcome='claimed' 时 tracker 从 dict 中移除"""
ticker = _make_minimal_ticker()
ticker._broadcast_tracker["task-1"] = _make_round("task-1")
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
def test_claimed_ignores_responded(self):
"""B1 补充:claimed 不往 responded_agents 添加"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
# ===================================================================
# B2: agent no_reply → responded 增加
# ===================================================================
class TestNoReplyAddsResponded:
def test_no_reply_adds_agent(self):
"""B2: outcome != 'claimed' 时 agent 加入 responded_agents"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert "agent-a" in tracker.responded_agents
assert "task-1" in ticker._broadcast_tracker # 未清理
def test_rejected_adds_agent(self):
"""B2 补充:'rejected' 也加入 responded"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "rejected")
assert "agent-a" in tracker.responded_agents
# ===================================================================
# B3: 所有 notified 都 responded → round_number +1
# ===================================================================
class TestAllRespondedRoundCompletes:
def test_all_responded_advances_round(self):
"""B3: notified_agents ⊆ responded_agents 时 round_number += 1,清空 sets"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert tracker.round_number == 1
assert len(tracker.notified_agents) == 0
assert len(tracker.responded_agents) == 0
def test_superset_responded_also_completes(self):
"""B3 补充:responded 超过 notified 也完成轮次"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
# agent-a responded → matched → round completes
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 1
# ===================================================================
# B4: 部分响应 → round 不结束
# ===================================================================
class TestPartialResponseNoRoundAdvance:
def test_partial_response_keeps_round(self):
"""B4: 只有一个 agent 响应,另一个未响应 → round 不变"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b", "agent-c"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 0
assert "agent-a" in tracker.responded_agents
assert "task-1" in ticker._broadcast_tracker
# ===================================================================
# B5: 不存在的 task_id → 静默返回
# ===================================================================
class TestMissingTaskIdSilent:
def test_missing_task_no_error(self):
"""B5: task_id 不在 tracker 中 → 不抛异常"""
ticker = _make_minimal_ticker()
# 不应抛异常
ticker.record_broadcast_response("nonexistent-task", "agent-a", "claimed")
ticker.record_broadcast_response("nonexistent-task", "agent-a", "no_reply")
assert len(ticker._broadcast_tracker) == 0
# ===================================================================
# B6: round 2 正常完成
# ===================================================================
class TestRoundTwoCompletes:
def test_second_round_completes(self):
"""B6: 完成 round 1 后再完成 round 2"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
# Round 1
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 1
# Simulate: 下一轮 notified 被重新填充
tracker.notified_agents = {"agent-b"}
# Round 2
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert tracker.round_number == 2
def test_three_rounds_sequentially(self):
"""B6 补充:连续三轮"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
for expected_round in range(1, 4):
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == expected_round
# 重新填充 notified(模拟下一轮广播)
tracker.notified_agents = {"agent-a"}
# ===================================================================
# B7: 已 claimed 后其他 agent 仍回调 → 不影响
# ===================================================================
class TestClaimedThenOtherCallback:
def test_other_agent_callback_after_claimed(self):
"""B7: agent-a 认领后 agent-b 回调 → tracker 已不存在,静默返回"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
# agent-a claimed → tracker removed
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
# agent-b late callback → no crash
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert "task-1" not in ticker._broadcast_tracker
# ===================================================================
# B8: 3 轮后 _broadcast_claim 标记 escalated
# ===================================================================
class TestEscalationAfterThreeRounds:
def test_round_3_triggers_escalation(self):
"""B8: round_number >= 3 时 _broadcast_claim 将任务放入 escalated 列表
验证 tracker.round_number=3 时,任务在 _broadcast_claim 内部
被识别为 escalatedtracker 被清理)。
由于 _broadcast_claim 依赖大量基础设施,这里只验证判断条件。
"""
ticker = _make_minimal_ticker()
# 模拟 round_number = 3 的 tracker
tracker = _make_round("task-1", round_number=3)
ticker._broadcast_tracker["task-1"] = tracker
# 验证条件:round_number >= 3 应被视为 escalated
assert tracker.round_number >= 3
# 模拟 _broadcast_claim 的分支逻辑
escalated_ids = []
broadcastable_ids = []
for tid, tr in ticker._broadcast_tracker.items():
if tr.round_number >= 3:
escalated_ids.append(tid)
else:
broadcastable_ids.append(tid)
assert "task-1" in escalated_ids
assert "task-1" not in broadcastable_ids