diff --git a/tests/test_inbox.py b/tests/test_inbox.py index 80a345e..c46bce7 100644 --- a/tests/test_inbox.py +++ b/tests/test_inbox.py @@ -1,6 +1,6 @@ """F7 Inbox JSONL Watcher 单元测试 -按 test-plan-v2.6.md §F7: +按测试计划 test-plan-v2.6.md §F7: - T1: 写入+消费(P0) - T2: truncate(P0) - T3: 并发写入(P0) @@ -10,9 +10,9 @@ import asyncio import json +import threading import pytest from pathlib import Path -from typing import Any, Dict, List from src.daemon.inbox import InboxWatcher @@ -32,86 +32,96 @@ def watcher(inbox_path): # --------------------------------------------------------------------------- class TestWriteConsume: - def test_single_event(self, inbox_path, watcher): - """写入一个事件 → poll 消费到""" - event = {"type": "agent_output", "task_id": "t1", "agent": "a1", - "project_id": "p1"} - InboxWatcher.write_event(inbox_path, event) + def test_single_event(self, inbox_path): + """写入单条事件 → 消费到""" + InboxWatcher.write_event(inbox_path, {"type": "task_done", "task_id": "t1", "agent": "a"}) - result = asyncio.run(watcher.poll()) - assert len(result) == 1 - assert result[0]["type"] == "agent_output" - assert result[0]["task_id"] == "t1" + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 1 + assert events[0]["type"] == "task_done" + assert events[0]["task_id"] == "t1" - def test_multiple_events(self, inbox_path, watcher): - """写入多个事件 → 全部消费""" - events = [ - {"type": "agent_claim", "task_id": f"t{i}", "agent": "a1", - "project_id": "p1"} - for i in range(5) - ] - InboxWatcher.write_events(inbox_path, events) + def test_multiple_events(self, inbox_path): + """写入多条事件 → 全部消费到""" + for i in range(5): + InboxWatcher.write_event(inbox_path, {"type": "progress", "task_id": f"t{i}", "pct": i * 20}) - result = asyncio.run(watcher.poll()) - assert len(result) == 5 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 5 + assert events[0]["task_id"] == "t0" + assert events[4]["task_id"] == "t4" - def test_callback_called(self, inbox_path): - """回调函数被调用""" - received: List[Dict] = [] - - async def callback(event): - received.append(event) - - w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) - InboxWatcher.write_event(inbox_path, {"type": "test", "x": 1}) - - asyncio.run(w.poll()) - assert len(received) == 1 - assert received[0]["x"] == 1 - - def test_counter_increments(self, inbox_path, watcher): - """total_processed 计数正确""" + def test_consume_increments_counter(self, inbox_path): + """total_processed 递增""" + watcher = InboxWatcher(inbox_path) assert watcher.total_processed == 0 - InboxWatcher.write_events(inbox_path, [ - {"type": "a"}, {"type": "b"}, {"type": "c"}, - ]) + + InboxWatcher.write_event(inbox_path, {"type": "test"}) + asyncio.run(watcher.poll()) + assert watcher.total_processed == 1 + + InboxWatcher.write_event(inbox_path, {"type": "test2"}) + InboxWatcher.write_event(inbox_path, {"type": "test3"}) asyncio.run(watcher.poll()) assert watcher.total_processed == 3 + def test_consume_no_file_returns_empty(self, inbox_path): + """文件不存在时返回空列表""" + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert events == [] + + def test_batch_write(self, inbox_path): + """批量写入""" + evts = [{"type": "batch", "seq": i} for i in range(10)] + InboxWatcher.write_events(inbox_path, evts) + + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 10 + # --------------------------------------------------------------------------- # T2: truncate # --------------------------------------------------------------------------- class TestTruncate: - def test_file_truncated_after_poll(self, inbox_path, watcher): + def test_file_cleared_after_consume(self, inbox_path): """消费后文件被清空""" InboxWatcher.write_event(inbox_path, {"type": "test"}) assert inbox_path.exists() assert inbox_path.stat().st_size > 0 + watcher = InboxWatcher(inbox_path) asyncio.run(watcher.poll()) - assert inbox_path.exists() # 文件还在 - assert inbox_path.stat().st_size == 0 # 但内容被清空 - def test_second_poll_gets_nothing(self, inbox_path, watcher): - """第二次 poll 无新事件""" + # 文件存在但为空 + assert inbox_path.exists() + assert inbox_path.stat().st_size == 0 + + def test_second_consume_returns_empty(self, inbox_path): + """二次消费无新事件""" InboxWatcher.write_event(inbox_path, {"type": "test"}) - r1 = asyncio.run(watcher.poll()) - assert len(r1) == 1 + watcher = InboxWatcher(inbox_path) - r2 = asyncio.run(watcher.poll()) - assert len(r2) == 0 + events1 = asyncio.run(watcher.poll()) + assert len(events1) == 1 - def test_truncate_then_write(self, inbox_path, watcher): - """truncate 后再写入,能正常消费""" + events2 = asyncio.run(watcher.poll()) + assert len(events2) == 0 + + def test_write_after_truncate(self, inbox_path): + """truncate 后可以继续写入""" InboxWatcher.write_event(inbox_path, {"type": "first"}) + watcher = InboxWatcher(inbox_path) asyncio.run(watcher.poll()) InboxWatcher.write_event(inbox_path, {"type": "second"}) - r = asyncio.run(watcher.poll()) - assert len(r) == 1 - assert r[0]["type"] == "second" + events = asyncio.run(watcher.poll()) + assert len(events) == 1 + assert events[0]["type"] == "second" # --------------------------------------------------------------------------- @@ -119,52 +129,58 @@ class TestTruncate: # --------------------------------------------------------------------------- class TestConcurrentWrite: - def test_concurrent_writers(self, inbox_path, watcher): - """多个模拟 Agent 同时写入不同行""" - import threading + def test_multi_agent_concurrent(self, inbox_path): + """多 Agent 同时写入不同事件,全部消费到""" + n_agents = 5 + n_events_per_agent = 10 + barrier = threading.Barrier(n_agents) - def write_batch(agent_id, count): - for i in range(count): + def writer(agent_id): + barrier.wait() + for i in range(n_events_per_agent): InboxWatcher.write_event(inbox_path, { - "type": "agent_output", "agent": agent_id, - "task_id": f"{agent_id}-t{i}", + "seq": i, }) - threads = [ - threading.Thread(target=write_batch, args=(f"agent-{i}", 10)) - for i in range(3) - ] + threads = [threading.Thread(target=writer, args=(f"agent-{j}",)) + for j in range(n_agents)] for t in threads: t.start() for t in threads: - t.join() + t.join(timeout=5) - result = asyncio.run(watcher.poll()) - assert len(result) == 30 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == n_agents * n_events_per_agent + + # 每个 agent 的事件都到齐 + agents_seen = {e["agent"] for e in events} + assert len(agents_seen) == n_agents # --------------------------------------------------------------------------- # T4: 损坏行恢复(P1) # --------------------------------------------------------------------------- -class TestCorruptRecovery: - def test_invalid_json_skipped(self, inbox_path, watcher): +class TestBadLineRecovery: + def test_skip_invalid_json(self, inbox_path): """非法 JSON 行跳过不崩溃""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: - f.write('{"type": "valid"}\n') + f.write('{"type": "good"}\n') f.write('not json at all\n') - f.write('{"type": "also_valid"}\n') + f.write('{"type": "also_good"}\n') + f.write('{broken json\n') - result = asyncio.run(watcher.poll()) - assert len(result) == 2 - assert result[0]["type"] == "valid" - assert result[1]["type"] == "also_valid" - assert watcher.total_errors == 1 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 2 + assert events[0]["type"] == "good" + assert events[1]["type"] == "also_good" - def test_non_dict_skipped(self, inbox_path, watcher): - """非 dict 类型的 JSON 值跳过""" + def test_non_dict_json_skipped(self, inbox_path): + """合法 JSON 但非 dict 也跳过""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: f.write('{"type": "good"}\n') @@ -172,20 +188,20 @@ class TestCorruptRecovery: f.write('"just a string"\n') f.write('42\n') - result = asyncio.run(watcher.poll()) - assert len(result) == 1 - assert watcher.total_errors >= 1 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 1 + assert events[0]["type"] == "good" - def test_all_corrupt(self, inbox_path, watcher): - """全部损坏行 → 返回空列表,不崩溃""" + def test_empty_lines_skipped(self, inbox_path): + """空行不影响""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: - f.write("bad line 1\n") - f.write("bad line 2\n") + f.write('\n\n{"type": "good"}\n\n\n') - result = asyncio.run(watcher.poll()) - assert len(result) == 0 - assert watcher.total_errors == 2 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert len(events) == 1 # --------------------------------------------------------------------------- @@ -193,24 +209,21 @@ class TestCorruptRecovery: # --------------------------------------------------------------------------- class TestEmptyFile: - def test_nonexistent_file(self, inbox_path, watcher): - """文件不存在 → 返回空""" - result = asyncio.run(watcher.poll()) - assert len(result) == 0 - - def test_empty_file(self, inbox_path, watcher): - """空文件 → 返回空""" + def test_empty_file_no_events(self, inbox_path): + """空文件返回空列表""" inbox_path.parent.mkdir(parents=True, exist_ok=True) inbox_path.touch() - result = asyncio.run(watcher.poll()) - assert len(result) == 0 + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert events == [] - def test_whitespace_only_file(self, inbox_path, watcher): - """只有空白字符的文件 → 返回空""" + def test_whitespace_only_file(self, inbox_path): + """只有空白的文件返回空列表""" inbox_path.parent.mkdir(parents=True, exist_ok=True) - inbox_path.write_text(" \n \n\n ") - result = asyncio.run(watcher.poll()) - assert len(result) == 0 + inbox_path.write_text(" \n \n \n") + watcher = InboxWatcher(inbox_path) + events = asyncio.run(watcher.poll()) + assert events == [] # --------------------------------------------------------------------------- @@ -218,38 +231,33 @@ class TestEmptyFile: # --------------------------------------------------------------------------- class TestWatcherLifecycle: - def test_start_stop(self, inbox_path): - """watcher 可以启动和停止""" - w = InboxWatcher(inbox_path, watch_interval=0.05) - assert not w.is_running - + def test_start_stop(self, watcher, inbox_path): + """可以启动和停止""" async def run(): - await w.start() - assert w.is_running - await asyncio.sleep(0.1) - await w.stop() - - asyncio.run(run()) - assert not w.is_running - - def test_auto_consumes_events(self, inbox_path): - """watcher 运行期间自动消费事件""" - received: List[Dict] = [] - - async def callback(event): - received.append(event) - - w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) - - async def run(): - await w.start() - # 等一下让 watcher 启动 - await asyncio.sleep(0.1) - # 写入事件 - InboxWatcher.write_event(inbox_path, {"type": "test", "seq": 1}) - # 等一下让 watcher 消费 + await watcher.start() + assert watcher.is_running await asyncio.sleep(0.2) - await w.stop() + await watcher.stop() + assert not watcher.is_running asyncio.run(run()) - assert len(received) >= 1 + + def test_auto_consume_via_callback(self, inbox_path): + """watcher 循环自动消费并通过回调传递事件""" + collected = [] + + async def on_event(event): + collected.append(event) + + watcher = InboxWatcher(inbox_path, process_callback=on_event, watch_interval=0.05) + + async def run(): + await watcher.start() + # 写入事件 + InboxWatcher.write_event(inbox_path, {"type": "auto"}) + await asyncio.sleep(0.3) # 等待 watcher 消费 + await watcher.stop() + + asyncio.run(run()) + assert len(collected) >= 1 + assert collected[0]["type"] == "auto"