Files
sanguo_moziplus_v2/docs/design/topic11-multi-project-proposal.md
T
2026-05-16 13:47:29 +08:00

29 KiB
Raw Blame History

课题11 设计方案:用户级多项目支持

日期: 2026-05-16 作者: 庞统(副军师)🐦 状态: v2(并发调度模型重设计,待评审) 前置: 课题1-4、课题6 已完成设计 变更: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick


1. 核心问题

用户同时有多个工作域(量化策略A + 平台开发 + 数据研究),需要项目级隔离——不同项目的任务、配置、产出互不干扰,但共享同一套 Agent 团队和 Daemon 基础设施。

2. 需要隔离什么

隔离项 原因 隔离方式
黑板数据(tasks/comments/outputs/decisions/observations/events/agents/task_attempts/reviews/experiences/experience_tags 不同项目的任务不能混在一起 独立 SQLite 文件
配置(guardrails.yaml / prompt_templates / project_context.yaml 不同项目可能有不同的审查规则、上下文 项目级 config/ 目录,覆盖全局默认
产出文件 不同项目的代码/数据物理隔离 项目级 outputs/ 目录
Agent session 同一 Agent 参与不同项目时上下文不串 OpenClaw --session-id 已有隔离
Daemon 连接 不同数据库连接不能混淆 连接池 project_id → Connection 映射

不需要隔离的

  • Agent 注册表(agents.yaml)—— 同一套 Agent 团队服务所有项目
  • Daemon 进程 —— 单进程管理所有项目
  • Schema 定义 —— 所有项目共享同一套表结构
  • 全局 prompt_templates —— 项目级覆盖,不是替换

3. 方案选择

3.1 三个方案对比

方案 做法 优点 缺点
A. 多实例 每个项目独立 Daemon + 独立 SQLite + 独立端口 完全隔离、互不影响 资源开销翻倍、管理复杂、Agent 重复注册
B. 单实例多命名空间 一个 SQLite,所有表加 project_id 零额外资源 每个查询带 WHERE、单文件性能上限、删除项目危险
C. 单 Daemon 多数据库 一个 Daemon,每个项目一个 SQLite 文件 物理隔离数据、共享 Daemon Daemon 需管理多连接

3.2 选择方案 C

理由:

  1. 方案 A 不适合——Mac mini 资源有限,每多一个项目就多一套 Daemon + PM2 进程 + 端口。6 个 Agent 跑在 OpenClaw 上已经固定开销,不需要重复。
  2. 方案 B 不够安全——WHERE project_id = ? 容易漏,SQLite 单文件多项目并发有 WAL 锁瓶颈,删除项目 = 跨所有表 DELETE。
  3. 方案 C 是最优点——数据物理隔离(每个 .db 文件独立),但共享 Daemon 进程和 Agent 注册表。Daemon 切换项目只是切换 SQLite 连接,无额外资源开销。

优秀实践验证

  • WanmanPer-Agent Worktree + $HOME 严格隔离 → 验证"物理隔离比逻辑隔离可靠"
  • ClawTeamGit Worktree 隔离 + fcntl 文件锁 → 验证"共享进程 + 独立存储"模式可行
  • ClineKanban + Worktree → 验证"多任务并行 + 物理隔离"是主流
  • Hermes:单 Dispatcher + 单 SQLite → Hermes 是单项目设计,我们没有"多项目用单数据库"的先例

4. 目录结构

~/.sanguo_projects/moziplus_v2/
├── daemon.py                          # 单 Daemon 进程
├── daemon.yaml                        # Daemon 全局配置(端口、tick 间隔等)
├── projects/
│   ├── _registry.yaml                 # 项目注册表(所有项目的元数据)
│   ├── quant-momentum/                # 项目 1
│   │   ├── blackboard.db              # 独立 SQLite
│   │   ├── config/
│   │   │   ├── project.yaml           # 项目元信息(名称、描述、创建时间)
│   │   │   ├── guardrails.yaml        # 项目级审查规则(覆盖全局默认)
│   │   │   ├── project_context.yaml   # 项目背景知识(注入 L2)
│   │   │   └── prompt_overrides/      # 可选:覆盖默认 prompt 模板
│   │   └── outputs/                   # 项目产出目录
│   ├── quant-pairs/                   # 项目 2
│   │   ├── blackboard.db
│   │   ├── config/
│   │   └── outputs/
│   └── moziplus-dev/                  # 项目 3(自身开发)
│       ├── blackboard.db
│       ├── config/
│       └── outputs/
└── shared/
    ├── prompt_templates/              # 全局默认模板
    ├── schemas/                       # 全局 Schema
    └── agents.yaml                    # 全局 Agent 注册表

4.1 项目注册表(_registry.yaml

# projects/_registry.yaml
default_project: quant-momentum
projects:
  quant-momentum:
    display_name: "动量因子策略"
    description: "基于动量因子的量化策略研发"
    created_at: "2026-05-16T10:00:00Z"
    status: active          # active / archived
    agents: [zhangfei-dev, zhaoyun-data, guanyu-dev]  # 该项目可用的 Agent
  quant-pairs:
    display_name: "配对交易策略"
    description: "统计套利配对交易研究"
    created_at: "2026-05-16T10:00:00Z"
    status: active
    agents: [zhangfei-dev, zhaoyun-data]
  moziplus-dev:
    display_name: "平台开发"
    description: "moziplus 自身开发"
    created_at: "2026-05-16T10:00:00Z"
    status: active
    agents: [zhangfei-dev, simayi-challenger]

5. Daemon 变更

5.1 多连接池

# daemon 内部
class ProjectManager:
    def __init__(self, projects_dir: Path):
        self.projects_dir = projects_dir
        self._connections: dict[str, sqlite3.Connection] = {}
        self._configs: dict[str, ProjectConfig] = {}
        self._load_registry()

    def get_connection(self, project_id: str) -> sqlite3.Connection:
        if project_id not in self._connections:
            db_path = self.projects_dir / project_id / "blackboard.db"
            self._connections[project_id] = sqlite_connect(db_path)
        return self._connections[project_id]

    def get_config(self, project_id: str) -> ProjectConfig:
        if project_id not in self._configs:
            config_path = self.projects_dir / project_id / "config" / "project.yaml"
            self._configs[project_id] = ProjectConfig.load(config_path)
        return self._configs[project_id]

    def load_guardrails(self, project_id: str) -> dict:
        """项目级 guardrails.yaml 覆盖全局默认"""
        global_guardrails = load_yaml("shared/guardrails.yaml")
        project_guardrails_path = self.projects_dir / project_id / "config" / "guardrails.yaml"
        if project_guardrails_path.exists():
            project_guardrails = load_yaml(project_guardrails_path)
            return deep_merge(global_guardrails, project_guardrails)  # 项目级覆盖
        return global_guardrails

5.2 Tick 逻辑变更(已废弃,见 §5.4

原设计:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。 问题:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。 新设计:见 §5.4 per-project 并发调度。

5.3 Daemon 逻辑健康自检 + 线程存活监控(v2 扩展)

# §14 风险缓解:连续 N tick 无状态变更则告警
STALE_TICK_THRESHOLD = 20

class DaemonHealth:
    def __init__(self, project_id: str):
        self.project_id = project_id
        self._idle_ticks = 0

    def record_idle(self):
        self._idle_ticks += 1

    def record_change(self):
        self._idle_ticks = 0

    def is_stale(self) -> bool:
        return self._idle_ticks >= STALE_TICK_THRESHOLD

线程存活监控(见 §5.4.4 Daemon._check_slot_health()):

  • Daemon 主线程每 60s 检查所有 ProjectSlot 线程是否存活
  • 线程死亡 → 记录日志 + 自动重启
  • 连续重启 3 次失败 → 告警(通过 Sanguo Mail 通知用户)

计数器超时兜底

  • 如果 Agent 完成回调丢失(进程被杀、网络断),ActiveAgentCounter 不会归零
  • _check_working_tasks() 中,working 任务超过 task_timeout(默认 10 分钟)视为完成
  • 视为完成时主动 decrement(),防止计数器泄漏

5.4 并发调度模型(v2 新增)

5.4.1 问题

原设计中 Daemon 主循环串行 tick 所有项目:

Tick → Project A30s)→ Project B(等A完成)→ Project C(等B完成)

问题:

  1. 项目间互相阻塞——Project A 有一个长任务在执行,Project B 的独立任务必须等
  2. 响应延迟——3 个项目 tick 一次可能要 90s+Project C 要等 60s 才被检查
  3. 不符合业界实践——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队

5.4.2 业界并发模型调研

项目 并发模型 核心机制
open-multi-agent AgentPool + Semaphore 全局 maxConcurrency=5per-agent 互斥锁,Promise.allSettled 并行执行独立任务
Wanman per-agent 进程 每个 Agent 独立进程+独立 runLoopSupervisor 通过消息总线协调
Google ADK asyncio.TaskGroup ParallelAgentTaskGroup 并行执行子 Agent
Microsoft AutoGen Pregel Superstep 每个 superstep 内所有激活 Executor 并行执行
Hermes 单线程 tick + flock 单项目设计tick 内只有几个 cron job,不需要并发

关键发现open-multi-agent 的 AgentPool + Semaphore + per-agent Lock 是最成熟、最可借鉴的模型。

5.4.3 设计:per-project 线程 + 全局资源信号量

Daemon 主进程(轻量路由器 + 资源管控)
│
├── 全局 LLM Semaphoremax_concurrent=3
├── per-agent Lock(张飞不能同时在两个项目里跑)
│
├── ProjectSlot A(独立线程)
│   └── 自己的 SQLite 连接
│   └── 自己的 tick 循环(30s
│   └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore
│
├── ProjectSlot B(独立线程)
│   └── (同上)
│
└── ProjectSlot C(独立线程)
    └── (同上)

三层资源控制

层级 控制对象 机制 原因
L1: 项目隔离 SQLite 连接 per-project 独立连接 数据物理隔离,无竞争
L2: Agent 互斥 同一 Agent 不能并行 threading.Lock per-agent Agent session 不是线程安全的,张飞同一时刻只能服务一个任务
L3: 全局资源 LLM API 调用并发 threading.Semaphore(max_concurrent) 防止 API 限流、控制成本

5.4.4 核心代码

import threading
import time
from pathlib import Path


class ActiveAgentCounter:
    """线程安全的 Agent 活跃任务计数器。"""

    def __init__(self, max_global: int = 5):
        self._counts: dict[str, int] = {}       # agent_id → 活跃任务数
        self._total = 0                          # 全局活跃总数
        self._max_global = max_global
        self._lock = threading.Lock()

    def can_acquire(self, agent_id: str, max_per_agent: int = 1) -> bool:
        """检查是否可以分配(非阻塞)。"""
        with self._lock:
            if self._total >= self._max_global:
                return False
            return self._counts.get(agent_id, 0) < max_per_agent

    def increment(self, agent_id: str):
        with self._lock:
            self._counts[agent_id] = self._counts.get(agent_id, 0) + 1
            self._total += 1

    def decrement(self, agent_id: str):
        with self._lock:
            if self._counts.get(agent_id, 0) > 0:
                self._counts[agent_id] -= 1
                self._total -= 1


class Daemon:
    """单进程 Daemonper-project 线程并发。"""

    def __init__(self, config: DaemonConfig):
        self.config = config
        self.agent_counter = ActiveAgentCounter(max_global=config.max_global_active)  # 默认 5
        self.slots: dict[str, ProjectSlot] = {}
        self._slot_threads: dict[str, threading.Thread] = {}  # 线程存活监控
        self._shutdown = threading.Event()

    def start(self):
        """启动所有 active 项目的独立线程。"""
        registry = load_registry()
        for project_id, meta in registry["projects"].items():
            if meta["status"] != "active":
                continue
            self._start_slot(project_id, meta)

        # 主线程:监控 + 等待 shutdown
        while not self._shutdown.is_set():
            self._check_slot_health()
            self._shutdown.wait(60)  # 每 60s 检查一次线程存活

    def _start_slot(self, project_id: str, meta: dict):
        slot = ProjectSlot(
            project_id=project_id,
            config=meta,
            agent_counter=self.agent_counter,
            tick_interval=self.config.tick_interval,
            shutdown_event=self._shutdown,
        )
        self.slots[project_id] = slot
        t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
        t.start()
        self._slot_threads[project_id] = t

    def _check_slot_health(self):
        """检查所有 ProjectSlot 线程是否存活,死亡则重启。"""
        for project_id, thread in list(self._slot_threads.items()):
            if not thread.is_alive():
                logger.warning(f"ProjectSlot {project_id} thread died, restarting...")
                meta = self.slots[project_id].config
                self._start_slot(project_id, meta)

    def shutdown(self):
        self._shutdown.set()


class ProjectSlot:
    """单项目的独立 tick 循环。"""

    def __init__(self, project_id, config, agent_counter,
                 tick_interval=30, shutdown_event=None):
        self.project_id = project_id
        self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
        self.config = config
        self.agent_counter = agent_counter      # 共享:全局 Agent 活跃计数器
        self.tick_interval = tick_interval
        self.shutdown = shutdown_event or threading.Event()
        self.health = DaemonHealth(project_id)

    def run_loop(self):
        """独立线程的主循环。"""
        while not self.shutdown.is_set():
            try:
                self._tick()
            except Exception as e:
                logger.error(f"[{self.project_id}] tick failed: {e}")
            self.shutdown.wait(self.tick_interval)

    def _tick(self):
        """单次 tick:找 pending 任务,尝试分配。"""
        # 先检查已完成的 Agent,释放计数器
        completed = self._check_working_tasks()  # 返回已完成的 agent_id 列表
        for agent_id in completed:
            self.agent_counter.decrement(agent_id)

        pending = find_pending(self.conn)
        if not pending:
            self.health.record_idle()
            return

        for task in pending:
            agent_id = task["assignee"]
            max_per_agent = self.config.get("max_active_per_agent", 1)

            # 检查全局 + per-agent 并发上限
            if not self.agent_counter.can_acquire(agent_id, max_per_agent):
                logger.info(f"[{self.project_id}] {agent_id} at capacity, skip task {task['id']}")
                continue

            # sequential 模式:检查该 Agent 在本项目是否有 working 任务
            if self.config.get("agent_parallelism") != "parallel":
                if has_working_task(self.conn, agent_id):
                    continue

            # 生成 session_id(§5.4.6 命名规则)
            session_id = self._get_session_id(agent_id, task['id'])

            # spawn Agent(异步,不阻塞)
            spawn_agent(
                project_id=self.project_id,
                task=task,
                session_id=session_id,
            )
            self.agent_counter.increment(agent_id)  # +1
            self.health.record_change()

    def _get_session_id(self, agent_id: str, task_id: str) -> str:
        """§5.4.6 session 命名规则。"""
        if self.config.get("agent_parallelism") != "parallel":
            # sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
            return f"agent:{agent_id}:project:{self.project_id}"
        else:
            # parallel:每个任务独立 session
            return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"

5.4.5 资源控制模型(v2 修订:异步计数器替代同步信号量)

原设计问题spawn_agent() 不是瞬时操作——Agent 执行涉及 LLM API 调用(可能多轮工具调用),如果在 spawn 完成后才释放锁/信号量,并发退化回串行;如果在 spawn 启动后立即释放,信号量没有真正限流。

修订方案(采纳司马懿评审建议):

资源 原方案 修订方案 原因
Agent 互斥 threading.Lock 移除。改为 per-project session 命名 + _check_working_tasks() 同一 Agent 可用不同 session-id 安全服务不同项目
LLM 并发 threading.Semaphore ActiveAgentCounter(异步计数器) spawn 是异步的,同步信号量无法精确限流
项目隔离 per-project SQLite 不变

新时序

ProjectSlot._tick()
  │
  ├── 检查 ActiveAgentCounter[agent_id] < max_active
  │   └── 否 → 跳过,下个 tick 再检查
  ├── 检查 agent_parallelism == "sequential" 且该 Agent 有 working 任务?
  │   └── 是 → 跳过
  ├── spawn_agent(project_id, task)
  │   ├── session_id = "agent:{agent_id}:project:{project_id}:task:{task_id}"
  │   └── ActiveAgentCounter.increment(agent_id)  # +1
  │
  └── Agent 完成回调(下次 tick 检测到 output 或 webhook
      └── ActiveAgentCounter.decrement(agent_id)   # -1

关键变化

  1. 不再使用 threading.Lockthreading.Semaphore——它们是同步原语,不适合异步 spawn 场景
  2. 改用 ActiveAgentCounter(线程安全计数器),spawn 时 +1,Agent 完成回调时 -1
  3. _tick() 分配前检查计数器,超过阈值就跳过
  4. Agent session 按 agent:{agent_id}:project:{project_id}:task:{task_id} 命名,项目+任务级天然隔离

5.4.6 Agent 并行策略 + Session 隔离

并行策略配置

# _registry.yaml 中可配置
projects:
  quant-momentum:
    agent_parallelism: sequential    # 同一 Agent 同一时刻只跑一个任务(默认)
    max_active_per_agent: 1          # sequential 的显式写法
  quant-pairs:
    agent_parallelism: parallel      # 同一 Agent 可同时跑多个任务
    max_active_per_agent: 2          # 最多 2 个并行

Session 命名规则

格式:agent:{agent_id}:project:{project_id}:task:{task_id}
示例:agent:zhangfei-dev:project:quant-momentum:task:task-001
  • 每个任务独立 session,任务间上下文不串
  • 同一 Agent 在不同项目用不同 session,项目间上下文不串
  • sequential 模式:同一项目同一 Agent 只有一个活跃 session(新的任务复用或新开)
  • parallel 模式:每个任务独立 session

sequential 模式下的 session 复用

def _get_session_id(self, agent_id: str, task_id: str) -> str:
    project_config = self.config
    if project_config.get("agent_parallelism") != "parallel":
        # sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
        return f"agent:{agent_id}:project:{self.project_id}"
    else:
        # parallel:每个任务独立 session
        return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"

5.4.7 并发安全保证

并发场景 风险 保护机制
两个项目同时写同一个 SQLite 数据损坏 每个项目独立 .db 文件,不存在此场景
两个项目同时分配同一个 Agent Agent 资源争抢 ActiveAgentCounter + max_active_per_agent 限制
LLM API 并发超限 API 限流/超限 ActiveAgentCounter 全局计数,_tick() 分配前检查
ProjectSlot 线程异常退出 项目 tick 停止 try/except 包裹 + Daemon 监控线程存活(§5.4.8
Daemon 主进程崩溃 所有项目停止 PM2 自动重启 + SQLite WAL 保护数据完整性
Agent 完成回调丢失 计数器不归零 超时兜底:working 任务超过 task_timeout 视为完成,计数器 -1
_registry.yaml 并发写入 数据损坏 _registry.yaml 只在 CLI 操作时读写(非 tick 热路径),tick 状态用内存 dict

5.5 启动状态恢复(v2 新增)

Daemon 进程崩溃后由 PM2 自动重启。启动时需要恢复所有项目状态。

持久化层(已天然支持,无需额外处理):

  • _registry.yaml → 项目列表、状态、配置
  • 每个 blackboard.db → 任务状态(pending/working/completed/failed...
  • SQLite WAL 模式 → 崩溃时未提交的事务自动回滚,数据完整性有保障

内存层(需要恢复):

  • ActiveAgentCounter → 重启后归零
  • DaemonHealth → 重启后归零

恢复流程

def start(self):
    registry = load_registry()
    for project_id, meta in registry["projects"].items():
        if meta["status"] != "active":
            continue
        # 启动 ProjectSlot
        self._start_slot(project_id, meta)
        # 第一次 tick 时,_check_working_tasks() 会扫描所有 working 任务
        # 超时的 → 视为完成(decrement 不需要,因为计数器从零开始)
        # 未超时的 → 继续等待(计数器会在下次 tick 重新追踪)

working 任务的幽灵问题

Daemon 崩溃时可能有 Agent 正在执行。重启后这些任务在 blackboard.db 中仍是 working 状态,但 Agent 子进程已经死了(Daemon 重启不等于 Agent 重启,但 Daemon 崩溃通常意味着整个环境有问题)。

处理策略:

  1. 启动时扫描所有 working 任务
  2. 检查 Agent 子进程是否还活着(如果有 PID 记录的话)
  3. 进程已死 + 超过 task_timeout → 标记为 failed(原因:"Daemon restart, agent process lost"
  4. 进程已死 + 未超时 → 标记为 failed(保守策略:Daemon 重启后不信任幽灵任务)
  5. 不尝试重新执行——用户可以手动 retry

为什么保守:Daemon 崩溃是不正常事件。此时 Agent 子进程的状态不可预测(可能写了半个 output.json),重新执行比尝试恢复更安全。

task_attempts 表attempt_index 会递增,retry 时从新的 attempt 开始,不覆盖之前的尝试记录。

6.1 项目管理命令

# 创建项目
python3 blackboard.py project create --name quant-momentum --display-name "动量因子策略" --agents zhangfei-dev,zhaoyun-data,guanyu-dev

# 列出项目
python3 blackboard.py project list

# 切换默认项目
python3 blackboard.py project default quant-momentum

# 归档项目(不删除数据,只停 tick)
python3 blackboard.py project archive quant-pairs

# 删除项目(删除数据,需确认)
python3 blackboard.py project delete quant-pairs --confirm

6.2 所有操作指定项目

# 方式1:命令行参数
python3 blackboard.py read --project quant-momentum --task task-001

# 方式2:环境变量(设置后所有命令默认用此项目)
export MOZIPLUS_PROJECT=quant-momentum
python3 blackboard.py read --task task-001

# 方式3:默认项目(_registry.yaml 中 default_project
# 不指定 --project 也不设环境变量时,使用 default_project
python3 blackboard.py read --task task-001

6.3 Agent 使用的项目解析优先级

1. --project 参数(显式指定)
2. MOZIPLUS_PROJECT 环境变量
3. _registry.yaml 中的 default_project
4. 如果只有一个 active 项目,自动使用
5. 都没有 → 报错"请指定项目"

7. L2 上下文注入变更

7.1 Agent spawn 时注入项目上下文

L2 prompt_template 三段式注入增加项目段:

═══ 项目上下文 ═══
项目: quant-momentum(动量因子策略)
背景: <project_context.yaml 内容,由项目级配置提供>
可用 Agent: 张飞(编码)、赵云(数据)、关羽(风控)
═══ 任务上下文 ═══
(原有内容不变)

7.2 project_context.yaml 示例

# projects/quant-momentum/config/project_context.yaml
description: "基于动量因子的量化策略研发"
domain: "量化交易"
data_sources:
  - "NAS /Volumes/stock/ A股日线数据"
  - "NAS /Volumes/stock/minute_kline/ 分钟线数据"
code_repo: "~/.openclaw/sanguo_projects/sanguo_quant_live/"
key_constraints:
  - "所有策略必须通过回测验证才能上实盘"
  - "止损逻辑必须经过关羽风控审查"

8. 跨项目协作

8.1 默认禁止跨项目

Agent 不能跨项目读写黑板。这是安全边界——不同项目的数据、配置、产出互不干扰。

8.2 跨项目数据共享

如果项目 A 需要项目 B 的产出(如"moziplus-dev 需要赵云的数据"),通过文件系统共享:

# 项目 A 中,Agent 把产出写到 NAS 共享路径
# 项目 B 中,Agent 从 NAS 共享路径读取

不需要特殊的跨项目协议——NAS 路径就是跨项目的桥梁,和当前团队的工作方式一致。

8.3 跨项目任务请求(可选扩展)

如果未来需要 Agent 主动发起跨项目请求:

项目 A 黑板 → 创建 cross_project_request 类型任务
→ Daemon 检测到 → 在项目 B 黑板创建对应任务
→ 项目 B Agent 完成 → 产出写入 NAS
→ Daemon 检测项目 B 完成 → 更新项目 A 任务状态

当前不实现,预留设计空间。

8.4 项目归档/删除时正在运行的任务(v2 新增)

归档(archive

  1. 检查是否有 working 状态的任务
  2. 有 → 将这些任务标记为 cancelled,等待 Agent 完成回调(超时兜底 5 分钟)
  3. 无 → 立即停止该项目的 ProjectSlot 线程
  4. _registry.yaml 中状态改为 archived

删除(delete

  1. 必须先 archive
  2. 必须无 working 任务(归档时已处理)
  3. --confirm 确认
  4. rm -rf projects/{project_id}/
  5. _registry.yaml 移除条目

禁止直接删除 active 项目——必须先归档。

9. 与其他课题的关系

课题 关系 说明
课题1(三层执行) 无冲突 Agent spawn 时多传一个 project_id,L2 注入多加项目上下文
课题2(事件驱动) 微调 Inbox JSONL 增加 project_id 字段,Daemon 路由到正确连接
课题3(挑战/评审) 项目级配置 guardrails.yaml 项目级覆盖
课题4(拆解+上下文) 项目级配置 project_context.yaml 注入 L2
课题6(经验沉淀) 项目级经验 每个项目独立的 experiences 表,经验不跨项目污染
课题7+9(交互+Dashboard 多项目视图 Dashboard 需要项目切换/多项目概览
Worktree 隔离 正交 Worktree 解决"同项目内多 Agent 并行改代码",课题11 解决"不同项目数据隔离"

10. 黑板 Schema 变更

不增加 project_id 字段——每个项目有独立数据库,表结构不变。

唯一新增:_registry.yaml 项目注册表。

11. 开发清单

# 任务 依赖
1 项目目录结构 + _registry.yaml + project.yaml Schema
2 ActiveAgentCounter(线程安全计数器 + 全局/per-agent 双重限制)
3 ProjectSlot(独立线程 tick + SQLite 独立连接 + spawn 前检查计数器) 1, 2
4 Daemon 主循环(启动状态恢复 + ProjectSlot 线程监控/重启) 3
4a 启动时幽灵 working 任务处理(扫描+标记 failed) 4
5 CLI project create/list/default/archive/delete 命令 1
6 CLI 所有操作增加 --project 参数 + 优先级解析 1, 3
7 L2 prompt_template 注入 project_context + session_id 命名规则 3, 6
8 Daemon 逻辑健康自检(按项目追踪 + 计数器超时兜底) 4
9 项目归档/删除安全流程(working 任务处理) 4, 5
10 Dashboard 项目切换 + 多项目概览 课题9

附录:方案 B 的详细反驳

有人可能觉得方案 B(加 project_id 字段)更简单。但实际上:

  1. 安全面:方案 B 靠 WHERE project_id = ? 逻辑隔离。一个漏掉的 WHERE = 数据泄漏。方案 C 靠物理文件隔离,漏不掉。

  2. 性能面:方案 B 所有项目共享一个 SQLite 文件。10 个项目各 10 个任务 = 100 个任务在一个 .db 里。WAL 写入是串行的,多项目并发 tick 会互相等待。方案 C 每个项目独立文件,互不影响。

  3. 运维面:方案 B 删除项目 = DELETE FROM tasks WHERE project_id = ? + 8 张表都要删。方案 C = rm -rf projects/xxx/。备份/恢复同理。

  4. 配置面:方案 B 的 guardrails.yaml 要设计"全局默认 + 项目覆盖"的合并逻辑。方案 C 每个项目独立 config/ 目录,天然隔离,覆盖逻辑更清晰。

  5. 经验沉淀面:方案 B 的经验混在一个 experiences 表里。如果项目 A 的"pytest 参数经验"污染到项目 B(B 可能不做 Python),反而有害。方案 C 每个项目独立经验库。

唯一方案 B 更优的场景:跨项目统计分析("所有项目的平均完成时间")。但这个需求可以后期通过注册表元数据实现,不需要把所有数据放一个库里。