31 KiB
课题11 设计方案:用户级多项目支持
日期: 2026-05-16 作者: 庞统(副军师)🐦 状态: v2(并发调度模型重设计,待评审) 前置: 课题1-4、课题6 已完成设计 变更: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick
1. 核心问题
用户同时有多个工作域(量化策略A + 平台开发 + 数据研究),需要项目级隔离——不同项目的任务、配置、产出互不干扰,但共享同一套 Agent 团队和 Daemon 基础设施。
2. 需要隔离什么
| 隔离项 | 原因 | 隔离方式 |
|---|---|---|
| 黑板数据(tasks/comments/outputs/decisions/observations/events/agents/task_attempts/reviews/experiences/experience_tags) | 不同项目的任务不能混在一起 | 独立 SQLite 文件 |
| 配置(guardrails.yaml / prompt_templates / project_context.yaml) | 不同项目可能有不同的审查规则、上下文 | 项目级 config/ 目录,覆盖全局默认 |
| 产出文件 | 不同项目的代码/数据物理隔离 | 项目级 outputs/ 目录 |
| Agent session | 同一 Agent 参与不同项目时上下文不串 | OpenClaw --session-id 已有隔离 |
| Daemon 连接 | 不同数据库连接不能混淆 | 连接池 project_id → Connection 映射 |
不需要隔离的:
- Agent 注册表(agents.yaml)—— 同一套 Agent 团队服务所有项目
- Daemon 进程 —— 单进程管理所有项目
- Schema 定义 —— 所有项目共享同一套表结构
- 全局 prompt_templates —— 项目级覆盖,不是替换
3. 方案选择
3.1 三个方案对比
| 方案 | 做法 | 优点 | 缺点 |
|---|---|---|---|
| A. 多实例 | 每个项目独立 Daemon + 独立 SQLite + 独立端口 | 完全隔离、互不影响 | 资源开销翻倍、管理复杂、Agent 重复注册 |
| B. 单实例多命名空间 | 一个 SQLite,所有表加 project_id |
零额外资源 | 每个查询带 WHERE、单文件性能上限、删除项目危险 |
| C. 单 Daemon 多数据库 | 一个 Daemon,每个项目一个 SQLite 文件 | 物理隔离数据、共享 Daemon | Daemon 需管理多连接 |
3.2 选择方案 C
理由:
- 方案 A 不适合——Mac mini 资源有限,每多一个项目就多一套 Daemon + PM2 进程 + 端口。6 个 Agent 跑在 OpenClaw 上已经固定开销,不需要重复。
- 方案 B 不够安全——
WHERE project_id = ?容易漏,SQLite 单文件多项目并发有 WAL 锁瓶颈,删除项目 = 跨所有表 DELETE。 - 方案 C 是最优点——数据物理隔离(每个
.db文件独立),但共享 Daemon 进程和 Agent 注册表。Daemon 切换项目只是切换 SQLite 连接,无额外资源开销。
优秀实践验证:
- Wanman:Per-Agent Worktree + $HOME 严格隔离 → 验证"物理隔离比逻辑隔离可靠"
- ClawTeam:Git Worktree 隔离 + fcntl 文件锁 → 验证"共享进程 + 独立存储"模式可行
- Cline:Kanban + Worktree → 验证"多任务并行 + 物理隔离"是主流
- Hermes:单 Dispatcher + 单 SQLite → Hermes 是单项目设计,我们没有"多项目用单数据库"的先例
4. 目录结构
~/.sanguo_projects/moziplus_v2/
├── daemon.py # 单 Daemon 进程
├── daemon.yaml # Daemon 全局配置(端口、tick 间隔等)
├── projects/
│ ├── _registry.yaml # 项目注册表(所有项目的元数据)
│ ├── quant-momentum/ # 项目 1
│ │ ├── blackboard.db # 独立 SQLite
│ │ ├── config/
│ │ │ ├── project.yaml # 项目元信息(名称、描述、创建时间)
│ │ │ ├── guardrails.yaml # 项目级审查规则(覆盖全局默认)
│ │ │ ├── project_context.yaml # 项目背景知识(注入 L2)
│ │ │ └── prompt_overrides/ # 可选:覆盖默认 prompt 模板
│ │ └── outputs/ # 项目产出目录
│ ├── quant-pairs/ # 项目 2
│ │ ├── blackboard.db
│ │ ├── config/
│ │ └── outputs/
│ └── moziplus-dev/ # 项目 3(自身开发)
│ ├── blackboard.db
│ ├── config/
│ └── outputs/
└── shared/
├── prompt_templates/ # 全局默认模板
├── schemas/ # 全局 Schema
└── agents.yaml # 全局 Agent 注册表
4.1 项目注册表(_registry.yaml)
# projects/_registry.yaml
default_project: quant-momentum
projects:
quant-momentum:
display_name: "动量因子策略"
description: "基于动量因子的量化策略研发"
created_at: "2026-05-16T10:00:00Z"
status: active # active / archived
agents: [zhangfei-dev, zhaoyun-data, guanyu-dev] # 该项目可用的 Agent
quant-pairs:
display_name: "配对交易策略"
description: "统计套利配对交易研究"
created_at: "2026-05-16T10:00:00Z"
status: active
agents: [zhangfei-dev, zhaoyun-data]
moziplus-dev:
display_name: "平台开发"
description: "moziplus 自身开发"
created_at: "2026-05-16T10:00:00Z"
status: active
agents: [zhangfei-dev, simayi-challenger]
5. Daemon 变更
5.1 多连接池
# daemon 内部
class ProjectManager:
def __init__(self, projects_dir: Path):
self.projects_dir = projects_dir
self._connections: dict[str, sqlite3.Connection] = {}
self._configs: dict[str, ProjectConfig] = {}
self._load_registry()
def get_connection(self, project_id: str) -> sqlite3.Connection:
if project_id not in self._connections:
db_path = self.projects_dir / project_id / "blackboard.db"
self._connections[project_id] = sqlite_connect(db_path)
return self._connections[project_id]
def get_config(self, project_id: str) -> ProjectConfig:
if project_id not in self._configs:
config_path = self.projects_dir / project_id / "config" / "project.yaml"
self._configs[project_id] = ProjectConfig.load(config_path)
return self._configs[project_id]
def load_guardrails(self, project_id: str) -> dict:
"""项目级 guardrails.yaml 覆盖全局默认"""
global_guardrails = load_yaml("shared/guardrails.yaml")
project_guardrails_path = self.projects_dir / project_id / "config" / "guardrails.yaml"
if project_guardrails_path.exists():
project_guardrails = load_yaml(project_guardrails_path)
return deep_merge(global_guardrails, project_guardrails) # 项目级覆盖
return global_guardrails
5.2 Tick 逻辑变更(已废弃,见 §5.4)
原设计:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。 问题:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。 新设计:见 §5.4 per-project 并发调度。
5.3 Daemon 逻辑健康自检 + 线程存活监控(v2 扩展)
# §14 风险缓解:连续 N tick 无状态变更则告警
STALE_TICK_THRESHOLD = 20
class DaemonHealth:
def __init__(self, project_id: str):
self.project_id = project_id
self._idle_ticks = 0
def record_idle(self):
self._idle_ticks += 1
def record_change(self):
self._idle_ticks = 0
def is_stale(self) -> bool:
return self._idle_ticks >= STALE_TICK_THRESHOLD
线程存活监控(见 §5.4.4 Daemon._check_slot_health()):
- Daemon 主线程每 60s 检查所有 ProjectSlot 线程是否存活
- 线程死亡 → 记录日志 + 自动重启
- 连续重启 3 次失败 → 告警(通过 Sanguo Mail 通知用户)
计数器超时兜底:
- 如果 Agent 完成回调丢失(进程被杀、网络断),
ActiveAgentCounter不会归零 _check_working_tasks()中,working 任务超过task_timeout(默认 10 分钟)视为完成- 视为完成时主动
decrement(),防止计数器泄漏
5.5 Daemon 崩溃恢复(v2 新增)
设计原则:状态全在 SQLite,Daemon 无状态。重启 = 重新加载所有项目 + 所有任务状态,继续执行。
恢复流程:
PM2 检测 Daemon 挂了 → 重启 Daemon
│
├── 读取 _registry.yaml → 恢复项目列表
├── 遍历每个 active 项目 → 打开 SQLite 连接
│ └── 读取所有任务状态(pending/working/completed/failed/...)
├── 为每个 active 项目启动 ProjectSlot 线程
│
└── ProjectSlot._tick() 扫描 working 任务
└── working 任务 → 重新 spawn Agent(可能冗余执行一次)
└── Agent 自己判断:上次做了没有?产出在不在?
├── 产出已存在 → 跳过,直接报告完成
├── 做到一半 → 继续完成
└── 没做过 → 正常执行
关键假设:
- SQLite 是真相来源——所有任务状态、产出记录都在
.db文件里,Daemon 内存无状态 - Agent 能判断重复——AI native,Agent 看到 task context + 已有 output 文件,能自主判断是否需要重新执行
- 冗余执行无害——即使一个节点被多执行一次,结果是幂等的(Agent 会检查产出是否已存在)
- 无限续杯——重启后 working 节点重新执行,属于已有的"无限续杯"机制(项目内运转机制,不在此设计)
不需要额外存储:
- 不需要 checkpoint 文件——SQLite 就是 checkpoint
- 不需要 recovery log——
task_attempts表已记录每次尝试 - 不需要状态快照——每次 tick 从 SQLite 实时读取
唯一注意:ActiveAgentCounter 重启后从零开始——但 _tick() 会重新扫描 working 任务并重新计数,所以没问题。
5.4 并发调度模型(v2 新增)
5.4.1 问题
原设计中 Daemon 主循环串行 tick 所有项目:
Tick → Project A(30s)→ Project B(等A完成)→ Project C(等B完成)
问题:
- 项目间互相阻塞——Project A 有一个长任务在执行,Project B 的独立任务必须等
- 响应延迟——3 个项目 tick 一次可能要 90s+,Project C 要等 60s 才被检查
- 不符合业界实践——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队
5.4.2 业界并发模型调研
| 项目 | 并发模型 | 核心机制 |
|---|---|---|
| open-multi-agent | AgentPool + Semaphore | 全局 maxConcurrency=5,per-agent 互斥锁,Promise.allSettled 并行执行独立任务 |
| Wanman | per-agent 进程 | 每个 Agent 独立进程+独立 runLoop,Supervisor 通过消息总线协调 |
| Google ADK | asyncio.TaskGroup | ParallelAgent 用 TaskGroup 并行执行子 Agent |
| Microsoft AutoGen | Pregel Superstep | 每个 superstep 内所有激活 Executor 并行执行 |
| Hermes | 单线程 tick + flock | 单项目设计,tick 内只有几个 cron job,不需要并发 |
关键发现:open-multi-agent 的 AgentPool + Semaphore + per-agent Lock 是最成熟、最可借鉴的模型。
5.4.3 设计:per-project 线程 + 全局资源信号量
Daemon 主进程(轻量路由器 + 资源管控)
│
├── 全局 LLM Semaphore(max_concurrent=3)
├── per-agent Lock(张飞不能同时在两个项目里跑)
│
├── ProjectSlot A(独立线程)
│ └── 自己的 SQLite 连接
│ └── 自己的 tick 循环(30s)
│ └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore
│
├── ProjectSlot B(独立线程)
│ └── (同上)
│
└── ProjectSlot C(独立线程)
└── (同上)
三层资源控制:
| 层级 | 控制对象 | 机制 | 原因 |
|---|---|---|---|
| L1: 项目隔离 | SQLite 连接 | per-project 独立连接 | 数据物理隔离,无竞争 |
| L2: Agent 互斥 | 同一 Agent 不能并行 | threading.Lock per-agent |
Agent session 不是线程安全的,张飞同一时刻只能服务一个任务 |
| L3: 全局资源 | LLM API 调用并发 | threading.Semaphore(max_concurrent) |
防止 API 限流、控制成本 |
5.4.4 核心代码
import threading
import time
from pathlib import Path
class ActiveAgentCounter:
"""线程安全的 Agent 活跃任务计数器。"""
def __init__(self, max_global: int = 5):
self._counts: dict[str, int] = {} # agent_id → 活跃任务数
self._total = 0 # 全局活跃总数
self._max_global = max_global
self._lock = threading.Lock()
def can_acquire(self, agent_id: str, max_per_agent: int = 1) -> bool:
"""检查是否可以分配(非阻塞)。"""
with self._lock:
if self._total >= self._max_global:
return False
return self._counts.get(agent_id, 0) < max_per_agent
def increment(self, agent_id: str):
with self._lock:
self._counts[agent_id] = self._counts.get(agent_id, 0) + 1
self._total += 1
def decrement(self, agent_id: str):
with self._lock:
if self._counts.get(agent_id, 0) > 0:
self._counts[agent_id] -= 1
self._total -= 1
class Daemon:
"""单进程 Daemon,per-project 线程并发。"""
def __init__(self, config: DaemonConfig):
self.config = config
self.agent_counter = ActiveAgentCounter(max_global=config.max_global_active) # 默认 5
self.slots: dict[str, ProjectSlot] = {}
self._slot_threads: dict[str, threading.Thread] = {} # 线程存活监控
self._shutdown = threading.Event()
def start(self):
"""启动所有 active 项目的独立线程。"""
registry = load_registry()
for project_id, meta in registry["projects"].items():
if meta["status"] != "active":
continue
self._start_slot(project_id, meta)
# 主线程:监控 + 等待 shutdown
while not self._shutdown.is_set():
self._check_slot_health()
self._shutdown.wait(60) # 每 60s 检查一次线程存活
def _start_slot(self, project_id: str, meta: dict):
slot = ProjectSlot(
project_id=project_id,
config=meta,
agent_counter=self.agent_counter,
tick_interval=self.config.tick_interval,
shutdown_event=self._shutdown,
)
self.slots[project_id] = slot
t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
t.start()
self._slot_threads[project_id] = t
def _check_slot_health(self):
"""检查所有 ProjectSlot 线程是否存活,死亡则重启。"""
for project_id, thread in list(self._slot_threads.items()):
if not thread.is_alive():
logger.warning(f"ProjectSlot {project_id} thread died, restarting...")
meta = self.slots[project_id].config
self._start_slot(project_id, meta)
def shutdown(self):
self._shutdown.set()
class ProjectSlot:
"""单项目的独立 tick 循环。"""
def __init__(self, project_id, config, agent_counter,
tick_interval=30, shutdown_event=None):
self.project_id = project_id
self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
self.config = config
self.agent_counter = agent_counter # 共享:全局 Agent 活跃计数器
self.tick_interval = tick_interval
self.shutdown = shutdown_event or threading.Event()
self.health = DaemonHealth(project_id)
def run_loop(self):
"""独立线程的主循环。"""
while not self.shutdown.is_set():
try:
self._tick()
except Exception as e:
logger.error(f"[{self.project_id}] tick failed: {e}")
self.shutdown.wait(self.tick_interval)
def _tick(self):
"""单次 tick:找 pending 任务,尝试分配。"""
# 先检查已完成的 Agent,释放计数器
completed = self._check_working_tasks() # 返回已完成的 agent_id 列表
for agent_id in completed:
self.agent_counter.decrement(agent_id)
pending = find_pending(self.conn)
if not pending:
self.health.record_idle()
return
for task in pending:
agent_id = task["assignee"]
max_per_agent = self.config.get("max_active_per_agent", 1)
# 检查全局 + per-agent 并发上限
if not self.agent_counter.can_acquire(agent_id, max_per_agent):
logger.info(f"[{self.project_id}] {agent_id} at capacity, skip task {task['id']}")
continue
# sequential 模式:检查该 Agent 在本项目是否有 working 任务
if self.config.get("agent_parallelism") != "parallel":
if has_working_task(self.conn, agent_id):
continue
# 生成 session_id(§5.4.6 命名规则)
session_id = self._get_session_id(agent_id, task['id'])
# spawn Agent(异步,不阻塞)
spawn_agent(
project_id=self.project_id,
task=task,
session_id=session_id,
)
self.agent_counter.increment(agent_id) # +1
self.health.record_change()
def _get_session_id(self, agent_id: str, task_id: str) -> str:
"""§5.4.6 session 命名规则。"""
if self.config.get("agent_parallelism") != "parallel":
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
return f"agent:{agent_id}:project:{self.project_id}"
else:
# parallel:每个任务独立 session
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
5.4.5 资源控制模型(v2 修订:异步计数器替代同步信号量)
原设计问题:spawn_agent() 不是瞬时操作——Agent 执行涉及 LLM API 调用(可能多轮工具调用),如果在 spawn 完成后才释放锁/信号量,并发退化回串行;如果在 spawn 启动后立即释放,信号量没有真正限流。
修订方案(采纳司马懿评审建议):
| 资源 | 原方案 | 修订方案 | 原因 |
|---|---|---|---|
| Agent 互斥 | threading.Lock |
移除。改为 per-project session 命名 + _check_working_tasks() |
同一 Agent 可用不同 session-id 安全服务不同项目 |
| LLM 并发 | threading.Semaphore |
ActiveAgentCounter(异步计数器) |
spawn 是异步的,同步信号量无法精确限流 |
| 项目隔离 | per-project SQLite | 不变 |
新时序:
ProjectSlot._tick()
│
├── 检查 ActiveAgentCounter[agent_id] < max_active?
│ └── 否 → 跳过,下个 tick 再检查
├── 检查 agent_parallelism == "sequential" 且该 Agent 有 working 任务?
│ └── 是 → 跳过
├── spawn_agent(project_id, task)
│ ├── session_id = "agent:{agent_id}:project:{project_id}:task:{task_id}"
│ └── ActiveAgentCounter.increment(agent_id) # +1
│
└── Agent 完成回调(下次 tick 检测到 output 或 webhook)
└── ActiveAgentCounter.decrement(agent_id) # -1
关键变化:
- 不再使用
threading.Lock和threading.Semaphore——它们是同步原语,不适合异步 spawn 场景 - 改用
ActiveAgentCounter(线程安全计数器),spawn 时 +1,Agent 完成回调时 -1 _tick()分配前检查计数器,超过阈值就跳过- Agent session 按
agent:{agent_id}:project:{project_id}:task:{task_id}命名,项目+任务级天然隔离
5.4.6 Agent 并行策略 + Session 隔离
并行策略配置:
# _registry.yaml 中可配置
projects:
quant-momentum:
agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认)
max_active_per_agent: 1 # sequential 的显式写法
quant-pairs:
agent_parallelism: parallel # 同一 Agent 可同时跑多个任务
max_active_per_agent: 2 # 最多 2 个并行
Session 命名规则:
格式:agent:{agent_id}:project:{project_id}:task:{task_id}
示例:agent:zhangfei-dev:project:quant-momentum:task:task-001
- 每个任务独立 session,任务间上下文不串
- 同一 Agent 在不同项目用不同 session,项目间上下文不串
sequential模式:同一项目同一 Agent 只有一个活跃 session(新的任务复用或新开)parallel模式:每个任务独立 session
sequential 模式下的 session 复用:
def _get_session_id(self, agent_id: str, task_id: str) -> str:
project_config = self.config
if project_config.get("agent_parallelism") != "parallel":
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
return f"agent:{agent_id}:project:{self.project_id}"
else:
# parallel:每个任务独立 session
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
5.4.7 并发安全保证
| 并发场景 | 风险 | 保护机制 |
|---|---|---|
| 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 .db 文件,不存在此场景 |
| 两个项目同时分配同一个 Agent | Agent 资源争抢 | ActiveAgentCounter + max_active_per_agent 限制 |
| LLM API 并发超限 | API 限流/超限 | ActiveAgentCounter 全局计数,_tick() 分配前检查 |
| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活(§5.4.8) |
| Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 |
| Agent 完成回调丢失 | 计数器不归零 | 超时兜底:working 任务超过 task_timeout 视为完成,计数器 -1 |
| _registry.yaml 并发写入 | 数据损坏 | _registry.yaml 只在 CLI 操作时读写(非 tick 热路径),tick 状态用内存 dict |
5.5 启动状态恢复(v2 新增)
Daemon 进程崩溃后由 PM2 自动重启。启动时需要恢复所有项目状态。
持久化层(已天然支持,无需额外处理):
_registry.yaml→ 项目列表、状态、配置- 每个
blackboard.db→ 任务状态(pending/working/completed/failed...) - SQLite WAL 模式 → 崩溃时未提交的事务自动回滚,数据完整性有保障
内存层(需要恢复):
ActiveAgentCounter→ 重启后归零DaemonHealth→ 重启后归零
恢复流程:
def start(self):
registry = load_registry()
for project_id, meta in registry["projects"].items():
if meta["status"] != "active":
continue
# 启动 ProjectSlot
self._start_slot(project_id, meta)
# 第一次 tick 时,_check_working_tasks() 会扫描所有 working 任务
# 超时的 → 视为完成(decrement 不需要,因为计数器从零开始)
# 未超时的 → 继续等待(计数器会在下次 tick 重新追踪)
working 任务的幽灵问题:
Daemon 崩溃时可能有 Agent 正在执行。重启后这些任务在 blackboard.db 中仍是 working 状态,但 Agent 子进程已经死了(Daemon 重启不等于 Agent 重启,但 Daemon 崩溃通常意味着整个环境有问题)。
处理策略:
- 启动时扫描所有 working 任务
- 检查 Agent 子进程是否还活着(如果有 PID 记录的话)
- 进程已死 + 超过
task_timeout→ 标记为failed(原因:"Daemon restart, agent process lost") - 进程已死 + 未超时 → 标记为
failed(保守策略:Daemon 重启后不信任幽灵任务) - 不尝试重新执行——用户可以手动
retry
为什么保守:Daemon 崩溃是不正常事件。此时 Agent 子进程的状态不可预测(可能写了半个 output.json),重新执行比尝试恢复更安全。
task_attempts 表的 attempt_index 会递增,retry 时从新的 attempt 开始,不覆盖之前的尝试记录。
6.1 项目管理命令
# 创建项目
python3 blackboard.py project create --name quant-momentum --display-name "动量因子策略" --agents zhangfei-dev,zhaoyun-data,guanyu-dev
# 列出项目
python3 blackboard.py project list
# 切换默认项目
python3 blackboard.py project default quant-momentum
# 归档项目(不删除数据,只停 tick)
python3 blackboard.py project archive quant-pairs
# 删除项目(删除数据,需确认)
python3 blackboard.py project delete quant-pairs --confirm
6.2 所有操作指定项目
# 方式1:命令行参数
python3 blackboard.py read --project quant-momentum --task task-001
# 方式2:环境变量(设置后所有命令默认用此项目)
export MOZIPLUS_PROJECT=quant-momentum
python3 blackboard.py read --task task-001
# 方式3:默认项目(_registry.yaml 中 default_project)
# 不指定 --project 也不设环境变量时,使用 default_project
python3 blackboard.py read --task task-001
6.3 Agent 使用的项目解析优先级
1. --project 参数(显式指定)
2. MOZIPLUS_PROJECT 环境变量
3. _registry.yaml 中的 default_project
4. 如果只有一个 active 项目,自动使用
5. 都没有 → 报错"请指定项目"
7. L2 上下文注入变更
7.1 Agent spawn 时注入项目上下文
L2 prompt_template 三段式注入增加项目段:
═══ 项目上下文 ═══
项目: quant-momentum(动量因子策略)
背景: <project_context.yaml 内容,由项目级配置提供>
可用 Agent: 张飞(编码)、赵云(数据)、关羽(风控)
═══ 任务上下文 ═══
(原有内容不变)
7.2 project_context.yaml 示例
# projects/quant-momentum/config/project_context.yaml
description: "基于动量因子的量化策略研发"
domain: "量化交易"
data_sources:
- "NAS /Volumes/stock/ A股日线数据"
- "NAS /Volumes/stock/minute_kline/ 分钟线数据"
code_repo: "~/.openclaw/sanguo_projects/sanguo_quant_live/"
key_constraints:
- "所有策略必须通过回测验证才能上实盘"
- "止损逻辑必须经过关羽风控审查"
8. 跨项目协作
8.1 默认禁止跨项目
Agent 不能跨项目读写黑板。这是安全边界——不同项目的数据、配置、产出互不干扰。
8.2 跨项目数据共享
如果项目 A 需要项目 B 的产出(如"moziplus-dev 需要赵云的数据"),通过文件系统共享:
# 项目 A 中,Agent 把产出写到 NAS 共享路径
# 项目 B 中,Agent 从 NAS 共享路径读取
不需要特殊的跨项目协议——NAS 路径就是跨项目的桥梁,和当前团队的工作方式一致。
8.3 跨项目任务请求(可选扩展)
如果未来需要 Agent 主动发起跨项目请求:
项目 A 黑板 → 创建 cross_project_request 类型任务
→ Daemon 检测到 → 在项目 B 黑板创建对应任务
→ 项目 B Agent 完成 → 产出写入 NAS
→ Daemon 检测项目 B 完成 → 更新项目 A 任务状态
当前不实现,预留设计空间。
8.4 项目归档/删除时正在运行的任务(v2 新增)
归档(archive):
- 检查是否有 working 状态的任务
- 有 → 将这些任务标记为 cancelled,等待 Agent 完成回调(超时兜底 5 分钟)
- 无 → 立即停止该项目的 ProjectSlot 线程
- 将
_registry.yaml中状态改为archived
删除(delete):
- 必须先 archive
- 必须无 working 任务(归档时已处理)
--confirm确认rm -rf projects/{project_id}/- 从
_registry.yaml移除条目
禁止直接删除 active 项目——必须先归档。
9. 与其他课题的关系
| 课题 | 关系 | 说明 |
|---|---|---|
| 课题1(三层执行) | 无冲突 | Agent spawn 时多传一个 project_id,L2 注入多加项目上下文 |
| 课题2(事件驱动) | 微调 | Inbox JSONL 增加 project_id 字段,Daemon 路由到正确连接 |
| 课题3(挑战/评审) | 项目级配置 | guardrails.yaml 项目级覆盖 |
| 课题4(拆解+上下文) | 项目级配置 | project_context.yaml 注入 L2 |
| 课题6(经验沉淀) | 项目级经验 | 每个项目独立的 experiences 表,经验不跨项目污染 |
| 课题7+9(交互+Dashboard) | 多项目视图 | Dashboard 需要项目切换/多项目概览 |
| Worktree 隔离 | 正交 | Worktree 解决"同项目内多 Agent 并行改代码",课题11 解决"不同项目数据隔离" |
10. 黑板 Schema 变更
不增加 project_id 字段——每个项目有独立数据库,表结构不变。
唯一新增:_registry.yaml 项目注册表。
11. 开发清单
| # | 任务 | 依赖 |
|---|---|---|
| 1 | 项目目录结构 + _registry.yaml + project.yaml Schema | 无 |
| 2 | ActiveAgentCounter(线程安全计数器 + 全局/per-agent 双重限制) | 无 |
| 3 | ProjectSlot(独立线程 tick + SQLite 独立连接 + spawn 前检查计数器) | 1, 2 |
| 4 | Daemon 主循环(启动状态恢复 + ProjectSlot 线程监控/重启) | 3 |
| 4a | 启动时幽灵 working 任务处理(扫描+标记 failed) | 4 |
| 5 | CLI project create/list/default/archive/delete 命令 | 1 |
| 6 | CLI 所有操作增加 --project 参数 + 优先级解析 | 1, 3 |
| 7 | L2 prompt_template 注入 project_context + session_id 命名规则 | 3, 6 |
| 8 | Daemon 逻辑健康自检(按项目追踪 + 计数器超时兜底) | 4 |
| 9 | 项目归档/删除安全流程(working 任务处理) | 4, 5 |
| 10 | Dashboard 项目切换 + 多项目概览 | 课题9 |
附录:方案 B 的详细反驳
有人可能觉得方案 B(加 project_id 字段)更简单。但实际上:
-
安全面:方案 B 靠
WHERE project_id = ?逻辑隔离。一个漏掉的 WHERE = 数据泄漏。方案 C 靠物理文件隔离,漏不掉。 -
性能面:方案 B 所有项目共享一个 SQLite 文件。10 个项目各 10 个任务 = 100 个任务在一个
.db里。WAL 写入是串行的,多项目并发 tick 会互相等待。方案 C 每个项目独立文件,互不影响。 -
运维面:方案 B 删除项目 =
DELETE FROM tasks WHERE project_id = ?+ 8 张表都要删。方案 C =rm -rf projects/xxx/。备份/恢复同理。 -
配置面:方案 B 的 guardrails.yaml 要设计"全局默认 + 项目覆盖"的合并逻辑。方案 C 每个项目独立 config/ 目录,天然隔离,覆盖逻辑更清晰。
-
经验沉淀面:方案 B 的经验混在一个 experiences 表里。如果项目 A 的"pytest 参数经验"污染到项目 B(B 可能不做 Python),反而有害。方案 C 每个项目独立经验库。
唯一方案 B 更优的场景:跨项目统计分析("所有项目的平均完成时间")。但这个需求可以后期通过注册表元数据实现,不需要把所有数据放一个库里。