diff --git a/docs/design/technical-design-v2.6.md b/docs/design/technical-design-v2.6.md index f1d35a3..51ebaad 100644 --- a/docs/design/technical-design-v2.6.md +++ b/docs/design/technical-design-v2.6.md @@ -1,9 +1,9 @@ # v2.6 技术方案设计 -**版本**: v2.6.1-tech (含评审修正 + 独立部署) -**基于**: architecture-v2.6.md -**作者**: 庞统(副军师) -**日期**: 2026-05-15 +**版本**: v2.6.2-tech +**基于**: architecture-v2.6.md (v2.6.11) +**作者**: 庞统(副军师)🐦 +**日期**: 2026-05-16 --- @@ -11,704 +11,481 @@ | 组件 | 选型 | 版本 | 理由 | |------|------|------|------| -| Daemon 进程管理 | PM2 | 6.0.14 | 独立新进程 sanguo-moziplus-v2 | -| Daemon 框架 | Python + asyncio | 3.9+ | 独立项目,独立目录 | -| 黑板存储 | SQLite | 3.51 | WAL 模式,单 host 足够 | -| HTTP API | FastAPI | 独立安装 | 独立 uvicorn 实例,端口 8083 | -| Agent 调度 | openclaw CLI | 2026.5.7 | 已验证 `openclaw agent --session-id` | -| Session 清理 | fcntl + JSON 编辑 | 系统原生 | 已验证可行 | -| Git | gitee + gitea | 双远程 | 开发目录 → 安装目录 | +| Daemon 进程管理 | PM2 | 6.0.14 | 独立进程 sanguo-moziplus-v2 | +| Daemon 框架 | Python + asyncio | 3.9+ | tick 循环 + FastAPI 共存 | +| 黑板存储 | SQLite(per-project) | 3.51+ | WAL 模式,物理隔离 | +| HTTP API | FastAPI + uvicorn | 独立安装 | 端口 8083 | +| Agent 调度 | openclaw CLI / sessions_spawn | 2026.5.7 | Full Agent / Subagent 双路径 | +| 前端 | React + Vite + TypeScript | 复用 v1.0 | 5页面 Dashboard | +| 实时推送 | SSE (Server-Sent Events) | FastAPI 内置 | 降级轮询兜底 | +| Git | gitee + gitea | 双远程 | 开发→安装同步 | +| 配置 | YAML (PyYAML) | — | guardrails / review_protocols / template_components | -**不加新依赖。** 所有组件都是已有环境中的。 +**不加重型新依赖。** 所有组件基于已有环境。 --- ## 2. 项目结构 -**独立项目,不嵌入 moziplus。** 完全独立开发、独立部署、独立运行。v2 上线后 v1 整体下线。 - ``` -sanguo_moziplus_v2/ # 🆕 全新项目目录 +sanguo_moziplus_v2/ # 项目根目录 ├── src/ -│ ├── main.py # FastAPI 入口(独立) +│ ├── main.py # FastAPI 入口 + Daemon ticker 启动 +│ │ │ ├── blackboard/ # 黑板核心模块 │ │ ├── __init__.py -│ │ ├── db.py # SQLite 连接管理、schema 初始化 +│ │ ├── db.py # SQLite 连接管理(per-project) │ │ ├── models.py # 数据模型 -│ │ ├── operations.py # 读写操作 -│ │ └── queries.py # 查询操作 +│ │ ├── operations.py # 写操作(task/comment/output/decision/observation/review) +│ │ └── queries.py # 读操作(L1/L2/L3 分层) │ │ │ ├── daemon/ # Daemon 核心模块 │ │ ├── __init__.py -│ │ ├── ticker.py # Tick 循环主逻辑 +│ │ ├── ticker.py # Tick 循环主逻辑(30s) +│ │ ├── dispatcher.py # Agent 调度判据(Full/Subagent/Daemon 三级) │ │ ├── spawner.py # Agent spawn + session 管理 -│ │ ├── health.py # 健康检查 -│ │ └── notifier.py # @mention 解析 +│ │ ├── inbox.py # Inbox JSONL watcher + truncate +│ │ ├── guardrail.py # L1 assert 执行 + L2 subagent 触发 +│ │ ├── review_flow.py # 审查流水线(单审/反驳权/辩论) +│ │ ├── experience.py # 一级蒸馏触发 + 经验注入 +│ │ ├── bootstrap.py # build_bootstrap() L1/L2/L3 消息拼装 +│ │ ├── health.py # 健康检查 + 逻辑死循环检测 +│ │ ├── counter.py # ActiveAgentCounter 异步计数器 +│ │ └── notifier.py # @mention 解析 + Inbox 事件注入 │ │ │ ├── api/ # HTTP API -│ │ ├── blackboard_routes.py # 黑板 API -│ │ └── daemon_routes.py # Daemon 控制 API +│ │ ├── blackboard_routes.py # 黑板 CRUD API +│ │ ├── daemon_routes.py # Daemon 控制 API +│ │ ├── sse_routes.py # SSE 推送端点 +│ │ └── project_routes.py # 多项目管理 API │ │ -│ └── cli/ # CLI 工具 -│ ├── blackboard.py # Agent 黑板操作 CLI -│ └── daemon.py # Daemon 控制 CLI +│ ├── cli/ # CLI 工具 +│ │ ├── blackboard.py # Agent 黑板操作 CLI +│ │ └── admin.py # 管理员控制 CLI +│ │ +│ └── frontend/ # 前端(React + Vite) +│ ├── src/ +│ │ ├── pages/ # 5 页面 +│ │ │ ├── TaskBoard.tsx # 任务看板(核心) +│ │ │ ├── GlobalMonitor.tsx # 全局监控 +│ │ │ ├── ArtifactVault.tsx # 产出档案 +│ │ │ ├── SystemConfig.tsx # 系统配置 +│ │ │ └── AIBriefing.tsx # AI Briefing +│ │ ├── components/ +│ │ │ ├── CheckpointPanel.tsx # Checkpoint 交互 +│ │ │ ├── NotificationCenter.tsx # 推送通知中心 +│ │ │ └── ProjectSwitcher.tsx # 项目切换器 +│ │ ├── store.ts # Zustand 状态管理 +│ │ └── api.ts # API 层 +│ └── dist/ # 构建产物(8083 直接服务) │ ├── config/ -│ └── default.yaml +│ ├── default.yaml # 全局默认配置 +│ ├── guardrails.yaml # Guardrail 规则(per task_type) +│ ├── template_components.yaml # 模板组件库(7组件 + custom) +│ └── review_protocols/ # 审查协议 +│ ├── plan_review.yaml +│ ├── output_review.yaml +│ ├── analysis_review.yaml +│ └── final_review.yaml │ -├── artifacts/ # 产出物目录 +├── prompt_templates/ # L2 引擎注入模板 +│ ├── executor.md # 执行者操作规范 +│ ├── reviewer.md # 审查者操作规范 +│ ├── planner.md # 庞统规划操作规范 +│ ├── adjudicator.md # 庞统裁决操作规范 +│ ├── rebuttal.md # 反驳权操作规范 +│ └── plan_checker.md # Plan Checker 验证规范 +│ +├── schemas/ # L1 Schema 校验 +│ ├── handoff.schema.json +│ ├── output.schema.json +│ ├── decide.schema.json +│ └── observe.schema.json +│ +├── projects/ # 多项目目录(课题11) +│ └── {project_id}/ +│ ├── blackboard.db # per-project SQLite +│ ├── config/ # per-project 配置覆盖 +│ │ └── project.yaml +│ ├── artifacts/ # per-project 产出物 +│ └── experiences/ # per-project 经验库 +│ +├── inbox/ # Inbox JSONL +│ └── daemon.jsonl # 跨进程事件推送 │ ├── tests/ │ ├── unit/ │ └── e2e/ │ ├── pyproject.toml -└── ecosystem.config.cjs # PM2 配置 -``` - -### 开发 vs 安装目录 - -| | 路径 | 说明 | -|------|------|------| -| 开发 | `~/.openclaw/sanguo_projects/sanguo_moziplus_v2/` | Git 管理 | -| 安装 | `~/.sanguo_projects/sanguo_moziplus_v2/` | PM2 运行 | -| Git 远程 | gitee + gitea | 同现有项目 | - ---- - -## 3. 模块详细设计 - -### 3.1 blackboard/db.py — 数据库管理 - -```python -"""黑板 SQLite 数据库管理""" -import sqlite3 -import os -from pathlib import Path - -DB_PATH = Path(os.environ.get( - 'BLACKBOARD_DB', - os.path.expanduser('~/.sanguo_projects/sanguo_moziplus_v2/blackboard.db') -)) - -SCHEMA_SQL = """ --- 完整 schema 见 architecture-v2.6.md §3.2 --- 此处运行 CREATE TABLE IF NOT EXISTS -""" - -def get_connection() -> sqlite3.Connection: - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA foreign_keys=ON") - conn.execute("PRAGMA busy_timeout=5000") - return conn - -def init_db(): - """初始化数据库(启动时调用)""" - conn = get_connection() - try: - conn.executescript(SCHEMA_SQL) - conn.commit() - finally: - conn.close() -``` - -### 3.2 blackboard/operations.py — 核心操作 - -```python -"""黑板读写操作""" -import json -import sqlite3 -from datetime import datetime -from .db import get_connection - -def create_task(title: str, creator: str, description: str = None, - task_type: str = None, assignee: str = None, - depends_on: list = None, priority: int = 5, - parent_task: str = None) -> dict: - """创建任务(任何 Agent 都可以)""" - task_id = f"task-{uuid.uuid4().hex[:8]}" - conn = get_connection() - try: - conn.execute( - """INSERT INTO tasks (id, title, description, status, assigned_by, - task_type, priority, depends_on, parent_task) - VALUES (?, ?, ?, 'pending', ?, ?, ?, ?, ?)""", - (task_id, title, description, creator, task_type, priority, - json.dumps(depends_on or []), parent_task) - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_created', ?)", - (task_id, creator, json.dumps({'title': title})) - ) - conn.commit() - return {'id': task_id, 'status': 'pending'} - finally: - conn.close() - -def claim_task(task_id: str, agent_id: str) -> bool: - """认领任务(原子 CAS,支持指定分配和自由认领)""" - conn = get_connection() - try: - cursor = conn.execute( - """UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now') - WHERE id=? AND status='pending' - AND (assignee IS NULL OR assignee=?)""", - (agent_id, task_id, agent_id) - ) - if cursor.rowcount > 0: - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_claimed', NULL)", - (task_id, agent_id) - ) - conn.commit() - return True - conn.rollback() - return False - finally: - conn.close() - -def update_task_status(task_id: str, new_status: str, agent_id: str, reason: str = None): - """更新任务状态(校验合法流转)""" - VALID = { - "pending": {"claimed", "cancelled"}, - "claimed": {"working", "pending", "cancelled"}, - "working": {"review", "blocked", "failed", "cancelled"}, - "review": {"done", "pending", "failed", "cancelled"}, - "blocked": {"pending", "cancelled"}, - "failed": {"pending"}, - "done": set(), - "cancelled": set(), - } - conn = get_connection() - try: - row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() - if not row: - raise ValueError(f"Task {task_id} not found") - current = row['status'] - if new_status not in VALID.get(current, set()): - raise ValueError(f"Invalid transition: {current} → {new_status}") - - conn.execute( - "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", - (new_status, task_id) - ) - # 更新时间戳 - if new_status == 'working': - conn.execute("UPDATE tasks SET started_at=datetime('now') WHERE id=?", (task_id,)) - elif new_status in ('done', 'failed', 'cancelled'): - conn.execute("UPDATE tasks SET completed_at=datetime('now') WHERE id=?", (task_id,)) - - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", - (task_id, agent_id, f'task_{new_status}', json.dumps({'from': current, 'reason': reason})) - ) - conn.commit() - finally: - conn.close() - -def add_comment(task_id: str, author: str, body: str, mentions: list = None): - """添加评论""" - conn = get_connection() - try: - conn.execute( - "INSERT INTO comments (task_id, author, body, mentions) VALUES (?, ?, ?, ?)", - (task_id, author, body, json.dumps(mentions or [])) - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'comment_added', ?)", - (task_id, author, json.dumps({'body_preview': body[:100], 'mentions': mentions})) - ) - conn.commit() - finally: - conn.close() - -def write_output(task_id: str, agent: str, output_type: str, title: str, - content_path: str, summary: str, metadata: dict = None): - """写入产出""" - conn = get_connection() - try: - conn.execute("BEGIN IMMEDIATE") - conn.execute( - """INSERT INTO outputs (task_id, agent, output_type, title, content_path, summary, metadata) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - (task_id, agent, output_type, title, content_path, summary, - json.dumps(metadata or {})) - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'output_written', ?)", - (task_id, agent, json.dumps({'title': title})) - ) - conn.commit() - finally: - conn.close() - -def record_decision(task_id: str, decider: str, decision: str, - rationale: str, alternatives: list = None): - """记录决策""" - conn = get_connection() - try: - conn.execute( - """INSERT INTO decisions (task_id, decider, decision, rationale, alternatives) - VALUES (?, ?, ?, ?, ?)""", - (task_id, decider, decision, rationale, json.dumps(alternatives or [])) - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'decision_recorded', ?)", - (task_id, decider, json.dumps({'decision': decision})) - ) - conn.commit() - finally: - conn.close() - -def add_observation(task_id: str, observer: str, severity: str, body: str): - """添加观察""" - conn = get_connection() - try: - conn.execute( - "INSERT INTO observations (task_id, observer, severity, body) VALUES (?, ?, ?, ?)", - (task_id, observer, severity, body) - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'observation_added', ?)", - (task_id, observer, json.dumps({'severity': severity})) - ) - conn.commit() - finally: - conn.close() -``` - -### 3.3 daemon/ticker.py — Tick 循环 - -```python -"""Daemon tick 循环""" -import asyncio -import logging -from datetime import datetime, timedelta - -from blackboard.db import get_connection -from blackboard.queries import get_board_snapshot -from daemon.spawner import async_spawn_agent, cleanup_session, get_active_sessions -from daemon.health import reclaim_stale, detect_zombies - -logger = logging.getLogger(__name__) - -TICK_INTERVAL = 60 # 秒 - -class DaemonTicker: - def __init__(self): - self.running = False - self.last_tick = None - self._conn = None # tick 内共享的 SQLite 连接 - - async def start(self): - """启动 tick 循环""" - self.running = True - logger.info("Daemon ticker started") - while self.running: - try: - await self.tick() - except Exception as e: - logger.error(f"Tick error: {e}", exc_info=True) - self.last_tick = datetime.now() - await asyncio.sleep(TICK_INTERVAL) - - async def stop(self): - self.running = False - - async def manual_tick(self): - """手动触发(CLI 或 API 调用)""" - await self.tick() - - async def tick(self): - """单次 tick""" - logger.debug("Tick start") - - # tick 内共享一个 SQLite 连接(避免频繁 open/close 导致 WAL checkpoint 抖动) - self._conn = get_connection() - try: - # 1. 健康检查 - reclaimed = reclaim_stale(self._conn) - zombies = detect_zombies(self._conn) - if reclaimed: - logger.info(f"Reclaimed {len(reclaimed)} stale tasks") - if zombies: - logger.warning(f"Detected {len(zombies)} zombie sessions") - - # 2. 读黑板 - board = get_board_snapshot() - - # 3. 处理 @mention - await self._process_mentions(board) - - # 4. 处理待领取任务 - await self._process_unclaimed(board) - - # 5. 处理 blocked 任务 - await self._process_blocked(board) - - # 6. 清理完成的 session - await self._cleanup_sessions() - - finally: - self._conn.close() - self._conn = None - logger.debug("Tick done") - - async def _process_mentions(self, board): - """处理评论中的 @mention""" - for comment in board.get_unprocessed_mentions(): - for agent_id in comment.mentions: - if not self._is_agent_active(agent_id): - await async_spawn_agent( - agent_id, - f"黑板上有给你的新评论({comment.task_id}),请查看。" - ) - board.mark_comment_processed(comment.id) - - async def _process_unclaimed(self, board): - """处理有 assignee 但未 claim 的任务""" - for task in board.get_assigned_unclaimed(): - if not self._is_agent_active(task.assignee): - await async_spawn_agent( - task.assignee, - f"黑板上有分配给你的任务({task.title}),请查看并认领。" - ) - - async def _process_blocked(self, board): - """处理 blocked 任务""" - for task in board.get_blocked_tasks(): - mentions = board.get_latest_mentions(task.id) - if mentions: - for agent_id in mentions: - if not self._is_agent_active(agent_id): - await async_spawn_agent( - agent_id, - f"任务 {task.id} 被 block,需要你的协助。" - ) - else: - # 没有 @ → spawn 庞统决定 - await async_spawn_agent( - 'pangtong-fujunshi', - f"任务 {task.id} 被 block,没有指定协助者,请决定如何处理。" - ) - - async def _cleanup_sessions(self): - """清理完成的 session""" - for session in get_active_sessions(): - if session.is_completed(): - archive_dir = f"artifacts/{session.task_id}/archive/" - cleanup_session(session.agent_id, session.session_id, archive_dir) - - def _is_agent_active(self, agent_id: str) -> bool: - """检查 agent 是否有正在运行的 session""" - conn = get_connection() - try: - row = conn.execute( - "SELECT current_status FROM agents WHERE agent_id=?", (agent_id,) - ).fetchone() - return row and row['current_status'] == 'working' - finally: - conn.close() -``` - -### 3.4 daemon/spawner.py — Agent 调度 - -```python -"""Agent spawn 和 session 管理""" -import json -import os -import shutil -import fcntl -import subprocess -import uuid -import logging -from pathlib import Path - -logger = logging.getLogger(__name__) - -AGENTS_DIR = os.path.expanduser("~/.openclaw/agents") - -def spawn_agent(agent_id: str, message: str) -> str: - """spawn 隔离 session,不等待完成。 - - 注意:当前同步实现(Popen + SQLite 写入)。 - 6 Agent 规模下可接受(SQLite 写入 <1ms)。 - 后续 scale up 时改 asyncio.create_subprocess_exec + async SQLite。 - """ - session_id = str(uuid.uuid4()) - cmd = [ - "openclaw", "agent", - "--agent", agent_id, - "--session-id", session_id, - "--message", message, - "--json" - ] - logger.info(f"Spawning {agent_id} session={session_id[:8]}") - proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - - # 检查是否立即失败(100ms 内退出 = spawn 失败) - import time - time.sleep(0.1) - if proc.poll() is not None: - stdout, stderr = proc.communicate() - logger.error(f"Spawn failed immediately: {stderr.decode()[:500]}") - raise RuntimeError(f"Spawn failed: {stderr.decode()[:200]}") - - # 更新 agents 表 - _update_agent_status(agent_id, 'working') - - # 记录 session 元数据(用于后续清理) - _register_session(agent_id, session_id) - - return session_id - -def cleanup_session(agent_id: str, session_id: str, archive_dir: str): - """存档 jsonl + 文件锁保护下清理 sessions.json""" - sessions_dir = f"{AGENTS_DIR}/{agent_id}/sessions" - store_path = f"{sessions_dir}/sessions.json" - lock_path = f"{sessions_dir}/.cleanup.lock" - - # 1. 存档 jsonl - os.makedirs(archive_dir, exist_ok=True) - for ext in ['.jsonl', '.trajectory.jsonl', '.trajectory-path.json']: - src = f"{sessions_dir}/{session_id}{ext}" - if os.path.exists(src): - shutil.move(src, f"{archive_dir}/{session_id}{ext}") - - # 2. 文件锁保护下编辑 sessions.json - with open(lock_path, 'w') as lock_file: - fcntl.flock(lock_file, fcntl.LOCK_EX) - try: - with open(store_path) as f: - store = json.load(f) - keys_to_remove = [k for k in store if session_id in k] - for k in keys_to_remove: - del store[k] - with open(store_path, 'w') as f: - json.dump(store, f, indent=2) - finally: - fcntl.flock(lock_file, fcntl.LOCK_UN) - try: - os.unlink(lock_path) - except OSError: - pass - - # 3. 更新 agents 表 - _update_agent_status(agent_id, 'idle') - - logger.info(f"Cleaned up {agent_id} session={session_id[:8]}") -``` - -### 3.5 CLI 工具 - -**blackboard.py** — Agent 通过 exec 调用: - -```python -#!/usr/bin/env python3 -"""黑板操作 CLI — Agent 通过 exec 调用""" -import argparse -import sys -import json - -# 绝对路径导入 -import os -sys.path.insert(0, os.path.expanduser("~/.sanguo_projects/sanguo_moziplus_v2")) - -from src.blackboard.operations import ( - create_task, claim_task, update_task_status, - add_comment, write_output, record_decision, add_observation -) -from src.blackboard.queries import read_board - -def main(): - parser = argparse.ArgumentParser(description='Blackboard CLI') - sub = parser.add_subparsers(dest='command') - - # read - r = sub.add_parser('read') - r.add_argument('--task', required=True) - r.add_argument('--agent', default=None) - r.add_argument('--last', type=int, default=None) - r.add_argument('--type', choices=['comments', 'outputs', 'decisions', 'observations', 'all'], default='all') - - # claim - c = sub.add_parser('claim') - c.add_argument('--task', required=True) - c.add_argument('--agent', required=True) - - # output - o = sub.add_parser('output') - o.add_argument('--task', required=True) - o.add_argument('--agent', required=True) - o.add_argument('--type', required=True) - o.add_argument('--title', required=True) - o.add_argument('--path', required=True) - o.add_argument('--summary', required=True) - - # comment - cm = sub.add_parser('comment') - cm.add_argument('--task', required=True) - cm.add_argument('--author', required=True) - cm.add_argument('--body', required=True) - cm.add_argument('--mentions', default='[]') - - # decide - d = sub.add_parser('decide') - d.add_argument('--task', required=True) - d.add_argument('--decider', required=True) - d.add_argument('--decision', required=True) - d.add_argument('--rationale', required=True) - - # observe - ob = sub.add_parser('observe') - ob.add_argument('--task', required=True) - ob.add_argument('--observer', required=True) - ob.add_argument('--severity', required=True, choices=['blocking', 'warning', 'info', 'audit']) - ob.add_argument('--body', required=True) - - # create - cr = sub.add_parser('create') - cr.add_argument('--title', required=True) - cr.add_argument('--creator', required=True) - cr.add_argument('--description', default=None) - cr.add_argument('--task-type', default=None) - cr.add_argument('--assignee', default=None) - - args = parser.parse_args() - # ... 路由到对应操作函数,输出 JSON 结果 - -if __name__ == '__main__': - main() -``` - -**daemon.py** — Daemon 控制 CLI: - -```python -#!/usr/bin/env python3 -"""Daemon 控制 CLI""" -# 命令: -# daemon.py tick — 手动触发一次 tick -# daemon.py status — 查看 daemon 状态 -# daemon.py board — 查看黑板摘要 +├── ecosystem.config.cjs # PM2 配置 +└── docs/ # 设计文档 ``` --- -## 4. API 路由设计 +## 3. 数据库设计 -### 4.1 黑板 API(blackboard_routes.py) +### 3.1 数据库架构(per-project) + +每个项目独立 SQLite 数据库,物理隔离。路径:`projects/{project_id}/blackboard.db`。 + +全局注册表:`projects/registry.db`(存储项目列表和元数据)。 + +### 3.2 表结构 + +完整 Schema 见 `architecture-v2.6.md §3.2`,此处列出核心表: + +| 表 | 用途 | 来源课题 | +|----|------|---------| +| `tasks` | 任务主表(状态机) | 课题1 | +| `comments` | 评论/Handoff/辩论论点(含 comment_type) | 课题2/3 | +| `outputs` | 产出物(含 attempt_number) | 课题2 | +| `decisions` | 决策记录 | 课题1 | +| `observations` | 观察/吹哨(severity 分级) | 课题1 | +| `events` | 事件流(审计追踪) | 课题2 | +| `reviews` | 评审结果(含 confidence/rebuttal_status/debate_round) | 课题3 | +| `experiences` | 经验沉淀(含 tags) | 课题6 | +| `experience_tags` | 经验标签关联表 | 课题6 | + +### 3.3 关键约束 + +- WAL 模式 + `busy_timeout=5000` + `PRAGMA foreign_keys=ON` +- 所有写操作走 `BEGIN IMMEDIATE` 串行化 +- comments 表 `comment_type` CHECK 约束区分 8 种类型 + +--- + +## 4. Daemon 核心架构 + +### 4.1 Tick 循环(30s) + +```python +async def tick(project_id: str): + conn = get_connection(project_id) + try: + # 1. 逻辑健康自检(连续 N tick 无变更 → 告警) + health_check(conn) + + # 2. 处理 Inbox JSONL(即时事件) + process_inbox(conn) + + # 3. 扫描黑板状态 + tasks = scan_tasks(conn) + + # 4. 审查流水线处理 + review_flow(conn, tasks) + + # 5. 依赖推进(已完成任务解锁下游) + advance_dependencies(conn, tasks) + + # 6. 反驳权进度检测(催促通知) + check_rebuttal_progress(conn) + + # 7. Agent 调度 + dispatch_pending(conn, tasks) + + # 8. 一级蒸馏触发 + check_distillation(conn) + + finally: + conn.close() +``` + +**Tick + Inbox 双层事件架构**: +- Tick 30s 兜底扫描(Pull) +- Inbox JSONL 秒级推送加速(Push) +- Inbox 文件用 truncate(清空不删除),避免并发写入时文件不存在 + +### 4.2 Agent 调度判据(三级决策树) + +详见 `topic3-challenge-review-proposal.md §5.4`。 + +```python +def dispatch(task, action_type, project_config): + registered_agents = project_config.get("agents", []) + + # Level 1: 纯机械检查 → Daemon 直接执行 + if action_type in ("L1_guardrail", "format_check", "file_exists_check"): + return execute_locally(task) + + # Level 2: 有名字的角色 → Full Agent (Popen 非阻塞) + if task.assignee in registered_agents: + if action_type == "adjudication": + return spawn_full_agent(task.assignee, new_session=True) + return spawn_full_agent(task.assignee) + + # Level 3: 无名字的一次性任务 → Subagent (sessions_spawn) + if DISPATCH_RULES.get(action_type) == "subagent": + return spawn_subagent(task_description=action_type) + + # Level 4: 未知 action_type → 庞统裁决 + return spawn_full_agent("pangtong-fujunshi", new_session=True) +``` + +**spawn 方式**: +- Full Agent:`subprocess.Popen`(非阻塞),不等返回,下次 tick 检查产出 +- Subagent:`sessions_spawn`(Gateway API),等返回 + +### 4.3 ActiveAgentCounter(课题11) + +```python +class ActiveAgentCounter: + """异步计数器,控制并发""" + def __init__(self, max_global=5, max_per_agent=1): + self._global = asyncio.Semaphore(max_global) + self._per_agent = {} # agent_id → Semaphore(max_per_agent) + + async def acquire(self, agent_id: str) -> bool: + if self._global.locked(): + return False + agent_sem = self._per_agent.get(agent_id) + if agent_sem and agent_sem.locked(): + return False + await self._global.acquire() + if agent_id not in self._per_agent: + self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent) + await self._per_agent[agent_id].acquire() + return True + + def release(self, agent_id: str): + self._per_agent[agent_id].release() + self._global.release() +``` + +### 4.4 build_bootstrap() 四层上下文拼装 + +详见 `topic4-decomposition-skill-proposal.md D4-7`。 + +``` +L0 铁律层(~500 tokens) → Hook 注入,不占 bootstrap +L1 角色层(~2000 tokens) → SOUL.md / IDENTITY.md(Agent 自带) +L2 引擎注入层(~1500 tokens)→ prompt_templates 按 role 拼装 + ① 操作规范(executor.md / reviewer.md / planner.md) + ② 项目背景(project_context.yaml) + ③ 任务上下文(黑板数据) + ④ 前序信息(depends_on 产出摘要) + ⑤ Guardrail 规则(guardrails.yaml,仅执行者) + ⑥ 审查协议(review_protocols/,仅审查者) + ⑦ 经验注入(experiences 表按 tag 匹配) +L3 被动参考层(按需加载)→ Skill description 四要素 +``` + +L2 按角色精确注入:执行者注入⑤⑥,审查者注入⑥,庞统注入⑥。 + +--- + +## 5. 审查流水线 + +### 5.1 分级审查 + +| 风险等级 | 流水线 | 方案审查 | 产出审查 | 模式 | max_rounds | +|---------|--------|---------|---------|------|-----------| +| **high** | 三阶段 | ✅ 对抗辩论 | ✅ 对抗辩论 + Guardrail | debate | 5 | +| **standard** | 二阶段 | ✅ 单审 | ✅ 单审 + Guardrail | single_reviewer | 3 | +| **low** | 一阶段 | ❌ | ⚡ Guardrail 自动 | auto | 0 | +| **research** | 一阶段 | ❌ | ✅ 庞统确认 | single_reviewer | 2 | + +### 5.2 Guardrail 执行 + +```yaml +# config/guardrails.yaml 示例 +task_types: + coding: + output_review: + required: true + layers: + L1: # Daemon 直接执行 + - assert: "len(glob('tests/test_*.py')) > 0" + message: "缺少测试文件" + - assert: "import subprocess; subprocess.run(['python', '-m', 'py_compile', output_path], check=True)" + message: "Python 语法检查失败" + L2: # Subagent 执行 + prompt: "检查代码是否遵循项目规范,重点关注:命名、异常处理、资源释放" + deploy: + output_review: + required: true + layers: + L1: + - assert: "os.path.exists('deploy_log.md')" + message: "缺少部署日志" + data: + output_review: + required: false # low 风险,Guardrail 自动 +``` + +### 5.3 反驳权流控 + +审查者 verdict=needs_revision(有 critical/major)→ spawn 原执行者反驳 → 协商轮次 ≤ max_rounds → 不设超时,有催促通知 → 超轮次升级庞统。 + +详见 `topic3-challenge-review-proposal.md §5.3`。 + +### 5.4 评审详情 Schema + +评审结果写入 `{task_id}/reviews/{review_id}.json`,包含: +- `issues[]`(status/severity/category/location+context/suggestion) +- `evidence[]`(file_content/command_output) +- `positives[]`、`meta` +- category 含:correctness/security/performance/style/scope_deviation/**architecture/robustness** + +详见 `topic3-challenge-review-proposal.md §5.2`。 + +--- + +## 6. 经验沉淀 + +### 6.1 两级蒸馏 + +| 级别 | 触发时机 | 产出 | 存储 | +|------|---------|------|------| +| 一级(实时) | 任务完成后 | 经验条目(key findings/lessons/tags) | experiences 表 | +| 二级(周期) | 同 tag 积累 N 条 | Skill 草稿(draft→active→deprecated) | skills/ 目录 | + +### 6.2 经验注入 + +build_bootstrap() 按 tag 检索 experiences 表,格式化后注入 L2 上下文。 + +详见 `topic6-experience-loop-proposal.md`。 + +--- + +## 7. API 设计 + +### 7.1 黑板 API | 方法 | 路径 | 说明 | |------|------|------| -| GET | /api/blackboard/tasks | 任务列表(支持 ?status=&assignee= 过滤)| -| GET | /api/blackboard/tasks/{id} | 任务详情(含评论、产出、决策、观察)| -| POST | /api/blackboard/tasks | 创建任务 | -| POST | /api/blackboard/tasks/{id}/claim | 认领任务 | -| PATCH | /api/blackboard/tasks/{id}/status | 更新状态 | -| POST | /api/blackboard/tasks/{id}/comments | 添加评论 | -| POST | /api/blackboard/tasks/{id}/outputs | 写入产出 | -| POST | /api/blackboard/tasks/{id}/decisions | 记录决策 | -| POST | /api/blackboard/tasks/{id}/observations | 添加观察 | +| GET | `/api/projects/{pid}/tasks` | 任务列表(支持过滤) | +| GET | `/api/projects/{pid}/tasks/{id}` | 任务详情 | +| POST | `/api/projects/{pid}/tasks` | 创建任务 | +| POST | `/api/projects/{pid}/tasks/{id}/claim` | 认领 | +| PATCH | `/api/projects/{pid}/tasks/{id}/status` | 更新状态 | +| POST | `/api/projects/{pid}/tasks/{id}/comments` | 添加评论 | +| POST | `/api/projects/{pid}/tasks/{id}/outputs` | 写入产出 | +| POST | `/api/projects/{pid}/tasks/{id}/decisions` | 记录决策 | +| POST | `/api/projects/{pid}/tasks/{id}/observations` | 添加观察 | +| POST | `/api/projects/{pid}/tasks/{id}/reviews` | 提交评审 | -### 4.2 Daemon 控制 API(daemon_routes.py) +### 7.2 Daemon 控制 API | 方法 | 路径 | 说明 | |------|------|------| -| POST | /api/daemon/tick | 手动触发 tick | -| GET | /api/daemon/status | Daemon 状态(上次 tick、活跃 session 数)| -| GET | /api/daemon/sessions | 活跃 session 列表 | +| POST | `/api/daemon/tick` | 手动触发 tick | +| GET | `/api/daemon/status` | Daemon 状态 | +| GET | `/api/daemon/sessions` | 活跃 session | ---- +### 7.3 SSE 推送 -## 5. 与 v1.0 的关系 - -**完全独立,不共享代码。** - -| 维度 | v1.0 (sanguo_moziplus) | v2.0 (sanguo_moziplus_v2) | -|------|----------------------|--------------------------| -| 进程 | sanguo-moziplus (PM2) | sanguo-moziplus-v2 (PM2) | -| 端口 | 8082 | 8083 | -| 数据库 | moziplus.db | blackboard.db | -| 代码 | 独立 | 独立 | -| 编排 | DAG 状态机 | Shared Workspace | -| 通信 | Sanguo Mail | 黑板评论 | - -**v2 上线流程:** -1. v2 部署、测试、验证 -2. 确认 v2 功能覆盖 v1 -3. `pm2 stop sanguo-moziplus` — v1 下线 -4. `pm2 delete sanguo-moziplus` — 删除 v1 进程 -5. v2 接管端口 8082(可选) -6. v1 代码和数据库保留只读(历史归档) - ---- - -## 6. 测试策略 - -### 6.1 单元测试 - -| 测试文件 | 覆盖内容 | -|---------|---------| -| test_blackboard.py | 所有 operations 函数(claim CAS、状态机校验、评论、产出、决策)| -| test_daemon_tick.py | tick 循环逻辑(mention 处理、blocked 处理、cleanup)| -| test_spawner.py | spawn + cleanup(mock openclaw CLI)| -| test_health.py | stale reclaim + zombie detection | - -### 6.2 集成测试 - -- 启动 daemon → 创建任务 → spawn agent → agent 操作黑板 → 完成闭环 -- 竞态测试:两个 "agent" 同时 claim 同一任务 - -### 6.3 测试数据库 - -测试用 `:memory:` SQLite,不污染生产数据。 - -```python -# tests/conftest.py -import pytest -from src.blackboard.db import get_connection - -@pytest.fixture -def blackboard_db(): - """测试用内存数据库""" - conn = sqlite3.connect(":memory:") - conn.row_factory = sqlite3.Row - conn.executescript(SCHEMA_SQL) - yield conn - conn.close() +``` +GET /api/events?project={pid} +→ SSE 流,事件类型: + task_created / task_status_changed / review_submitted / rebuttal_added + notification(4级:🔴🟡🟢🔵) ``` ---- +### 7.4 多项目 API -## 7. 实现顺序(Phase 1 具体步骤) - -| 步骤 | 内容 | 依赖 | 估计时间 | -|------|------|------|---------| -| 1 | `blackboard/db.py` — schema + 连接管理 | 无 | 0.5h | -| 2 | `blackboard/operations.py` — 核心操作 | 步骤 1 | 2h | -| 3 | `blackboard/queries.py` — 查询+过滤 | 步骤 1 | 1h | -| 4 | `cli/blackboard.py` — CLI 工具 | 步骤 2,3 | 1h | -| 5 | 单元测试 test_blackboard.py | 步骤 2,3 | 1h | -| 6 | `daemon/spawner.py` — spawn+cleanup | 无 | 1h | -| 7 | `daemon/health.py` — 健康检查 | 步骤 1 | 0.5h | -| 8 | `daemon/ticker.py` — tick 循环 | 步骤 1,6,7 | 1.5h | -| 9 | `api/blackboard_routes.py` — HTTP API | 步骤 2,3 | 1h | -| 10 | `api/daemon_routes.py` — 控制API | 步骤 8 | 0.5h | -| 11 | main.py 集成 | 步骤 1-10 | 0.5h | -| 12 | E2E 测试 | 步骤 11 | 1h | -| **总计** | | | **~11h** | +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/projects` | 项目列表 | +| POST | `/api/projects` | 创建项目 | +| GET | `/api/projects/{pid}` | 项目详情 | +| DELETE | `/api/projects/{pid}` | 归档项目(安全流程) | --- -## 8. 风险 +## 8. Hook 系统 + +双重 Hook 架构: + +| Hook | 所属 | 触发时机 | 用途 | +|------|------|---------|------| +| `agent_turn_prepare` | OpenClaw Plugin | Agent 接收消息前 | 注入铁律(L0) | +| `agent_turn_complete` | OpenClaw Plugin | Agent 完成后 | 触发 lint/test(工具链集成点) | +| `pre_task_claim` | moziplus HookRegistry | Agent claim 前 | 权限检查 | +| `post_output_write` | moziplus HookRegistry | 产出写入后 | 触发 Guardrail 检查 | +| `post_review_submit` | moziplus HookRegistry | 评审提交后 | 触发反驳权流程 | + +--- + +## 9. 前端架构 + +### 9.1 技术选型 + +复用 v1.0 前端框架(React + Vite + TypeScript),重设计页面结构。 + +### 9.2 页面结构(5页) + +| 页面 | 内容 | 来源 | +|------|------|------| +| **任务看板** | 任务卡片、Checkpoint 交互、状态流转 | 课题7+9 | +| **全局监控** | Agent 状态、系统健康、资源使用 | 课题9 | +| **产出档案** | 产出物预览/下载、评审详情 | 课题9 | +| **系统配置** | 项目管理、Agent 配置、Guardrail 编辑 | 课题9 | +| **AI Briefing** | 日报/周报自动生成 | 课题9 | + +### 9.3 实时推送 + +SSE 端点 `/api/events`,前端 EventSource 监听。4 级推送(🔴🟡🟢🔵)。降级:SSE 不可用时 30s 轮询。 + +### 9.4 构建部署 + +```bash +cd src/frontend && npm run build +# 产物在 src/frontend/dist/ +# FastAPI 直接托管静态文件(port 8083) +``` + +详见 `topic7-9-interaction-dashboard-proposal.md`。 + +--- + +## 10. 测试策略 + +### 10.1 单元测试 + +| 测试文件 | 覆盖 | +|---------|------| +| test_blackboard.py | operations 全函数(claim CAS、状态机、comment_type、review CRUD) | +| test_daemon_tick.py | tick 流程(Inbox 处理、依赖推进、反驳权进度检测) | +| test_dispatcher.py | 三级调度判据 + fallback | +| test_guardrail.py | L1 assert 执行 + L2 prompt 触发 | +| test_review_flow.py | 审查流水线(分级审查、反驳权、升级) | +| test_experience.py | 一级蒸馏 + tag 检索 | +| test_bootstrap.py | 四层上下文拼装(按 role 精确注入) | +| test_counter.py | ActiveAgentCounter 并发控制 | + +### 10.2 集成测试 + +- 创建任务 → 调度执行 → Guardrail → 审查 → 反驳 → 完成 闭环 +- 多项目并发:两个项目同时 tick,互不影响 +- 竞态测试:两个 Agent 同时 claim + 同时写 inbox + +### 10.3 测试数据库 + +per-project 测试用 `:memory:` SQLite。全局注册表测试用临时文件。 + +--- + +## 11. 实现顺序 + +| Phase | 内容 | 估计时间 | +|-------|------|---------| +| **P1** | 黑板 + Daemon + CLI + API + 多项目 | ~15h | +| **P2** | 审查流水线 + Guardrail + 反驳权 + build_bootstrap | ~12h | +| **P3** | 前端重设计 + SSE 推送 + AI Briefing | ~10h | +| **P4** | 经验沉淀 + Skill 进化 + 工具链集成 | ~8h | + +--- + +## 12. 风险 | 风险 | 缓解 | |------|------| -| asyncio tick 和 FastAPI 请求并发访问 SQLite | WAL 模式 + busy_timeout + BEGIN IMMEDIATE | -| Popen spawn 的子进程无人回收 | health.py 检测 + reclaim | -| 测试需要 mock openclaw CLI | 用 subprocess mock 或 test fixture | -| 前端需要适配黑板 API | Phase 1 先只做后端 + CLI,前端 Phase 2 | +| per-project SQLite 并发 tick | WAL + busy_timeout + per-project asyncio.Lock | +| Agent spawn 子进程回收 | health.py 僵尸检测 + reclaim | +| Inbox JSONL 并发写入 | truncate(不删除),实测 200 并发 0 丢失 | +| 前端 v1→v2 迁移 | v1 Tab 全保留,渐进替换 | +| Daemon 逻辑死循环 | 连续 N tick(默认 20)无变更 → observation 告警 + 通知用户 | +| 反驳权无限协商 | max_rounds 兜底 + 超轮次升级庞统 |