From c8d7d8a4a6bd9e93b5a97d94f7f8d844dbfa3c06 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Fri, 15 May 2026 02:05:48 +0800 Subject: [PATCH] auto-sync: 2026-05-15 02:05:48 --- docs/design/architecture-v2.6.md | 895 +++++++++++++++++++++++++++ docs/design/deployment-v2.6.md | 212 +++++++ docs/design/technical-design-v2.6.md | 714 +++++++++++++++++++++ 3 files changed, 1821 insertions(+) create mode 100644 docs/design/architecture-v2.6.md create mode 100644 docs/design/deployment-v2.6.md create mode 100644 docs/design/technical-design-v2.6.md diff --git a/docs/design/architecture-v2.6.md b/docs/design/architecture-v2.6.md new file mode 100644 index 0000000..94149fa --- /dev/null +++ b/docs/design/architecture-v2.6.md @@ -0,0 +1,895 @@ +# AI原生DevOps Platform 架构设计 v2.6 + +**版本**: v2.6(Shared Workspace + Blackboard 架构) +**基于**: architecture-v2.md + v2.0 AI Native 调研 + 技术验证 +**作者**: 庞统(副军师) +**日期**: 2026-05-15 + +--- + +## 变更历史 + +| 版本 | 日期 | 变更内容 | +|------|------|---------| +| v2.0 | 2026-05-04 | 初始版本:SQLite 4表 + 状态机 + DAG 引擎 | +| v2.6 | 2026-05-15 | **架构重构**:Shared Workspace(Blackboard)取代 DAG 引擎为编排核心 | +| v2.6.1 | 2026-05-15 | 司马懿评审反馈 + Mail 退役决策 + 质量门控 + 决策记录 + 工程修正 | + +--- + +## 1. v2.6 核心变革:从 DAG 状态机到 Shared Workspace + +### 1.1 为什么变? + +v2.0 的核心是 **DAG 引擎 + 状态机 + 邮件通信**,本质是给 AI 团队做了一套 ERP: +- 编排是确定性状态机(固定流程) +- 交互是点按钮(Dashboard) +- Agent 间靠邮件异步通信(信息分散在 mail 目录) +- 人的参与密度不变(全程驾驶) + +v2.6 的核心是 **Shared Workspace(Blackboard)+ Agent 自主决策 + Daemon 投递**: +- 编排是 AI agent 在黑板上自主领活(动态协作) +- 交互是自然语言对话 +- Agent 间通过黑板共享一切(信息集中在任务空间) +- 人只做方向决策和验收 + +### 1.2 核心原则 + +> **黑板是唯一真相源,所有 agent 读它、想、行动,写回结果。Daemon 是投递员,不是决策者。** + +1. **Agent 决策,Daemon 执行** — 庞统做 plan、张飞领任务、关羽发现风险,都写在黑板上。Daemon 读黑板,执行 spawn/通知。 +2. **产出在黑板,不在邮件** — 所有任务产出、讨论、观察都在任务的黑板空间里,Sanguo Mail 不介入任务协作。 +3. **Daemon 不阻塞 Agent** — Daemon 是常驻管家,定期 tick 检查黑板,spawn agent 执行,不占用任何 agent 的主 session。 +4. **Session 用完即清** — Agent 通过 `openclaw agent --agent --session-id ` spawn 隔离 session,执行完 daemon 存档 jsonl 并清理 sessions.json。 + +--- + +## 2. 架构总览 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 用户 / 触发器 │ +│ (Web / CLI / Cron) │ +└──────────────────────────┬──────────────────────────────────┘ + │ 写入黑板或触发 daemon +┌──────────────────────────▼──────────────────────────────────┐ +│ Shared Workspace(黑板) │ +│ │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ SQLite (blackboard.db) │ │ +│ │ tasks / comments / outputs / agents / events │ │ +│ │ 原子读写(propose→validate→commit 或 SQLite 事务) │ │ +│ └────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ 任务列表 │ │ 评论线程 │ │ 产出空间 │ │ 讨论区域 │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ +└──────────────────────────┬──────────────────────────────────┘ + │ daemon tick 读写 +┌──────────────────────────▼──────────────────────────────────┐ +│ Daemon(管家) │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ Tick 循环 │ │ Session 管理 │ │ 健康检查 │ │ +│ │ (60s 轮询) │ │ spawn/archive │ │ zombie/reclaim │ │ +│ │ 读黑板→决策 │ │ /cleanup │ │ /stale 任务 │ │ +│ └──────┬───────┘ └──────┬───────┘ └──────────────────┘ │ +│ │ │ │ +│ Daemon 只做三件事: │ │ +│ 1. 读黑板,发现需要介入的 │ │ +│ 2. Spawn 对应 agent │ │ +│ 3. 清理完成的 session │ │ +└──────────────────────────┬──────────────────────────────────┘ + │ openclaw agent --agent --session-id + │ 执行完 → 存档 jsonl → 清理 sessions.json +┌──────────────────────────▼──────────────────────────────────┐ +│ Agent 层(将军们) │ +│ │ +│ Agent 不常驻。被 spawn 时: │ +│ 1. 读黑板 → 了解全局状态 │ +│ 2. 想和做 → 根据职责自主决策 │ +│ 3. 写回黑板 → 产出、评论、领任务 │ +│ 4. 退出 → session 被 daemon 清理 │ +│ │ +│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ +│ │庞统 │ │司马懿│ │姜维 │ │关羽 │ │张飞 │ │赵云 │ │ +│ │策划 │ │质量 │ │平台 │ │风控 │ │编码 │ │数据 │ │ +│ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │ +│ │ +│ 每个 Agent: SOUL.md + IDENTITY.md + Skills + Workspace │ +│ Agent 主 session 不参与任务执行(不被污染) │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 关键区别:v2.0 vs v2.6 + +| 维度 | v2.0 | v2.6 | +|------|------|------| +| 编排核心 | DAG 引擎 + 状态机 | Blackboard(Shared Workspace) | +| 决策者 | Daemon(状态机驱动) | Agent(在黑板上自主决策) | +| Daemon 角色 | 调度器(决定谁干什么) | 投递员(执行黑板上的决策) | +| Agent 通信 | Sanguo Mail(异步邮件) | 黑板 Comment 线程(共享空间) | +| 信息位置 | 分散(mail + task目录 + session) | 集中(黑板 SQLite) | +| Agent 生命周期 | 固定节点执行 | Spawn 隔离 session,用完即清 | +| 通知机制 | Mail 轮询 | Daemon tick + spawn | +| 协作模式 | 指令式(庞统分配→将军执行) | 自主式(看黑板→领活→写回) | + +--- + +## 3. Shared Workspace(黑板)设计 + +### 3.1 参考系统对比 + +| 系统 | 存储 | 原子性 | 讨论 | 状态机 | 发现 | +|------|------|--------|------|--------|------| +| Claude Code Agent Teams | JSON 文件 | 无(last-write-wins) | inbox 点对点 | pending/in_progress/completed | Agent 轮询 | +| Hermes Kanban v0.13 | SQLite | SQLite 事务 | Comment 线程 | 7 状态完整机 | Dispatcher 60s tick | +| Network-AI | Markdown 文件 | flock 三阶段提交 | signal key | 无 | Agent 主动读 | +| agent-blackboard | SQLite + Ontology | SQLite 事务 | 本体条目 | 无 | Coordinator 分发 | +| **我们的方案** | **SQLite** | **SQLite 事务** | **Comment 线程** | **简化状态机** | **Daemon tick** | + +### 3.2 SQLite Schema + +```sql +-- ===== 任务表 ===== +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, -- task-001 + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL DEFAULT 'pending', + CHECK (status IN ('pending','claimed','working','review','done','failed','blocked','cancelled')), + + -- 分配(谁领了或被指派) + assignee TEXT, -- agent id: zhangfei-dev + assigned_by TEXT, -- 谁分配的:pangtong-fujunshi / user + + -- 依赖 + depends_on TEXT, -- JSON array of task IDs + parent_task TEXT, -- 父任务(子任务分解时) + + -- 优先级和类型 + priority INTEGER NOT NULL DEFAULT 5, -- 1(最高)-10(最低) + task_type TEXT, -- coding/review/data/deploy/research/discuss + + -- 时间 + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + claimed_at TEXT, + started_at TEXT, + completed_at TEXT, + deadline TEXT, + + -- 重试 + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 2 +); + +CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); +CREATE INDEX IF NOT EXISTS idx_tasks_assignee ON tasks(assignee); +CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_task); + +-- ===== 评论线程表 ===== +-- 参考 Hermes kanban_comment:追加写入,所有参与者可见 +CREATE TABLE IF NOT EXISTS comments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + author TEXT NOT NULL, -- agent id 或 'user' + body TEXT NOT NULL, + mentions TEXT, -- JSON array: ["zhangfei-dev", "guanyu-dev"] + created_at TEXT NOT NULL DEFAULT (datetime('now')), + + FOREIGN KEY (task_id) REFERENCES tasks(id) +); + +CREATE INDEX IF NOT EXISTS idx_comments_task ON comments(task_id); +CREATE INDEX IF NOT EXISTS idx_comments_author ON comments(author); +-- 注意:mentions 是 JSON 数组,无法直接建索引。daemon tick 查询用 json_each(mentions)。 +-- 数据量小时够用,后续可拆 comment_mentions 关联表优化。 + +-- ===== 产出表 ===== +CREATE TABLE IF NOT EXISTS outputs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + agent TEXT NOT NULL, -- 谁写的 + output_type TEXT NOT NULL, -- code/document/data/config/other + title TEXT NOT NULL, + content_path TEXT, -- 文件路径(产出物在 task 目录下) + summary TEXT, -- 一句话摘要 + metadata TEXT, -- JSON: {files_changed, lines_added, ...} + created_at TEXT NOT NULL DEFAULT (datetime('now')), + + FOREIGN KEY (task_id) REFERENCES tasks(id) +); + +CREATE INDEX IF NOT EXISTS idx_outputs_task ON outputs(task_id); + +-- ===== 决策记录表 ===== +-- Agent 执行过程中的关键决策必须记录。哪怕是自己做的决策也要填一条。 +CREATE TABLE IF NOT EXISTS decisions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + decider TEXT NOT NULL, -- 谁做的决策 + decision TEXT NOT NULL, -- 决策内容:"选 A 方案" + rationale TEXT NOT NULL, -- 为什么:"B 方案内存开销更大" + alternatives TEXT, -- JSON array: 被排除的选项 + created_at TEXT NOT NULL DEFAULT (datetime('now')), + + FOREIGN KEY (task_id) REFERENCES tasks(id) +); + +CREATE INDEX IF NOT EXISTS idx_decisions_task ON decisions(task_id); + +-- ===== 观察表 ===== +-- Agent 执行过程中发现的问题、风险、建议 +CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + observer TEXT NOT NULL, -- 谁观察到的 + severity TEXT NOT NULL DEFAULT 'info', + CHECK (severity IN ('blocking','warning','info','audit')), + body TEXT NOT NULL, + resolved_by TEXT, -- 谁处理的 + resolved_at TEXT, -- 何时处理的 + created_at TEXT NOT NULL DEFAULT (datetime('now')), + + FOREIGN KEY (task_id) REFERENCES tasks(id) +); + +-- ===== 事件日志(审计追踪)===== +CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT, + agent TEXT, + event_type TEXT NOT NULL, + detail TEXT, -- JSON + created_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_events_task ON events(task_id); +CREATE INDEX IF NOT EXISTS idx_events_time ON events(created_at); + +-- 合法 event_type 清单: +-- 任务:task_created, task_claimed, task_started, task_completed, task_failed, +-- task_blocked, task_unblocked, task_reviewed, task_cancelled, task_retried +-- 协作:comment_added, output_written, observation_added, decision_recorded +-- Agent:agent_spawned, agent_completed, agent_zombie_detected +-- Session:session_spawned, session_archived, session_cleanup +-- 系统:daemon_tick, daemon_manual_tick + +-- ===== Agent 注册表 ===== +CREATE TABLE IF NOT EXISTS agents ( + agent_id TEXT PRIMARY KEY, + role TEXT, + current_status TEXT DEFAULT 'idle', -- idle/working/offline + current_task TEXT, + last_active TEXT, + capabilities TEXT -- JSON array: ["coding", "review", "deploy"] +); + +-- agents 表更新规则: +-- Agent claim 任务时:自己更新 current_status='working', current_task=task_id +-- Agent 完成退出时:daemon 更新 current_status='idle', current_task=NULL +-- Daemon tick 检测到 zombie:daemon 更新 current_status='offline' +``` + +**连接配置:** + +```python +def get_connection(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.execute("PRAGMA busy_timeout=5000") + return conn +``` + +### 3.3 简化状态机 + +``` +pending → claimed → working → review → done + ↑ │ ├→ blocked ──┘ ├→ failed + │ │ └→ failed └→ cancelled + └─────────┘ + (review→pending: 审核不通过,打回重做) + (blocked→pending: 阻塞解除) + (failed→pending: 重试) +``` + +**与 v2.0 的区别:** v2.0 有 9 个状态(spawning, ready, reporting 等),v2.6 简化为 8 个。原因是 spawn 逻辑从状态机移到了 daemon——daemon tick 发现黑板需要某人介入就 spawn,不需要 spawning/ready 这些中间状态。 + +| 状态 | 含义 | 谁触发 | +|------|------|--------| +| pending | 待领取 | 任何 Agent 或用户创建 | +| claimed | 已认领 | Agent 自己或被指派 | +| working | 执行中 | Agent | +| review | 待审核 | Agent 完成产出 | +| blocked | 需要帮助 | Agent | +| done | 完成 | **审核通过且所有问题达成一致** | +| failed | 失败 | Agent 或 daemon | +| cancelled | 取消 | 用户 | + +**完整合法流转矩阵:** + +```python +VALID_TRANSITIONS = { + "pending": {"claimed", "cancelled"}, + "claimed": {"working", "pending", "cancelled"}, # pending: 放弃认领 + "working": {"review", "blocked", "failed", "cancelled"}, + "review": {"done", "pending", "failed", "cancelled"}, # pending: 审核不通过打回 + "blocked": {"pending", "cancelled"}, # pending: 阻塞解除 + "done": set(), # 终态 + "failed": {"pending"}, # pending: 重试 + "cancelled": set(), # 终态 +} +``` + +### 3.4 原子操作 + +**任务认领(claim)** — 原子 CAS,防止两个人同时领: + +```python +def claim_task(task_id: str, agent_id: str) -> bool: + conn = get_connection() + try: + cursor = conn.execute( + "UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now') " + "WHERE id=? AND status='pending' AND (assignee IS NULL OR assignee=?)", + (agent_id, task_id, agent_id) + ) + conn.commit() + return cursor.rowcount > 0 # 0 表示被别人抢了或不是指定分配给自己的人 + finally: + conn.close() +``` + +**产出写入** — SQLite 事务保证原子: + +```python +def write_output(task_id: str, agent_id: str, output: dict): + conn = get_connection() + try: + conn.execute("BEGIN IMMEDIATE") # 立即获取写锁 + conn.execute( + "INSERT INTO outputs (task_id, agent, output_type, title, content_path, summary, metadata) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (task_id, agent_id, output['type'], output['title'], + output['path'], output['summary'], json.dumps(output.get('metadata', {}))) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'output_written', ?)", + (task_id, agent_id, json.dumps({'output_id': output['title']})) + ) + conn.commit() + finally: + conn.close() +``` + +### 3.5 评论线程(讨论机制) + +参考 Hermes 的 `kanban_comment` 模式: + +```python +def add_comment(task_id: str, author: str, body: str, mentions: list = None): + conn = get_connection() + try: + conn.execute( + "INSERT INTO comments (task_id, author, body, mentions) VALUES (?, ?, ?, ?)", + (task_id, author, body, json.dumps(mentions or [])) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'commented', ?)", + (task_id, author, json.dumps({'body_preview': body[:100], 'mentions': mentions})) + ) + conn.commit() + finally: + conn.close() +``` + +**讨论示例:** + +``` +[16:30 庞统] 张飞,你的实现方案我看了,回测数据量大时内存会爆。 + 关羽,从风控角度也看看? @关羽 @张飞 +[16:35 关羽] 同意。建议加分批加载机制,单批不超过 50 万条。 +[16:40 张飞] 收到,改成分批加载。预计 30 分钟。 +[16:55 庞统] @张飞 注意止损逻辑也需要同步改,分批后止损触发时机变了。 +[17:10 张飞] 完成。产出在 output-zhangfei-v2.md。 +``` + +**核心原则:评论都在黑板上,不在任何 agent 的 session 里。Agent 的 session 是临时的。** + +### 3.6 竞态解决 + +任务认领的竞态通过 SQLite 原子 CAS 解决(先到先得)。 + +职责冲突的解决(张飞和关羽都认为自己该做某个任务): +1. **默认:先到先得** — SQLite CAS,谁先 claim 谁做 +2. **升级:庞统仲裁** — 如果争议,评论中 @庞统 请求仲裁 +3. **最终:用户拍板** — @user 请求用户决定 + +不需要复杂的分布式共识——职责分工已经自然避免了大部分冲突。 + +--- + +## 4. Daemon(管家)设计 + +### 4.1 Daemon 的角色定位 + +> **Daemon 是投递员,不是决策者。所有决策发生在黑板上,daemon 只执行。** + +Daemon 做三件事: +1. **读黑板** — 定期 tick,检查黑板状态 +2. **Spawn Agent** — 根据黑板上的指示,spawn 对应的 agent +3. **清理 Session** — agent 执行完后,存档 jsonl + 清理 sessions.json + +Daemon **不做**: +- ❌ 不决定谁做什么(agent 自己决定或庞统在黑板上分配) +- ❌ 不维护状态机(黑板就是状态) +- ❌ 不做业务逻辑(不解析产出、不做评审) + +### 4.2 Daemon Tick 循环 + +参考 Hermes Dispatcher,但更轻量: + +**Tick 频率:60 秒(默认),可通过 CLI 手动触发立即执行。** + +```bash +# 手动触发一次 tick(用于需要立即响应的场景) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/daemon.py tick +``` + +```python +async def daemon_tick(): + """每 60 秒执行一次""" + + # 1. 健康检查 + reclaim_stale_tasks() # 超时的 working 任务回收 + detect_zombie_sessions() # 进程死了但 session 还在的 + + # 2. 读黑板 + board = read_blackboard() # SQLite 查询 + + # 3. 处理评论中的 @mention + # Agent A 在评论中 @AgentB → daemon spawn AgentB 来看评论 + for comment in board.get_unprocessed_mentions(): + target_agent = comment.mentioned_agent + if not is_agent_active(target_agent): + async_spawn_agent(target_agent, + message=f"黑板上有给你的新评论(task-{comment.task_id}),请查看。") + mark_comment_processed(comment.id) + + # 4. 处理待领取的任务(庞统在黑板上分配了但 agent 还没领) + for task in board.get_assigned_unclaimed(): + if not is_agent_active(task.assignee): + async_spawn_agent(task.assignee, + message=f"黑板上有分配给你的任务({task.title}),请查看并认领。") + + # 5. 处理 blocked 任务(agent 请求帮助) + for task in board.get_blocked_tasks(): + mentions = get_latest_comment_mentions(task.id) + if mentions: + # 评论中 @ 了某人 → spawn 那个人 + for agent_id in mentions: + if not is_agent_active(agent_id): + async_spawn_agent(agent_id, + message=f"任务 {task.id} 被 block,需要你的协助。") + else: + # 没有 @ → spawn 庞统来决定找谁帮忙 + async_spawn_agent('pangtong-fujunshi', + message=f"任务 {task.id} 被 block,没有指定协助者,请决定如何处理。") + + # 6. 清理完成的 session + for session in get_completed_sessions(): + archive_session(session) # mv jsonl → task 目录 + cleanup_sessions_json(session) # 编辑 sessions.json 删除记录 +``` + +### 4.3 Session 生命周期 + +``` +1. Daemon spawn + openclaw agent --agent zhangfei-dev --session-id \ + --message "请检查黑板 task-001..." + ↓ +2. Agent 执行 + - 读黑板(SQLite 查询) + - 做任务(编码/审核/数据分析) + - 写回黑板(产出、评论、状态更新) + ↓ +3. Agent 退出(自然结束) + ↓ +4. Daemon 清理 + - mv .jsonl → task-001/archive/ + - mv .trajectory.jsonl → task-001/archive/ + - 编辑 sessions.json 删除该 session 记录 +``` + +**技术验证结论:** +- `openclaw agent --agent --session-id ` 可创建完全隔离的 session ✅ +- 直接编辑 `sessions.json` 可安全删除 session 记录 ✅(已验证) +- Gateway WS `sessions.delete` 需要 `operator.admin` scope(token 模式不授予,不可用)❌ +- 回退方案:直接编辑 `sessions.json` 是安全可靠的 ✅ + +### 4.4 Agent Spawn 后的消息内容 + +Agent 被 spawn 时,daemon 传递的消息应包含足够的上下文让 agent 知道该做什么: + +```python +def build_spawn_message(task_id: str, trigger_reason: str, comments_since: str = None): + task = get_task(task_id) + + msg = f"黑板任务通知:\n" + msg += f"- 任务:{task.title}({task.id})\n" + msg += f"- 状态:{task.status}\n" + msg += f"- 触发原因:{trigger_reason}\n" + + if comments_since: + recent = get_comments_since(task_id, comments_since) + if recent: + msg += f"\n最近评论:\n" + for c in recent: + msg += f" [{c.created_at} {c.author}] {c.body[:200]}\n" + + msg += f"\n请读取黑板获取完整信息。" + return msg +``` + +--- + +## 5. Agent 与黑板的交互 + +### 5.1 Agent 被_spawn_后的工作流程 + +``` +Agent 被 spawn + ↓ +1. 读黑板 → 了解任务全局状态 + - 读 tasks 表:当前任务的状态、描述、依赖 + - 读 comments 表:讨论历史 + - 读 outputs 表:已有产出 + - 读 observations 表:已知风险 + ↓ +2. 想 → 根据自己的职责自主决策 + - 我是编码先锋,这个 pending 任务适合我 → claim + - 我是风控守将,这个 comment @ 我 → 回复 + - 我是副军师,这个任务需要分解 → 创建子任务 + ↓ +3. 做 → 执行任务 + - 编码、审核、数据分析等 + - 过程中发现风险 → 写 observation + - 需要其他人协助 → 写 comment @mention + ↓ +4. 写回黑板 → 产出、评论、状态更新、决策记录 + - 写 outputs 表:产出文件路径 + 摘要 + - 写 comments 表:完成说明 + - 写 decisions 表:关键决策(哪怕自己的决策也要填一条) + - 更新 tasks 表:status → done/review + ↓ +5. 退出 → daemon 自动清理 session +``` + +### 5.2 Agent 工具集 + +Agent 通过 `exec` 工具调用 CLI 命令操作黑板: + +```bash +# 读黑板(全部) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py read --task task-001 + +# 读黑板(过滤:只读和自己相关的) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py read --task task-001 --agent zhangfei-dev + +# 读黑板(过滤:只读最近 20 条) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py read --task task-001 --last 20 + +# 读黑板(过滤:只读特定类型) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py read --task task-001 --type comments + +# 认领任务 +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py claim --task task-001 --agent zhangfei-dev + +# 写产出 +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py output --task task-001 --agent zhangfei-dev \ + --type code --title "分批加载实现" --path task-001/output-zhangfei.md \ + --summary "实现分批加载,单批50万条" + +# 写评论 +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py comment --task task-001 --author zhangfei-dev \ + --body "完成分批加载实现" --mentions "[]" + +# 写观察 +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py observe --task task-001 --observer guanyu-dev \ + --severity warning --body "止损逻辑需适配分批模式" + +# 记录决策 +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py decide --task task-001 --decider zhangfei-dev \ + --decision "使用分批加载而非流式" --rationale "流式需要改底层框架,分批只需改回测模块" + +# 创建任务(任何 Agent 都可以创建) +python3 ~/.sanguo_projects/sanguo_moziplus/cli/blackboard.py create --title "分钟线数据下载" \ + --creator zhaoyun-data --task-type data +``` + +--- + +## 6. 关键场景流程 + +### 6.1 庞统规划 + Agent 领任务 + +``` +用户 → 庞统(主session):"设计一个动量因子策略" + ↓ +庞统在黑板上写: + - 创建 task-001(数据准备,pending) + - 创建 task-002(因子计算,pending,depends_on: [task-001]) + - 创建 task-003(回测验证,pending,depends_on: [task-002]) + - 评论:"建议赵云领 001,张飞领 002 和 003" + ↓ +Daemon tick 发现 task-001 pending + 庞统评论建议赵云 + ↓ +Daemon spawn 赵云 → 赵云读黑板 → claim task-001 → 执行 → 写产出 → 退出 + ↓ +Daemon tick 发现 task-001 done → task-002 depends_on 满足 + ↓ +Daemon spawn 张飞 → 张飞读黑板 → claim task-002 → 执行 → 写产出 → 退出 + ↓ +(同理 task-003) +``` + +### 6.2 Agent 间协作讨论 + +``` +张飞执行 task-002 时发现需要分钟线数据 + ↓ +张飞写评论:"@赵云 task-002 需要分钟线数据,能帮忙下载吗?" +张飞更新任务状态 → blocked + ↓ +Daemon tick 发现 task-002 blocked + 评论 @ 赵云 + ↓ +Daemon spawn 赵云 → 赵云读黑板 → 看到评论 → 下载数据 +赵云写评论:"分钟线数据已下载到 /path/to/data" + 写产出 +赵云写评论:"@张飞 数据就绪,可以继续" + ↓ +Daemon tick 发现评论 @ 张飞 + ↓ +Daemon spawn 张飞 → 张飞读黑板 → 看到数据就绪 → 继续 task-002 +``` + +### 6.3 Agent 发现风险 + +``` +张飞在 task-002 中发现止损逻辑有 bug + ↓ +张飞写 observation(severity: warning): + "止损逻辑在分批模式下可能漏触发" +张飞写评论:"@关羽 止损逻辑需要你从风控角度确认" + ↓ +Daemon tick 发现 observation + 评论 @ 关羽 + ↓ +Daemon spawn 关羽 → 关羽读黑板 → 审查 → 写评论 + observation +``` + +### 6.4 用户直接参与 + +``` +用户读黑板 → 发现 task-002 进度慢 + ↓ +用户在黑板上写评论:"task-002 优先级提高,需要今天完成" + ↓ +Daemon tick 发现用户评论 → 如果张飞未 active → spawn 张飞通知 +``` + +--- + +## 7. Session 隔离与清理 + +### 7.1 技术实现 + +```python +class SessionManager: + def async_spawn_agent(self, agent_id: str, message: str) -> str: + """异步 spawn 隔离 session,不等待完成。返回 session_id。""" + session_id = str(uuid.uuid4()) + cmd = [ + "openclaw", "agent", + "--agent", agent_id, + "--session-id", session_id, + "--message", message, + "--json" + ] + # Popen 异步启动,不阻塞 daemon tick + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + log_event(agent=agent_id, event_type='agent_spawned', detail={'session_id': session_id}) + return session_id + + def cleanup_session(self, agent_id: str, session_id: str, archive_dir: str): + """存档 jsonl + 文件锁保护下清理 sessions.json""" + sessions_dir = f"/Users/chufeng/.openclaw/agents/{agent_id}/sessions" + store_path = f"{sessions_dir}/sessions.json" + lock_path = f"{sessions_dir}/.cleanup.lock" + + # 1. 存档 jsonl 文件 + os.makedirs(archive_dir, exist_ok=True) + for ext in ['.jsonl', '.trajectory.jsonl', '.trajectory-path.json']: + src = f"{sessions_dir}/{session_id}{ext}" + if os.path.exists(src): + shutil.move(src, f"{archive_dir}/{session_id}{ext}") + + # 2. 文件锁保护下编辑 sessions.json(防止和 Gateway 并发写入冲突) + with open(lock_path, 'w') as lock_file: + fcntl.flock(lock_file, fcntl.LOCK_EX) + try: + with open(store_path) as f: + store = json.load(f) + + keys_to_remove = [k for k in store if session_id in k] + for k in keys_to_remove: + del store[k] + + with open(store_path, 'w') as f: + json.dump(store, f, indent=2) + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) + os.unlink(lock_path) +``` + +### 7.2 验证结论 + +| 验证项 | 结果 | +|--------|------| +| `openclaw agent --session-id ` 创建隔离 session | ✅ 通过 | +| 连续 spawn 多个 session 互不干扰 | ✅ 通过 | +| 并行 spawn 成功 | ✅ 通过 | +| 直接编辑 sessions.json 删除记录安全 | ✅ 通过 | +| jsonl 存档后从原目录删除 | ✅ 通过 | +| Gateway WS sessions.delete(需 admin scope) | ❌ 不可用 | +| `openclaw sessions cleanup --fix-missing --enforce` | ❌ 对 agent main session 报错 | +| Agent 主 session 对 CLI spawn 的 sub 完全无感 | ✅ 确认(设计如此)| + +--- + +## 8. Sanguo Mail:退役 + +v2.6 中 Mail 完全退役。黑板的两个操作替代了 Mail 的所有功能: + +| Mail 功能 | 黑板替代 | +|----------|--------| +| 庞统分配任务 | 庞统在黑板创建 task + 评论 @指定 agent | +| Agent 间通信 | 评论 @mention | +| 结果回传 | 产出写入 outputs 表 + 评论通知 | +| 讨论 | 评论线程 | + +黑板比 Mail 更可靠:信息集中在 SQLite(不分散在 mail 目录)、有状态追踪、评论线程保持上下文完整、SQLite 读写比 Mail poller 更可靠。 + +如果需要系统级通知(daemon 异常、Gateway 状态),在黑板上创建 `system` 类型任务处理。 + +--- + +## 9. 质量门控(任务完成标准) + +### 9.1 任务完成 = 司马懿评审通过 + 所有问题达成一致 + 修改完成 + +一个任务要标记为 `done`,必须满足: + +1. **产出已提交** — Agent 写入 outputs 表 +2. **决策已记录** — 关键决策写入 decisions 表(哪怕是自己的决策也要填一条) +3. **司马懿审核通过** — spawn 司马懿审核产出,评论中明确写“通过” +4. **所有问题达成一致** — 审核中提出的问题全部解决,讨论达成共识 +5. **修改已完成** — 审核问题对应的修改已写入产出 + +### 9.2 达成一致的原则 + +- **以用户意图为导向** — 任何人(包括司马懿、庞统)说的都不一定对,最终以用户的原始意图为准 +- **讨论达成共识** — 不同意见通过黑板评论讨论解决,不是谁职位高谁说了算 +- **用户有最终裁量权** — 讨论无法达成一致时,@user 请用户裁定 + +### 9.3 流程 + +``` +Agent 完成产出 → status: review + ↓ +Daemon tick → spawn 司马懿审核 + ↓ +司马懿读黑板(产出 + 决策 + 观察) + ↓ +司马懿写评论: + - “通过” → status: done + - “不通过,原因:XXX” → status: pending(打回重做)+ 评论说明问题 + ↓ +如果有争议: + - 评论中讨论 + - 讨论不清 → @user 请求裁定 +``` + +### 9.4 决策记录 + +Agent 执行过程中的每个关键决策都必须记录在黑板的 decisions 表中: + +| 字段 | 含义 | +|------|------| +| decider | 谁做的决策 | +| decision | 决策内容(选了什么) | +| rationale | 为什么这样选 | +| alternatives | 被排除的选项 | + +**哪怕是自己做的决策也要填一条。** 目的: +- 后续复盘时能追溯“当时为什么这样选” +- 审核时司马懿能理解决策背后的思考 +- 经验沉淀的原始素材 + +--- + +## 10. 产出物目录约定 + +``` +~/.sanguo_projects/sanguo_moziplus/artifacts/ +└── {task-id}/ + ├── outputs/ # Agent 产出物(代码、文档、数据) + ├── archive/ # session jsonl 存档 + └── data/ # 数据文件 +``` + +Agent 写产出时,`content_path` 指向此目录。Daemon 存档 session jsonl 时也写入 `archive/` 子目录。 + +--- + +## 11. 保留 v2.0 的设计 + +以下 v2.0 的设计在 v2.6 中保留: + +1. **SQLite WAL 模式** — 黑板数据库同样使用 WAL +2. **结构化产出规范** — output.md frontmatter + 结论 JSON(写在黑板 outputs 表中) +3. **观察机制** — v2.0 Report Watcher 的思路升级为 observations 表 +4. **证据原则** — 结论必须有证据(代码行号、日志、文件内容) +5. **审核流程** — 可通过黑板评论 + 状态机实现 + +--- + +## 12. Phase 规划(v2.6) + +### Phase 1: 黑板基础设施 +1. SQLite blackboard.db(5 表 + WAL) +2. blackboard.py CLI(读写操作) +3. Daemon tick 循环(读黑板 + spawn + 清理) +4. Session 管理(spawn + 存档 + 清理) + +### Phase 2: Agent 交互 +5. Agent 黑板操作 Skill +6. 评论 + @mention 通知链路 +7. 任务依赖自动推进 +8. 健康检查(stale reclaim + zombie 检测) + +### Phase 3: 智能化 +9. 庞统 AI 规划(读需求 → 创建任务 + 分配建议) +10. Agent 自主领活(读黑板 → 匹配职责 → claim) +11. 产出验证门禁 +12. 经验沉淀(observation → knowledge base) + +--- + +## 13. 技术选型 + +| 需求 | 参考系统 | 我们的方案 | 理由 | +|------|---------|-----------|------| +| 共享状态 | Hermes SQLite + Network-AI flock | SQLite WAL + 事务 CAS | 原子性 + 无外部依赖 | +| 讨论 | Hermes kanban_comment | comments 表 + @mention | 简单追加写入,所有人可见 | +| 调度 | Hermes Dispatcher 60s tick | Daemon 60s tick | 同设计,更轻量 | +| 通知 | Claude Code idle notification | Daemon spawn + message | OpenClaw 原生能力 | +| 通信 | Hermes kanban_comment + Claude Code inbox | 黑板 comments + @mention | 替代 Sanguo Mail | +| 竞态 | Network-AI propose→validate→commit | SQLite CAS(first-commit-wins) | SQLite 事务足够 | +| Session | Hermes process-per-worker | openclaw agent --session-id | OpenClaw 原生隔离 | +| 清理 | 无参考 | 编辑 sessions.json | 已验证可行 | + +--- + +## 14. 风险和缓解 + +| 风险 | 概率 | 缓解 | +|------|------|------| +| Agent 上下文不足(隔离 session 没有历史)| 中 | spawn 时传递黑板关键信息 + agent 可主动读黑板 | +| Daemon 单点故障 | 低 | PM2 自动重启 + tick 无状态 | +| SQLite 并发写入 | 中 | WAL + busy_timeout + BEGIN IMMEDIATE | +| 黑板膨胀(大量评论/产出)| 低 | 定期 archive + agent 只读最近 N 条 | +| Agent 不知道该做什么 | 中 | Skill 指导 + 庞统 plan 评论 + daemon 消息含上下文 | +| Sanguo Mail 退役后的系统通知 | 低 | 黑板 system 类型任务替代 | diff --git a/docs/design/deployment-v2.6.md b/docs/design/deployment-v2.6.md new file mode 100644 index 0000000..0f7be82 --- /dev/null +++ b/docs/design/deployment-v2.6.md @@ -0,0 +1,212 @@ +# v2.6 部署方案设计 + +**版本**: v2.6.1-deploy (含评审修正 + 独立部署) +**基于**: technical-design-v2.6.md +**作者**: 庞统(副军师) +**日期**: 2026-05-15 + +--- + +## 1. 部署环境 + +| 项目 | 当前状态 | +|------|---------| +| 主机 | 楚锋的 Mac mini (ARM64, macOS) | +| Python | 3.9.6(系统)+ 3.11(moziplus venv)| +| Node.js | v22.22.1 | +| PM2 | 6.0.14(管理 10 个进程)| +| 运行目录 | `~/.sanguo_projects/sanguo_moziplus_v2/` | +| 开发目录 | `~/.openclaw/sanguo_projects/sanguo_moziplus_v2/` | +| Git 远程 | gitee (origin) + gitea (内网 NAS) | +| 前端 | port 8083(v2 独立端口) | +| Gateway | 127.0.0.1:18789 | + +--- + +## 2. 部署架构 + +``` +PM2 (进程管理器) +├── sanguo-moziplus-v2 (port 8083) ← 🆕 v2 独立进程 +├── sanguo-moziplus (port 8082) ← v1 保留(v2 验证后下线) +├── sanguo-git-sync ← 不变 +├── sanguo-mail-* (8 个) ← 不变(v2 验证后下线) +└── sanguo-mozi ← 不变 + +新增文件: +~/.sanguo_projects/sanguo_moziplus_v2/ +├── blackboard.db ← 🆕 黑板数据库(自动创建) +├── artifacts/ ← 🆕 产出物目录(自动创建) +├── src/blackboard/ ← 🆕 黑板模块 +├── src/daemon/ ← 🆕 Daemon 模块 +├── src/api/ ← 🆕 API +├── src/cli/ ← 🆕 CLI +└── ecosystem.config.cjs ← 🆕 PM2 配置 +``` + +**关键:不新增 PM2 进程。** Daemon ticker 作为 asyncio background task 运行在 moziplus 进程内。 + +--- + +## 3. 部署流程 + +### 3.1 开发→安装同步 + +按照 AGENTS.md 的"代码变更标准流程",但简化(非代码变更,是文档确认后的首次实现): + +``` +步骤 1: 创建项目 + mkdir -p ~/.openclaw/sanguo_projects/sanguo_moziplus_v2 + cd ~/.openclaw/sanguo_projects/sanguo_moziplus_v2 + git init + git remote add origin git@gitee.com:cfdaily:sanguo_moziplus_v2.git + git remote add gitea http://192.168.2.154:3000/cfdaily/sanguo_moziplus_v2.git + +步骤 2: 开发编码 + ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/src/blackboard/ + ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/src/daemon/ + ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/src/api/ + ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/src/cli/ + +步骤 3: 单元测试通过 + cd ~/.openclaw/sanguo_projects/sanguo_moziplus_v2 + python3 -m pytest tests/unit/test_blackboard.py -v + python3 -m pytest tests/unit/test_daemon_tick.py -v + +步骤 4: 发司马懿评审(代码级) + +步骤 5: 评审通过后,同步到安装目录 + rsync -av --exclude='.venv' --exclude='node_modules' \ + ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/ ~/.sanguo_projects/sanguo_moziplus_v2/ + +步骤 6: PM2 启动 v2 + cd ~/.sanguo_projects/sanguo_moziplus_v2 + pm2 start ecosystem.config.cjs + +步骤 7: 验证 + curl http://127.0.0.1:8083/api/daemon/status + python3 ~/.sanguo_projects/sanguo_moziplus_v2/src/cli/blackboard.py create --title "test" --creator test +``` + pm2 restart sanguo-moziplus + +步骤 6: 验证 + curl http://127.0.0.1:8082/api/daemon/status + python3 ~/.sanguo_projects/sanguo_moziplus_v2/src/cli/blackboard.py create --title "test" --creator test +``` + +### 3.2 数据库初始化 + +blackboard.db 在 moziplus 启动时自动创建和初始化(`init_blackboard_db()`)。不需要手动建表。 + +首次部署时 `blackboard.db` 文件不存在,启动后自动创建。后续启动检测到已存在则跳过 schema 初始化(`CREATE TABLE IF NOT EXISTS` 天然幂等)。 + +--- + +## 4. 回滚方案 + +| 场景 | 回滚方式 | +|------|---------| +| v2 启动失败 | `pm2 stop sanguo-moziplus-v2` → v1 不受影响 | +| Daemon tick 异常 | API 调用 `POST /api/daemon/stop` 停 ticker 不停服务 | +| SQLite 损坏 | 删除 `blackboard.db` 重启自动重建 | +| v2 整体不可用 | v1 仍在 8082 运行,切换回 v1 即可 | + +**关键:v1 和 v2 完全独立,互不影响。** v2 出问题直接停掉,v1 继续服务。 + +--- + +## 5. 监控 + +### 5.1 日志 + +所有黑板操作和 daemon tick 通过 Python logging 输出到 PM2 日志: + +```bash +# 查看 daemon tick 日志 +pm2 logs sanguo-moziplus --lines 100 | grep "Daemon\|Blackboard\|Tick" + +# 查看 spawn 日志 +pm2 logs sanguo-moziplus --lines 100 | grep "Spawning\|Cleaned up" +``` + +### 5.2 健康端点 + +``` +GET /api/daemon/status +返回: +{ + "ticker_running": true, + "last_tick": "2026-05-15T01:30:00", + "tick_interval_seconds": 60, + "active_sessions": 2, + "blackboard_db_size_bytes": 45056, + "tasks_summary": { + "pending": 3, + "claimed": 1, + "working": 2, + "review": 0, + "done": 15, + "failed": 1, + "blocked": 0 + } +} +``` + +### 5.3 PM2 告警 + +复用现有 PM2 配置,sanguo-moziplus 如果异常退出会自动重启(已配置 `max_restarts: 10`)。 + +--- + +## 6. 安全考虑 + +| 项目 | 措施 | +|------|------| +| SQLite 文件权限 | `blackboard.db` 只有 owner 可读写(0600)| +| API 认证 | 复用 moziplus 现有认证(如有)| +| CLI 路径 | blackboard.py 只能本地 exec 调用,不暴露网络 | +| Session 清理锁 | fcntl 文件锁防止并发编辑 sessions.json | +| Agent 权限 | SOUL.md 约束 + blackboard.py claim 时的 assignee 检查 | + +--- + +## 7. 性能预估 + +| 指标 | 预估值 | 理由 | +|------|--------|------| +| Tick 循环耗时 | <100ms | 6 张表、几十条记录的 SQLite 查询 | +| Agent spawn 延迟 | 2-5s | openclaw agent 冷启动时间 | +| 并发 Agent 数 | ≤6 | 6 个将军,通常不会同时活跃 | +| SQLite 并发写入 | WAL 模式支持读写并发 | 已验证 | +| blackboard.db 大小 | 初期 <1MB | 每个任务几 KB,100 个任务 < 1MB | + +--- + +## 8. 交付检查清单 + +Phase 1 部署前逐项验证: + +- [ ] 项目创建、git init、远程配置完成 +- [ ] `blackboard.db` 自动创建且 schema 正确 +- [ ] CLI `blackboard.py read/claim/output/comment/decide/observe/create` 全部可用 +- [ ] Daemon tick 60s 循环正常运行 +- [ ] Agent spawn 成功创建隔离 session +- [ ] Session 完成后自动存档 + sessions.json 清理 +- [ ] API `/api/blackboard/tasks` 可创建/查询任务 +- [ ] API `/api/daemon/status` 返回正确状态 +- [ ] API `/api/daemon/tick` 可手动触发 +- [ ] PM2 启动后 daemon ticker 自动恢复 +- [ ] `pm2 logs` 中能看到 tick 和 spawn 日志 +- [ ] v1 (8082) 不受 v2 (8083) 影响 + +--- + +## 9. 后续 Phase 规划 + +| Phase | 内容 | 依赖 | +|-------|------|------| +| **Phase 1** | 黑板 + Daemon + CLI + API(本方案) | 无 | +| Phase 2 | 前端黑板视图 + Agent 黑板 Skill | Phase 1 | +| Phase 3 | 依赖自动推进 + 产出验证门禁 | Phase 2 | +| Phase 4 | 庞统 AI 规划 + Agent 自主领活 | Phase 3 | +| Phase 5 | 经验沉淀 + Mail 完全移除 | Phase 4 | diff --git a/docs/design/technical-design-v2.6.md b/docs/design/technical-design-v2.6.md new file mode 100644 index 0000000..f1d35a3 --- /dev/null +++ b/docs/design/technical-design-v2.6.md @@ -0,0 +1,714 @@ +# v2.6 技术方案设计 + +**版本**: v2.6.1-tech (含评审修正 + 独立部署) +**基于**: architecture-v2.6.md +**作者**: 庞统(副军师) +**日期**: 2026-05-15 + +--- + +## 1. 技术栈 + +| 组件 | 选型 | 版本 | 理由 | +|------|------|------|------| +| Daemon 进程管理 | PM2 | 6.0.14 | 独立新进程 sanguo-moziplus-v2 | +| Daemon 框架 | Python + asyncio | 3.9+ | 独立项目,独立目录 | +| 黑板存储 | SQLite | 3.51 | WAL 模式,单 host 足够 | +| HTTP API | FastAPI | 独立安装 | 独立 uvicorn 实例,端口 8083 | +| Agent 调度 | openclaw CLI | 2026.5.7 | 已验证 `openclaw agent --session-id` | +| Session 清理 | fcntl + JSON 编辑 | 系统原生 | 已验证可行 | +| Git | gitee + gitea | 双远程 | 开发目录 → 安装目录 | + +**不加新依赖。** 所有组件都是已有环境中的。 + +--- + +## 2. 项目结构 + +**独立项目,不嵌入 moziplus。** 完全独立开发、独立部署、独立运行。v2 上线后 v1 整体下线。 + +``` +sanguo_moziplus_v2/ # 🆕 全新项目目录 +├── src/ +│ ├── main.py # FastAPI 入口(独立) +│ ├── blackboard/ # 黑板核心模块 +│ │ ├── __init__.py +│ │ ├── db.py # SQLite 连接管理、schema 初始化 +│ │ ├── models.py # 数据模型 +│ │ ├── operations.py # 读写操作 +│ │ └── queries.py # 查询操作 +│ │ +│ ├── daemon/ # Daemon 核心模块 +│ │ ├── __init__.py +│ │ ├── ticker.py # Tick 循环主逻辑 +│ │ ├── spawner.py # Agent spawn + session 管理 +│ │ ├── health.py # 健康检查 +│ │ └── notifier.py # @mention 解析 +│ │ +│ ├── api/ # HTTP API +│ │ ├── blackboard_routes.py # 黑板 API +│ │ └── daemon_routes.py # Daemon 控制 API +│ │ +│ └── cli/ # CLI 工具 +│ ├── blackboard.py # Agent 黑板操作 CLI +│ └── daemon.py # Daemon 控制 CLI +│ +├── config/ +│ └── default.yaml +│ +├── artifacts/ # 产出物目录 +│ +├── tests/ +│ ├── unit/ +│ └── e2e/ +│ +├── pyproject.toml +└── ecosystem.config.cjs # PM2 配置 +``` + +### 开发 vs 安装目录 + +| | 路径 | 说明 | +|------|------|------| +| 开发 | `~/.openclaw/sanguo_projects/sanguo_moziplus_v2/` | Git 管理 | +| 安装 | `~/.sanguo_projects/sanguo_moziplus_v2/` | PM2 运行 | +| Git 远程 | gitee + gitea | 同现有项目 | + +--- + +## 3. 模块详细设计 + +### 3.1 blackboard/db.py — 数据库管理 + +```python +"""黑板 SQLite 数据库管理""" +import sqlite3 +import os +from pathlib import Path + +DB_PATH = Path(os.environ.get( + 'BLACKBOARD_DB', + os.path.expanduser('~/.sanguo_projects/sanguo_moziplus_v2/blackboard.db') +)) + +SCHEMA_SQL = """ +-- 完整 schema 见 architecture-v2.6.md §3.2 +-- 此处运行 CREATE TABLE IF NOT EXISTS +""" + +def get_connection() -> sqlite3.Connection: + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.execute("PRAGMA busy_timeout=5000") + return conn + +def init_db(): + """初始化数据库(启动时调用)""" + conn = get_connection() + try: + conn.executescript(SCHEMA_SQL) + conn.commit() + finally: + conn.close() +``` + +### 3.2 blackboard/operations.py — 核心操作 + +```python +"""黑板读写操作""" +import json +import sqlite3 +from datetime import datetime +from .db import get_connection + +def create_task(title: str, creator: str, description: str = None, + task_type: str = None, assignee: str = None, + depends_on: list = None, priority: int = 5, + parent_task: str = None) -> dict: + """创建任务(任何 Agent 都可以)""" + task_id = f"task-{uuid.uuid4().hex[:8]}" + conn = get_connection() + try: + conn.execute( + """INSERT INTO tasks (id, title, description, status, assigned_by, + task_type, priority, depends_on, parent_task) + VALUES (?, ?, ?, 'pending', ?, ?, ?, ?, ?)""", + (task_id, title, description, creator, task_type, priority, + json.dumps(depends_on or []), parent_task) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_created', ?)", + (task_id, creator, json.dumps({'title': title})) + ) + conn.commit() + return {'id': task_id, 'status': 'pending'} + finally: + conn.close() + +def claim_task(task_id: str, agent_id: str) -> bool: + """认领任务(原子 CAS,支持指定分配和自由认领)""" + conn = get_connection() + try: + cursor = conn.execute( + """UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now') + WHERE id=? AND status='pending' + AND (assignee IS NULL OR assignee=?)""", + (agent_id, task_id, agent_id) + ) + if cursor.rowcount > 0: + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_claimed', NULL)", + (task_id, agent_id) + ) + conn.commit() + return True + conn.rollback() + return False + finally: + conn.close() + +def update_task_status(task_id: str, new_status: str, agent_id: str, reason: str = None): + """更新任务状态(校验合法流转)""" + VALID = { + "pending": {"claimed", "cancelled"}, + "claimed": {"working", "pending", "cancelled"}, + "working": {"review", "blocked", "failed", "cancelled"}, + "review": {"done", "pending", "failed", "cancelled"}, + "blocked": {"pending", "cancelled"}, + "failed": {"pending"}, + "done": set(), + "cancelled": set(), + } + conn = get_connection() + try: + row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + if not row: + raise ValueError(f"Task {task_id} not found") + current = row['status'] + if new_status not in VALID.get(current, set()): + raise ValueError(f"Invalid transition: {current} → {new_status}") + + conn.execute( + "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", + (new_status, task_id) + ) + # 更新时间戳 + if new_status == 'working': + conn.execute("UPDATE tasks SET started_at=datetime('now') WHERE id=?", (task_id,)) + elif new_status in ('done', 'failed', 'cancelled'): + conn.execute("UPDATE tasks SET completed_at=datetime('now') WHERE id=?", (task_id,)) + + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", + (task_id, agent_id, f'task_{new_status}', json.dumps({'from': current, 'reason': reason})) + ) + conn.commit() + finally: + conn.close() + +def add_comment(task_id: str, author: str, body: str, mentions: list = None): + """添加评论""" + conn = get_connection() + try: + conn.execute( + "INSERT INTO comments (task_id, author, body, mentions) VALUES (?, ?, ?, ?)", + (task_id, author, body, json.dumps(mentions or [])) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'comment_added', ?)", + (task_id, author, json.dumps({'body_preview': body[:100], 'mentions': mentions})) + ) + conn.commit() + finally: + conn.close() + +def write_output(task_id: str, agent: str, output_type: str, title: str, + content_path: str, summary: str, metadata: dict = None): + """写入产出""" + conn = get_connection() + try: + conn.execute("BEGIN IMMEDIATE") + conn.execute( + """INSERT INTO outputs (task_id, agent, output_type, title, content_path, summary, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (task_id, agent, output_type, title, content_path, summary, + json.dumps(metadata or {})) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'output_written', ?)", + (task_id, agent, json.dumps({'title': title})) + ) + conn.commit() + finally: + conn.close() + +def record_decision(task_id: str, decider: str, decision: str, + rationale: str, alternatives: list = None): + """记录决策""" + conn = get_connection() + try: + conn.execute( + """INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) + VALUES (?, ?, ?, ?, ?)""", + (task_id, decider, decision, rationale, json.dumps(alternatives or [])) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'decision_recorded', ?)", + (task_id, decider, json.dumps({'decision': decision})) + ) + conn.commit() + finally: + conn.close() + +def add_observation(task_id: str, observer: str, severity: str, body: str): + """添加观察""" + conn = get_connection() + try: + conn.execute( + "INSERT INTO observations (task_id, observer, severity, body) VALUES (?, ?, ?, ?)", + (task_id, observer, severity, body) + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'observation_added', ?)", + (task_id, observer, json.dumps({'severity': severity})) + ) + conn.commit() + finally: + conn.close() +``` + +### 3.3 daemon/ticker.py — Tick 循环 + +```python +"""Daemon tick 循环""" +import asyncio +import logging +from datetime import datetime, timedelta + +from blackboard.db import get_connection +from blackboard.queries import get_board_snapshot +from daemon.spawner import async_spawn_agent, cleanup_session, get_active_sessions +from daemon.health import reclaim_stale, detect_zombies + +logger = logging.getLogger(__name__) + +TICK_INTERVAL = 60 # 秒 + +class DaemonTicker: + def __init__(self): + self.running = False + self.last_tick = None + self._conn = None # tick 内共享的 SQLite 连接 + + async def start(self): + """启动 tick 循环""" + self.running = True + logger.info("Daemon ticker started") + while self.running: + try: + await self.tick() + except Exception as e: + logger.error(f"Tick error: {e}", exc_info=True) + self.last_tick = datetime.now() + await asyncio.sleep(TICK_INTERVAL) + + async def stop(self): + self.running = False + + async def manual_tick(self): + """手动触发(CLI 或 API 调用)""" + await self.tick() + + async def tick(self): + """单次 tick""" + logger.debug("Tick start") + + # tick 内共享一个 SQLite 连接(避免频繁 open/close 导致 WAL checkpoint 抖动) + self._conn = get_connection() + try: + # 1. 健康检查 + reclaimed = reclaim_stale(self._conn) + zombies = detect_zombies(self._conn) + if reclaimed: + logger.info(f"Reclaimed {len(reclaimed)} stale tasks") + if zombies: + logger.warning(f"Detected {len(zombies)} zombie sessions") + + # 2. 读黑板 + board = get_board_snapshot() + + # 3. 处理 @mention + await self._process_mentions(board) + + # 4. 处理待领取任务 + await self._process_unclaimed(board) + + # 5. 处理 blocked 任务 + await self._process_blocked(board) + + # 6. 清理完成的 session + await self._cleanup_sessions() + + finally: + self._conn.close() + self._conn = None + logger.debug("Tick done") + + async def _process_mentions(self, board): + """处理评论中的 @mention""" + for comment in board.get_unprocessed_mentions(): + for agent_id in comment.mentions: + if not self._is_agent_active(agent_id): + await async_spawn_agent( + agent_id, + f"黑板上有给你的新评论({comment.task_id}),请查看。" + ) + board.mark_comment_processed(comment.id) + + async def _process_unclaimed(self, board): + """处理有 assignee 但未 claim 的任务""" + for task in board.get_assigned_unclaimed(): + if not self._is_agent_active(task.assignee): + await async_spawn_agent( + task.assignee, + f"黑板上有分配给你的任务({task.title}),请查看并认领。" + ) + + async def _process_blocked(self, board): + """处理 blocked 任务""" + for task in board.get_blocked_tasks(): + mentions = board.get_latest_mentions(task.id) + if mentions: + for agent_id in mentions: + if not self._is_agent_active(agent_id): + await async_spawn_agent( + agent_id, + f"任务 {task.id} 被 block,需要你的协助。" + ) + else: + # 没有 @ → spawn 庞统决定 + await async_spawn_agent( + 'pangtong-fujunshi', + f"任务 {task.id} 被 block,没有指定协助者,请决定如何处理。" + ) + + async def _cleanup_sessions(self): + """清理完成的 session""" + for session in get_active_sessions(): + if session.is_completed(): + archive_dir = f"artifacts/{session.task_id}/archive/" + cleanup_session(session.agent_id, session.session_id, archive_dir) + + def _is_agent_active(self, agent_id: str) -> bool: + """检查 agent 是否有正在运行的 session""" + conn = get_connection() + try: + row = conn.execute( + "SELECT current_status FROM agents WHERE agent_id=?", (agent_id,) + ).fetchone() + return row and row['current_status'] == 'working' + finally: + conn.close() +``` + +### 3.4 daemon/spawner.py — Agent 调度 + +```python +"""Agent spawn 和 session 管理""" +import json +import os +import shutil +import fcntl +import subprocess +import uuid +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + +AGENTS_DIR = os.path.expanduser("~/.openclaw/agents") + +def spawn_agent(agent_id: str, message: str) -> str: + """spawn 隔离 session,不等待完成。 + + 注意:当前同步实现(Popen + SQLite 写入)。 + 6 Agent 规模下可接受(SQLite 写入 <1ms)。 + 后续 scale up 时改 asyncio.create_subprocess_exec + async SQLite。 + """ + session_id = str(uuid.uuid4()) + cmd = [ + "openclaw", "agent", + "--agent", agent_id, + "--session-id", session_id, + "--message", message, + "--json" + ] + logger.info(f"Spawning {agent_id} session={session_id[:8]}") + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # 检查是否立即失败(100ms 内退出 = spawn 失败) + import time + time.sleep(0.1) + if proc.poll() is not None: + stdout, stderr = proc.communicate() + logger.error(f"Spawn failed immediately: {stderr.decode()[:500]}") + raise RuntimeError(f"Spawn failed: {stderr.decode()[:200]}") + + # 更新 agents 表 + _update_agent_status(agent_id, 'working') + + # 记录 session 元数据(用于后续清理) + _register_session(agent_id, session_id) + + return session_id + +def cleanup_session(agent_id: str, session_id: str, archive_dir: str): + """存档 jsonl + 文件锁保护下清理 sessions.json""" + sessions_dir = f"{AGENTS_DIR}/{agent_id}/sessions" + store_path = f"{sessions_dir}/sessions.json" + lock_path = f"{sessions_dir}/.cleanup.lock" + + # 1. 存档 jsonl + os.makedirs(archive_dir, exist_ok=True) + for ext in ['.jsonl', '.trajectory.jsonl', '.trajectory-path.json']: + src = f"{sessions_dir}/{session_id}{ext}" + if os.path.exists(src): + shutil.move(src, f"{archive_dir}/{session_id}{ext}") + + # 2. 文件锁保护下编辑 sessions.json + with open(lock_path, 'w') as lock_file: + fcntl.flock(lock_file, fcntl.LOCK_EX) + try: + with open(store_path) as f: + store = json.load(f) + keys_to_remove = [k for k in store if session_id in k] + for k in keys_to_remove: + del store[k] + with open(store_path, 'w') as f: + json.dump(store, f, indent=2) + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) + try: + os.unlink(lock_path) + except OSError: + pass + + # 3. 更新 agents 表 + _update_agent_status(agent_id, 'idle') + + logger.info(f"Cleaned up {agent_id} session={session_id[:8]}") +``` + +### 3.5 CLI 工具 + +**blackboard.py** — Agent 通过 exec 调用: + +```python +#!/usr/bin/env python3 +"""黑板操作 CLI — Agent 通过 exec 调用""" +import argparse +import sys +import json + +# 绝对路径导入 +import os +sys.path.insert(0, os.path.expanduser("~/.sanguo_projects/sanguo_moziplus_v2")) + +from src.blackboard.operations import ( + create_task, claim_task, update_task_status, + add_comment, write_output, record_decision, add_observation +) +from src.blackboard.queries import read_board + +def main(): + parser = argparse.ArgumentParser(description='Blackboard CLI') + sub = parser.add_subparsers(dest='command') + + # read + r = sub.add_parser('read') + r.add_argument('--task', required=True) + r.add_argument('--agent', default=None) + r.add_argument('--last', type=int, default=None) + r.add_argument('--type', choices=['comments', 'outputs', 'decisions', 'observations', 'all'], default='all') + + # claim + c = sub.add_parser('claim') + c.add_argument('--task', required=True) + c.add_argument('--agent', required=True) + + # output + o = sub.add_parser('output') + o.add_argument('--task', required=True) + o.add_argument('--agent', required=True) + o.add_argument('--type', required=True) + o.add_argument('--title', required=True) + o.add_argument('--path', required=True) + o.add_argument('--summary', required=True) + + # comment + cm = sub.add_parser('comment') + cm.add_argument('--task', required=True) + cm.add_argument('--author', required=True) + cm.add_argument('--body', required=True) + cm.add_argument('--mentions', default='[]') + + # decide + d = sub.add_parser('decide') + d.add_argument('--task', required=True) + d.add_argument('--decider', required=True) + d.add_argument('--decision', required=True) + d.add_argument('--rationale', required=True) + + # observe + ob = sub.add_parser('observe') + ob.add_argument('--task', required=True) + ob.add_argument('--observer', required=True) + ob.add_argument('--severity', required=True, choices=['blocking', 'warning', 'info', 'audit']) + ob.add_argument('--body', required=True) + + # create + cr = sub.add_parser('create') + cr.add_argument('--title', required=True) + cr.add_argument('--creator', required=True) + cr.add_argument('--description', default=None) + cr.add_argument('--task-type', default=None) + cr.add_argument('--assignee', default=None) + + args = parser.parse_args() + # ... 路由到对应操作函数,输出 JSON 结果 + +if __name__ == '__main__': + main() +``` + +**daemon.py** — Daemon 控制 CLI: + +```python +#!/usr/bin/env python3 +"""Daemon 控制 CLI""" +# 命令: +# daemon.py tick — 手动触发一次 tick +# daemon.py status — 查看 daemon 状态 +# daemon.py board — 查看黑板摘要 +``` + +--- + +## 4. API 路由设计 + +### 4.1 黑板 API(blackboard_routes.py) + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | /api/blackboard/tasks | 任务列表(支持 ?status=&assignee= 过滤)| +| GET | /api/blackboard/tasks/{id} | 任务详情(含评论、产出、决策、观察)| +| POST | /api/blackboard/tasks | 创建任务 | +| POST | /api/blackboard/tasks/{id}/claim | 认领任务 | +| PATCH | /api/blackboard/tasks/{id}/status | 更新状态 | +| POST | /api/blackboard/tasks/{id}/comments | 添加评论 | +| POST | /api/blackboard/tasks/{id}/outputs | 写入产出 | +| POST | /api/blackboard/tasks/{id}/decisions | 记录决策 | +| POST | /api/blackboard/tasks/{id}/observations | 添加观察 | + +### 4.2 Daemon 控制 API(daemon_routes.py) + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | /api/daemon/tick | 手动触发 tick | +| GET | /api/daemon/status | Daemon 状态(上次 tick、活跃 session 数)| +| GET | /api/daemon/sessions | 活跃 session 列表 | + +--- + +## 5. 与 v1.0 的关系 + +**完全独立,不共享代码。** + +| 维度 | v1.0 (sanguo_moziplus) | v2.0 (sanguo_moziplus_v2) | +|------|----------------------|--------------------------| +| 进程 | sanguo-moziplus (PM2) | sanguo-moziplus-v2 (PM2) | +| 端口 | 8082 | 8083 | +| 数据库 | moziplus.db | blackboard.db | +| 代码 | 独立 | 独立 | +| 编排 | DAG 状态机 | Shared Workspace | +| 通信 | Sanguo Mail | 黑板评论 | + +**v2 上线流程:** +1. v2 部署、测试、验证 +2. 确认 v2 功能覆盖 v1 +3. `pm2 stop sanguo-moziplus` — v1 下线 +4. `pm2 delete sanguo-moziplus` — 删除 v1 进程 +5. v2 接管端口 8082(可选) +6. v1 代码和数据库保留只读(历史归档) + +--- + +## 6. 测试策略 + +### 6.1 单元测试 + +| 测试文件 | 覆盖内容 | +|---------|---------| +| test_blackboard.py | 所有 operations 函数(claim CAS、状态机校验、评论、产出、决策)| +| test_daemon_tick.py | tick 循环逻辑(mention 处理、blocked 处理、cleanup)| +| test_spawner.py | spawn + cleanup(mock openclaw CLI)| +| test_health.py | stale reclaim + zombie detection | + +### 6.2 集成测试 + +- 启动 daemon → 创建任务 → spawn agent → agent 操作黑板 → 完成闭环 +- 竞态测试:两个 "agent" 同时 claim 同一任务 + +### 6.3 测试数据库 + +测试用 `:memory:` SQLite,不污染生产数据。 + +```python +# tests/conftest.py +import pytest +from src.blackboard.db import get_connection + +@pytest.fixture +def blackboard_db(): + """测试用内存数据库""" + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + conn.executescript(SCHEMA_SQL) + yield conn + conn.close() +``` + +--- + +## 7. 实现顺序(Phase 1 具体步骤) + +| 步骤 | 内容 | 依赖 | 估计时间 | +|------|------|------|---------| +| 1 | `blackboard/db.py` — schema + 连接管理 | 无 | 0.5h | +| 2 | `blackboard/operations.py` — 核心操作 | 步骤 1 | 2h | +| 3 | `blackboard/queries.py` — 查询+过滤 | 步骤 1 | 1h | +| 4 | `cli/blackboard.py` — CLI 工具 | 步骤 2,3 | 1h | +| 5 | 单元测试 test_blackboard.py | 步骤 2,3 | 1h | +| 6 | `daemon/spawner.py` — spawn+cleanup | 无 | 1h | +| 7 | `daemon/health.py` — 健康检查 | 步骤 1 | 0.5h | +| 8 | `daemon/ticker.py` — tick 循环 | 步骤 1,6,7 | 1.5h | +| 9 | `api/blackboard_routes.py` — HTTP API | 步骤 2,3 | 1h | +| 10 | `api/daemon_routes.py` — 控制API | 步骤 8 | 0.5h | +| 11 | main.py 集成 | 步骤 1-10 | 0.5h | +| 12 | E2E 测试 | 步骤 11 | 1h | +| **总计** | | | **~11h** | + +--- + +## 8. 风险 + +| 风险 | 缓解 | +|------|------| +| asyncio tick 和 FastAPI 请求并发访问 SQLite | WAL 模式 + busy_timeout + BEGIN IMMEDIATE | +| Popen spawn 的子进程无人回收 | health.py 检测 + reclaim | +| 测试需要 mock openclaw CLI | 用 subprocess mock 或 test fixture | +| 前端需要适配黑板 API | Phase 1 先只做后端 + CLI,前端 Phase 2 |