Files
sanguo_moziplus_v2/docs/design/technical-design-v2.6.md
T
2026-05-15 02:05:48 +08:00

24 KiB
Raw Blame History

v2.6 技术方案设计

版本: v2.6.1-tech (含评审修正 + 独立部署) 基于: architecture-v2.6.md 作者: 庞统(副军师) 日期: 2026-05-15


1. 技术栈

组件 选型 版本 理由
Daemon 进程管理 PM2 6.0.14 独立新进程 sanguo-moziplus-v2
Daemon 框架 Python + asyncio 3.9+ 独立项目,独立目录
黑板存储 SQLite 3.51 WAL 模式,单 host 足够
HTTP API FastAPI 独立安装 独立 uvicorn 实例,端口 8083
Agent 调度 openclaw CLI 2026.5.7 已验证 openclaw agent --session-id
Session 清理 fcntl + JSON 编辑 系统原生 已验证可行
Git gitee + gitea 双远程 开发目录 → 安装目录

不加新依赖。 所有组件都是已有环境中的。


2. 项目结构

独立项目,不嵌入 moziplus。 完全独立开发、独立部署、独立运行。v2 上线后 v1 整体下线。

sanguo_moziplus_v2/                    # 🆕 全新项目目录
├── src/
│   ├── main.py                         # FastAPI 入口(独立)
│   ├── blackboard/                     # 黑板核心模块
│   │   ├── __init__.py
│   │   ├── db.py                       # SQLite 连接管理、schema 初始化
│   │   ├── models.py                   # 数据模型
│   │   ├── operations.py               # 读写操作
│   │   └── queries.py                  # 查询操作
│   │
│   ├── daemon/                         # Daemon 核心模块
│   │   ├── __init__.py
│   │   ├── ticker.py                   # Tick 循环主逻辑
│   │   ├── spawner.py                  # Agent spawn + session 管理
│   │   ├── health.py                   # 健康检查
│   │   └── notifier.py                 # @mention 解析
│   │
│   ├── api/                            # HTTP API
│   │   ├── blackboard_routes.py        # 黑板 API
│   │   └── daemon_routes.py            # Daemon 控制 API
│   │
│   └── cli/                            # CLI 工具
│       ├── blackboard.py               # Agent 黑板操作 CLI
│       └── daemon.py                   # Daemon 控制 CLI
│
├── config/
│   └── default.yaml
│
├── artifacts/                          # 产出物目录
│
├── tests/
│   ├── unit/
│   └── e2e/
│
├── pyproject.toml
└── ecosystem.config.cjs                # PM2 配置

开发 vs 安装目录

路径 说明
开发 ~/.openclaw/sanguo_projects/sanguo_moziplus_v2/ Git 管理
安装 ~/.sanguo_projects/sanguo_moziplus_v2/ PM2 运行
Git 远程 gitee + gitea 同现有项目

3. 模块详细设计

3.1 blackboard/db.py — 数据库管理

"""黑板 SQLite 数据库管理"""
import sqlite3
import os
from pathlib import Path

DB_PATH = Path(os.environ.get(
    'BLACKBOARD_DB',
    os.path.expanduser('~/.sanguo_projects/sanguo_moziplus_v2/blackboard.db')
))

SCHEMA_SQL = """
-- 完整 schema 见 architecture-v2.6.md §3.2
-- 此处运行 CREATE TABLE IF NOT EXISTS
"""

def get_connection() -> sqlite3.Connection:
    conn = sqlite3.connect(str(DB_PATH))
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA foreign_keys=ON")
    conn.execute("PRAGMA busy_timeout=5000")
    return conn

def init_db():
    """初始化数据库(启动时调用)"""
    conn = get_connection()
    try:
        conn.executescript(SCHEMA_SQL)
        conn.commit()
    finally:
        conn.close()

3.2 blackboard/operations.py — 核心操作

"""黑板读写操作"""
import json
import sqlite3
from datetime import datetime
from .db import get_connection

def create_task(title: str, creator: str, description: str = None,
                task_type: str = None, assignee: str = None,
                depends_on: list = None, priority: int = 5,
                parent_task: str = None) -> dict:
    """创建任务(任何 Agent 都可以)"""
    task_id = f"task-{uuid.uuid4().hex[:8]}"
    conn = get_connection()
    try:
        conn.execute(
            """INSERT INTO tasks (id, title, description, status, assigned_by, 
               task_type, priority, depends_on, parent_task)
               VALUES (?, ?, ?, 'pending', ?, ?, ?, ?, ?)""",
            (task_id, title, description, creator, task_type, priority,
             json.dumps(depends_on or []), parent_task)
        )
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_created', ?)",
            (task_id, creator, json.dumps({'title': title}))
        )
        conn.commit()
        return {'id': task_id, 'status': 'pending'}
    finally:
        conn.close()

