auto-sync: 2026-05-17 05:54:26

This commit is contained in:
cfdaily
2026-05-17 05:54:26 +08:00
parent abdc573aaa
commit 1e098c0202
+142 -134
View File
@@ -1,6 +1,6 @@
"""F7 Inbox JSONL Watcher 单元测试
按 test-plan-v2.6.md §F7
测试计划 test-plan-v2.6.md §F7
- T1: 写入+消费(P0
- T2: truncateP0
- T3: 并发写入(P0
@@ -10,9 +10,9 @@
import asyncio
import json
import threading
import pytest
from pathlib import Path
from typing import Any, Dict, List
from src.daemon.inbox import InboxWatcher
@@ -32,86 +32,96 @@ def watcher(inbox_path):
# ---------------------------------------------------------------------------
class TestWriteConsume:
def test_single_event(self, inbox_path, watcher):
"""写入一个事件 → poll 消费到"""
event = {"type": "agent_output", "task_id": "t1", "agent": "a1",
"project_id": "p1"}
InboxWatcher.write_event(inbox_path, event)
def test_single_event(self, inbox_path):
"""写入单条事件 → 消费到"""
InboxWatcher.write_event(inbox_path, {"type": "task_done", "task_id": "t1", "agent": "a"})
result = asyncio.run(watcher.poll())
assert len(result) == 1
assert result[0]["type"] == "agent_output"
assert result[0]["task_id"] == "t1"
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 1
assert events[0]["type"] == "task_done"
assert events[0]["task_id"] == "t1"
def test_multiple_events(self, inbox_path, watcher):
"""写入多事件 → 全部消费"""
events = [
{"type": "agent_claim", "task_id": f"t{i}", "agent": "a1",
"project_id": "p1"}
for i in range(5)
]
InboxWatcher.write_events(inbox_path, events)
def test_multiple_events(self, inbox_path):
"""写入多事件 → 全部消费"""
for i in range(5):
InboxWatcher.write_event(inbox_path, {"type": "progress", "task_id": f"t{i}", "pct": i * 20})
result = asyncio.run(watcher.poll())
assert len(result) == 5
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 5
assert events[0]["task_id"] == "t0"
assert events[4]["task_id"] == "t4"
def test_callback_called(self, inbox_path):
"""回调函数被调用"""
received: List[Dict] = []
async def callback(event):
received.append(event)
w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05)
InboxWatcher.write_event(inbox_path, {"type": "test", "x": 1})
asyncio.run(w.poll())
assert len(received) == 1
assert received[0]["x"] == 1
def test_counter_increments(self, inbox_path, watcher):
"""total_processed 计数正确"""
def test_consume_increments_counter(self, inbox_path):
"""total_processed 递增"""
watcher = InboxWatcher(inbox_path)
assert watcher.total_processed == 0
InboxWatcher.write_events(inbox_path, [
{"type": "a"}, {"type": "b"}, {"type": "c"},
])
InboxWatcher.write_event(inbox_path, {"type": "test"})
asyncio.run(watcher.poll())
assert watcher.total_processed == 1
InboxWatcher.write_event(inbox_path, {"type": "test2"})
InboxWatcher.write_event(inbox_path, {"type": "test3"})
asyncio.run(watcher.poll())
assert watcher.total_processed == 3
def test_consume_no_file_returns_empty(self, inbox_path):
"""文件不存在时返回空列表"""
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert events == []
def test_batch_write(self, inbox_path):
"""批量写入"""
evts = [{"type": "batch", "seq": i} for i in range(10)]
InboxWatcher.write_events(inbox_path, evts)
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 10
# ---------------------------------------------------------------------------
# T2: truncate
# ---------------------------------------------------------------------------
class TestTruncate:
def test_file_truncated_after_poll(self, inbox_path, watcher):
def test_file_cleared_after_consume(self, inbox_path):
"""消费后文件被清空"""
InboxWatcher.write_event(inbox_path, {"type": "test"})
assert inbox_path.exists()
assert inbox_path.stat().st_size > 0
watcher = InboxWatcher(inbox_path)
asyncio.run(watcher.poll())
assert inbox_path.exists() # 文件还在
assert inbox_path.stat().st_size == 0 # 但内容被清空
def test_second_poll_gets_nothing(self, inbox_path, watcher):
"""第二次 poll 无新事件"""
# 文件存在但为空
assert inbox_path.exists()
assert inbox_path.stat().st_size == 0
def test_second_consume_returns_empty(self, inbox_path):
"""二次消费无新事件"""
InboxWatcher.write_event(inbox_path, {"type": "test"})
r1 = asyncio.run(watcher.poll())
assert len(r1) == 1
watcher = InboxWatcher(inbox_path)
r2 = asyncio.run(watcher.poll())
assert len(r2) == 0
events1 = asyncio.run(watcher.poll())
assert len(events1) == 1
def test_truncate_then_write(self, inbox_path, watcher):
"""truncate 后再写入,能正常消费"""
events2 = asyncio.run(watcher.poll())
assert len(events2) == 0
def test_write_after_truncate(self, inbox_path):
"""truncate 后可以继续写入"""
InboxWatcher.write_event(inbox_path, {"type": "first"})
watcher = InboxWatcher(inbox_path)
asyncio.run(watcher.poll())
InboxWatcher.write_event(inbox_path, {"type": "second"})
r = asyncio.run(watcher.poll())
assert len(r) == 1
assert r[0]["type"] == "second"
events = asyncio.run(watcher.poll())
assert len(events) == 1
assert events[0]["type"] == "second"
# ---------------------------------------------------------------------------
@@ -119,52 +129,58 @@ class TestTruncate:
# ---------------------------------------------------------------------------
class TestConcurrentWrite:
def test_concurrent_writers(self, inbox_path, watcher):
"""个模拟 Agent 同时写入不同"""
import threading
def test_multi_agent_concurrent(self, inbox_path):
"""多 Agent 同时写入不同事件,全部消费到"""
n_agents = 5
n_events_per_agent = 10
barrier = threading.Barrier(n_agents)
def write_batch(agent_id, count):
for i in range(count):
def writer(agent_id):
barrier.wait()
for i in range(n_events_per_agent):
InboxWatcher.write_event(inbox_path, {
"type": "agent_output",
"agent": agent_id,
"task_id": f"{agent_id}-t{i}",
"seq": i,
})
threads = [
threading.Thread(target=write_batch, args=(f"agent-{i}", 10))
for i in range(3)
]
threads = [threading.Thread(target=writer, args=(f"agent-{j}",))
for j in range(n_agents)]
for t in threads:
t.start()
for t in threads:
t.join()
t.join(timeout=5)
result = asyncio.run(watcher.poll())
assert len(result) == 30
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == n_agents * n_events_per_agent
# 每个 agent 的事件都到齐
agents_seen = {e["agent"] for e in events}
assert len(agents_seen) == n_agents
# ---------------------------------------------------------------------------
# T4: 损坏行恢复(P1
# ---------------------------------------------------------------------------
class TestCorruptRecovery:
def test_invalid_json_skipped(self, inbox_path, watcher):
class TestBadLineRecovery:
def test_skip_invalid_json(self, inbox_path):
"""非法 JSON 行跳过不崩溃"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
with open(inbox_path, "w") as f:
f.write('{"type": "valid"}\n')
f.write('{"type": "good"}\n')
f.write('not json at all\n')
f.write('{"type": "also_valid"}\n')
f.write('{"type": "also_good"}\n')
f.write('{broken json\n')
result = asyncio.run(watcher.poll())
assert len(result) == 2
assert result[0]["type"] == "valid"
assert result[1]["type"] == "also_valid"
assert watcher.total_errors == 1
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 2
assert events[0]["type"] == "good"
assert events[1]["type"] == "also_good"
def test_non_dict_skipped(self, inbox_path, watcher):
"""非 dict 类型的 JSON 值跳过"""
def test_non_dict_json_skipped(self, inbox_path):
"""合法 JSON 但非 dict 也跳过"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
with open(inbox_path, "w") as f:
f.write('{"type": "good"}\n')
@@ -172,20 +188,20 @@ class TestCorruptRecovery:
f.write('"just a string"\n')
f.write('42\n')
result = asyncio.run(watcher.poll())
assert len(result) == 1
assert watcher.total_errors >= 1
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 1
assert events[0]["type"] == "good"
def test_all_corrupt(self, inbox_path, watcher):
"""全部损坏行 → 返回空列表,不崩溃"""
def test_empty_lines_skipped(self, inbox_path):
"""空行不影响"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
with open(inbox_path, "w") as f:
f.write("bad line 1\n")
f.write("bad line 2\n")
f.write('\n\n{"type": "good"}\n\n\n')
result = asyncio.run(watcher.poll())
assert len(result) == 0
assert watcher.total_errors == 2
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert len(events) == 1
# ---------------------------------------------------------------------------
@@ -193,24 +209,21 @@ class TestCorruptRecovery:
# ---------------------------------------------------------------------------
class TestEmptyFile:
def test_nonexistent_file(self, inbox_path, watcher):
"""文件不存在 → 返回空"""
result = asyncio.run(watcher.poll())
assert len(result) == 0
def test_empty_file(self, inbox_path, watcher):
"""空文件 → 返回空"""
def test_empty_file_no_events(self, inbox_path):
"""文件返回空列表"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
inbox_path.touch()
result = asyncio.run(watcher.poll())
assert len(result) == 0
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert events == []
def test_whitespace_only_file(self, inbox_path, watcher):
"""只有空白字符的文件返回空"""
def test_whitespace_only_file(self, inbox_path):
"""只有空白的文件返回空列表"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
inbox_path.write_text(" \n \n\n ")
result = asyncio.run(watcher.poll())
assert len(result) == 0
inbox_path.write_text(" \n \n \n")
watcher = InboxWatcher(inbox_path)
events = asyncio.run(watcher.poll())
assert events == []
# ---------------------------------------------------------------------------
@@ -218,38 +231,33 @@ class TestEmptyFile:
# ---------------------------------------------------------------------------
class TestWatcherLifecycle:
def test_start_stop(self, inbox_path):
"""watcher 可以启动和停止"""
w = InboxWatcher(inbox_path, watch_interval=0.05)
assert not w.is_running
def test_start_stop(self, watcher, inbox_path):
"""可以启动和停止"""
async def run():
await w.start()
assert w.is_running
await asyncio.sleep(0.1)
await w.stop()
asyncio.run(run())
assert not w.is_running
def test_auto_consumes_events(self, inbox_path):
"""watcher 运行期间自动消费事件"""
received: List[Dict] = []
async def callback(event):
received.append(event)
w = InboxWatcher(inbox_path, process_callback=callback, watch_interval=0.05)
async def run():
await w.start()
# 等一下让 watcher 启动
await asyncio.sleep(0.1)
# 写入事件
InboxWatcher.write_event(inbox_path, {"type": "test", "seq": 1})
# 等一下让 watcher 消费
await watcher.start()
assert watcher.is_running
await asyncio.sleep(0.2)
await w.stop()
await watcher.stop()
assert not watcher.is_running
asyncio.run(run())
assert len(received) >= 1
def test_auto_consume_via_callback(self, inbox_path):
"""watcher 循环自动消费并通过回调传递事件"""
collected = []
async def on_event(event):
collected.append(event)
watcher = InboxWatcher(inbox_path, process_callback=on_event, watch_interval=0.05)
async def run():
await watcher.start()
# 写入事件
InboxWatcher.write_event(inbox_path, {"type": "auto"})
await asyncio.sleep(0.3) # 等待 watcher 消费
await watcher.stop()
asyncio.run(run())
assert len(collected) >= 1
assert collected[0]["type"] == "auto"