"""F7 Inbox JSONL Watcher 单元测试 按 test-plan-v2.6.md §F7: - T1: 写入+消费(P0) - T2: truncate(P0) - T3: 并发写入(P0) - T4: 损坏行恢复(P1) - T5: 空文件处理(P1) """ import asyncio import json import pytest from pathlib import Path from typing import Any, Dict, List from src.daemon.inbox import InboxWatcher @pytest.fixture def inbox_path(tmp_path): return tmp_path / "inbox" / "daemon.jsonl" @pytest.fixture def watcher(inbox_path): return InboxWatcher(inbox_path, watch_interval=0.05) # --------------------------------------------------------------------------- # T1: 写入+消费 # --------------------------------------------------------------------------- class TestWriteConsume: def test_single_event(self, inbox_path, watcher): """写入一个事件 → poll 消费到""" event = {"type": "agent_output", "task_id": "t1", "agent": "a1", "project_id": "p1"} InboxWatcher.write_event(inbox_path, event) result = asyncio.run(watcher.poll()) assert len(result) == 1 assert result[0]["type"] == "agent_output" assert result[0]["task_id"] == "t1" def test_multiple_events(self, inbox_path, watcher): """写入多个事件 → 全部消费""" events = [ {"type": "agent_claim", "task_id": f"t{i}", "agent": "a1", "project_id": "p1"} for i in range(5) ] InboxWatcher.write_events(inbox_path, events) result = asyncio.run(watcher.poll()) assert len(result) == 5 def test_callback_called(self, inbox_path): """回调函数被调用""" received: List[Dict] = [] async def callback(event): received.append(event) w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) InboxWatcher.write_event(inbox_path, {"type": "test", "x": 1}) asyncio.run(w.poll()) assert len(received) == 1 assert received[0]["x"] == 1 def test_counter_increments(self, inbox_path, watcher): """total_processed 计数正确""" assert watcher.total_processed == 0 InboxWatcher.write_events(inbox_path, [ {"type": "a"}, {"type": "b"}, {"type": "c"}, ]) asyncio.run(watcher.poll()) assert watcher.total_processed == 3 # --------------------------------------------------------------------------- # T2: truncate # --------------------------------------------------------------------------- class TestTruncate: def test_file_truncated_after_poll(self, inbox_path, watcher): """消费后文件被清空""" InboxWatcher.write_event(inbox_path, {"type": "test"}) assert inbox_path.exists() assert inbox_path.stat().st_size > 0 asyncio.run(watcher.poll()) assert inbox_path.exists() # 文件还在 assert inbox_path.stat().st_size == 0 # 但内容被清空 def test_second_poll_gets_nothing(self, inbox_path, watcher): """第二次 poll 无新事件""" InboxWatcher.write_event(inbox_path, {"type": "test"}) r1 = asyncio.run(watcher.poll()) assert len(r1) == 1 r2 = asyncio.run(watcher.poll()) assert len(r2) == 0 def test_truncate_then_write(self, inbox_path, watcher): """truncate 后再写入,能正常消费""" InboxWatcher.write_event(inbox_path, {"type": "first"}) asyncio.run(watcher.poll()) InboxWatcher.write_event(inbox_path, {"type": "second"}) r = asyncio.run(watcher.poll()) assert len(r) == 1 assert r[0]["type"] == "second" # --------------------------------------------------------------------------- # T3: 并发写入 # --------------------------------------------------------------------------- class TestConcurrentWrite: def test_concurrent_writers(self, inbox_path, watcher): """多个模拟 Agent 同时写入不同行""" import threading def write_batch(agent_id, count): for i in range(count): InboxWatcher.write_event(inbox_path, { "type": "agent_output", "agent": agent_id, "task_id": f"{agent_id}-t{i}", }) threads = [ threading.Thread(target=write_batch, args=(f"agent-{i}", 10)) for i in range(3) ] for t in threads: t.start() for t in threads: t.join() result = asyncio.run(watcher.poll()) assert len(result) == 30 # --------------------------------------------------------------------------- # T4: 损坏行恢复(P1) # --------------------------------------------------------------------------- class TestCorruptRecovery: def test_invalid_json_skipped(self, inbox_path, watcher): """非法 JSON 行跳过不崩溃""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: f.write('{"type": "valid"}\n') f.write('not json at all\n') f.write('{"type": "also_valid"}\n') result = asyncio.run(watcher.poll()) assert len(result) == 2 assert result[0]["type"] == "valid" assert result[1]["type"] == "also_valid" assert watcher.total_errors == 1 def test_non_dict_skipped(self, inbox_path, watcher): """非 dict 类型的 JSON 值跳过""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: f.write('{"type": "good"}\n') f.write('[1, 2, 3]\n') f.write('"just a string"\n') f.write('42\n') result = asyncio.run(watcher.poll()) assert len(result) == 1 assert watcher.total_errors >= 1 def test_all_corrupt(self, inbox_path, watcher): """全部损坏行 → 返回空列表,不崩溃""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "w") as f: f.write("bad line 1\n") f.write("bad line 2\n") result = asyncio.run(watcher.poll()) assert len(result) == 0 assert watcher.total_errors == 2 # --------------------------------------------------------------------------- # T5: 空文件处理(P1) # --------------------------------------------------------------------------- class TestEmptyFile: def test_nonexistent_file(self, inbox_path, watcher): """文件不存在 → 返回空""" result = asyncio.run(watcher.poll()) assert len(result) == 0 def test_empty_file(self, inbox_path, watcher): """空文件 → 返回空""" inbox_path.parent.mkdir(parents=True, exist_ok=True) inbox_path.touch() result = asyncio.run(watcher.poll()) assert len(result) == 0 def test_whitespace_only_file(self, inbox_path, watcher): """只有空白字符的文件 → 返回空""" inbox_path.parent.mkdir(parents=True, exist_ok=True) inbox_path.write_text(" \n \n\n ") result = asyncio.run(watcher.poll()) assert len(result) == 0 # --------------------------------------------------------------------------- # Watcher 生命周期 # --------------------------------------------------------------------------- class TestWatcherLifecycle: def test_start_stop(self, inbox_path): """watcher 可以启动和停止""" w = InboxWatcher(inbox_path, watch_interval=0.05) assert not w.is_running async def run(): await w.start() assert w.is_running await asyncio.sleep(0.1) await w.stop() asyncio.run(run()) assert not w.is_running def test_auto_consumes_events(self, inbox_path): """watcher 运行期间自动消费事件""" received: List[Dict] = [] async def callback(event): received.append(event) w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05) async def run(): await w.start() # 等一下让 watcher 启动 await asyncio.sleep(0.1) # 写入事件 InboxWatcher.write_event(inbox_path, {"type": "test", "seq": 1}) # 等一下让 watcher 消费 await asyncio.sleep(0.2) await w.stop() asyncio.run(run()) assert len(received) >= 1