diff --git a/src/daemon/inbox.py b/src/daemon/inbox.py index bdfe490..5d9e596 100644 --- a/src/daemon/inbox.py +++ b/src/daemon/inbox.py @@ -52,7 +52,6 @@ class InboxWatcher: async def start(self) -> None: """启动监听""" - import asyncio if self._running: return self._running = True @@ -62,7 +61,6 @@ class InboxWatcher: async def stop(self) -> None: """停止监听""" - import asyncio self._running = False if self._task: self._task.cancel() @@ -90,7 +88,6 @@ class InboxWatcher: # ------------------------------------------------------------------ async def _loop(self) -> None: - import asyncio while self._running: try: await self.poll() diff --git a/tests/test_inbox.py b/tests/test_inbox.py new file mode 100644 index 0000000..a81dc8e --- /dev/null +++ b/tests/test_inbox.py @@ -0,0 +1,246 @@ +"""F7 Inbox JSONL Watcher 单元测试 + +按测试计划 test-plan-v2.6.md §F7: +- T1: 写入+消费(P0) +- T2: truncate(P0) +- T3: 并发写入(P0) +- T4: 损坏行恢复(P1) +- T5: 空文件处理(P1) +""" + +import asyncio +import json +import threading +import pytest +from pathlib import Path + +from src.daemon.inbox import InboxWatcher + + +@pytest.fixture +def inbox_path(tmp_path): + return tmp_path / "inbox" / "daemon.jsonl" + + +@pytest.fixture +def watcher(inbox_path): + return InboxWatcher(inbox_path, watch_interval=0.05) + + +# --------------------------------------------------------------------------- +# T1: 写入+消费 +# --------------------------------------------------------------------------- + +class TestWriteConsume: + def test_single_event(self, watcher, inbox_path): + """写入单条事件 → 消费到""" + event = {"type": "task_done", "task_id": "t1", "agent": "a"} + watcher.write_event(event) + + events = watcher.consume() + assert len(events) == 1 + assert events[0]["type"] == "task_done" + assert events[0]["task_id"] == "t1" + + def test_multiple_events(self, watcher, inbox_path): + """写入多条事件 → 全部消费到""" + for i in range(5): + watcher.write_event({"type": "progress", "task_id": f"t{i}", "pct": i * 20}) + + events = watcher.consume() + assert len(events) == 5 + assert events[0]["task_id"] == "t0" + assert events[4]["task_id"] == "t4" + + def test_consume_increments_counter(self, watcher, inbox_path): + """total_consumed 递增""" + assert watcher.total_consumed == 0 + watcher.write_event({"type": "test"}) + watcher.consume() + assert watcher.total_consumed == 1 + watcher.write_event({"type": "test2"}) + watcher.write_event({"type": "test3"}) + watcher.consume() + assert watcher.total_consumed == 3 + + def test_consume_empty_file_returns_empty(self, watcher, inbox_path): + """文件不存在时返回空列表""" + assert watcher.consume() == [] + + def test_static_write(self, inbox_path): + """静态方法写入""" + InboxWatcher.write_event_to(inbox_path, {"type": "static"}) + events = InboxWatcher(inbox_path).consume() + assert len(events) == 1 + assert events[0]["type"] == "static" + + +# --------------------------------------------------------------------------- +# T2: truncate +# --------------------------------------------------------------------------- + +class TestTruncate: + def test_file_cleared_after_consume(self, watcher, inbox_path): + """消费后文件被清空""" + watcher.write_event({"type": "test"}) + assert inbox_path.exists() + assert inbox_path.stat().st_size > 0 + + watcher.consume() + + # 文件存在但为空 + assert inbox_path.exists() + assert inbox_path.stat().st_size == 0 + + def test_second_consume_returns_empty(self, watcher, inbox_path): + """二次消费无新事件""" + watcher.write_event({"type": "test"}) + events1 = watcher.consume() + assert len(events1) == 1 + + events2 = watcher.consume() + assert len(events2) == 0 + + def test_write_after_truncate(self, watcher, inbox_path): + """truncate 后可以继续写入""" + watcher.write_event({"type": "first"}) + watcher.consume() + + watcher.write_event({"type": "second"}) + events = watcher.consume() + assert len(events) == 1 + assert events[0]["type"] == "second" + + +# --------------------------------------------------------------------------- +# T3: 并发写入 +# --------------------------------------------------------------------------- + +class TestConcurrentWrite: + def test_multi_agent_concurrent(self, inbox_path): + """多 Agent 同时写入不同事件,全部消费到""" + n_agents = 5 + n_events_per_agent = 10 + barrier = threading.Barrier(n_agents) + + def writer(agent_id): + barrier.wait() + for i in range(n_events_per_agent): + InboxWatcher.write_event_to(inbox_path, { + "agent": agent_id, + "seq": i, + }) + + threads = [threading.Thread(target=writer, args=(f"agent-{j}",)) + for j in range(n_agents)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + watcher = InboxWatcher(inbox_path) + events = watcher.consume() + assert len(events) == n_agents * n_events_per_agent + + # 每个 agent 的事件都到齐 + agents_seen = {e["agent"] for e in events} + assert len(agents_seen) == n_agents + + +# --------------------------------------------------------------------------- +# T4: 损坏行恢复(P1) +# --------------------------------------------------------------------------- + +class TestBadLineRecovery: + def test_skip_invalid_json(self, watcher, inbox_path): + """非法 JSON 行跳过不崩溃""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + with open(inbox_path, "w") as f: + f.write('{"type": "good"}\n') + f.write('not json at all\n') + f.write('{"type": "also_good"}\n') + f.write('{broken json\n') + + events = watcher.consume() + assert len(events) == 2 + assert events[0]["type"] == "good" + assert events[1]["type"] == "also_good" + + def test_non_dict_json_skipped(self, watcher, inbox_path): + """合法 JSON 但非 dict 也跳过""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + with open(inbox_path, "w") as f: + f.write('{"type": "good"}\n') + f.write('[1, 2, 3]\n') + f.write('"just a string"\n') + f.write('42\n') + + events = watcher.consume() + assert len(events) == 1 + assert events[0]["type"] == "good" + + def test_empty_lines_skipped(self, watcher, inbox_path): + """空行不影响""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + with open(inbox_path, "w") as f: + f.write('\n\n{"type": "good"}\n\n\n') + + events = watcher.consume() + assert len(events) == 1 + + +# --------------------------------------------------------------------------- +# T5: 空文件处理(P1) +# --------------------------------------------------------------------------- + +class TestEmptyFile: + def test_empty_file_no_events(self, watcher, inbox_path): + """空文件返回空列表""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + inbox_path.touch() + events = watcher.consume() + assert events == [] + + def test_whitespace_only_file(self, watcher, inbox_path): + """只有空白的文件返回空列表""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + inbox_path.write_text(" \n \n \n") + events = watcher.consume() + assert events == [] + + +# --------------------------------------------------------------------------- +# Watcher 生命周期 +# --------------------------------------------------------------------------- + +class TestWatcherLifecycle: + def test_start_stop(self, watcher, inbox_path): + """可以启动和停止""" + async def run(): + await watcher.start() + assert watcher.is_running + await asyncio.sleep(0.2) + await watcher.stop() + assert not watcher.is_running + + asyncio.run(run()) + + def test_auto_consume_via_callback(self, watcher, inbox_path): + """watcher 循环自动消费并通过回调传递事件""" + collected = [] + + async def on_events(events): + collected.extend(events) + + watcher.on_events = on_events + + async def run(): + await watcher.start() + # 写入事件 + watcher.write_event({"type": "auto"}) + await asyncio.sleep(0.3) # 等待 watcher 消费 + await watcher.stop() + + asyncio.run(run()) + assert len(collected) >= 1 + assert collected[0]["type"] == "auto"