def claim_task(task_id: str, agent_id: str) -> bool:
    """认领任务(原子 CAS,支持指定分配和自由认领)"""
    conn = get_connection()
    try:
        cursor = conn.execute(
            """UPDATE tasks SET status='claimed', assignee=?, claimed_at=datetime('now')
               WHERE id=? AND status='pending' 
               AND (assignee IS NULL OR assignee=?)""",
            (agent_id, task_id, agent_id)
        )
        if cursor.rowcount > 0:
            conn.execute(
                "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'task_claimed', NULL)",
                (task_id, agent_id)
            )
            conn.commit()
            return True
        conn.rollback()
        return False
    finally:
        conn.close()

def update_task_status(task_id: str, new_status: str, agent_id: str, reason: str = None):
    """更新任务状态(校验合法流转)"""
    VALID = {
        "pending": {"claimed", "cancelled"},
        "claimed": {"working", "pending", "cancelled"},
        "working": {"review", "blocked", "failed", "cancelled"},
        "review": {"done", "pending", "failed", "cancelled"},
        "blocked": {"pending", "cancelled"},
        "failed": {"pending"},
        "done": set(),
        "cancelled": set(),
    }
    conn = get_connection()
    try:
        row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
        if not row:
            raise ValueError(f"Task {task_id} not found")
        current = row['status']
        if new_status not in VALID.get(current, set()):
            raise ValueError(f"Invalid transition: {current}{new_status}")
        
        conn.execute(
            "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
            (new_status, task_id)
        )
        # 更新时间戳
        if new_status == 'working':
            conn.execute("UPDATE tasks SET started_at=datetime('now') WHERE id=?", (task_id,))
        elif new_status in ('done', 'failed', 'cancelled'):
            conn.execute("UPDATE tasks SET completed_at=datetime('now') WHERE id=?", (task_id,))
        
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
            (task_id, agent_id, f'task_{new_status}', json.dumps({'from': current, 'reason': reason}))
        )
        conn.commit()
    finally:
        conn.close()

def add_comment(task_id: str, author: str, body: str, mentions: list = None):
    """添加评论"""
    conn = get_connection()
    try:
        conn.execute(
            "INSERT INTO comments (task_id, author, body, mentions) VALUES (?, ?, ?, ?)",
            (task_id, author, body, json.dumps(mentions or []))
        )
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'comment_added', ?)",
            (task_id, author, json.dumps({'body_preview': body[:100], 'mentions': mentions}))
        )
        conn.commit()
    finally:
        conn.close()

