d58e38d58f
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。 修复内容: - autoflake 移除未使用导入/变量 - autopep8 修复缩进/空格 - 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量) - 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
307 lines
10 KiB
Python
307 lines
10 KiB
Python
"""Inbox JSONL Watcher — 秒级事件推送
|
||
|
||
Agent 完成任务后写 JSONL 到 inbox/daemon.jsonl,Inbox watcher 秒级消费。
|
||
用 truncate(清空内容但不删除文件),避免并发写入时文件不存在。
|
||
|
||
支持的事件类型:
|
||
- agent_output: Agent 写入产出
|
||
- agent_claim: Agent 认领任务
|
||
- agent_status: Agent 状态变更(working/done/failed)
|
||
- agent_heartbeat: Agent 心跳
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Coroutine, Dict, List, Optional
|
||
|
||
logger = logging.getLogger("moziplus-v2.inbox")
|
||
|
||
|
||
class InboxWatcher:
|
||
"""Inbox JSONL 文件监听器"""
|
||
|
||
def __init__(
|
||
self,
|
||
inbox_path: Path,
|
||
process_callback: Optional[Callable[[
|
||
Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
|
||
watch_interval: float = 1.0,
|
||
):
|
||
"""
|
||
Args:
|
||
inbox_path: daemon.jsonl 文件路径
|
||
process_callback: 收到事件后的异步回调
|
||
watch_interval: 轮询间隔秒数
|
||
"""
|
||
self.inbox_path = inbox_path
|
||
self.process_callback = process_callback
|
||
self.watch_interval = watch_interval
|
||
|
||
self._running: bool = False
|
||
self._task: Optional[asyncio.Task] = None
|
||
self._total_processed: int = 0
|
||
self._total_errors: int = 0
|
||
|
||
# ------------------------------------------------------------------
|
||
# 生命周期
|
||
# ------------------------------------------------------------------
|
||
|
||
async def start(self) -> None:
|
||
"""启动监听"""
|
||
if self._running:
|
||
return
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._loop())
|
||
logger.info("Inbox watcher started (path=%s, interval=%.1fs)",
|
||
self.inbox_path, self.watch_interval)
|
||
|
||
async def stop(self) -> None:
|
||
"""停止监听"""
|
||
self._running = False
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("Inbox watcher stopped (processed=%d, errors=%d)",
|
||
self._total_processed, self._total_errors)
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
return self._running
|
||
|
||
@property
|
||
def total_processed(self) -> int:
|
||
return self._total_processed
|
||
|
||
@property
|
||
def total_errors(self) -> int:
|
||
return self._total_errors
|
||
|
||
# ------------------------------------------------------------------
|
||
# 主循环
|
||
# ------------------------------------------------------------------
|
||
|
||
async def _loop(self) -> None:
|
||
while self._running:
|
||
try:
|
||
await self.poll()
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception:
|
||
logger.exception("Inbox poll error")
|
||
self._total_errors += 1
|
||
|
||
try:
|
||
await asyncio.sleep(self.watch_interval)
|
||
except asyncio.CancelledError:
|
||
return
|
||
|
||
# ------------------------------------------------------------------
|
||
# 消费逻辑
|
||
# ------------------------------------------------------------------
|
||
|
||
async def poll(self) -> List[Dict[str, Any]]:
|
||
"""读取并处理所有新事件,然后 truncate 文件
|
||
|
||
Returns:
|
||
本次处理的事件列表
|
||
"""
|
||
events = self._read_events()
|
||
if not events:
|
||
return []
|
||
|
||
processed = []
|
||
for event in events:
|
||
try:
|
||
if self.process_callback:
|
||
await self.process_callback(event)
|
||
processed.append(event)
|
||
self._total_processed += 1
|
||
except Exception:
|
||
logger.exception("Error processing inbox event: %s",
|
||
json.dumps(event, default=str))
|
||
self._total_errors += 1
|
||
# 继续处理后续事件
|
||
|
||
# truncate(清空内容,不删除文件)
|
||
self._truncate()
|
||
return processed
|
||
|
||
def _read_events(self) -> List[Dict[str, Any]]:
|
||
"""读取 JSONL 文件中的所有有效行"""
|
||
if not self.inbox_path.exists():
|
||
return []
|
||
|
||
try:
|
||
content = self.inbox_path.read_text(encoding="utf-8").strip()
|
||
except OSError:
|
||
return []
|
||
|
||
if not content:
|
||
return []
|
||
|
||
events: List[Dict[str, Any]] = []
|
||
for line_no, line in enumerate(content.split("\n"), 1):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
event = json.loads(line)
|
||
if isinstance(event, dict):
|
||
events.append(event)
|
||
else:
|
||
logger.warning("Inbox line %d: expected dict, got %s",
|
||
line_no, type(event).__name__)
|
||
self._total_errors += 1
|
||
except json.JSONDecodeError:
|
||
logger.warning(
|
||
"Inbox line %d: invalid JSON, skipping", line_no)
|
||
self._total_errors += 1
|
||
|
||
return events
|
||
|
||
def _truncate(self) -> None:
|
||
"""清空文件内容(不删除文件)"""
|
||
try:
|
||
with open(self.inbox_path, "w", encoding="utf-8"):
|
||
pass # "w" mode already truncates
|
||
except OSError:
|
||
logger.exception("Failed to truncate inbox: %s", self.inbox_path)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 写入(供 Agent / 测试使用)
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def write_event(inbox_path: Path, event: Dict[str, Any]) -> None:
|
||
"""原子写入一个事件到 JSONL 文件
|
||
|
||
使用 "追加" 模式写入,多个 Agent 并发写安全(OS 保证 append 原子性
|
||
对于小行 < PIPE_BUF,通常 512-4096 字节)
|
||
"""
|
||
inbox_path.parent.mkdir(parents=True, exist_ok=True)
|
||
line = json.dumps(event, ensure_ascii=False) + "\n"
|
||
with open(inbox_path, "a", encoding="utf-8") as f:
|
||
f.write(line)
|
||
|
||
@staticmethod
|
||
def write_events(inbox_path: Path, events: List[Dict[str, Any]]) -> None:
|
||
"""批量写入事件"""
|
||
inbox_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(inbox_path, "a", encoding="utf-8") as f:
|
||
for event in events:
|
||
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 事件处理回调 — 将 Inbox 事件翻译为黑板操作
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def default_inbox_callback(
|
||
event: Dict[str, Any],
|
||
registry_root: Path,
|
||
initialized_dbs: set,
|
||
) -> None:
|
||
"""默认回调:根据事件类型更新黑板"""
|
||
from src.blackboard.db import init_db, get_connection
|
||
|
||
event_type = event.get("type", "")
|
||
project_id = event.get("project_id", "")
|
||
task_id = event.get("task_id")
|
||
agent = event.get("agent", "unknown")
|
||
|
||
if not project_id:
|
||
logger.warning("Inbox event missing project_id: %s", event_type)
|
||
return
|
||
|
||
db_path = registry_root / project_id / "blackboard.db"
|
||
db_key = str(db_path)
|
||
|
||
# 确保 DB 已初始化
|
||
if db_key not in initialized_dbs:
|
||
init_db(db_path)
|
||
initialized_dbs.add(db_key)
|
||
|
||
if event_type == "agent_output":
|
||
# Agent 写入产出 → 写 outputs 表 + 事件
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"INSERT INTO outputs (task_id, agent, output_type, title, summary, metadata) "
|
||
"VALUES (?,?,?,?,?,?)",
|
||
(task_id, agent,
|
||
event.get("output_type", "other"),
|
||
event.get("title", "output"),
|
||
event.get("summary"),
|
||
json.dumps(event.get("metadata", {}))),
|
||
)
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, agent, "output_written",
|
||
json.dumps({"type": event.get("output_type")})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
elif event_type == "agent_status":
|
||
# Agent 状态变更 → 通过 Blackboard 状态机校验
|
||
new_status = event.get("status")
|
||
if new_status and task_id:
|
||
from src.blackboard.operations import Blackboard
|
||
bb = Blackboard(db_path)
|
||
ok = bb.update_task_status(task_id, new_status, agent=agent)
|
||
if not ok:
|
||
logger.warning("Inbox: invalid status transition to '%s' for task %s",
|
||
new_status, task_id)
|
||
|
||
elif event_type == "agent_claim":
|
||
# Agent 认领任务
|
||
if task_id:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
cursor = conn.execute(
|
||
"UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), "
|
||
"updated_at=datetime('now') WHERE id=? AND status='pending'",
|
||
(agent, task_id),
|
||
)
|
||
if cursor.rowcount > 0:
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, agent, "task_claimed",
|
||
json.dumps({"by": agent})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
elif event_type == "agent_heartbeat":
|
||
# Agent 心跳 → 更新 agents 表
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO agents (agent_id, role, current_status, last_active) "
|
||
"VALUES (?,?,?,datetime('now'))",
|
||
(agent, event.get("role", ""), event.get("agent_status", "busy")),
|
||
)
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, agent, "daemon_tick",
|
||
json.dumps({"heartbeat": True})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
else:
|
||
logger.debug("Unhandled inbox event type: %s", event_type)
|