From 32b4afa1bdf20c3a9a78a1e0b66c67b472660a29 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 05:53:43 +0800 Subject: [PATCH] auto-sync: 2026-05-17 05:53:43 --- tests/test_inbox.py | 261 +++++++++++++++++++++++--------------------- 1 file changed, 135 insertions(+), 126 deletions(-) diff --git a/tests/test_inbox.py b/tests/test_inbox.py index a81dc8e..80a345e 100644 --- a/tests/test_inbox.py +++ b/tests/test_inbox.py @@ -1,6 +1,6 @@ """F7 Inbox JSONL Watcher 单元测试 -按测试计划 test-plan-v2.6.md §F7: +按 test-plan-v2.6.md §F7: - T1: 写入+消费(P0) - T2: truncate(P0) - T3: 并发写入(P0) @@ -10,9 +10,9 @@ import asyncio import json -import threading import pytest from pathlib import Path +from typing import Any, Dict, List from src.daemon.inbox import InboxWatcher @@ -32,47 +32,51 @@ def watcher(inbox_path): # --------------------------------------------------------------------------- class TestWriteConsume: - def test_single_event(self, watcher, inbox_path): - """写入单条事件 → 消费到""" - event = {"type": "task_done", "task_id": "t1", "agent": "a"} - watcher.write_event(event) + def test_single_event(self, inbox_path, watcher): + """写入一个事件 → poll 消费到""" + event = {"type": "agent_output", "task_id": "t1", "agent": "a1", + "project_id": "p1"} + InboxWatcher.write_event(inbox_path, event) - events = watcher.consume() - assert len(events) == 1 - assert events[0]["type"] == "task_done" - assert events[0]["task_id"] == "t1" + result = asyncio.run(watcher.poll()) + assert len(result) == 1 + assert result[0]["type"] == "agent_output" + assert result[0]["task_id"] == "t1" - def test_multiple_events(self, watcher, inbox_path): - """写入多条事件 → 全部消费到""" - for i in range(5): - watcher.write_event({"type": "progress", "task_id": f"t{i}", "pct": i * 20}) + def test_multiple_events(self, inbox_path, watcher): + """写入多个事件 → 全部消费""" + events = [ + {"type": "agent_claim", "task_id": f"t{i}", "agent": "a1", + "project_id": "p1"} + for i in range(5) + ] + InboxWatcher.write_events(inbox_path, events) - events = watcher.consume() - assert len(events) == 5 - assert events[0]["task_id"] == "t0" - assert events[4]["task_id"] == "t4" + result = asyncio.run(watcher.poll()) + assert len(result) == 5 - def test_consume_increments_counter(self, watcher, inbox_path): - """total_consumed 递增""" - assert watcher.total_consumed == 0 - watcher.write_event({"type": "test"}) - watcher.consume() - assert watcher.total_consumed == 1 - watcher.write_event({"type": "test2"}) - watcher.write_event({"type": "test3"}) - watcher.consume() - assert watcher.total_consumed == 3 + def test_callback_called(self, inbox_path): + """回调函数被调用""" + received: List[Dict] = [] - def test_consume_empty_file_returns_empty(self, watcher, inbox_path): - """文件不存在时返回空列表""" - assert watcher.consume() == [] + async def callback(event): + received.append(event) - def test_static_write(self, inbox_path): - """静态方法写入""" - InboxWatcher.write_event_to(inbox_path, {"type": "static"}) - events = InboxWatcher(inbox_path).consume() - assert len(events) == 1 - assert events[0]["type"] == "static" + w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) + InboxWatcher.write_event(inbox_path, {"type": "test", "x": 1}) + + asyncio.run(w.poll()) + assert len(received) == 1 + assert received[0]["x"] == 1 + + def test_counter_increments(self, inbox_path, watcher): + """total_processed 计数正确""" + assert watcher.total_processed == 0 + InboxWatcher.write_events(inbox_path, [ + {"type": "a"}, {"type": "b"}, {"type": "c"}, + ]) + asyncio.run(watcher.poll()) + assert watcher.total_processed == 3 # --------------------------------------------------------------------------- @@ -80,36 +84,34 @@ class TestWriteConsume: # --------------------------------------------------------------------------- class TestTruncate: - def test_file_cleared_after_consume(self, watcher, inbox_path): + def test_file_truncated_after_poll(self, inbox_path, watcher): """消费后文件被清空""" - watcher.write_event({"type": "test"}) + InboxWatcher.write_event(inbox_path, {"type": "test"}) assert inbox_path.exists() assert inbox_path.stat().st_size > 0 - watcher.consume() + asyncio.run(watcher.poll()) + assert inbox_path.exists() # 文件还在 + assert inbox_path.stat().st_size == 0 # 但内容被清空 - # 文件存在但为空 - assert inbox_path.exists() - assert inbox_path.stat().st_size == 0 + def test_second_poll_gets_nothing(self, inbox_path, watcher): + """第二次 poll 无新事件""" + InboxWatcher.write_event(inbox_path, {"type": "test"}) + r1 = asyncio.run(watcher.poll()) + assert len(r1) == 1 - def test_second_consume_returns_empty(self, watcher, inbox_path): - """二次消费无新事件""" - watcher.write_event({"type": "test"}) - events1 = watcher.consume() - assert len(events1) == 1 + r2 = asyncio.run(watcher.poll()) + assert len(r2) == 0 - events2 = watcher.consume() - assert len(events2) == 0 + def test_truncate_then_write(self, inbox_path, watcher): + """truncate 后再写入,能正常消费""" + InboxWatcher.write_event(inbox_path, {"type": "first"}) + asyncio.run(watcher.poll()) - def test_write_after_truncate(self, watcher, inbox_path): - """truncate 后可以继续写入""" - watcher.write_event({"type": "first"}) - watcher.consume() - - watcher.write_event({"type": "second"}) - events = watcher.consume() - assert len(events) == 1 - assert events[0]["type"] == "second" + InboxWatcher.write_event(inbox_path, {"type": "second"}) + r = asyncio.run(watcher.poll()) + assert len(r) == 1 + assert r[0]["type"] == "second" # --------------------------------------------------------------------------- @@ -117,57 +119,52 @@ class TestTruncate: # --------------------------------------------------------------------------- class TestConcurrentWrite: - def test_multi_agent_concurrent(self, inbox_path): - """多 Agent 同时写入不同事件,全部消费到""" - n_agents = 5 - n_events_per_agent = 10 - barrier = threading.Barrier(n_agents) + def test_concurrent_writers(self, inbox_path, watcher): + """多个模拟 Agent 同时写入不同行""" + import threading - def writer(agent_id): - barrier.wait() - for i in range(n_events_per_agent): - InboxWatcher.write_event_to(inbox_path, { + def write_batch(agent_id, count): + for i in range(count): + InboxWatcher.write_event(inbox_path, { + "type": "agent_output", "agent": agent_id, - "seq": i, + "task_id": f"{agent_id}-t{i}", }) - threads = [threading.Thread(target=writer, args=(f"agent-{j}",)) - for j in range(n_agents)] + threads = [ + threading.Thread(target=write_batch, args=(f"agent-{i}", 10)) + for i in range(3) + ] for t in threads: t.start() for t in threads: - t.join(timeout=5) + t.join() - watcher = InboxWatcher(inbox_path) - events = watcher.consume() - assert len(events) == n_agents * n_events_per_agent - - # 每个 agent 的事件都到齐 - agents_seen = {e["agent"] for e in events} - assert len(agents_seen) == n_agents + result = asyncio.run(watcher.poll()) + assert len(result) == 30 # --------------------------------------------------------------------------- # T4: 损坏行恢复(P1) # --------------------------------------------------------------------------- -class TestBadLineRecovery: - def test_skip_invalid_json(self, watcher, inbox_path): +class TestCorruptRecovery: + def test_invalid_json_skipped(self, inbox_path, watcher): """非法 JSON 行跳过不崩溃""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: - f.write('{"type": "good"}\n') + f.write('{"type": "valid"}\n') f.write('not json at all\n') - f.write('{"type": "also_good"}\n') - f.write('{broken json\n') + f.write('{"type": "also_valid"}\n') - events = watcher.consume() - assert len(events) == 2 - assert events[0]["type"] == "good" - assert events[1]["type"] == "also_good" + result = asyncio.run(watcher.poll()) + assert len(result) == 2 + assert result[0]["type"] == "valid" + assert result[1]["type"] == "also_valid" + assert watcher.total_errors == 1 - def test_non_dict_json_skipped(self, watcher, inbox_path): - """合法 JSON 但非 dict 也跳过""" + def test_non_dict_skipped(self, inbox_path, watcher): + """非 dict 类型的 JSON 值跳过""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: f.write('{"type": "good"}\n') @@ -175,18 +172,20 @@ class TestBadLineRecovery: f.write('"just a string"\n') f.write('42\n') - events = watcher.consume() - assert len(events) == 1 - assert events[0]["type"] == "good" + result = asyncio.run(watcher.poll()) + assert len(result) == 1 + assert watcher.total_errors >= 1 - def test_empty_lines_skipped(self, watcher, inbox_path): - """空行不影响""" + def test_all_corrupt(self, inbox_path, watcher): + """全部损坏行 → 返回空列表,不崩溃""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: - f.write('\n\n{"type": "good"}\n\n\n') + f.write("bad line 1\n") + f.write("bad line 2\n") - events = watcher.consume() - assert len(events) == 1 + result = asyncio.run(watcher.poll()) + assert len(result) == 0 + assert watcher.total_errors == 2 # --------------------------------------------------------------------------- @@ -194,19 +193,24 @@ class TestBadLineRecovery: # --------------------------------------------------------------------------- class TestEmptyFile: - def test_empty_file_no_events(self, watcher, inbox_path): - """空文件返回空列表""" + def test_nonexistent_file(self, inbox_path, watcher): + """文件不存在 → 返回空""" + result = asyncio.run(watcher.poll()) + assert len(result) == 0 + + def test_empty_file(self, inbox_path, watcher): + """空文件 → 返回空""" inbox_path.parent.mkdir(parents=True, exist_ok=True) inbox_path.touch() - events = watcher.consume() - assert events == [] + result = asyncio.run(watcher.poll()) + assert len(result) == 0 - def test_whitespace_only_file(self, watcher, inbox_path): - """只有空白的文件返回空列表""" + def test_whitespace_only_file(self, inbox_path, watcher): + """只有空白字符的文件 → 返回空""" inbox_path.parent.mkdir(parents=True, exist_ok=True) - inbox_path.write_text(" \n \n \n") - events = watcher.consume() - assert events == [] + inbox_path.write_text(" \n \n\n ") + result = asyncio.run(watcher.poll()) + assert len(result) == 0 # --------------------------------------------------------------------------- @@ -214,33 +218,38 @@ class TestEmptyFile: # --------------------------------------------------------------------------- class TestWatcherLifecycle: - def test_start_stop(self, watcher, inbox_path): - """可以启动和停止""" + def test_start_stop(self, inbox_path): + """watcher 可以启动和停止""" + w = InboxWatcher(inbox_path, watch_interval=0.05) + assert not w.is_running + async def run(): - await watcher.start() - assert watcher.is_running - await asyncio.sleep(0.2) - await watcher.stop() - assert not watcher.is_running + await w.start() + assert w.is_running + await asyncio.sleep(0.1) + await w.stop() asyncio.run(run()) + assert not w.is_running - def test_auto_consume_via_callback(self, watcher, inbox_path): - """watcher 循环自动消费并通过回调传递事件""" - collected = [] + def test_auto_consumes_events(self, inbox_path): + """watcher 运行期间自动消费事件""" + received: List[Dict] = [] - async def on_events(events): - collected.extend(events) + async def callback(event): + received.append(event) - watcher.on_events = on_events + w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) async def run(): - await watcher.start() + await w.start() + # 等一下让 watcher 启动 + await asyncio.sleep(0.1) # 写入事件 - watcher.write_event({"type": "auto"}) - await asyncio.sleep(0.3) # 等待 watcher 消费 - await watcher.stop() + InboxWatcher.write_event(inbox_path, {"type": "test", "seq": 1}) + # 等一下让 watcher 消费 + await asyncio.sleep(0.2) + await w.stop() asyncio.run(run()) - assert len(collected) >= 1 - assert collected[0]["type"] == "auto" + assert len(received) >= 1