"""Inbox JSONL Watcher — 秒级事件推送 Agent 完成任务后写 JSONL 到 inbox/daemon.jsonl,Inbox watcher 秒级消费。 用 truncate(清空内容但不删除文件),避免并发写入时文件不存在。 支持的事件类型: - agent_output: Agent 写入产出 - agent_claim: Agent 认领任务 - agent_status: Agent 状态变更(working/done/failed) - agent_heartbeat: Agent 心跳 """ from __future__ import annotations import asyncio import json import logging from pathlib import Path from typing import Any, Callable, Coroutine, Dict, List, Optional logger = logging.getLogger("moziplus-v2.inbox") class InboxWatcher: """Inbox JSONL 文件监听器""" def __init__( self, inbox_path: Path, process_callback: Optional[Callable[[ Dict[str, Any]], Coroutine[Any, Any, None]]] = None, watch_interval: float = 1.0, ): """ Args: inbox_path: daemon.jsonl 文件路径 process_callback: 收到事件后的异步回调 watch_interval: 轮询间隔秒数 """ self.inbox_path = inbox_path self.process_callback = process_callback self.watch_interval = watch_interval self._running: bool = False self._task: Optional[asyncio.Task] = None self._total_processed: int = 0 self._total_errors: int = 0 # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ async def start(self) -> None: """启动监听""" if self._running: return self._running = True self._task = asyncio.create_task(self._loop()) logger.info("Inbox watcher started (path=%s, interval=%.1fs)", self.inbox_path, self.watch_interval) async def stop(self) -> None: """停止监听""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("Inbox watcher stopped (processed=%d, errors=%d)", self._total_processed, self._total_errors) @property def is_running(self) -> bool: return self._running @property def total_processed(self) -> int: return self._total_processed @property def total_errors(self) -> int: return self._total_errors # ------------------------------------------------------------------ # 主循环 # ------------------------------------------------------------------ async def _loop(self) -> None: while self._running: try: await self.poll() except asyncio.CancelledError: raise except Exception: logger.exception("Inbox poll error") self._total_errors += 1 try: await asyncio.sleep(self.watch_interval) except asyncio.CancelledError: return # ------------------------------------------------------------------ # 消费逻辑 # ------------------------------------------------------------------ async def poll(self) -> List[Dict[str, Any]]: """读取并处理所有新事件,然后 truncate 文件 Returns: 本次处理的事件列表 """ events = self._read_events() if not events: return [] processed = [] for event in events: try: if self.process_callback: await self.process_callback(event) processed.append(event) self._total_processed += 1 except Exception: logger.exception("Error processing inbox event: %s", json.dumps(event, default=str)) self._total_errors += 1 # 继续处理后续事件 # truncate(清空内容,不删除文件) self._truncate() return processed def _read_events(self) -> List[Dict[str, Any]]: """读取 JSONL 文件中的所有有效行""" if not self.inbox_path.exists(): return [] try: content = self.inbox_path.read_text(encoding="utf-8").strip() except OSError: return [] if not content: return [] events: List[Dict[str, Any]] = [] for line_no, line in enumerate(content.split("\n"), 1): line = line.strip() if not line: continue try: event = json.loads(line) if isinstance(event, dict): events.append(event) else: logger.warning("Inbox line %d: expected dict, got %s", line_no, type(event).__name__) self._total_errors += 1 except json.JSONDecodeError: logger.warning( "Inbox line %d: invalid JSON, skipping", line_no) self._total_errors += 1 return events def _truncate(self) -> None: """清空文件内容(不删除文件)""" try: with open(self.inbox_path, "w", encoding="utf-8"): pass # "w" mode already truncates except OSError: logger.exception("Failed to truncate inbox: %s", self.inbox_path) # ------------------------------------------------------------------ # 写入(供 Agent / 测试使用) # ------------------------------------------------------------------ @staticmethod def write_event(inbox_path: Path, event: Dict[str, Any]) -> None: """原子写入一个事件到 JSONL 文件 使用 "追加" 模式写入,多个 Agent 并发写安全(OS 保证 append 原子性 对于小行 < PIPE_BUF,通常 512-4096 字节) """ inbox_path.parent.mkdir(parents=True, exist_ok=True) line = json.dumps(event, ensure_ascii=False) + "\n" with open(inbox_path, "a", encoding="utf-8") as f: f.write(line) @staticmethod def write_events(inbox_path: Path, events: List[Dict[str, Any]]) -> None: """批量写入事件""" inbox_path.parent.mkdir(parents=True, exist_ok=True) with open(inbox_path, "a", encoding="utf-8") as f: for event in events: f.write(json.dumps(event, ensure_ascii=False) + "\n") # --------------------------------------------------------------------------- # 事件处理回调 — 将 Inbox 事件翻译为黑板操作 # --------------------------------------------------------------------------- async def default_inbox_callback( event: Dict[str, Any], registry_root: Path, initialized_dbs: set, ) -> None: """默认回调:根据事件类型更新黑板""" from src.blackboard.db import init_db, get_connection event_type = event.get("type", "") project_id = event.get("project_id", "") task_id = event.get("task_id") agent = event.get("agent", "unknown") if not project_id: logger.warning("Inbox event missing project_id: %s", event_type) return db_path = registry_root / project_id / "blackboard.db" db_key = str(db_path) # 确保 DB 已初始化 if db_key not in initialized_dbs: init_db(db_path) initialized_dbs.add(db_key) if event_type == "agent_output": # Agent 写入产出 → 写 outputs 表 + 事件 conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO outputs (task_id, agent, output_type, title, summary, metadata) " "VALUES (?,?,?,?,?,?)", (task_id, agent, event.get("output_type", "other"), event.get("title", "output"), event.get("summary"), json.dumps(event.get("metadata", {}))), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, "output_written", json.dumps({"type": event.get("output_type")})), ) conn.commit() finally: conn.close() elif event_type == "agent_status": # Agent 状态变更 → 通过 Blackboard 状态机校验 new_status = event.get("status") if new_status and task_id: from src.blackboard.operations import Blackboard bb = Blackboard(db_path) ok = bb.update_task_status(task_id, new_status, agent=agent) if not ok: logger.warning("Inbox: invalid status transition to '%s' for task %s", new_status, task_id) elif event_type == "agent_claim": # Agent 认领任务 if task_id: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") cursor = conn.execute( "UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), " "updated_at=datetime('now') WHERE id=? AND status='pending'", (agent, task_id), ) if cursor.rowcount > 0: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, "task_claimed", json.dumps({"by": agent})), ) conn.commit() finally: conn.close() elif event_type == "agent_heartbeat": # Agent 心跳 → 更新 agents 表 conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT OR REPLACE INTO agents (agent_id, role, current_status, last_active) " "VALUES (?,?,?,datetime('now'))", (agent, event.get("role", ""), event.get("agent_status", "busy")), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent, "daemon_tick", json.dumps({"heartbeat": True})), ) conn.commit() finally: conn.close() else: logger.debug("Unhandled inbox event type: %s", event_type)