Files
sanguo_moziplus_v2/docs/design/topic11-multi-project-proposal.md
T
2026-05-16 13:39:32 +08:00

22 KiB
Raw Blame History

课题11 设计方案:用户级多项目支持

日期: 2026-05-16 作者: 庞统(副军师)🐦 状态: v2(并发调度模型重设计,待评审) 前置: 课题1-4、课题6 已完成设计 变更: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick


1. 核心问题

用户同时有多个工作域(量化策略A + 平台开发 + 数据研究),需要项目级隔离——不同项目的任务、配置、产出互不干扰,但共享同一套 Agent 团队和 Daemon 基础设施。

2. 需要隔离什么

隔离项 原因 隔离方式
黑板数据(tasks/comments/outputs/decisions/observations/events/agents/task_attempts/reviews/experiences/experience_tags 不同项目的任务不能混在一起 独立 SQLite 文件
配置(guardrails.yaml / prompt_templates / project_context.yaml 不同项目可能有不同的审查规则、上下文 项目级 config/ 目录,覆盖全局默认
产出文件 不同项目的代码/数据物理隔离 项目级 outputs/ 目录
Agent session 同一 Agent 参与不同项目时上下文不串 OpenClaw --session-id 已有隔离
Daemon 连接 不同数据库连接不能混淆 连接池 project_id → Connection 映射

不需要隔离的

  • Agent 注册表(agents.yaml)—— 同一套 Agent 团队服务所有项目
  • Daemon 进程 —— 单进程管理所有项目
  • Schema 定义 —— 所有项目共享同一套表结构
  • 全局 prompt_templates —— 项目级覆盖,不是替换

3. 方案选择

3.1 三个方案对比

方案 做法 优点 缺点
A. 多实例 每个项目独立 Daemon + 独立 SQLite + 独立端口 完全隔离、互不影响 资源开销翻倍、管理复杂、Agent 重复注册
B. 单实例多命名空间 一个 SQLite,所有表加 project_id 零额外资源 每个查询带 WHERE、单文件性能上限、删除项目危险
C. 单 Daemon 多数据库 一个 Daemon,每个项目一个 SQLite 文件 物理隔离数据、共享 Daemon Daemon 需管理多连接

3.2 选择方案 C

理由:

  1. 方案 A 不适合——Mac mini 资源有限,每多一个项目就多一套 Daemon + PM2 进程 + 端口。6 个 Agent 跑在 OpenClaw 上已经固定开销,不需要重复。
  2. 方案 B 不够安全——WHERE project_id = ? 容易漏,SQLite 单文件多项目并发有 WAL 锁瓶颈,删除项目 = 跨所有表 DELETE。
  3. 方案 C 是最优点——数据物理隔离(每个 .db 文件独立),但共享 Daemon 进程和 Agent 注册表。Daemon 切换项目只是切换 SQLite 连接,无额外资源开销。

优秀实践验证

  • WanmanPer-Agent Worktree + $HOME 严格隔离 → 验证"物理隔离比逻辑隔离可靠"
  • ClawTeamGit Worktree 隔离 + fcntl 文件锁 → 验证"共享进程 + 独立存储"模式可行
  • ClineKanban + Worktree → 验证"多任务并行 + 物理隔离"是主流
  • Hermes:单 Dispatcher + 单 SQLite → Hermes 是单项目设计,我们没有"多项目用单数据库"的先例

4. 目录结构

~/.sanguo_projects/moziplus_v2/
├── daemon.py                          # 单 Daemon 进程
├── daemon.yaml                        # Daemon 全局配置(端口、tick 间隔等)
├── projects/
│   ├── _registry.yaml                 # 项目注册表(所有项目的元数据)
│   ├── quant-momentum/                # 项目 1
│   │   ├── blackboard.db              # 独立 SQLite
│   │   ├── config/
│   │   │   ├── project.yaml           # 项目元信息(名称、描述、创建时间)
│   │   │   ├── guardrails.yaml        # 项目级审查规则(覆盖全局默认)
│   │   │   ├── project_context.yaml   # 项目背景知识(注入 L2)
│   │   │   └── prompt_overrides/      # 可选:覆盖默认 prompt 模板
│   │   └── outputs/                   # 项目产出目录
│   ├── quant-pairs/                   # 项目 2
│   │   ├── blackboard.db
│   │   ├── config/
│   │   └── outputs/
│   └── moziplus-dev/                  # 项目 3(自身开发)
│       ├── blackboard.db
│       ├── config/
│       └── outputs/
└── shared/
    ├── prompt_templates/              # 全局默认模板
    ├── schemas/                       # 全局 Schema
    └── agents.yaml                    # 全局 Agent 注册表

4.1 项目注册表(_registry.yaml

# projects/_registry.yaml
default_project: quant-momentum
projects:
  quant-momentum:
    display_name: "动量因子策略"
    description: "基于动量因子的量化策略研发"
    created_at: "2026-05-16T10:00:00Z"
    status: active          # active / archived
    agents: [zhangfei-dev, zhaoyun-data, guanyu-dev]  # 该项目可用的 Agent
  quant-pairs:
    display_name: "配对交易策略"
    description: "统计套利配对交易研究"
    created_at: "2026-05-16T10:00:00Z"
    status: active
    agents: [zhangfei-dev, zhaoyun-data]
  moziplus-dev:
    display_name: "平台开发"
    description: "moziplus 自身开发"
    created_at: "2026-05-16T10:00:00Z"
    status: active
    agents: [zhangfei-dev, simayi-challenger]

5. Daemon 变更

5.1 多连接池

# daemon 内部
class ProjectManager:
    def __init__(self, projects_dir: Path):
        self.projects_dir = projects_dir
        self._connections: dict[str, sqlite3.Connection] = {}
        self._configs: dict[str, ProjectConfig] = {}
        self._load_registry()

    def get_connection(self, project_id: str) -> sqlite3.Connection:
        if project_id not in self._connections:
            db_path = self.projects_dir / project_id / "blackboard.db"
            self._connections[project_id] = sqlite_connect(db_path)
        return self._connections[project_id]

    def get_config(self, project_id: str) -> ProjectConfig:
        if project_id not in self._configs:
            config_path = self.projects_dir / project_id / "config" / "project.yaml"
            self._configs[project_id] = ProjectConfig.load(config_path)
        return self._configs[project_id]

    def load_guardrails(self, project_id: str) -> dict:
        """项目级 guardrails.yaml 覆盖全局默认"""
        global_guardrails = load_yaml("shared/guardrails.yaml")
        project_guardrails_path = self.projects_dir / project_id / "config" / "guardrails.yaml"
        if project_guardrails_path.exists():
            project_guardrails = load_yaml(project_guardrails_path)
            return deep_merge(global_guardrails, project_guardrails)  # 项目级覆盖
        return global_guardrails

5.2 Tick 逻辑变更(已废弃,见 §5.4

原设计:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。 问题:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。 新设计:见 §5.4 per-project 并发调度。

5.3 Daemon 逻辑健康自检

# §14 风险缓解:连续 N tick 无状态变更则告警
STALE_TICK_THRESHOLD = 20

class DaemonHealth:
    def __init__(self):
        self._tick_state_changes: dict[str, int] = {}  # project_id → 连续无变更 tick 数

    def record_change(self, project_id: str):
        self._tick_state_changes[project_id] = 0

    def check_stale(self, project_id: str) -> bool:
        self._tick_state_changes.setdefault(project_id, 0)
        self._tick_state_changes[project_id] += 1
        return self._tick_state_changes[project_id] >= STALE_TICK_THRESHOLD

5.4 并发调度模型(v2 新增)

5.4.1 问题

原设计中 Daemon 主循环串行 tick 所有项目:

Tick → Project A30s)→ Project B(等A完成)→ Project C(等B完成)

问题:

  1. 项目间互相阻塞——Project A 有一个长任务在执行,Project B 的独立任务必须等
  2. 响应延迟——3 个项目 tick 一次可能要 90s+Project C 要等 60s 才被检查
  3. 不符合业界实践——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队

5.4.2 业界并发模型调研

项目 并发模型 核心机制
open-multi-agent AgentPool + Semaphore 全局 maxConcurrency=5per-agent 互斥锁,Promise.allSettled 并行执行独立任务
Wanman per-agent 进程 每个 Agent 独立进程+独立 runLoopSupervisor 通过消息总线协调
Google ADK asyncio.TaskGroup ParallelAgentTaskGroup 并行执行子 Agent
Microsoft AutoGen Pregel Superstep 每个 superstep 内所有激活 Executor 并行执行
Hermes 单线程 tick + flock 单项目设计tick 内只有几个 cron job,不需要并发

关键发现open-multi-agent 的 AgentPool + Semaphore + per-agent Lock 是最成熟、最可借鉴的模型。

5.4.3 设计:per-project 线程 + 全局资源信号量

Daemon 主进程(轻量路由器 + 资源管控)
│
├── 全局 LLM Semaphoremax_concurrent=3
├── per-agent Lock(张飞不能同时在两个项目里跑)
│
├── ProjectSlot A(独立线程)
│   └── 自己的 SQLite 连接
│   └── 自己的 tick 循环(30s
│   └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore
│
├── ProjectSlot B(独立线程)
│   └── (同上)
│
└── ProjectSlot C(独立线程)
    └── (同上)

三层资源控制

层级 控制对象 机制 原因
L1: 项目隔离 SQLite 连接 per-project 独立连接 数据物理隔离,无竞争
L2: Agent 互斥 同一 Agent 不能并行 threading.Lock per-agent Agent session 不是线程安全的,张飞同一时刻只能服务一个任务
L3: 全局资源 LLM API 调用并发 threading.Semaphore(max_concurrent) 防止 API 限流、控制成本

5.4.4 核心代码

import threading
import time
from pathlib import Path


class Daemon:
    """单进程 Daemonper-project 线程并发。"""

    def __init__(self, config: DaemonConfig):
        self.config = config
        self.llm_semaphore = threading.Semaphore(config.max_concurrent_llm)  # 默认 3
        self.agent_locks: dict[str, threading.Lock] = {}  # per-agent 互斥
        self.slots: dict[str, ProjectSlot] = {}
        self._shutdown = threading.Event()

    def start(self):
        """启动所有 active 项目的独立线程。"""
        registry = load_registry()
        for project_id, meta in registry["projects"].items():
            if meta["status"] != "active":
                continue
            slot = ProjectSlot(
                project_id=project_id,
                config=meta,
                llm_semaphore=self.llm_semaphore,
                agent_locks=self.agent_locks,
                tick_interval=self.config.tick_interval,  # 默认 30s
                shutdown_event=self._shutdown,
            )
            self.slots[project_id] = slot
            t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
            t.start()

        # 主线程等待 shutdown
        self._shutdown.wait()

    def shutdown(self):
        self._shutdown.set()


class ProjectSlot:
    """单项目的独立 tick 循环。"""

    def __init__(self, project_id, config, llm_semaphore, agent_locks,
                 tick_interval=30, shutdown_event=None):
        self.project_id = project_id
        self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
        self.config = config
        self.llm_sem = llm_semaphore        # 共享:全局 LLM 信号量
        self.agent_locks = agent_locks        # 共享:per-agent 锁字典
        self.tick_interval = tick_interval
        self.shutdown = shutdown_event or threading.Event()
        self.health = DaemonHealth(project_id)

    def run_loop(self):
        """独立线程的主循环。"""
        while not self.shutdown.is_set():
            try:
                self._tick()
            except Exception as e:
                logger.error(f"[{self.project_id}] tick failed: {e}")
            self.shutdown.wait(self.tick_interval)  # 可被 shutdown 中断的 sleep

    def _tick(self):
        """单次 tick:找 pending 任务,尝试分配。"""
        pending = find_pending(self.conn)
        if not pending:
            self.health.record_idle()
            return

        for task in pending:
            agent_id = task["assignee"]
            lock = self.agent_locks.setdefault(agent_id, threading.Lock())

            # 非阻塞尝试:Agent 正忙就跳过,不排队等
            if not lock.acquire(blocking=False):
                logger.info(f"[{self.project_id}] {agent_id} busy, skip task {task['id']}")
                continue

            try:
                # 等待全局 LLM 槽位(阻塞,但持有 agent_lock)
                self.llm_sem.acquire()
                try:
                    spawn_agent(self.project_id, task, self.conn)
                    self.health.record_change()
                finally:
                    # LLM 调用完成后释放信号量
                    # 注意:Agent spawn 后 LLM 调用即完成,不需要等 Agent 执行完
                    self.llm_sem.release()
            finally:
                lock.release()

        # 检查 working 任务超时等
        self._check_working_tasks()

5.4.5 资源释放时序

ProjectSlot._tick()
  │
  ├── lock.acquire()          # 拿到 Agent 锁
  ├── llm_sem.acquire()       # 拿到 LLM 槽位
  ├── spawn_agent()           # spawn Agent 子进程(LLM 调用在 spawn 瞬间完成)
  ├── llm_sem.release()       # ✅ spawn 完立即释放 LLM 槽位
  ├── lock.release()          # ✅ spawn 完立即释放 Agent 锁
  │
  └── Agent 子进程独立运行 → 完成后写 output.json → 下次 tick 检测

关键设计决策lockllm_semspawn_agent() 返回后立即释放,不等 Agent 执行完成

原因:

  1. spawn_agent()subprocess.Popenopenclaw agent CLI 调用,启动后立即返回
  2. Agent 执行是异步的(子进程独立运行)
  3. 如果等 Agent 执行完才释放锁,并发就退化回串行

这意味着:同一个 Agent 理论上可以同时有多个任务在跑。如果需要严格串行(一个 Agent 同一时刻只有一个任务),则改为在下次 tick 的 _check_working_tasks() 中检测 Agent 是否有 working 任务,有则不再分配新任务。

5.4.6 Agent 并行策略配置

# _registry.yaml 中可配置
projects:
  quant-momentum:
    agent_parallelism: sequential    # 同一 Agent 同一时刻只跑一个任务(默认)
  quant-pairs:
    agent_parallelism: parallel      # 同一 Agent 可同时跑多个任务

默认 sequential_tick() 分配任务前先检查该 Agent 是否有 working 任务。

5.4.7 并发安全保证

并发场景 风险 保护机制
两个项目同时写同一个 SQLite 数据损坏 每个项目独立 .db 文件,不存在此场景
两个项目同时分配同一个 Agent Agent session 上下文串 per-agent Lock 互斥
三个项目同时调 LLM API API 限流/超限 全局 Semaphore 限流
ProjectSlot 线程异常退出 项目 tick 停止 try/except 包裹 + Daemon 监控线程存活
Daemon 主进程崩溃 所有项目停止 PM2 自动重启 + SQLite WAL 保护数据完整性

6. CLI 变更

6.1 项目管理命令

# 创建项目
python3 blackboard.py project create --name quant-momentum --display-name "动量因子策略" --agents zhangfei-dev,zhaoyun-data,guanyu-dev

# 列出项目
python3 blackboard.py project list

# 切换默认项目
python3 blackboard.py project default quant-momentum

# 归档项目(不删除数据,只停 tick)
python3 blackboard.py project archive quant-pairs

# 删除项目(删除数据,需确认)
python3 blackboard.py project delete quant-pairs --confirm

6.2 所有操作指定项目

# 方式1:命令行参数
python3 blackboard.py read --project quant-momentum --task task-001

# 方式2:环境变量(设置后所有命令默认用此项目)
export MOZIPLUS_PROJECT=quant-momentum
python3 blackboard.py read --task task-001

# 方式3:默认项目(_registry.yaml 中 default_project
# 不指定 --project 也不设环境变量时,使用 default_project
python3 blackboard.py read --task task-001

6.3 Agent 使用的项目解析优先级

1. --project 参数(显式指定)
2. MOZIPLUS_PROJECT 环境变量
3. _registry.yaml 中的 default_project
4. 如果只有一个 active 项目,自动使用
5. 都没有 → 报错"请指定项目"

7. L2 上下文注入变更

7.1 Agent spawn 时注入项目上下文

L2 prompt_template 三段式注入增加项目段:

═══ 项目上下文 ═══
项目: quant-momentum(动量因子策略)
背景: <project_context.yaml 内容,由项目级配置提供>
可用 Agent: 张飞(编码)、赵云(数据)、关羽(风控)
═══ 任务上下文 ═══
(原有内容不变)

7.2 project_context.yaml 示例

# projects/quant-momentum/config/project_context.yaml
description: "基于动量因子的量化策略研发"
domain: "量化交易"
data_sources:
  - "NAS /Volumes/stock/ A股日线数据"
  - "NAS /Volumes/stock/minute_kline/ 分钟线数据"
code_repo: "~/.openclaw/sanguo_projects/sanguo_quant_live/"
key_constraints:
  - "所有策略必须通过回测验证才能上实盘"
  - "止损逻辑必须经过关羽风控审查"

8. 跨项目协作

8.1 默认禁止跨项目

Agent 不能跨项目读写黑板。这是安全边界——不同项目的数据、配置、产出互不干扰。

8.2 跨项目数据共享

如果项目 A 需要项目 B 的产出(如"moziplus-dev 需要赵云的数据"),通过文件系统共享:

# 项目 A 中,Agent 把产出写到 NAS 共享路径
# 项目 B 中,Agent 从 NAS 共享路径读取

不需要特殊的跨项目协议——NAS 路径就是跨项目的桥梁,和当前团队的工作方式一致。

8.3 跨项目任务请求(可选扩展)

如果未来需要 Agent 主动发起跨项目请求:

项目 A 黑板 → 创建 cross_project_request 类型任务
→ Daemon 检测到 → 在项目 B 黑板创建对应任务
→ 项目 B Agent 完成 → 产出写入 NAS
→ Daemon 检测项目 B 完成 → 更新项目 A 任务状态

当前不实现,预留设计空间。

9. 与其他课题的关系

课题 关系 说明
课题1(三层执行) 无冲突 Agent spawn 时多传一个 project_id,L2 注入多加项目上下文
课题2(事件驱动) 微调 Inbox JSONL 增加 project_id 字段,Daemon 路由到正确连接
课题3(挑战/评审) 项目级配置 guardrails.yaml 项目级覆盖
课题4(拆解+上下文) 项目级配置 project_context.yaml 注入 L2
课题6(经验沉淀) 项目级经验 每个项目独立的 experiences 表,经验不跨项目污染
课题7+9(交互+Dashboard 多项目视图 Dashboard 需要项目切换/多项目概览
Worktree 隔离 正交 Worktree 解决"同项目内多 Agent 并行改代码",课题11 解决"不同项目数据隔离"

10. 黑板 Schema 变更

不增加 project_id 字段——每个项目有独立数据库,表结构不变。

唯一新增:_registry.yaml 项目注册表。

11. 开发清单

# 任务 依赖
1 项目目录结构 + _registry.yaml + project.yaml Schema
2 ProjectManager(多连接池 + 配置加载 + 项目级 guardrails 覆盖) 1
3 CLI project create/list/default/archive/delete 命令 1
4 CLI 所有操作增加 --project 参数 + 优先级解析 1, 2
5 Daemon tick 遍历所有 active 项目 2
6 L2 prompt_template 注入 project_context 2, 4
7 Daemon 逻辑健康自检(按项目追踪) 5
8 Dashboard 项目切换 + 多项目概览 课题9

附录:方案 B 的详细反驳

有人可能觉得方案 B(加 project_id 字段)更简单。但实际上:

  1. 安全面:方案 B 靠 WHERE project_id = ? 逻辑隔离。一个漏掉的 WHERE = 数据泄漏。方案 C 靠物理文件隔离,漏不掉。

  2. 性能面:方案 B 所有项目共享一个 SQLite 文件。10 个项目各 10 个任务 = 100 个任务在一个 .db 里。WAL 写入是串行的,多项目并发 tick 会互相等待。方案 C 每个项目独立文件,互不影响。

  3. 运维面:方案 B 删除项目 = DELETE FROM tasks WHERE project_id = ? + 8 张表都要删。方案 C = rm -rf projects/xxx/。备份/恢复同理。

  4. 配置面:方案 B 的 guardrails.yaml 要设计"全局默认 + 项目覆盖"的合并逻辑。方案 C 每个项目独立 config/ 目录,天然隔离,覆盖逻辑更清晰。

  5. 经验沉淀面:方案 B 的经验混在一个 experiences 表里。如果项目 A 的"pytest 参数经验"污染到项目 B(B 可能不做 Python),反而有害。方案 C 每个项目独立经验库。

唯一方案 B 更优的场景:跨项目统计分析("所有项目的平均完成时间")。但这个需求可以后期通过注册表元数据实现,不需要把所有数据放一个库里。