Files
sanguo_moziplus_v2/tests/unit/test_broadcast_claim.py
T
2026-06-05 13:29:53 +08:00

243 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Broadcast claim 流程单元测试
覆盖 ticker.py 中的 record_broadcast_response 和 BroadcastRound 轮次逻辑。
场景 B1-B8。
"""
import pytest
from unittest.mock import MagicMock, patch
pytestmark = pytest.mark.unit
from src.daemon.ticker import Ticker, BroadcastRound
# ---------------------------------------------------------------------------
# 辅助:构造最小 Ticker(只初始化 _broadcast_tracker
# ---------------------------------------------------------------------------
def _make_minimal_ticker() -> MagicMock:
"""创建只包含 _broadcast_tracker 的 Ticker mock,绑定真实 record_broadcast_response"""
ticker = MagicMock(spec=Ticker)
ticker._broadcast_tracker = {}
# 绑定真实实例方法
ticker.record_broadcast_response = Ticker.record_broadcast_response.__get__(ticker, Ticker)
return ticker
def _make_round(task_id: str = "task-1",
notified: set = None,
responded: set = None,
round_number: int = 0) -> BroadcastRound:
"""快速构造 BroadcastRound"""
br = BroadcastRound(task_id=task_id)
br.notified_agents = notified if notified is not None else set()
br.responded_agents = responded if responded is not None else set()
br.round_number = round_number
return br
# ===================================================================
# B1: agent claimed → tracker 被清理
# ===================================================================
class TestClaimedClearsTracker:
def test_claimed_removes_tracker(self):
"""B1: outcome='claimed' 时 tracker 从 dict 中移除"""
ticker = _make_minimal_ticker()
ticker._broadcast_tracker["task-1"] = _make_round("task-1")
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
def test_claimed_ignores_responded(self):
"""B1 补充:claimed 不往 responded_agents 添加"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
# ===================================================================
# B2: agent no_reply → responded 增加
# ===================================================================
class TestNoReplyAddsResponded:
def test_no_reply_adds_agent(self):
"""B2: outcome != 'claimed' 时 agent 加入 responded_agents"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert "agent-a" in tracker.responded_agents
assert "task-1" in ticker._broadcast_tracker # 未清理
def test_rejected_adds_agent(self):
"""B2 补充:'rejected' 也加入 responded(多 agent 避免轮次结束清空)"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "rejected")
assert "agent-a" in tracker.responded_agents
assert tracker.round_number == 0 # 还没完成一轮
# ===================================================================
# B3: 所有 notified 都 responded → round_number +1
# ===================================================================
class TestAllRespondedRoundCompletes:
def test_all_responded_advances_round(self):
"""B3: notified_agents ⊆ responded_agents 时 round_number += 1,清空 sets"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert tracker.round_number == 1
assert len(tracker.notified_agents) == 0
assert len(tracker.responded_agents) == 0
def test_superset_responded_also_completes(self):
"""B3 补充:responded 超过 notified 也完成轮次"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
# agent-a responded → matched → round completes
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 1
# ===================================================================
# B4: 部分响应 → round 不结束
# ===================================================================
class TestPartialResponseNoRoundAdvance:
def test_partial_response_keeps_round(self):
"""B4: 只有一个 agent 响应,另一个未响应 → round 不变"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b", "agent-c"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 0
assert "agent-a" in tracker.responded_agents
assert "task-1" in ticker._broadcast_tracker
# ===================================================================
# B5: 不存在的 task_id → 静默返回
# ===================================================================
class TestMissingTaskIdSilent:
def test_missing_task_no_error(self):
"""B5: task_id 不在 tracker 中 → 不抛异常"""
ticker = _make_minimal_ticker()
# 不应抛异常
ticker.record_broadcast_response("nonexistent-task", "agent-a", "claimed")
ticker.record_broadcast_response("nonexistent-task", "agent-a", "no_reply")
assert len(ticker._broadcast_tracker) == 0
# ===================================================================
# B6: round 2 正常完成
# ===================================================================
class TestRoundTwoCompletes:
def test_second_round_completes(self):
"""B6: 完成 round 1 后再完成 round 2"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
# Round 1
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == 1
# Simulate: 下一轮 notified 被重新填充
tracker.notified_agents = {"agent-b"}
# Round 2
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert tracker.round_number == 2
def test_three_rounds_sequentially(self):
"""B6 补充:连续三轮"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a"}, round_number=0)
ticker._broadcast_tracker["task-1"] = tracker
for expected_round in range(1, 4):
ticker.record_broadcast_response("task-1", "agent-a", "no_reply")
assert tracker.round_number == expected_round
# 重新填充 notified(模拟下一轮广播)
tracker.notified_agents = {"agent-a"}
# ===================================================================
# B7: 已 claimed 后其他 agent 仍回调 → 不影响
# ===================================================================
class TestClaimedThenOtherCallback:
def test_other_agent_callback_after_claimed(self):
"""B7: agent-a 认领后 agent-b 回调 → tracker 已不存在,静默返回"""
ticker = _make_minimal_ticker()
tracker = _make_round("task-1", notified={"agent-a", "agent-b"})
ticker._broadcast_tracker["task-1"] = tracker
# agent-a claimed → tracker removed
ticker.record_broadcast_response("task-1", "agent-a", "claimed")
assert "task-1" not in ticker._broadcast_tracker
# agent-b late callback → no crash
ticker.record_broadcast_response("task-1", "agent-b", "no_reply")
assert "task-1" not in ticker._broadcast_tracker
# ===================================================================
# B8: 3 轮后 _broadcast_claim 标记 escalated
# ===================================================================
class TestEscalationAfterThreeRounds:
def test_round_3_triggers_escalation(self):
"""B8: round_number >= 3 时 _broadcast_claim 将任务放入 escalated 列表
验证 tracker.round_number=3 时,任务在 _broadcast_claim 内部
被识别为 escalatedtracker 被清理)。
由于 _broadcast_claim 依赖大量基础设施,这里只验证判断条件。
"""
ticker = _make_minimal_ticker()
# 模拟 round_number = 3 的 tracker
tracker = _make_round("task-1", round_number=3)
ticker._broadcast_tracker["task-1"] = tracker
# 验证条件:round_number >= 3 应被视为 escalated
assert tracker.round_number >= 3
# 模拟 _broadcast_claim 的分支逻辑
escalated_ids = []
broadcastable_ids = []
for tid, tr in ticker._broadcast_tracker.items():
if tr.round_number >= 3:
escalated_ids.append(tid)
else:
broadcastable_ids.append(tid)
assert "task-1" in escalated_ids
assert "task-1" not in broadcastable_ids