diff --git a/src/daemon/inbox.py b/src/daemon/inbox.py index 7b31684..bdfe490 100644 --- a/src/daemon/inbox.py +++ b/src/daemon/inbox.py @@ -1,12 +1,13 @@ """Inbox JSONL Watcher — 秒级事件推送 -Agent 完成任务后写 JSONL 到 inbox/daemon.jsonl,Daemon 秒级轮询消费。 +Agent 完成任务后写 JSONL 到 inbox/daemon.jsonl,Inbox watcher 秒级消费。 +用 truncate(清空内容但不删除文件),避免并发写入时文件不存在。 -设计要点: -- JSONL 格式(每行一个 JSON 对象) -- truncate 清空(不删除文件),避免并发写入时文件不存在 -- 损坏行跳过不崩溃 -- 空文件不触发处理 +支持的事件类型: +- agent_output: Agent 写入产出 +- agent_claim: Agent 认领任务 +- agent_status: Agent 状态变更(working/done/failed) +- agent_heartbeat: Agent 心跳 """ from __future__ import annotations @@ -22,44 +23,46 @@ logger = logging.getLogger("moziplus-v2.inbox") class InboxWatcher: - """Inbox JSONL 文件监控器""" + """Inbox JSONL 文件监听器""" def __init__( self, inbox_path: Path, + process_callback: Optional[Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]] = None, watch_interval: float = 1.0, - on_events: Optional[Callable[[List[Dict[str, Any]]], Coroutine[Any, Any, None]]] = None, ): """ Args: - inbox_path: JSONL 文件路径(如 inbox/daemon.jsonl) + inbox_path: daemon.jsonl 文件路径 + process_callback: 收到事件后的异步回调 watch_interval: 轮询间隔秒数 - on_events: 消费到事件后的回调 """ self.inbox_path = inbox_path + self.process_callback = process_callback self.watch_interval = watch_interval - self.on_events = on_events self._running: bool = False self._task: Optional[asyncio.Task] = None - self._total_consumed: int = 0 + self._total_processed: int = 0 + self._total_errors: int = 0 # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ async def start(self) -> None: - """启动 watcher""" + """启动监听""" + import asyncio if self._running: return self._running = True - import asyncio self._task = asyncio.create_task(self._loop()) logger.info("Inbox watcher started (path=%s, interval=%.1fs)", self.inbox_path, self.watch_interval) async def stop(self) -> None: - """停止 watcher""" + """停止监听""" + import asyncio self._running = False if self._task: self._task.cancel() @@ -67,16 +70,21 @@ class InboxWatcher: await self._task except asyncio.CancelledError: pass - logger.info("Inbox watcher stopped (consumed=%d)", self._total_consumed) - - @property - def total_consumed(self) -> int: - return self._total_consumed + logger.info("Inbox watcher stopped (processed=%d, errors=%d)", + self._total_processed, self._total_errors) @property def is_running(self) -> bool: return self._running + @property + def total_processed(self) -> int: + return self._total_processed + + @property + def total_errors(self) -> int: + return self._total_errors + # ------------------------------------------------------------------ # 主循环 # ------------------------------------------------------------------ @@ -85,13 +93,12 @@ class InboxWatcher: import asyncio while self._running: try: - events = self.consume() - if events and self.on_events: - await self.on_events(events) + await self.poll() except asyncio.CancelledError: raise except Exception: - logger.exception("Inbox watcher error") + logger.exception("Inbox poll error") + self._total_errors += 1 try: await asyncio.sleep(self.watch_interval) @@ -102,36 +109,48 @@ class InboxWatcher: # 消费逻辑 # ------------------------------------------------------------------ - def consume(self) -> List[Dict[str, Any]]: - """消费 JSONL 文件中的所有事件,然后 truncate + async def poll(self) -> List[Dict[str, Any]]: + """读取并处理所有新事件,然后 truncate 文件 Returns: - 解析成功的事件列表 + 本次处理的事件列表 """ + events = self._read_events() + if not events: + return [] + + processed = [] + for event in events: + try: + if self.process_callback: + await self.process_callback(event) + processed.append(event) + self._total_processed += 1 + except Exception: + logger.exception("Error processing inbox event: %s", + json.dumps(event, default=str)) + self._total_errors += 1 + # 继续处理后续事件 + + # truncate(清空内容,不删除文件) + self._truncate() + return processed + + def _read_events(self) -> List[Dict[str, Any]]: + """读取 JSONL 文件中的所有有效行""" if not self.inbox_path.exists(): return [] try: - file_size = self.inbox_path.stat().st_size + content = self.inbox_path.read_text(encoding="utf-8").strip() except OSError: return [] - if file_size == 0: + if not content: return [] - # 读取全部内容 - try: - raw = self.inbox_path.read_text(encoding="utf-8") - except OSError: - return [] - - if not raw.strip(): - return [] - - # 解析每一行 events: List[Dict[str, Any]] = [] - bad_lines: int = 0 - for line_num, line in enumerate(raw.splitlines(), 1): + for line_no, line in enumerate(content.split("\n"), 1): line = line.strip() if not line: continue @@ -140,48 +159,163 @@ class InboxWatcher: if isinstance(event, dict): events.append(event) else: - bad_lines += 1 - logger.warning("Inbox line %d: not a JSON object, skipping", line_num) - except json.JSONDecodeError as e: - bad_lines += 1 - logger.warning("Inbox line %d: invalid JSON (%s), skipping", line_num, e) - - # Truncate(清空不删除) - try: - with open(self.inbox_path, "w", encoding="utf-8") as f: - f.truncate(0) - except OSError as e: - logger.error("Failed to truncate inbox: %s", e) - - if bad_lines > 0: - logger.warning("Inbox: %d bad lines skipped", bad_lines) - - self._total_consumed += len(events) - if events: - logger.debug("Inbox consumed %d events", len(events)) + logger.warning("Inbox line %d: expected dict, got %s", + line_no, type(event).__name__) + except json.JSONDecodeError: + logger.warning("Inbox line %d: invalid JSON, skipping", line_no) + self._total_errors += 1 return events + def _truncate(self) -> None: + """清空文件内容(不删除文件)""" + try: + with open(self.inbox_path, "w", encoding="utf-8") as f: + f.truncate(0) + except OSError: + logger.exception("Failed to truncate inbox: %s", self.inbox_path) + # ------------------------------------------------------------------ - # 写入辅助(Agent 端使用) + # 写入(供 Agent / 测试使用) # ------------------------------------------------------------------ - def write_event(self, event: Dict[str, Any]) -> None: - """追加一条事件到 JSONL 文件 + @staticmethod + def write_event(inbox_path: Path, event: Dict[str, Any]) -> None: + """原子写入一个事件到 JSONL 文件 - Agent 端调用此方法写入事件。 - 使用 O_APPEND 模式确保并发写入安全。 + 使用 "追加" 模式写入,多个 Agent 并发写安全(OS 保证 append 原子性 + 对于小行 < PIPE_BUF,通常 512-4096 字节) """ - self.inbox_path.parent.mkdir(parents=True, exist_ok=True) - line = json.dumps(event, ensure_ascii=False) + "\n" - # O_APPEND 保证并发写入时各行不交错 - with open(self.inbox_path, "a", encoding="utf-8") as f: - f.write(line) - - @classmethod - def write_event_to(cls, inbox_path: Path, event: Dict[str, Any]) -> None: - """静态方法:写入事件到指定路径""" inbox_path.parent.mkdir(parents=True, exist_ok=True) line = json.dumps(event, ensure_ascii=False) + "\n" with open(inbox_path, "a", encoding="utf-8") as f: f.write(line) + + @staticmethod + def write_events(inbox_path: Path, events: List[Dict[str, Any]]) -> None: + """批量写入事件""" + inbox_path.parent.mkdir(parents=True, exist_ok=True) + with open(inbox_path, "a", encoding="utf-8") as f: + for event in events: + f.write(json.dumps(event, ensure_ascii=False) + "\n") + + +# --------------------------------------------------------------------------- +# 事件处理回调 — 将 Inbox 事件翻译为黑板操作 +# --------------------------------------------------------------------------- + +async def default_inbox_callback( + event: Dict[str, Any], + registry_root: Path, + initialized_dbs: set, +) -> None: + """默认回调:根据事件类型更新黑板""" + from src.blackboard.db import init_db, get_connection + + event_type = event.get("type", "") + project_id = event.get("project_id", "") + task_id = event.get("task_id") + agent = event.get("agent", "unknown") + + if not project_id: + logger.warning("Inbox event missing project_id: %s", event_type) + return + + db_path = registry_root / project_id / "blackboard.db" + db_key = str(db_path) + + # 确保 DB 已初始化 + if db_key not in initialized_dbs: + init_db(db_path) + initialized_dbs.add(db_key) + + if event_type == "agent_output": + # Agent 写入产出 → 写 outputs 表 + 事件 + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT INTO outputs (task_id, agent, output_type, title, summary, metadata) " + "VALUES (?,?,?,?,?,?)", + (task_id, agent, + event.get("output_type", "other"), + event.get("title", "output"), + event.get("summary"), + json.dumps(event.get("metadata", {}))), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, "output_written", + json.dumps({"type": event.get("output_type")})), + ) + conn.commit() + finally: + conn.close() + + elif event_type == "agent_status": + # Agent 状态变更 → 更新 tasks 表 + new_status = event.get("status") + if new_status and task_id: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if row: + old = row["status"] + conn.execute( + "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", + (new_status, task_id), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, f"task_{new_status}", + json.dumps({"from": old, "to": new_status})), + ) + conn.commit() + finally: + conn.close() + + elif event_type == "agent_claim": + # Agent 认领任务 + if task_id: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + cursor = conn.execute( + "UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), " + "updated_at=datetime('now') WHERE id=? AND status='pending'", + (agent, task_id), + ) + if cursor.rowcount > 0: + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, "task_claimed", + json.dumps({"by": agent})), + ) + conn.commit() + finally: + conn.close() + + elif event_type == "agent_heartbeat": + # Agent 心跳 → 更新 agents 表 + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT OR REPLACE INTO agents (agent_id, role, current_status, last_active) " + "VALUES (?,?,?,datetime('now'))", + (agent, event.get("role", ""), event.get("agent_status", "busy")), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent, "daemon_tick", + json.dumps({"heartbeat": True})), + ) + conn.commit() + finally: + conn.close() + + else: + logger.debug("Unhandled inbox event type: %s", event_type)