def write_output(task_id: str, agent: str, output_type: str, title: str,
                 content_path: str, summary: str, metadata: dict = None):
    """写入产出"""
    conn = get_connection()
    try:
        conn.execute("BEGIN IMMEDIATE")
        conn.execute(
            """INSERT INTO outputs (task_id, agent, output_type, title, content_path, summary, metadata)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (task_id, agent, output_type, title, content_path, summary,
             json.dumps(metadata or {}))
        )
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'output_written', ?)",
            (task_id, agent, json.dumps({'title': title}))
        )
        conn.commit()
    finally:
        conn.close()

def record_decision(task_id: str, decider: str, decision: str,
                    rationale: str, alternatives: list = None):
    """记录决策"""
    conn = get_connection()
    try:
        conn.execute(
            """INSERT INTO decisions (task_id, decider, decision, rationale, alternatives)
               VALUES (?, ?, ?, ?, ?)""",
            (task_id, decider, decision, rationale, json.dumps(alternatives or []))
        )
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'decision_recorded', ?)",
            (task_id, decider, json.dumps({'decision': decision}))
        )
        conn.commit()
    finally:
        conn.close()

def add_observation(task_id: str, observer: str, severity: str, body: str):
    """添加观察"""
    conn = get_connection()
    try:
        conn.execute(
            "INSERT INTO observations (task_id, observer, severity, body) VALUES (?, ?, ?, ?)",
            (task_id, observer, severity, body)
        )
        conn.execute(
            "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, 'observation_added', ?)",
            (task_id, observer, json.dumps({'severity': severity}))
        )
        conn.commit()
    finally:
        conn.close()

3.3 daemon/ticker.py — Tick 循环

"""Daemon tick 循环"""
import asyncio
import logging
from datetime import datetime, timedelta

from blackboard.db import get_connection
from blackboard.queries import get_board_snapshot
from daemon.spawner import async_spawn_agent, cleanup_session, get_active_sessions
from daemon.health import reclaim_stale, detect_zombies

logger = logging.getLogger(__name__)

TICK_INTERVAL = 60  # 秒

class DaemonTicker:
    def __init__(self):
        self.running = False
        self.last_tick = None
        self._conn = None  # tick 内共享的 SQLite 连接
    
    async def start(self):
        """启动 tick 循环"""
        self.running = True
        logger.info("Daemon ticker started")
        while self.running:
            try:
                await self.tick()
            except Exception as e:
                logger.error(f"Tick error: {e}", exc_info=True)
            self.last_tick = datetime.now()
            await asyncio.sleep(TICK_INTERVAL)
    
    async def stop(self):
        self.running = False
    
    async def manual_tick(self):
        """手动触发(CLI 或 API 调用)"""
        await self.tick()
    
    async def tick(self):
        """单次 tick"""
        logger.debug("Tick start")
        
        # tick 内共享一个 SQLite 连接(避免频繁 open/close 导致 WAL checkpoint 抖动)
        self._conn = get_connection()
        try:
            # 1. 健康检查
            reclaimed = reclaim_stale(self._conn)
            zombies = detect_zombies(self._conn)
        if reclaimed:
            logger.info(f"Reclaimed {len(reclaimed)} stale tasks")
        if zombies:
            logger.warning(f"Detected {len(zombies)} zombie sessions")
        
        # 2. 读黑板
        board = get_board_snapshot()
        
        # 3. 处理 @mention
        await self._process_mentions(board)
        
        # 4. 处理待领取任务
        await self._process_unclaimed(board)
        
        # 5. 处理 blocked 任务
        await self._process_blocked(board)
        
        # 6. 清理完成的 session
        await self._cleanup_sessions()
        
        finally:
            self._conn.close()
            self._conn = None
        logger.debug("Tick done")
    
    async def _process_mentions(self, board):
        """处理评论中的 @mention"""
        for comment in board.get_unprocessed_mentions():
            for agent_id in comment.mentions:
                if not self._is_agent_active(agent_id):
                    await async_spawn_agent(
                        agent_id,
                        f"黑板上有给你的新评论({comment.task_id}),请查看。"
                    )
            board.mark_comment_processed(comment.id)
    
    async def _process_unclaimed(self, board):
        """处理有 assignee 但未 claim 的任务"""
        for task in board.get_assigned_unclaimed():
            if not self._is_agent_active(task.assignee):
                await async_spawn_agent(
                    task.assignee,
                    f"黑板上有分配给你的任务({task.title}),请查看并认领。"
                )
    
    async def _process_blocked(self, board):
        """处理 blocked 任务"""
        for task in board.get_blocked_tasks():
            mentions = board.get_latest_mentions(task.id)
            if mentions:
                for agent_id in mentions:
                    if not self._is_agent_active(agent_id):
                        await async_spawn_agent(
                            agent_id,
                            f"任务 {task.id} 被 block,需要你的协助。"
                        )
            else:
                # 没有 @ → spawn 庞统决定
                await async_spawn_agent(
                    'pangtong-fujunshi',
                    f"任务 {task.id} 被 block,没有指定协助者,请决定如何处理。"
                )
    
    async def _cleanup_sessions(self):
        """清理完成的 session"""
        for session in get_active_sessions():
            if session.is_completed():
                archive_dir = f"artifacts/{session.task_id}/archive/"
                cleanup_session(session.agent_id, session.session_id, archive_dir)
    
    def _is_agent_active(self, agent_id: str) -> bool:
        """检查 agent 是否有正在运行的 session"""
        conn = get_connection()
        try:
            row = conn.execute(
                "SELECT current_status FROM agents WHERE agent_id=?", (agent_id,)
            ).fetchone()
            return row and row['current_status'] == 'working'
        finally:
            conn.close()

3.4 daemon/spawner.py — Agent 调度

"""Agent spawn 和 session 管理"""
import json
import os
import shutil
import fcntl
import subprocess
import uuid
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

AGENTS_DIR = os.path.expanduser("~/.openclaw/agents")

def spawn_agent(agent_id: str, message: str) -> str:
    """spawn 隔离 session,不等待完成。
    
    注意:当前同步实现(Popen + SQLite 写入)。
    6 Agent 规模下可接受(SQLite 写入 <1ms)。
    后续 scale up 时改 asyncio.create_subprocess_exec + async SQLite。
    """
    session_id = str(uuid.uuid4())
    cmd = [
        "openclaw", "agent",
        "--agent", agent_id,
        "--session-id", session_id,
        "--message", message,
        "--json"
    ]
    logger.info(f"Spawning {agent_id} session={session_id[:8]}")
    proc = subprocess.Popen(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    
    # 检查是否立即失败(100ms 内退出 = spawn 失败)
    import time
    time.sleep(0.1)
    if proc.poll() is not None:
        stdout, stderr = proc.communicate()
        logger.error(f"Spawn failed immediately: {stderr.decode()[:500]}")
        raise RuntimeError(f"Spawn failed: {stderr.decode()[:200]}")
    
    # 更新 agents 表
    _update_agent_status(agent_id, 'working')
    
    # 记录 session 元数据(用于后续清理)
    _register_session(agent_id, session_id)
    
    return session_id

def cleanup_session(agent_id: str, session_id: str, archive_dir: str):
    """存档 jsonl + 文件锁保护下清理 sessions.json"""
    sessions_dir = f"{AGENTS_DIR}/{agent_id}/sessions"
    store_path = f"{sessions_dir}/sessions.json"
    lock_path = f"{sessions_dir}/.cleanup.lock"
    
    # 1. 存档 jsonl
    os.makedirs(archive_dir, exist_ok=True)
    for ext in ['.jsonl', '.trajectory.jsonl', '.trajectory-path.json']:
        src = f"{sessions_dir}/{session_id}{ext}"
        if os.path.exists(src):
            shutil.move(src, f"{archive_dir}/{session_id}{ext}")
    
    # 2. 文件锁保护下编辑 sessions.json
    with open(lock_path, 'w') as lock_file:
        fcntl.flock(lock_file, fcntl.LOCK_EX)
        try:
            with open(store_path) as f:
                store = json.load(f)
            keys_to_remove = [k for k in store if session_id in k]
            for k in keys_to_remove:
                del store[k]
            with open(store_path, 'w') as f:
                json.dump(store, f, indent=2)
        finally:
            fcntl.flock(lock_file, fcntl.LOCK_UN)
            try:
                os.unlink(lock_path)
            except OSError:
                pass
    
    # 3. 更新 agents 表
    _update_agent_status(agent_id, 'idle')
    
    logger.info(f"Cleaned up {agent_id} session={session_id[:8]}")

3.5 CLI 工具

blackboard.py — Agent 通过 exec 调用:

#!/usr/bin/env python3
"""黑板操作 CLI — Agent 通过 exec 调用"""
import argparse
import sys
import json

# 绝对路径导入
import os
sys.path.insert(0, os.path.expanduser("~/.sanguo_projects/sanguo_moziplus_v2"))

from src.blackboard.operations import (
    create_task, claim_task, update_task_status,
    add_comment, write_output, record_decision, add_observation
)
from src.blackboard.queries import read_board

def main():
    parser = argparse.ArgumentParser(description='Blackboard CLI')
    sub = parser.add_subparsers(dest='command')
    
    # read
    r = sub.add_parser('read')
    r.add_argument('--task', required=True)
    r.add_argument('--agent', default=None)
    r.add_argument('--last', type=int, default=None)
    r.add_argument('--type', choices=['comments', 'outputs', 'decisions', 'observations', 'all'], default='all')
    
    # claim
    c = sub.add_parser('claim')
    c.add_argument('--task', required=True)
    c.add_argument('--agent', required=True)
    
    # output
    o = sub.add_parser('output')
    o.add_argument('--task', required=True)
    o.add_argument('--agent', required=True)
    o.add_argument('--type', required=True)
    o.add_argument('--title', required=True)
    o.add_argument('--path', required=True)
    o.add_argument('--summary', required=True)
    
    # comment
    cm = sub.add_parser('comment')
    cm.add_argument('--task', required=True)
    cm.add_argument('--author', required=True)
    cm.add_argument('--body', required=True)
    cm.add_argument('--mentions', default='[]')
    
    # decide
    d = sub.add_parser('decide')
    d.add_argument('--task', required=True)
    d.add_argument('--decider', required=True)
    d.add_argument('--decision', required=True)
    d.add_argument('--rationale', required=True)
    
    # observe
    ob = sub.add_parser('observe')
    ob.add_argument('--task', required=True)
    ob.add_argument('--observer', required=True)
    ob.add_argument('--severity', required=True, choices=['blocking', 'warning', 'info', 'audit'])
    ob.add_argument('--body', required=True)
    
    # create
    cr = sub.add_parser('create')
    cr.add_argument('--title', required=True)
    cr.add_argument('--creator', required=True)
    cr.add_argument('--description', default=None)
    cr.add_argument('--task-type', default=None)
    cr.add_argument('--assignee', default=None)
    
    args = parser.parse_args()
    # ... 路由到对应操作函数,输出 JSON 结果

if __name__ == '__main__':
    main()

daemon.py — Daemon 控制 CLI

#!/usr/bin/env python3
"""Daemon 控制 CLI"""
# 命令:
#   daemon.py tick      — 手动触发一次 tick
#   daemon.py status    — 查看 daemon 状态
#   daemon.py board     — 查看黑板摘要

4. API 路由设计

4.1 黑板 APIblackboard_routes.py

方法 路径 说明
GET /api/blackboard/tasks 任务列表(支持 ?status=&assignee= 过滤)
GET /api/blackboard/tasks/{id} 任务详情(含评论、产出、决策、观察)
POST /api/blackboard/tasks 创建任务
POST /api/blackboard/tasks/{id}/claim 认领任务
PATCH /api/blackboard/tasks/{id}/status 更新状态
POST /api/blackboard/tasks/{id}/comments 添加评论
POST /api/blackboard/tasks/{id}/outputs 写入产出
POST /api/blackboard/tasks/{id}/decisions 记录决策
POST /api/blackboard/tasks/{id}/observations 添加观察

4.2 Daemon 控制 APIdaemon_routes.py

方法 路径 说明
POST /api/daemon/tick 手动触发 tick
GET /api/daemon/status Daemon 状态(上次 tick、活跃 session 数)
GET /api/daemon/sessions 活跃 session 列表

5. 与 v1.0 的关系

完全独立,不共享代码。

维度 v1.0 (sanguo_moziplus) v2.0 (sanguo_moziplus_v2)
进程 sanguo-moziplus (PM2) sanguo-moziplus-v2 (PM2)
端口 8082 8083
数据库 moziplus.db blackboard.db
代码 独立 独立
编排 DAG 状态机 Shared Workspace
通信 Sanguo Mail 黑板评论

v2 上线流程:

  1. v2 部署、测试、验证
  2. 确认 v2 功能覆盖 v1
  3. pm2 stop sanguo-moziplus — v1 下线
  4. pm2 delete sanguo-moziplus — 删除 v1 进程
  5. v2 接管端口 8082(可选)
  6. v1 代码和数据库保留只读(历史归档)

6. 测试策略

6.1 单元测试

测试文件 覆盖内容
test_blackboard.py 所有 operations 函数(claim CAS、状态机校验、评论、产出、决策)
test_daemon_tick.py tick 循环逻辑(mention 处理、blocked 处理、cleanup
test_spawner.py spawn + cleanupmock openclaw CLI
test_health.py stale reclaim + zombie detection

6.2 集成测试

  • 启动 daemon → 创建任务 → spawn agent → agent 操作黑板 → 完成闭环
  • 竞态测试:两个 "agent" 同时 claim 同一任务

6.3 测试数据库

测试用 :memory: SQLite,不污染生产数据。

# tests/conftest.py
import pytest
from src.blackboard.db import get_connection

@pytest.fixture
def blackboard_db():
    """测试用内存数据库"""
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA_SQL)
    yield conn
    conn.close()

7. 实现顺序(Phase 1 具体步骤)

步骤 内容 依赖 估计时间
1 blackboard/db.py — schema + 连接管理 0.5h
2 blackboard/operations.py — 核心操作 步骤 1 2h
3 blackboard/queries.py — 查询+过滤 步骤 1 1h
4 cli/blackboard.py — CLI 工具 步骤 2,3 1h
5 单元测试 test_blackboard.py 步骤 2,3 1h
6 daemon/spawner.py — spawn+cleanup 1h
7 daemon/health.py — 健康检查 步骤 1 0.5h
8 daemon/ticker.py — tick 循环 步骤 1,6,7 1.5h
9 api/blackboard_routes.py — HTTP API 步骤 2,3 1h
10 api/daemon_routes.py — 控制API 步骤 8 0.5h
11 main.py 集成 步骤 1-10 0.5h
12 E2E 测试 步骤 11 1h
总计 ~11h

8. 风险

风险 缓解
asyncio tick 和 FastAPI 请求并发访问 SQLite WAL 模式 + busy_timeout + BEGIN IMMEDIATE
Popen spawn 的子进程无人回收 health.py 检测 + reclaim
测试需要 mock openclaw CLI 用 subprocess mock 或 test fixture
前端需要适配黑板 API Phase 1 先只做后端 + CLI,前端 Phase 2