auto-sync: 2026-05-15 13:37:35
This commit is contained in:
@@ -542,120 +542,151 @@ Daemon **不做**:
|
||||
|
||||
这个区分决定了 spawn 时的消息内容--L2 传数据,L3 传指针。
|
||||
|
||||
### 4.2 双层事件架构(课题 2 设计决策)
|
||||
### 4.2 事件驱动架构(课题 2 设计决策)
|
||||
|
||||
#### D2-1:架构总览
|
||||
#### 设计推导过程
|
||||
|
||||
> **设计推导**:v2.6 原设计为 60s polling tick,但 @mention 响应延迟、依赖解锁延迟、用户操作无法即时响应等痛点要求事件驱动。open-multi-agent 证明纯 EventEmitter 零基础设施即可实现,Network-AI 证明 file-backed 信号可实现跨进程通知。用户确认"事件驱动这块需要设计完一起实施"。
|
||||
**三个参考系统的做法**:
|
||||
|
||||
**双层架构**:
|
||||
| 系统 | 架构 | 事件通知方式 | 启示 |
|
||||
|------|------|------------|------|
|
||||
| **open-multi-agent** | 单进程 TypeScript | 纯 EventEmitter——`queue.on('task:ready', handler)`。TaskQueue 内部维护 listeners Map,complete() 时同步触发 emit。零基础设施 | 和我们的黑板同构:TaskQueue.complete() = 我们的任务完成,unblockDependents() = 我们的依赖解锁 |
|
||||
| **agent-chorus** | 多进程(Claude/Codex/Gemini 各自独立) | 本地 JSONL 文件队列——`chorus send` 写入 `.agent-chorus/messages/<target>.jsonl`,`chorus messages --agent <self> --clear` 读并清空。纯文件系统,无网络 | Standup+Conclude 模式:Agent 开始时读 inbox,结束时广播状态。JSONL inbox 做跨进程通信 |
|
||||
| **Edict** | 分布式(API Gateway + Orchestrator + Agent Pool) | Redis Streams Event Bus——`task.created` 等主题 + WebSocket 推送 Dashboard | 我们是单机单进程,不需要 Redis |
|
||||
|
||||
**推导结论**:
|
||||
1. open-multi-agent 证明:单进程内 EventEmitter 完全够用,但它是单进程,我们是跨进程
|
||||
2. agent-chorus 证明:跨进程通信用 JSONL 文件就行,不需要 HTTP/Redis/MQ
|
||||
3. Edict 的 Redis Streams 是分布式场景所需,我们不需要
|
||||
4. **真正需要即时响应的场景只有 4 个**:task_completed / task_failed / @mention / user_action。其他 ≤30s 延迟完全可接受
|
||||
5. **60s Tick 本身不是问题,问题是 Tick 的效率**——应该 Tick 是核心,加速器可选
|
||||
|
||||
**用户反馈与设计迭代**:
|
||||
- 初始设计:Signal File 跨进程通知 → 用户质疑"Signal File 存在的意义是什么"
|
||||
- 第二版:HTTP 端点 → 用户要求"基于优秀实践推导,不是拍脑袋换方案"
|
||||
- 最终版:Tick 核心 + Inbox JSONL 加速(agent-chorus 模式)——基于三个参考系统的实际代码推导
|
||||
|
||||
#### D2-1:Tick 核心 + Inbox 加速(最终方案)
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────────────────────────────┐
|
||||
│ Daemon (Python asyncio) │
|
||||
│ │
|
||||
│ Layer 1: EventBus(asyncio.Queue) │
|
||||
│ 核心:Tick Loop(30s 主循环) │
|
||||
│ ┌─────────────────────────────────────────────────────────────┐ │
|
||||
│ │ 进程内事件总线,~0ms 延迟 │ │
|
||||
│ │ • task_completed → 解锁下游依赖 + spawn 对应 Agent │ │
|
||||
│ │ • task_failed → 触发 retry 链(spawn 庞统) │ │
|
||||
│ │ • comment_added → @mention 检测 → spawn 被提及者 │ │
|
||||
│ │ • user_action → 即时响应 │ │
|
||||
│ │ • task_ready → 依赖满足 → spawn 对应 Agent │ │
|
||||
│ │ 读黑板全量状态(SQLite 查询) │ │
|
||||
│ │ 发现需要处理的(mention / blocked / done → pending) │ │
|
||||
│ │ 执行对应操作(spawn / 通知 / 清理) │ │
|
||||
│ │ 健康检查(zombie / stale / heartbeat) │ │
|
||||
│ │ │ │
|
||||
│ │ 设计推导:Hermes 60s tick 证明 polling 可靠稳定。 │ │
|
||||
│ │ 我们从 60s 降到 30s,因为黑板查询比 Hermes 轻量。 │ │
|
||||
│ └─────────────────────────────────────────────────────────────┘ │
|
||||
│ ↑ │
|
||||
│ Layer 2: Signal File Watcher(~500ms) │
|
||||
│ 加速:Inbox JSONL(agent-chorus 模式) │
|
||||
│ ┌─────────────────────────────────────────────────────────────┐ │
|
||||
│ │ Agent(外部进程)写 SQLite 后,同时写 signal file │ │
|
||||
│ │ Daemon 每 500ms 扫描 signal 目录 │ │
|
||||
│ │ 读取信号 → emit 到 EventBus → 即时处理 │ │
|
||||
│ │ Agent 写黑板后,可选:追加一行 JSON 到 daemon inbox │ │
|
||||
│ │ Daemon 主循环每 1s 检查 inbox 是否有新内容 │ │
|
||||
│ │ 有新内容 → 立即执行一次 mini-tick(只处理触发的事件) │ │
|
||||
│ │ 处理完清空 inbox │ │
|
||||
│ │ │ │
|
||||
│ │ 设计推导:agent-chorus 用 JSONL inbox 做跨 Agent 通信, │ │
|
||||
│ │ 我们用 JSONL inbox 做 Agent→Daemon 通知。同理同构。 │ │
|
||||
│ │ inbox 是加速器,不是核心。Tick 兜底所有场景。 │ │
|
||||
│ └─────────────────────────────────────────────────────────────┘ │
|
||||
│ ↑ │
|
||||
│ Layer 3: Tick(30s 简化版,兜底 + 健康检查) │
|
||||
│ 恢复:启动时全量扫描 │
|
||||
│ ┌─────────────────────────────────────────────────────────────┐ │
|
||||
│ │ • 僵尸检测、stale 任务回收 │ │
|
||||
│ │ • Signal 遗漏先底(万一 signal file 处理失败) │ │
|
||||
│ │ • 低优先级事件批量处理(task_created, output_written) │ │
|
||||
│ │ Daemon 重启后立即做一次完整 Tick(PM2 自动重启) │ │
|
||||
│ │ 消除重启后的 30s 空窗 │ │
|
||||
│ │ 不需要 EventBus 持久化——黑板(SQLite)是唯一真相源 │ │
|
||||
│ └─────────────────────────────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**不选的替代方案**:
|
||||
- SQLite update_hook:~0ms 但需要 C API 绑定,且只能在 SQLite 操作时触发
|
||||
- Redis pub/sub:引入新依赖,v2.6 已去掉 Redis
|
||||
- fswatch/watchdog:跨平台兼容性差,signal file 更简单
|
||||
**为什么不选的替代方案**:
|
||||
- EventBus + Signal File(初始设计):Signal File 需要额外的扫描/读/删循环,增加了耦合链
|
||||
- HTTP 端点(第二版):引入网络依赖,Daemon 需要跑 HTTP 服务,不够简单
|
||||
- Redis pub/sub(Edict 方案):引入新依赖,v2.6 已去掉 Redis;我们不需要分布式
|
||||
- SQLite update-hook:需要 C API 绑定
|
||||
- fswatch/watchdog:跨平台兼容性差
|
||||
|
||||
#### D2-2:事件类型与优先级
|
||||
|
||||
| 事件类型 | 触发方式 | 延迟要求 | 处理方式 |
|
||||
|---------|---------|---------|----------|
|
||||
| `task_completed` | Signal File + EventBus | ~0ms | EventBus 即时:解锁下游依赖 |
|
||||
| `task_failed` | Signal File + EventBus | ~0ms | EventBus 即时:触发 retry 链 |
|
||||
| `comment_added` | Signal File + EventBus | ~0ms | EventBus 即时:@mention 检测 → spawn |
|
||||
| `user_action` | 直接 API 调用 | ~0ms | EventBus 即时处理 |
|
||||
| `task_ready` | EventBus(内部事件) | ~0ms | EventBus 即时:spawn 对应 Agent |
|
||||
| `task_created` | Signal File | ≤30s | Tick 批量处理 |
|
||||
| `task_claimed` | EventBus(Daemon 内部) | ≤30s | Tick 批量处理 |
|
||||
| `output_written` | Signal File | ≤30s | Tick 批量处理 |
|
||||
|
||||
**关键洞察**:真正需要即时响应的场景只有 4 个(task_completed / task_failed / @mention / user_action),其他 60s 延迟完全可接受。
|
||||
|
||||
#### D2-3:依赖声明的并行/串行自动决策
|
||||
|
||||
> **设计推导**:open-multi-agent 的核心模式--complete→auto-unlock,纯依赖声明驱动。不需要额外的冲突检测或 AI 判断并行性。
|
||||
|
||||
**串行触发链**(事件驱动版):
|
||||
```
|
||||
Agent A 完成 task-001
|
||||
→ 写黑板 outputs + 更新 status → done
|
||||
→ 写 signal file: task_completed
|
||||
→ Daemon EventBus 即时处理:
|
||||
查询所有 depends_on 包含 task-001 的 pending 任务
|
||||
→ task-002 depends_on: [task-001],检查 task-001 done ✅
|
||||
→ 触发 task_ready 事件
|
||||
→ spawn Agent B 执行 task-002
|
||||
```
|
||||
|
||||
**并行**:`depends_on` 为空且 assignee 不同的任务,自然并行(Daemon 分别 spawn)。不需要额外逻辑。
|
||||
|
||||
**不做 files_modified 冲突检测**(D2-4):Agent 通过黑板评论自然协调("我在改 main.py,你别碰"),不需要系统强制。Scope Guard(课题 1)作为兜底。
|
||||
|
||||
#### D2-5:Signal File 规范
|
||||
**Inbox JSONL 具体实现**(参考 agent-chorus `chorus send` 模式):
|
||||
|
||||
```python
|
||||
# Agent 操作黑板后写 signal file(CLI 自动完成)
|
||||
SIGNAL_DIR = Path("~/.sanguo_projects/sanguo_moziplus_v2/signals")
|
||||
# blackboard.py 写完 SQLite 后,可选追加一行 JSON
|
||||
INBOX_PATH = Path("~/.sanguo_projects/sanguo_moziplus_v2/inbox/daemon.jsonl")
|
||||
|
||||
# Signal file 格式: {event_type}.signal
|
||||
# 内容: {task_id}:{agent}:{timestamp}
|
||||
# 示例: task_completed.signal → "task-001:zhangfei-dev:1715750400"
|
||||
# 写入格式(参考 agent-chorus message schema: from/to/timestamp/content/cwd)
|
||||
async def notify_daemon(event_type: str, task_id: str, agent: str):
|
||||
line = json.dumps({
|
||||
"from": agent,
|
||||
"to": "daemon",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event": event_type, # comment_added / task_completed / task_failed
|
||||
"task_id": task_id,
|
||||
})
|
||||
async with aiofiles.open(INBOX_PATH, mode='a') as f:
|
||||
await f.write(line + '\n')
|
||||
|
||||
# Daemon Watcher: 每 500ms 扫描 SIGNAL_DIR
|
||||
async def watch_signals():
|
||||
while True:
|
||||
for signal_file in SIGNAL_DIR.glob("*.signal"):
|
||||
payload = signal_file.read_text()
|
||||
parts = payload.split(":")
|
||||
event_type = signal_file.stem
|
||||
await event_bus.emit(Event(
|
||||
type=EventType(event_type),
|
||||
task_id=parts[0] or None,
|
||||
agent=parts[1] or None,
|
||||
))
|
||||
signal_file.unlink() # 处理完删除
|
||||
await asyncio.sleep(0.5)
|
||||
# Daemon 主循环中检查 inbox
|
||||
async def check_inbox():
|
||||
if not INBOX_PATH.exists():
|
||||
return
|
||||
lines = INBOX_PATH.read_text().strip().split('\n')
|
||||
INBOX_PATH.unlink() # 原子读取+删除(参考 agent-chorus --clear 模式)
|
||||
for line in lines:
|
||||
msg = json.loads(line)
|
||||
await handle_event(msg['event'], msg['task_id'], msg['from'])
|
||||
```
|
||||
|
||||
**关键**:Signal file 写入由 `blackboard.py` CLI 自动完成,Agent 无需额外操作。任何 `blackboard.py` 的写操作(comment/output/claim/status update)都会同步写 signal file。
|
||||
**Daemon 主循环**:
|
||||
|
||||
```python
|
||||
async def daemon_main_loop():
|
||||
# 启动时全量扫描
|
||||
await full_tick()
|
||||
|
||||
while True:
|
||||
# 1. 检查 inbox(每 1s)
|
||||
await check_inbox() # 有内容则立即执行 mini-tick
|
||||
|
||||
# 2. 定期 Tick(每 30s)
|
||||
if time.time() - last_tick > 30:
|
||||
await full_tick()
|
||||
last_tick = time.time()
|
||||
|
||||
await asyncio.sleep(1)
|
||||
```
|
||||
|
||||
#### D2-2:依赖声明的并行/串行自动决策
|
||||
|
||||
> **设计推导**:open-multi-agent 的 TaskQueue.complete() → unblockDependents() 是核心模式——complete→auto-unlock,纯依赖声明驱动。其 scheduler.ts 还提供了 4 种调度策略(round-robin / least-busy / capability-match / dependency-first)。
|
||||
|
||||
**串行触发链**(Tick + Inbox 加速版):
|
||||
```
|
||||
Agent A 完成 task-001
|
||||
→ 写黑板 outputs + 更新 status → done + 写 handoff comment
|
||||
→ 通知 Daemon(inbox JSONL)
|
||||
→ Daemon 下次循环(~1s 内)收到通知 → mini-tick:
|
||||
查询所有 depends_on 包含 task-001 的 pending 任务
|
||||
→ task-002 depends_on: [task-001],检查 task-001 done ✅
|
||||
→ spawn Agent B 执行 task-002
|
||||
(如果 inbox 通知丢失 → 30s Tick 兜底补上)
|
||||
```
|
||||
|
||||
**并行**:`depends_on` 为空且 assignee 不同的任务,自然并行(Daemon 分别 spawn)。不需要额外逻辑。
|
||||
|
||||
**不做 files_modified 冲突检测**(D2-4):Agent 通过黑板评论自然协调("我在改 main.py,你别碰"),不需要系统强制。Scope Guard(课题 1)作为兜底。实际覆盖:depends_on 覆盖 80%+ 的显式依赖场景,边角场景通过黑板评论 + 庞统仲裁补充。
|
||||
|
||||
#### 与课题 1 的兼容性
|
||||
|
||||
| 课题 1 设计 | 事件驱动后变化 | 改善 |
|
||||
|-----------|--------------|------|
|
||||
| 续杯机制 | task_completed 事件即时触发依赖解锁 | @mention 从 ≤60s 降到 ≤1s |
|
||||
| retry 由 AI 决策 | task_failed 事件即时触发 retry 链 | 庞统更快介入 |
|
||||
| Guardrail 吹哨人 | observation 写入触发 signal file | Daemon 即时感知问题 |
|
||||
| 三层执行模型 | 不变,事件处理仍按 L1/L2/L3 分层 | ✅ 一致 |
|
||||
| 续杯机制 | task_completed 通知加速依赖解锁 | @mention 从 ≤60s 降到 ≤1s |
|
||||
| retry 由 AI 决策 | task_failed 通知加速 retry 链 | 庞统更快介入 |
|
||||
| Guardrail 吹哨人 | observation 写入后通知 Daemon | Daemon 即时感知问题 |
|
||||
| 三层执行模型 | 不变,Tick/inbox 处理仍按 L1/L2/L3 分层 | ✅ 一致 |
|
||||
|
||||
### 4.3 Session 生命周期
|
||||
|
||||
|
||||
Reference in New Issue
Block a user