auto-sync: 2026-05-17 05:52:28

This commit is contained in:
cfdaily
2026-05-17 05:52:28 +08:00
parent a69110c834
commit e92b25b6a5
+187
View File
@@ -0,0 +1,187 @@
"""Inbox JSONL Watcher — 秒级事件推送
Agent 完成任务后写 JSONL 到 inbox/daemon.jsonlDaemon 秒级轮询消费。
设计要点:
- JSONL 格式(每行一个 JSON 对象)
- truncate 清空(不删除文件),避免并发写入时文件不存在
- 损坏行跳过不崩溃
- 空文件不触发处理
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
logger = logging.getLogger("moziplus-v2.inbox")
class InboxWatcher:
"""Inbox JSONL 文件监控器"""
def __init__(
self,
inbox_path: Path,
watch_interval: float = 1.0,
on_events: Optional[Callable[[List[Dict[str, Any]]], Coroutine[Any, Any, None]]] = None,
):
"""
Args:
inbox_path: JSONL 文件路径(如 inbox/daemon.jsonl
watch_interval: 轮询间隔秒数
on_events: 消费到事件后的回调
"""
self.inbox_path = inbox_path
self.watch_interval = watch_interval
self.on_events = on_events
self._running: bool = False
self._task: Optional[asyncio.Task] = None
self._total_consumed: int = 0
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 watcher"""
if self._running:
return
self._running = True
import asyncio
self._task = asyncio.create_task(self._loop())
logger.info("Inbox watcher started (path=%s, interval=%.1fs)",
self.inbox_path, self.watch_interval)
async def stop(self) -> None:
"""停止 watcher"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Inbox watcher stopped (consumed=%d)", self._total_consumed)
@property
def total_consumed(self) -> int:
return self._total_consumed
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
import asyncio
while self._running:
try:
events = self.consume()
if events and self.on_events:
await self.on_events(events)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Inbox watcher error")
try:
await asyncio.sleep(self.watch_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 消费逻辑
# ------------------------------------------------------------------
def consume(self) -> List[Dict[str, Any]]:
"""消费 JSONL 文件中的所有事件,然后 truncate
Returns:
解析成功的事件列表
"""
if not self.inbox_path.exists():
return []
try:
file_size = self.inbox_path.stat().st_size
except OSError:
return []
if file_size == 0:
return []
# 读取全部内容
try:
raw = self.inbox_path.read_text(encoding="utf-8")
except OSError:
return []
if not raw.strip():
return []
# 解析每一行
events: List[Dict[str, Any]] = []
bad_lines: int = 0
for line_num, line in enumerate(raw.splitlines(), 1):
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
if isinstance(event, dict):
events.append(event)
else:
bad_lines += 1
logger.warning("Inbox line %d: not a JSON object, skipping", line_num)
except json.JSONDecodeError as e:
bad_lines += 1
logger.warning("Inbox line %d: invalid JSON (%s), skipping", line_num, e)
# Truncate(清空不删除)
try:
with open(self.inbox_path, "w", encoding="utf-8") as f:
f.truncate(0)
except OSError as e:
logger.error("Failed to truncate inbox: %s", e)
if bad_lines > 0:
logger.warning("Inbox: %d bad lines skipped", bad_lines)
self._total_consumed += len(events)
if events:
logger.debug("Inbox consumed %d events", len(events))
return events
# ------------------------------------------------------------------
# 写入辅助(Agent 端使用)
# ------------------------------------------------------------------
def write_event(self, event: Dict[str, Any]) -> None:
"""追加一条事件到 JSONL 文件
Agent 端调用此方法写入事件。
使用 O_APPEND 模式确保并发写入安全。
"""
self.inbox_path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, ensure_ascii=False) + "\n"
# O_APPEND 保证并发写入时各行不交错
with open(self.inbox_path, "a", encoding="utf-8") as f:
f.write(line)
@classmethod
def write_event_to(cls, inbox_path: Path, event: Dict[str, Any]) -> None:
"""静态方法:写入事件到指定路径"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, ensure_ascii=False) + "\n"
with open(inbox_path, "a", encoding="utf-8") as f:
f.write(line)