diff --git a/src/daemon/inbox.py b/src/daemon/inbox.py new file mode 100644 index 0000000..7b31684 --- /dev/null +++ b/src/daemon/inbox.py @@ -0,0 +1,187 @@ +"""Inbox JSONL Watcher — 秒级事件推送 + +Agent 完成任务后写 JSONL 到 inbox/daemon.jsonl,Daemon 秒级轮询消费。 + +设计要点: +- JSONL 格式(每行一个 JSON 对象) +- truncate 清空(不删除文件),避免并发写入时文件不存在 +- 损坏行跳过不崩溃 +- 空文件不触发处理 +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Any, Callable, Coroutine, Dict, List, Optional + +logger = logging.getLogger("moziplus-v2.inbox") + + +class InboxWatcher: + """Inbox JSONL 文件监控器""" + + def __init__( + self, + inbox_path: Path, + watch_interval: float = 1.0, + on_events: Optional[Callable[[List[Dict[str, Any]]], Coroutine[Any, Any, None]]] = None, + ): + """ + Args: + inbox_path: JSONL 文件路径(如 inbox/daemon.jsonl) + watch_interval: 轮询间隔秒数 + on_events: 消费到事件后的回调 + """ + self.inbox_path = inbox_path + self.watch_interval = watch_interval + self.on_events = on_events + + self._running: bool = False + self._task: Optional[asyncio.Task] = None + self._total_consumed: int = 0 + + # ------------------------------------------------------------------ + # 生命周期 + # ------------------------------------------------------------------ + + async def start(self) -> None: + """启动 watcher""" + if self._running: + return + self._running = True + import asyncio + self._task = asyncio.create_task(self._loop()) + logger.info("Inbox watcher started (path=%s, interval=%.1fs)", + self.inbox_path, self.watch_interval) + + async def stop(self) -> None: + """停止 watcher""" + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("Inbox watcher stopped (consumed=%d)", self._total_consumed) + + @property + def total_consumed(self) -> int: + return self._total_consumed + + @property + def is_running(self) -> bool: + return self._running + + # ------------------------------------------------------------------ + # 主循环 + # ------------------------------------------------------------------ + + async def _loop(self) -> None: + import asyncio + while self._running: + try: + events = self.consume() + if events and self.on_events: + await self.on_events(events) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Inbox watcher error") + + try: + await asyncio.sleep(self.watch_interval) + except asyncio.CancelledError: + return + + # ------------------------------------------------------------------ + # 消费逻辑 + # ------------------------------------------------------------------ + + def consume(self) -> List[Dict[str, Any]]: + """消费 JSONL 文件中的所有事件,然后 truncate + + Returns: + 解析成功的事件列表 + """ + if not self.inbox_path.exists(): + return [] + + try: + file_size = self.inbox_path.stat().st_size + except OSError: + return [] + + if file_size == 0: + return [] + + # 读取全部内容 + try: + raw = self.inbox_path.read_text(encoding="utf-8") + except OSError: + return [] + + if not raw.strip(): + return [] + + # 解析每一行 + events: List[Dict[str, Any]] = [] + bad_lines: int = 0 + for line_num, line in enumerate(raw.splitlines(), 1): + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + if isinstance(event, dict): + events.append(event) + else: + bad_lines += 1 + logger.warning("Inbox line %d: not a JSON object, skipping", line_num) + except json.JSONDecodeError as e: + bad_lines += 1 + logger.warning("Inbox line %d: invalid JSON (%s), skipping", line_num, e) + + # Truncate(清空不删除) + try: + with open(self.inbox_path, "w", encoding="utf-8") as f: + f.truncate(0) + except OSError as e: + logger.error("Failed to truncate inbox: %s", e) + + if bad_lines > 0: + logger.warning("Inbox: %d bad lines skipped", bad_lines) + + self._total_consumed += len(events) + if events: + logger.debug("Inbox consumed %d events", len(events)) + + return events + + # ------------------------------------------------------------------ + # 写入辅助(Agent 端使用) + # ------------------------------------------------------------------ + + def write_event(self, event: Dict[str, Any]) -> None: + """追加一条事件到 JSONL 文件 + + Agent 端调用此方法写入事件。 + 使用 O_APPEND 模式确保并发写入安全。 + """ + self.inbox_path.parent.mkdir(parents=True, exist_ok=True) + line = json.dumps(event, ensure_ascii=False) + "\n" + # O_APPEND 保证并发写入时各行不交错 + with open(self.inbox_path, "a", encoding="utf-8") as f: + f.write(line) + + @classmethod + def write_event_to(cls, inbox_path: Path, event: Dict[str, Any]) -> None: + """静态方法:写入事件到指定路径""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + line = json.dumps(event, ensure_ascii=False) + "\n" + with open(inbox_path, "a", encoding="utf-8") as f: + f.write(line)