# v2.6 技术方案设计 **版本**: v2.6.1-tech (含评审修正 + 独立部署) **基于**: architecture-v2.6.md **作者**: 庞统(副军师) **日期**: 2026-05-15 --- ## 1. 技术栈 | 组件 | 选型 | 版本 | 理由 | |------|------|------|------| | Daemon 进程管理 | PM2 | 6.0.14 | 独立新进程 sanguo-moziplus-v2 | | Daemon 框架 | Python + asyncio | 3.9+ | 独立项目,独立目录 | | 黑板存储 | SQLite | 3.51 | WAL 模式,单 host 足够 | | HTTP API | FastAPI | 独立安装 | 独立 uvicorn 实例,端口 8083 | | Agent 调度 | openclaw CLI | 2026.5.7 | 已验证 `openclaw agent --session-id` | | Session 清理 | fcntl + JSON 编辑 | 系统原生 | 已验证可行 | | Git | gitee + gitea | 双远程 | 开发目录 → 安装目录 | **不加新依赖。** 所有组件都是已有环境中的。 --- ## 2. 项目结构 **独立项目,不嵌入 moziplus。** 完全独立开发、独立部署、独立运行。v2 上线后 v1 整体下线。 ``` sanguo_moziplus_v2/ # 🆕 全新项目目录 ├── src/ │ ├── main.py # FastAPI 入口(独立) │ ├── blackboard/ # 黑板核心模块 │ │ ├── __init__.py │ │ ├── db.py # SQLite 连接管理、schema 初始化 │ │ ├── models.py # 数据模型 │ │ ├── operations.py # 读写操作 │ │ └── queries.py # 查询操作 │ │ │ ├── daemon/ # Daemon 核心模块 │ │ ├── __init__.py │ │ ├── ticker.py # Tick 循环主逻辑 │ │ ├── spawner.py # Agent spawn + session 管理 │ │ ├── health.py # 健康检查 │ │ └── notifier.py # @mention 解析 │ │ │ ├── api/ # HTTP API │ │ ├── blackboard_routes.py # 黑板 API │ │ └── daemon_routes.py # Daemon 控制 API │ │ │ └── cli/ # CLI 工具 │ ├── blackboard.py # Agent 黑板操作 CLI │ └── daemon.py # Daemon 控制 CLI │ ├── config/ │ └── default.yaml │ ├── artifacts/ # 产出物目录 │ ├── tests/ │ ├── unit/ │ └── e2e/ │ ├── pyproject.toml └── ecosystem.config.cjs # PM2 配置 ``` ### 开发 vs 安装目录 | | 路径 | 说明 | |------|------|------| | 开发 | `~/.openclaw/sanguo_projects/sanguo_moziplus_v2/` | Git 管理 | | 安装 | `~/.sanguo_projects/sanguo_moziplus_v2/` | PM2 运行 | | Git 远程 | gitee + gitea | 同现有项目 | --- ## 3. 模块详细设计 ### 3.1 blackboard/db.py — 数据库管理 ```python """黑板 SQLite 数据库管理""" import sqlite3 import os from pathlib import Path DB_PATH = Path(os.environ.get( 'BLACKBOARD_DB', os.path.expanduser('~/.sanguo_projects/sanguo_moziplus_v2/blackboard.db') )) SCHEMA_SQL = """ -- 完整 schema 见 architecture-v2.6.md §3.2 -- 此处运行 CREATE TABLE IF NOT EXISTS """ def get_connection() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA busy_timeout=5000") return conn def init_db(): """初始化数据库(启动时调用)""" conn = get_connection() try: conn.executescript(SCHEMA_SQL) conn.commit() finally: conn.close() ``` ### 3.2 blackboard/operations.py — 核心操作 ```python """黑板读写操作""" import json import sqlite3 from datetime import datetime from .db import get_connection def create_task(title: str, creator: str, description: str = None, task_type: str = None, assignee: str = None, depends_on: list = None, priority: int = 5, parent_task: str = None) -> dict: """创建任务(任何 Agent 都可以)""" task_id = f"task-{uuid.uuid4().hex[:8]}" conn = get_connection() try: conn.execute( """INSERT INTO tasks (id, title, description, status, assigned_by, task_type, priority, depends_on, parent_task) VALUES (?, ?, ?, 'pending', ?, ?, ?, ?, ?)""", (task_id, title, description, creator, task_type, priority, json.dumps(depends_on or []), parent_task) ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_created', ?)", (task_id, creator, json.dumps({'title': title})) ) conn.commit() return {'id': task_id, 'status': 'pending'} finally: conn.close() def claim_task(task_id: str, agent_id: str) -> bool: """认领任务(原子 CAS,支持指定分配和自由认领)""" conn = get_connection() try: cursor = conn.execute( """UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now') WHERE id=? AND status='pending' AND (assignee IS NULL OR assignee=?)""", (agent_id, task_id, agent_id) ) if cursor.rowcount > 0: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_claimed', NULL)", (task_id, agent_id) ) conn.commit() return True conn.rollback() return False finally: conn.close() def update_task_status(task_id: str, new_status: str, agent_id: str, reason: str = None): """更新任务状态(校验合法流转)""" VALID = { "pending": {"claimed", "cancelled"}, "claimed": {"working", "pending", "cancelled"}, "working": {"review", "blocked", "failed", "cancelled"}, "review": {"done", "pending", "failed", "cancelled"}, "blocked": {"pending", "cancelled"}, "failed": {"pending"}, "done": set(), "cancelled": set(), } conn = get_connection() try: row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: raise ValueError(f"Task {task_id} not found") current = row['status'] if new_status not in VALID.get(current, set()): raise ValueError(f"Invalid transition: {current} → {new_status}") conn.execute( "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", (new_status, task_id) ) # 更新时间戳 if new_status == 'working': conn.execute("UPDATE tasks SET started_at=datetime('now') WHERE id=?", (task_id,)) elif new_status in ('done', 'failed', 'cancelled'): conn.execute("UPDATE tasks SET completed_at=datetime('now') WHERE id=?", (task_id,)) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", (task_id, agent_id, f'task_{new_status}', json.dumps({'from': current, 'reason': reason})) ) conn.commit() finally: conn.close() def add_comment(task_id: str, author: str, body: str, mentions: list = None): """添加评论""" conn = get_connection() try: conn.execute( "INSERT INTO comments (task_id, author, body, mentions) VALUES (?, ?, ?, ?)", (task_id, author, body, json.dumps(mentions or [])) ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'comment_added', ?)", (task_id, author, json.dumps({'body_preview': body[:100], 'mentions': mentions})) ) conn.commit() finally: conn.close() def write_output(task_id: str, agent: str, output_type: str, title: str, content_path: str, summary: str, metadata: dict = None): """写入产出""" conn = get_connection() try: conn.execute("BEGIN IMMEDIATE") conn.execute( """INSERT INTO outputs (task_id, agent, output_type, title, content_path, summary, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)""", (task_id, agent, output_type, title, content_path, summary, json.dumps(metadata or {})) ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'output_written', ?)", (task_id, agent, json.dumps({'title': title})) ) conn.commit() finally: conn.close() def record_decision(task_id: str, decider: str, decision: str, rationale: str, alternatives: list = None): """记录决策""" conn = get_connection() try: conn.execute( """INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) VALUES (?, ?, ?, ?, ?)""", (task_id, decider, decision, rationale, json.dumps(alternatives or [])) ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'decision_recorded', ?)", (task_id, decider, json.dumps({'decision': decision})) ) conn.commit() finally: conn.close() def add_observation(task_id: str, observer: str, severity: str, body: str): """添加观察""" conn = get_connection() try: conn.execute( "INSERT INTO observations (task_id, observer, severity, body) VALUES (?, ?, ?, ?)", (task_id, observer, severity, body) ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'observation_added', ?)", (task_id, observer, json.dumps({'severity': severity})) ) conn.commit() finally: conn.close() ``` ### 3.3 daemon/ticker.py — Tick 循环 ```python """Daemon tick 循环""" import asyncio import logging from datetime import datetime, timedelta from blackboard.db import get_connection from blackboard.queries import get_board_snapshot from daemon.spawner import async_spawn_agent, cleanup_session, get_active_sessions from daemon.health import reclaim_stale, detect_zombies logger = logging.getLogger(__name__) TICK_INTERVAL = 60 # 秒 class DaemonTicker: def __init__(self): self.running = False self.last_tick = None self._conn = None # tick 内共享的 SQLite 连接 async def start(self): """启动 tick 循环""" self.running = True logger.info("Daemon ticker started") while self.running: try: await self.tick() except Exception as e: logger.error(f"Tick error: {e}", exc_info=True) self.last_tick = datetime.now() await asyncio.sleep(TICK_INTERVAL) async def stop(self): self.running = False async def manual_tick(self): """手动触发(CLI 或 API 调用)""" await self.tick() async def tick(self): """单次 tick""" logger.debug("Tick start") # tick 内共享一个 SQLite 连接(避免频繁 open/close 导致 WAL checkpoint 抖动) self._conn = get_connection() try: # 1. 健康检查 reclaimed = reclaim_stale(self._conn) zombies = detect_zombies(self._conn) if reclaimed: logger.info(f"Reclaimed {len(reclaimed)} stale tasks") if zombies: logger.warning(f"Detected {len(zombies)} zombie sessions") # 2. 读黑板 board = get_board_snapshot() # 3. 处理 @mention await self._process_mentions(board) # 4. 处理待领取任务 await self._process_unclaimed(board) # 5. 处理 blocked 任务 await self._process_blocked(board) # 6. 清理完成的 session await self._cleanup_sessions() finally: self._conn.close() self._conn = None logger.debug("Tick done") async def _process_mentions(self, board): """处理评论中的 @mention""" for comment in board.get_unprocessed_mentions(): for agent_id in comment.mentions: if not self._is_agent_active(agent_id): await async_spawn_agent( agent_id, f"黑板上有给你的新评论({comment.task_id}),请查看。" ) board.mark_comment_processed(comment.id) async def _process_unclaimed(self, board): """处理有 assignee 但未 claim 的任务""" for task in board.get_assigned_unclaimed(): if not self._is_agent_active(task.assignee): await async_spawn_agent( task.assignee, f"黑板上有分配给你的任务({task.title}),请查看并认领。" ) async def _process_blocked(self, board): """处理 blocked 任务""" for task in board.get_blocked_tasks(): mentions = board.get_latest_mentions(task.id) if mentions: for agent_id in mentions: if not self._is_agent_active(agent_id): await async_spawn_agent( agent_id, f"任务 {task.id} 被 block,需要你的协助。" ) else: # 没有 @ → spawn 庞统决定 await async_spawn_agent( 'pangtong-fujunshi', f"任务 {task.id} 被 block,没有指定协助者,请决定如何处理。" ) async def _cleanup_sessions(self): """清理完成的 session""" for session in get_active_sessions(): if session.is_completed(): archive_dir = f"artifacts/{session.task_id}/archive/" cleanup_session(session.agent_id, session.session_id, archive_dir) def _is_agent_active(self, agent_id: str) -> bool: """检查 agent 是否有正在运行的 session""" conn = get_connection() try: row = conn.execute( "SELECT current_status FROM agents WHERE agent_id=?", (agent_id,) ).fetchone() return row and row['current_status'] == 'working' finally: conn.close() ``` ### 3.4 daemon/spawner.py — Agent 调度 ```python """Agent spawn 和 session 管理""" import json import os import shutil import fcntl import subprocess import uuid import logging from pathlib import Path logger = logging.getLogger(__name__) AGENTS_DIR = os.path.expanduser("~/.openclaw/agents") def spawn_agent(agent_id: str, message: str) -> str: """spawn 隔离 session,不等待完成。 注意:当前同步实现(Popen + SQLite 写入)。 6 Agent 规模下可接受(SQLite 写入 <1ms)。 后续 scale up 时改 asyncio.create_subprocess_exec + async SQLite。 """ session_id = str(uuid.uuid4()) cmd = [ "openclaw", "agent", "--agent", agent_id, "--session-id", session_id, "--message", message, "--json" ] logger.info(f"Spawning {agent_id} session={session_id[:8]}") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 检查是否立即失败(100ms 内退出 = spawn 失败) import time time.sleep(0.1) if proc.poll() is not None: stdout, stderr = proc.communicate() logger.error(f"Spawn failed immediately: {stderr.decode()[:500]}") raise RuntimeError(f"Spawn failed: {stderr.decode()[:200]}") # 更新 agents 表 _update_agent_status(agent_id, 'working') # 记录 session 元数据(用于后续清理) _register_session(agent_id, session_id) return session_id def cleanup_session(agent_id: str, session_id: str, archive_dir: str): """存档 jsonl + 文件锁保护下清理 sessions.json""" sessions_dir = f"{AGENTS_DIR}/{agent_id}/sessions" store_path = f"{sessions_dir}/sessions.json" lock_path = f"{sessions_dir}/.cleanup.lock" # 1. 存档 jsonl os.makedirs(archive_dir, exist_ok=True) for ext in ['.jsonl', '.trajectory.jsonl', '.trajectory-path.json']: src = f"{sessions_dir}/{session_id}{ext}" if os.path.exists(src): shutil.move(src, f"{archive_dir}/{session_id}{ext}") # 2. 文件锁保护下编辑 sessions.json with open(lock_path, 'w') as lock_file: fcntl.flock(lock_file, fcntl.LOCK_EX) try: with open(store_path) as f: store = json.load(f) keys_to_remove = [k for k in store if session_id in k] for k in keys_to_remove: del store[k] with open(store_path, 'w') as f: json.dump(store, f, indent=2) finally: fcntl.flock(lock_file, fcntl.LOCK_UN) try: os.unlink(lock_path) except OSError: pass # 3. 更新 agents 表 _update_agent_status(agent_id, 'idle') logger.info(f"Cleaned up {agent_id} session={session_id[:8]}") ``` ### 3.5 CLI 工具 **blackboard.py** — Agent 通过 exec 调用: ```python #!/usr/bin/env python3 """黑板操作 CLI — Agent 通过 exec 调用""" import argparse import sys import json # 绝对路径导入 import os sys.path.insert(0, os.path.expanduser("~/.sanguo_projects/sanguo_moziplus_v2")) from src.blackboard.operations import ( create_task, claim_task, update_task_status, add_comment, write_output, record_decision, add_observation ) from src.blackboard.queries import read_board def main(): parser = argparse.ArgumentParser(description='Blackboard CLI') sub = parser.add_subparsers(dest='command') # read r = sub.add_parser('read') r.add_argument('--task', required=True) r.add_argument('--agent', default=None) r.add_argument('--last', type=int, default=None) r.add_argument('--type', choices=['comments', 'outputs', 'decisions', 'observations', 'all'], default='all') # claim c = sub.add_parser('claim') c.add_argument('--task', required=True) c.add_argument('--agent', required=True) # output o = sub.add_parser('output') o.add_argument('--task', required=True) o.add_argument('--agent', required=True) o.add_argument('--type', required=True) o.add_argument('--title', required=True) o.add_argument('--path', required=True) o.add_argument('--summary', required=True) # comment cm = sub.add_parser('comment') cm.add_argument('--task', required=True) cm.add_argument('--author', required=True) cm.add_argument('--body', required=True) cm.add_argument('--mentions', default='[]') # decide d = sub.add_parser('decide') d.add_argument('--task', required=True) d.add_argument('--decider', required=True) d.add_argument('--decision', required=True) d.add_argument('--rationale', required=True) # observe ob = sub.add_parser('observe') ob.add_argument('--task', required=True) ob.add_argument('--observer', required=True) ob.add_argument('--severity', required=True, choices=['blocking', 'warning', 'info', 'audit']) ob.add_argument('--body', required=True) # create cr = sub.add_parser('create') cr.add_argument('--title', required=True) cr.add_argument('--creator', required=True) cr.add_argument('--description', default=None) cr.add_argument('--task-type', default=None) cr.add_argument('--assignee', default=None) args = parser.parse_args() # ... 路由到对应操作函数,输出 JSON 结果 if __name__ == '__main__': main() ``` **daemon.py** — Daemon 控制 CLI: ```python #!/usr/bin/env python3 """Daemon 控制 CLI""" # 命令: # daemon.py tick — 手动触发一次 tick # daemon.py status — 查看 daemon 状态 # daemon.py board — 查看黑板摘要 ``` --- ## 4. API 路由设计 ### 4.1 黑板 API(blackboard_routes.py) | 方法 | 路径 | 说明 | |------|------|------| | GET | /api/blackboard/tasks | 任务列表(支持 ?status=&assignee= 过滤)| | GET | /api/blackboard/tasks/{id} | 任务详情(含评论、产出、决策、观察)| | POST | /api/blackboard/tasks | 创建任务 | | POST | /api/blackboard/tasks/{id}/claim | 认领任务 | | PATCH | /api/blackboard/tasks/{id}/status | 更新状态 | | POST | /api/blackboard/tasks/{id}/comments | 添加评论 | | POST | /api/blackboard/tasks/{id}/outputs | 写入产出 | | POST | /api/blackboard/tasks/{id}/decisions | 记录决策 | | POST | /api/blackboard/tasks/{id}/observations | 添加观察 | ### 4.2 Daemon 控制 API(daemon_routes.py) | 方法 | 路径 | 说明 | |------|------|------| | POST | /api/daemon/tick | 手动触发 tick | | GET | /api/daemon/status | Daemon 状态(上次 tick、活跃 session 数)| | GET | /api/daemon/sessions | 活跃 session 列表 | --- ## 5. 与 v1.0 的关系 **完全独立,不共享代码。** | 维度 | v1.0 (sanguo_moziplus) | v2.0 (sanguo_moziplus_v2) | |------|----------------------|--------------------------| | 进程 | sanguo-moziplus (PM2) | sanguo-moziplus-v2 (PM2) | | 端口 | 8082 | 8083 | | 数据库 | moziplus.db | blackboard.db | | 代码 | 独立 | 独立 | | 编排 | DAG 状态机 | Shared Workspace | | 通信 | Sanguo Mail | 黑板评论 | **v2 上线流程:** 1. v2 部署、测试、验证 2. 确认 v2 功能覆盖 v1 3. `pm2 stop sanguo-moziplus` — v1 下线 4. `pm2 delete sanguo-moziplus` — 删除 v1 进程 5. v2 接管端口 8082(可选) 6. v1 代码和数据库保留只读(历史归档) --- ## 6. 测试策略 ### 6.1 单元测试 | 测试文件 | 覆盖内容 | |---------|---------| | test_blackboard.py | 所有 operations 函数(claim CAS、状态机校验、评论、产出、决策)| | test_daemon_tick.py | tick 循环逻辑(mention 处理、blocked 处理、cleanup)| | test_spawner.py | spawn + cleanup(mock openclaw CLI)| | test_health.py | stale reclaim + zombie detection | ### 6.2 集成测试 - 启动 daemon → 创建任务 → spawn agent → agent 操作黑板 → 完成闭环 - 竞态测试:两个 "agent" 同时 claim 同一任务 ### 6.3 测试数据库 测试用 `:memory:` SQLite,不污染生产数据。 ```python # tests/conftest.py import pytest from src.blackboard.db import get_connection @pytest.fixture def blackboard_db(): """测试用内存数据库""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.executescript(SCHEMA_SQL) yield conn conn.close() ``` --- ## 7. 实现顺序(Phase 1 具体步骤) | 步骤 | 内容 | 依赖 | 估计时间 | |------|------|------|---------| | 1 | `blackboard/db.py` — schema + 连接管理 | 无 | 0.5h | | 2 | `blackboard/operations.py` — 核心操作 | 步骤 1 | 2h | | 3 | `blackboard/queries.py` — 查询+过滤 | 步骤 1 | 1h | | 4 | `cli/blackboard.py` — CLI 工具 | 步骤 2,3 | 1h | | 5 | 单元测试 test_blackboard.py | 步骤 2,3 | 1h | | 6 | `daemon/spawner.py` — spawn+cleanup | 无 | 1h | | 7 | `daemon/health.py` — 健康检查 | 步骤 1 | 0.5h | | 8 | `daemon/ticker.py` — tick 循环 | 步骤 1,6,7 | 1.5h | | 9 | `api/blackboard_routes.py` — HTTP API | 步骤 2,3 | 1h | | 10 | `api/daemon_routes.py` — 控制API | 步骤 8 | 0.5h | | 11 | main.py 集成 | 步骤 1-10 | 0.5h | | 12 | E2E 测试 | 步骤 11 | 1h | | **总计** | | | **~11h** | --- ## 8. 风险 | 风险 | 缓解 | |------|------| | asyncio tick 和 FastAPI 请求并发访问 SQLite | WAL 模式 + busy_timeout + BEGIN IMMEDIATE | | Popen spawn 的子进程无人回收 | health.py 检测 + reclaim | | 测试需要 mock openclaw CLI | 用 subprocess mock 或 test fixture | | 前端需要适配黑板 API | Phase 1 先只做后端 + CLI,前端 Phase 2 |