diff --git a/docs/design/architecture-v2.6.md b/docs/design/architecture-v2.6.md index ec56607..22d9f05 100644 --- a/docs/design/architecture-v2.6.md +++ b/docs/design/architecture-v2.6.md @@ -542,120 +542,151 @@ Daemon **不做**: 这个区分决定了 spawn 时的消息内容--L2 传数据,L3 传指针。 -### 4.2 双层事件架构(课题 2 设计决策) +### 4.2 事件驱动架构(课题 2 设计决策) -#### D2-1:架构总览 +#### 设计推导过程 -> **设计推导**:v2.6 原设计为 60s polling tick,但 @mention 响应延迟、依赖解锁延迟、用户操作无法即时响应等痛点要求事件驱动。open-multi-agent 证明纯 EventEmitter 零基础设施即可实现,Network-AI 证明 file-backed 信号可实现跨进程通知。用户确认"事件驱动这块需要设计完一起实施"。 +**三个参考系统的做法**: -**双层架构**: +| 系统 | 架构 | 事件通知方式 | 启示 | +|------|------|------------|------| +| **open-multi-agent** | 单进程 TypeScript | 纯 EventEmitter——`queue.on('task:ready', handler)`。TaskQueue 内部维护 listeners Map,complete() 时同步触发 emit。零基础设施 | 和我们的黑板同构:TaskQueue.complete() = 我们的任务完成,unblockDependents() = 我们的依赖解锁 | +| **agent-chorus** | 多进程(Claude/Codex/Gemini 各自独立) | 本地 JSONL 文件队列——`chorus send` 写入 `.agent-chorus/messages/.jsonl`,`chorus messages --agent --clear` 读并清空。纯文件系统,无网络 | Standup+Conclude 模式:Agent 开始时读 inbox,结束时广播状态。JSONL inbox 做跨进程通信 | +| **Edict** | 分布式(API Gateway + Orchestrator + Agent Pool) | Redis Streams Event Bus——`task.created` 等主题 + WebSocket 推送 Dashboard | 我们是单机单进程,不需要 Redis | + +**推导结论**: +1. open-multi-agent 证明:单进程内 EventEmitter 完全够用,但它是单进程,我们是跨进程 +2. agent-chorus 证明:跨进程通信用 JSONL 文件就行,不需要 HTTP/Redis/MQ +3. Edict 的 Redis Streams 是分布式场景所需,我们不需要 +4. **真正需要即时响应的场景只有 4 个**:task_completed / task_failed / @mention / user_action。其他 ≤30s 延迟完全可接受 +5. **60s Tick 本身不是问题,问题是 Tick 的效率**——应该 Tick 是核心,加速器可选 + +**用户反馈与设计迭代**: +- 初始设计:Signal File 跨进程通知 → 用户质疑"Signal File 存在的意义是什么" +- 第二版:HTTP 端点 → 用户要求"基于优秀实践推导,不是拍脑袋换方案" +- 最终版:Tick 核心 + Inbox JSONL 加速(agent-chorus 模式)——基于三个参考系统的实际代码推导 + +#### D2-1:Tick 核心 + Inbox 加速(最终方案) ``` ┌──────────────────────────────────────────────────────────────────┐ │ Daemon (Python asyncio) │ │ │ -│ Layer 1: EventBus(asyncio.Queue) │ +│ 核心:Tick Loop(30s 主循环) │ │ ┌─────────────────────────────────────────────────────────────┐ │ -│ │ 进程内事件总线,~0ms 延迟 │ │ -│ │ • task_completed → 解锁下游依赖 + spawn 对应 Agent │ │ -│ │ • task_failed → 触发 retry 链(spawn 庞统) │ │ -│ │ • comment_added → @mention 检测 → spawn 被提及者 │ │ -│ │ • user_action → 即时响应 │ │ -│ │ • task_ready → 依赖满足 → spawn 对应 Agent │ │ +│ │ 读黑板全量状态(SQLite 查询) │ │ +│ │ 发现需要处理的(mention / blocked / done → pending) │ │ +│ │ 执行对应操作(spawn / 通知 / 清理) │ │ +│ │ 健康检查(zombie / stale / heartbeat) │ │ +│ │ │ │ +│ │ 设计推导:Hermes 60s tick 证明 polling 可靠稳定。 │ │ +│ │ 我们从 60s 降到 30s,因为黑板查询比 Hermes 轻量。 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ ↑ │ -│ Layer 2: Signal File Watcher(~500ms) │ +│ 加速:Inbox JSONL(agent-chorus 模式) │ │ ┌─────────────────────────────────────────────────────────────┐ │ -│ │ Agent(外部进程)写 SQLite 后,同时写 signal file │ │ -│ │ Daemon 每 500ms 扫描 signal 目录 │ │ -│ │ 读取信号 → emit 到 EventBus → 即时处理 │ │ +│ │ Agent 写黑板后,可选:追加一行 JSON 到 daemon inbox │ │ +│ │ Daemon 主循环每 1s 检查 inbox 是否有新内容 │ │ +│ │ 有新内容 → 立即执行一次 mini-tick(只处理触发的事件) │ │ +│ │ 处理完清空 inbox │ │ +│ │ │ │ +│ │ 设计推导:agent-chorus 用 JSONL inbox 做跨 Agent 通信, │ │ +│ │ 我们用 JSONL inbox 做 Agent→Daemon 通知。同理同构。 │ │ +│ │ inbox 是加速器,不是核心。Tick 兜底所有场景。 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ ↑ │ -│ Layer 3: Tick(30s 简化版,兜底 + 健康检查) │ +│ 恢复:启动时全量扫描 │ │ ┌─────────────────────────────────────────────────────────────┐ │ -│ │ • 僵尸检测、stale 任务回收 │ │ -│ │ • Signal 遗漏先底(万一 signal file 处理失败) │ │ -│ │ • 低优先级事件批量处理(task_created, output_written) │ │ +│ │ Daemon 重启后立即做一次完整 Tick(PM2 自动重启) │ │ +│ │ 消除重启后的 30s 空窗 │ │ +│ │ 不需要 EventBus 持久化——黑板(SQLite)是唯一真相源 │ │ │ └─────────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────┘ ``` -**不选的替代方案**: -- SQLite update_hook:~0ms 但需要 C API 绑定,且只能在 SQLite 操作时触发 -- Redis pub/sub:引入新依赖,v2.6 已去掉 Redis -- fswatch/watchdog:跨平台兼容性差,signal file 更简单 +**为什么不选的替代方案**: +- EventBus + Signal File(初始设计):Signal File 需要额外的扫描/读/删循环,增加了耦合链 +- HTTP 端点(第二版):引入网络依赖,Daemon 需要跑 HTTP 服务,不够简单 +- Redis pub/sub(Edict 方案):引入新依赖,v2.6 已去掉 Redis;我们不需要分布式 +- SQLite update-hook:需要 C API 绑定 +- fswatch/watchdog:跨平台兼容性差 -#### D2-2:事件类型与优先级 - -| 事件类型 | 触发方式 | 延迟要求 | 处理方式 | -|---------|---------|---------|----------| -| `task_completed` | Signal File + EventBus | ~0ms | EventBus 即时:解锁下游依赖 | -| `task_failed` | Signal File + EventBus | ~0ms | EventBus 即时:触发 retry 链 | -| `comment_added` | Signal File + EventBus | ~0ms | EventBus 即时:@mention 检测 → spawn | -| `user_action` | 直接 API 调用 | ~0ms | EventBus 即时处理 | -| `task_ready` | EventBus(内部事件) | ~0ms | EventBus 即时:spawn 对应 Agent | -| `task_created` | Signal File | ≤30s | Tick 批量处理 | -| `task_claimed` | EventBus(Daemon 内部) | ≤30s | Tick 批量处理 | -| `output_written` | Signal File | ≤30s | Tick 批量处理 | - -**关键洞察**:真正需要即时响应的场景只有 4 个(task_completed / task_failed / @mention / user_action),其他 60s 延迟完全可接受。 - -#### D2-3:依赖声明的并行/串行自动决策 - -> **设计推导**:open-multi-agent 的核心模式--complete→auto-unlock,纯依赖声明驱动。不需要额外的冲突检测或 AI 判断并行性。 - -**串行触发链**(事件驱动版): -``` -Agent A 完成 task-001 - → 写黑板 outputs + 更新 status → done - → 写 signal file: task_completed - → Daemon EventBus 即时处理: - 查询所有 depends_on 包含 task-001 的 pending 任务 - → task-002 depends_on: [task-001],检查 task-001 done ✅ - → 触发 task_ready 事件 - → spawn Agent B 执行 task-002 -``` - -**并行**:`depends_on` 为空且 assignee 不同的任务,自然并行(Daemon 分别 spawn)。不需要额外逻辑。 - -**不做 files_modified 冲突检测**(D2-4):Agent 通过黑板评论自然协调("我在改 main.py,你别碰"),不需要系统强制。Scope Guard(课题 1)作为兜底。 - -#### D2-5:Signal File 规范 +**Inbox JSONL 具体实现**(参考 agent-chorus `chorus send` 模式): ```python -# Agent 操作黑板后写 signal file(CLI 自动完成) -SIGNAL_DIR = Path("~/.sanguo_projects/sanguo_moziplus_v2/signals") +# blackboard.py 写完 SQLite 后,可选追加一行 JSON +INBOX_PATH = Path("~/.sanguo_projects/sanguo_moziplus_v2/inbox/daemon.jsonl") -# Signal file 格式: {event_type}.signal -# 内容: {task_id}:{agent}:{timestamp} -# 示例: task_completed.signal → "task-001:zhangfei-dev:1715750400" +# 写入格式(参考 agent-chorus message schema: from/to/timestamp/content/cwd) +async def notify_daemon(event_type: str, task_id: str, agent: str): + line = json.dumps({ + "from": agent, + "to": "daemon", + "timestamp": datetime.now(timezone.utc).isoformat(), + "event": event_type, # comment_added / task_completed / task_failed + "task_id": task_id, + }) + async with aiofiles.open(INBOX_PATH, mode='a') as f: + await f.write(line + '\n') -# Daemon Watcher: 每 500ms 扫描 SIGNAL_DIR -async def watch_signals(): - while True: - for signal_file in SIGNAL_DIR.glob("*.signal"): - payload = signal_file.read_text() - parts = payload.split(":") - event_type = signal_file.stem - await event_bus.emit(Event( - type=EventType(event_type), - task_id=parts[0] or None, - agent=parts[1] or None, - )) - signal_file.unlink() # 处理完删除 - await asyncio.sleep(0.5) +# Daemon 主循环中检查 inbox +async def check_inbox(): + if not INBOX_PATH.exists(): + return + lines = INBOX_PATH.read_text().strip().split('\n') + INBOX_PATH.unlink() # 原子读取+删除(参考 agent-chorus --clear 模式) + for line in lines: + msg = json.loads(line) + await handle_event(msg['event'], msg['task_id'], msg['from']) ``` -**关键**:Signal file 写入由 `blackboard.py` CLI 自动完成,Agent 无需额外操作。任何 `blackboard.py` 的写操作(comment/output/claim/status update)都会同步写 signal file。 +**Daemon 主循环**: + +```python +async def daemon_main_loop(): + # 启动时全量扫描 + await full_tick() + + while True: + # 1. 检查 inbox(每 1s) + await check_inbox() # 有内容则立即执行 mini-tick + + # 2. 定期 Tick(每 30s) + if time.time() - last_tick > 30: + await full_tick() + last_tick = time.time() + + await asyncio.sleep(1) +``` + +#### D2-2:依赖声明的并行/串行自动决策 + +> **设计推导**:open-multi-agent 的 TaskQueue.complete() → unblockDependents() 是核心模式——complete→auto-unlock,纯依赖声明驱动。其 scheduler.ts 还提供了 4 种调度策略(round-robin / least-busy / capability-match / dependency-first)。 + +**串行触发链**(Tick + Inbox 加速版): +``` +Agent A 完成 task-001 + → 写黑板 outputs + 更新 status → done + 写 handoff comment + → 通知 Daemon(inbox JSONL) + → Daemon 下次循环(~1s 内)收到通知 → mini-tick: + 查询所有 depends_on 包含 task-001 的 pending 任务 + → task-002 depends_on: [task-001],检查 task-001 done ✅ + → spawn Agent B 执行 task-002 + (如果 inbox 通知丢失 → 30s Tick 兜底补上) +``` + +**并行**:`depends_on` 为空且 assignee 不同的任务,自然并行(Daemon 分别 spawn)。不需要额外逻辑。 + +**不做 files_modified 冲突检测**(D2-4):Agent 通过黑板评论自然协调("我在改 main.py,你别碰"),不需要系统强制。Scope Guard(课题 1)作为兜底。实际覆盖:depends_on 覆盖 80%+ 的显式依赖场景,边角场景通过黑板评论 + 庞统仲裁补充。 #### 与课题 1 的兼容性 | 课题 1 设计 | 事件驱动后变化 | 改善 | |-----------|--------------|------| -| 续杯机制 | task_completed 事件即时触发依赖解锁 | @mention 从 ≤60s 降到 ≤1s | -| retry 由 AI 决策 | task_failed 事件即时触发 retry 链 | 庞统更快介入 | -| Guardrail 吹哨人 | observation 写入触发 signal file | Daemon 即时感知问题 | -| 三层执行模型 | 不变,事件处理仍按 L1/L2/L3 分层 | ✅ 一致 | +| 续杯机制 | task_completed 通知加速依赖解锁 | @mention 从 ≤60s 降到 ≤1s | +| retry 由 AI 决策 | task_failed 通知加速 retry 链 | 庞统更快介入 | +| Guardrail 吹哨人 | observation 写入后通知 Daemon | Daemon 即时感知问题 | +| 三层执行模型 | 不变,Tick/inbox 处理仍按 L1/L2/L3 分层 | ✅ 一致 | ### 4.3 Session 生命周期