auto-sync: 2026-05-17 05:52:58

This commit is contained in:
cfdaily
2026-05-17 05:52:58 +08:00
parent e92b25b6a5
commit 32c9fbd874
+208 -74
View File
@@ -1,12 +1,13 @@
"""Inbox JSONL Watcher — 秒级事件推送
Agent 完成任务后写 JSONL 到 inbox/daemon.jsonlDaemon 秒级轮询消费。
Agent 完成任务后写 JSONL 到 inbox/daemon.jsonlInbox watcher 秒级消费。
用 truncate(清空内容但不删除文件),避免并发写入时文件不存在。
设计要点
- JSONL 格式(每行一个 JSON 对象)
- truncate 清空(不删除文件),避免并发写入时文件不存在
- 损坏行跳过不崩溃
- 空文件不触发处理
支持的事件类型
- agent_output: Agent 写入产出
- agent_claim: Agent 认领任务
- agent_status: Agent 状态变更(working/done/failed
- agent_heartbeat: Agent 心跳
"""
from __future__ import annotations
@@ -22,44 +23,46 @@ logger = logging.getLogger("moziplus-v2.inbox")
class InboxWatcher:
"""Inbox JSONL 文件监"""
"""Inbox JSONL 文件监"""
def __init__(
self,
inbox_path: Path,
process_callback: Optional[Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
watch_interval: float = 1.0,
on_events: Optional[Callable[[List[Dict[str, Any]]], Coroutine[Any, Any, None]]] = None,
):
"""
Args:
inbox_path: JSONL 文件路径(如 inbox/daemon.jsonl
inbox_path: daemon.jsonl 文件路径
process_callback: 收到事件后的异步回调
watch_interval: 轮询间隔秒数
on_events: 消费到事件后的回调
"""
self.inbox_path = inbox_path
self.process_callback = process_callback
self.watch_interval = watch_interval
self.on_events = on_events
self._running: bool = False
self._task: Optional[asyncio.Task] = None
self._total_consumed: int = 0
self._total_processed: int = 0
self._total_errors: int = 0
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 watcher"""
"""启动监听"""
import asyncio
if self._running:
return
self._running = True
import asyncio
self._task = asyncio.create_task(self._loop())
logger.info("Inbox watcher started (path=%s, interval=%.1fs)",
self.inbox_path, self.watch_interval)
async def stop(self) -> None:
"""停止 watcher"""
"""停止监听"""
import asyncio
self._running = False
if self._task:
self._task.cancel()
@@ -67,16 +70,21 @@ class InboxWatcher:
await self._task
except asyncio.CancelledError:
pass
logger.info("Inbox watcher stopped (consumed=%d)", self._total_consumed)
@property
def total_consumed(self) -> int:
return self._total_consumed
logger.info("Inbox watcher stopped (processed=%d, errors=%d)",
self._total_processed, self._total_errors)
@property
def is_running(self) -> bool:
return self._running
@property
def total_processed(self) -> int:
return self._total_processed
@property
def total_errors(self) -> int:
return self._total_errors
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
@@ -85,13 +93,12 @@ class InboxWatcher:
import asyncio
while self._running:
try:
events = self.consume()
if events and self.on_events:
await self.on_events(events)
await self.poll()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Inbox watcher error")
logger.exception("Inbox poll error")
self._total_errors += 1
try:
await asyncio.sleep(self.watch_interval)
@@ -102,36 +109,48 @@ class InboxWatcher:
# 消费逻辑
# ------------------------------------------------------------------
def consume(self) -> List[Dict[str, Any]]:
"""消费 JSONL 文件中的所有事件,然后 truncate
async def poll(self) -> List[Dict[str, Any]]:
"""读取并处理所有事件,然后 truncate 文件
Returns:
解析成功的事件列表
本次处理的事件列表
"""
events = self._read_events()
if not events:
return []
processed = []
for event in events:
try:
if self.process_callback:
await self.process_callback(event)
processed.append(event)
self._total_processed += 1
except Exception:
logger.exception("Error processing inbox event: %s",
json.dumps(event, default=str))
self._total_errors += 1
# 继续处理后续事件
# truncate(清空内容,不删除文件)
self._truncate()
return processed
def _read_events(self) -> List[Dict[str, Any]]:
"""读取 JSONL 文件中的所有有效行"""
if not self.inbox_path.exists():
return []
try:
file_size = self.inbox_path.stat().st_size
content = self.inbox_path.read_text(encoding="utf-8").strip()
except OSError:
return []
if file_size == 0:
if not content:
return []
# 读取全部内容
try:
raw = self.inbox_path.read_text(encoding="utf-8")
except OSError:
return []
if not raw.strip():
return []
# 解析每一行
events: List[Dict[str, Any]] = []
bad_lines: int = 0
for line_num, line in enumerate(raw.splitlines(), 1):
for line_no, line in enumerate(content.split("\n"), 1):
line = line.strip()
if not line:
continue
@@ -140,48 +159,163 @@ class InboxWatcher:
if isinstance(event, dict):
events.append(event)
else:
bad_lines += 1
logger.warning("Inbox line %d: not a JSON object, skipping", line_num)
except json.JSONDecodeError as e:
bad_lines += 1
logger.warning("Inbox line %d: invalid JSON (%s), skipping", line_num, e)
# Truncate(清空不删除)
try:
with open(self.inbox_path, "w", encoding="utf-8") as f:
f.truncate(0)
except OSError as e:
logger.error("Failed to truncate inbox: %s", e)
if bad_lines > 0:
logger.warning("Inbox: %d bad lines skipped", bad_lines)
self._total_consumed += len(events)
if events:
logger.debug("Inbox consumed %d events", len(events))
logger.warning("Inbox line %d: expected dict, got %s",
line_no, type(event).__name__)
except json.JSONDecodeError:
logger.warning("Inbox line %d: invalid JSON, skipping", line_no)
self._total_errors += 1
return events
def _truncate(self) -> None:
"""清空文件内容(不删除文件)"""
try:
with open(self.inbox_path, "w", encoding="utf-8") as f:
f.truncate(0)
except OSError:
logger.exception("Failed to truncate inbox: %s", self.inbox_path)
# ------------------------------------------------------------------
# 写入辅助Agent 使用)
# 写入(Agent / 测试使用)
# ------------------------------------------------------------------
def write_event(self, event: Dict[str, Any]) -> None:
"""追加一条事件到 JSONL 文件
@staticmethod
def write_event(inbox_path: Path, event: Dict[str, Any]) -> None:
"""原子写入一个事件到 JSONL 文件
Agent 端调用此方法写入事件。
使用 O_APPEND 模式确保并发写入安全。
使用 "追加" 模式写入,多个 Agent 并发写安全(OS 保证 append 原子性
对于小行 < PIPE_BUF,通常 512-4096 字节)
"""
self.inbox_path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, ensure_ascii=False) + "\n"
# O_APPEND 保证并发写入时各行不交错
with open(self.inbox_path, "a", encoding="utf-8") as f:
f.write(line)
@classmethod
def write_event_to(cls, inbox_path: Path, event: Dict[str, Any]) -> None:
"""静态方法:写入事件到指定路径"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(event, ensure_ascii=False) + "\n"
with open(inbox_path, "a", encoding="utf-8") as f:
f.write(line)
@staticmethod
def write_events(inbox_path: Path, events: List[Dict[str, Any]]) -> None:
"""批量写入事件"""
inbox_path.parent.mkdir(parents=True, exist_ok=True)
with open(inbox_path, "a", encoding="utf-8") as f:
for event in events:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
# ---------------------------------------------------------------------------
# 事件处理回调 — 将 Inbox 事件翻译为黑板操作
# ---------------------------------------------------------------------------
async def default_inbox_callback(
event: Dict[str, Any],
registry_root: Path,
initialized_dbs: set,
) -> None:
"""默认回调:根据事件类型更新黑板"""
from src.blackboard.db import init_db, get_connection
event_type = event.get("type", "")
project_id = event.get("project_id", "")
task_id = event.get("task_id")
agent = event.get("agent", "unknown")
if not project_id:
logger.warning("Inbox event missing project_id: %s", event_type)
return
db_path = registry_root / project_id / "blackboard.db"
db_key = str(db_path)
# 确保 DB 已初始化
if db_key not in initialized_dbs:
init_db(db_path)
initialized_dbs.add(db_key)
if event_type == "agent_output":
# Agent 写入产出 → 写 outputs 表 + 事件
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO outputs (task_id, agent, output_type, title, summary, metadata) "
"VALUES (?,?,?,?,?,?)",
(task_id, agent,
event.get("output_type", "other"),
event.get("title", "output"),
event.get("summary"),
json.dumps(event.get("metadata", {}))),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, "output_written",
json.dumps({"type": event.get("output_type")})),
)
conn.commit()
finally:
conn.close()
elif event_type == "agent_status":
# Agent 状态变更 → 更新 tasks 表
new_status = event.get("status")
if new_status and task_id:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row:
old = row["status"]
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(new_status, task_id),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, f"task_{new_status}",
json.dumps({"from": old, "to": new_status})),
)
conn.commit()
finally:
conn.close()
elif event_type == "agent_claim":
# Agent 认领任务
if task_id:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
cursor = conn.execute(
"UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now'), "
"updated_at=datetime('now') WHERE id=? AND status='pending'",
(agent, task_id),
)
if cursor.rowcount > 0:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, "task_claimed",
json.dumps({"by": agent})),
)
conn.commit()
finally:
conn.close()
elif event_type == "agent_heartbeat":
# Agent 心跳 → 更新 agents 表
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT OR REPLACE INTO agents (agent_id, role, current_status, last_active) "
"VALUES (?,?,?,datetime('now'))",
(agent, event.get("role", ""), event.get("agent_status", "busy")),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, "daemon_tick",
json.dumps({"heartbeat": True})),
)
conn.commit()
finally:
conn.close()
else:
logger.debug("Unhandled inbox event type: %s", event_type)