686 lines
29 KiB
Markdown
686 lines
29 KiB
Markdown
# 课题11 设计方案:用户级多项目支持
|
||
|
||
> **日期**: 2026-05-16
|
||
> **作者**: 庞统(副军师)🐦
|
||
> **状态**: v2(并发调度模型重设计,待评审)
|
||
> **前置**: 课题1-4、课题6 已完成设计
|
||
> **变更**: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick
|
||
|
||
---
|
||
|
||
## 1. 核心问题
|
||
|
||
用户同时有多个工作域(量化策略A + 平台开发 + 数据研究),需要项目级隔离——不同项目的任务、配置、产出互不干扰,但共享同一套 Agent 团队和 Daemon 基础设施。
|
||
|
||
## 2. 需要隔离什么
|
||
|
||
| 隔离项 | 原因 | 隔离方式 |
|
||
|--------|------|---------|
|
||
| 黑板数据(tasks/comments/outputs/decisions/observations/events/agents/task_attempts/reviews/experiences/experience_tags) | 不同项目的任务不能混在一起 | 独立 SQLite 文件 |
|
||
| 配置(guardrails.yaml / prompt_templates / project_context.yaml) | 不同项目可能有不同的审查规则、上下文 | 项目级 config/ 目录,覆盖全局默认 |
|
||
| 产出文件 | 不同项目的代码/数据物理隔离 | 项目级 outputs/ 目录 |
|
||
| Agent session | 同一 Agent 参与不同项目时上下文不串 | OpenClaw `--session-id` 已有隔离 |
|
||
| Daemon 连接 | 不同数据库连接不能混淆 | 连接池 project_id → Connection 映射 |
|
||
|
||
**不需要隔离的**:
|
||
- Agent 注册表(agents.yaml)—— 同一套 Agent 团队服务所有项目
|
||
- Daemon 进程 —— 单进程管理所有项目
|
||
- Schema 定义 —— 所有项目共享同一套表结构
|
||
- 全局 prompt_templates —— 项目级覆盖,不是替换
|
||
|
||
## 3. 方案选择
|
||
|
||
### 3.1 三个方案对比
|
||
|
||
| 方案 | 做法 | 优点 | 缺点 |
|
||
|------|------|------|------|
|
||
| **A. 多实例** | 每个项目独立 Daemon + 独立 SQLite + 独立端口 | 完全隔离、互不影响 | 资源开销翻倍、管理复杂、Agent 重复注册 |
|
||
| **B. 单实例多命名空间** | 一个 SQLite,所有表加 `project_id` | 零额外资源 | 每个查询带 WHERE、单文件性能上限、删除项目危险 |
|
||
| **C. 单 Daemon 多数据库** | 一个 Daemon,每个项目一个 SQLite 文件 | 物理隔离数据、共享 Daemon | Daemon 需管理多连接 |
|
||
|
||
### 3.2 选择方案 C
|
||
|
||
理由:
|
||
1. **方案 A 不适合**——Mac mini 资源有限,每多一个项目就多一套 Daemon + PM2 进程 + 端口。6 个 Agent 跑在 OpenClaw 上已经固定开销,不需要重复。
|
||
2. **方案 B 不够安全**——`WHERE project_id = ?` 容易漏,SQLite 单文件多项目并发有 WAL 锁瓶颈,删除项目 = 跨所有表 DELETE。
|
||
3. **方案 C 是最优点**——数据物理隔离(每个 `.db` 文件独立),但共享 Daemon 进程和 Agent 注册表。Daemon 切换项目只是切换 SQLite 连接,无额外资源开销。
|
||
|
||
**优秀实践验证**:
|
||
- Wanman:Per-Agent Worktree + $HOME 严格隔离 → 验证"物理隔离比逻辑隔离可靠"
|
||
- ClawTeam:Git Worktree 隔离 + fcntl 文件锁 → 验证"共享进程 + 独立存储"模式可行
|
||
- Cline:Kanban + Worktree → 验证"多任务并行 + 物理隔离"是主流
|
||
- Hermes:单 Dispatcher + 单 SQLite → **Hermes 是单项目设计**,我们没有"多项目用单数据库"的先例
|
||
|
||
## 4. 目录结构
|
||
|
||
```
|
||
~/.sanguo_projects/moziplus_v2/
|
||
├── daemon.py # 单 Daemon 进程
|
||
├── daemon.yaml # Daemon 全局配置(端口、tick 间隔等)
|
||
├── projects/
|
||
│ ├── _registry.yaml # 项目注册表(所有项目的元数据)
|
||
│ ├── quant-momentum/ # 项目 1
|
||
│ │ ├── blackboard.db # 独立 SQLite
|
||
│ │ ├── config/
|
||
│ │ │ ├── project.yaml # 项目元信息(名称、描述、创建时间)
|
||
│ │ │ ├── guardrails.yaml # 项目级审查规则(覆盖全局默认)
|
||
│ │ │ ├── project_context.yaml # 项目背景知识(注入 L2)
|
||
│ │ │ └── prompt_overrides/ # 可选:覆盖默认 prompt 模板
|
||
│ │ └── outputs/ # 项目产出目录
|
||
│ ├── quant-pairs/ # 项目 2
|
||
│ │ ├── blackboard.db
|
||
│ │ ├── config/
|
||
│ │ └── outputs/
|
||
│ └── moziplus-dev/ # 项目 3(自身开发)
|
||
│ ├── blackboard.db
|
||
│ ├── config/
|
||
│ └── outputs/
|
||
└── shared/
|
||
├── prompt_templates/ # 全局默认模板
|
||
├── schemas/ # 全局 Schema
|
||
└── agents.yaml # 全局 Agent 注册表
|
||
```
|
||
|
||
### 4.1 项目注册表(_registry.yaml)
|
||
|
||
```yaml
|
||
# projects/_registry.yaml
|
||
default_project: quant-momentum
|
||
projects:
|
||
quant-momentum:
|
||
display_name: "动量因子策略"
|
||
description: "基于动量因子的量化策略研发"
|
||
created_at: "2026-05-16T10:00:00Z"
|
||
status: active # active / archived
|
||
agents: [zhangfei-dev, zhaoyun-data, guanyu-dev] # 该项目可用的 Agent
|
||
quant-pairs:
|
||
display_name: "配对交易策略"
|
||
description: "统计套利配对交易研究"
|
||
created_at: "2026-05-16T10:00:00Z"
|
||
status: active
|
||
agents: [zhangfei-dev, zhaoyun-data]
|
||
moziplus-dev:
|
||
display_name: "平台开发"
|
||
description: "moziplus 自身开发"
|
||
created_at: "2026-05-16T10:00:00Z"
|
||
status: active
|
||
agents: [zhangfei-dev, simayi-challenger]
|
||
```
|
||
|
||
## 5. Daemon 变更
|
||
|
||
### 5.1 多连接池
|
||
|
||
```python
|
||
# daemon 内部
|
||
class ProjectManager:
|
||
def __init__(self, projects_dir: Path):
|
||
self.projects_dir = projects_dir
|
||
self._connections: dict[str, sqlite3.Connection] = {}
|
||
self._configs: dict[str, ProjectConfig] = {}
|
||
self._load_registry()
|
||
|
||
def get_connection(self, project_id: str) -> sqlite3.Connection:
|
||
if project_id not in self._connections:
|
||
db_path = self.projects_dir / project_id / "blackboard.db"
|
||
self._connections[project_id] = sqlite_connect(db_path)
|
||
return self._connections[project_id]
|
||
|
||
def get_config(self, project_id: str) -> ProjectConfig:
|
||
if project_id not in self._configs:
|
||
config_path = self.projects_dir / project_id / "config" / "project.yaml"
|
||
self._configs[project_id] = ProjectConfig.load(config_path)
|
||
return self._configs[project_id]
|
||
|
||
def load_guardrails(self, project_id: str) -> dict:
|
||
"""项目级 guardrails.yaml 覆盖全局默认"""
|
||
global_guardrails = load_yaml("shared/guardrails.yaml")
|
||
project_guardrails_path = self.projects_dir / project_id / "config" / "guardrails.yaml"
|
||
if project_guardrails_path.exists():
|
||
project_guardrails = load_yaml(project_guardrails_path)
|
||
return deep_merge(global_guardrails, project_guardrails) # 项目级覆盖
|
||
return global_guardrails
|
||
```
|
||
|
||
### 5.2 ~~Tick 逻辑变更~~(已废弃,见 §5.4)
|
||
|
||
> **原设计**:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。
|
||
> **问题**:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。
|
||
> **新设计**:见 §5.4 per-project 并发调度。
|
||
|
||
### 5.3 Daemon 逻辑健康自检 + 线程存活监控(v2 扩展)
|
||
|
||
```python
|
||
# §14 风险缓解:连续 N tick 无状态变更则告警
|
||
STALE_TICK_THRESHOLD = 20
|
||
|
||
class DaemonHealth:
|
||
def __init__(self, project_id: str):
|
||
self.project_id = project_id
|
||
self._idle_ticks = 0
|
||
|
||
def record_idle(self):
|
||
self._idle_ticks += 1
|
||
|
||
def record_change(self):
|
||
self._idle_ticks = 0
|
||
|
||
def is_stale(self) -> bool:
|
||
return self._idle_ticks >= STALE_TICK_THRESHOLD
|
||
```
|
||
|
||
**线程存活监控**(见 §5.4.4 `Daemon._check_slot_health()`):
|
||
- Daemon 主线程每 60s 检查所有 ProjectSlot 线程是否存活
|
||
- 线程死亡 → 记录日志 + 自动重启
|
||
- 连续重启 3 次失败 → 告警(通过 Sanguo Mail 通知用户)
|
||
|
||
**计数器超时兜底**:
|
||
- 如果 Agent 完成回调丢失(进程被杀、网络断),`ActiveAgentCounter` 不会归零
|
||
- `_check_working_tasks()` 中,working 任务超过 `task_timeout`(默认 10 分钟)视为完成
|
||
- 视为完成时主动 `decrement()`,防止计数器泄漏
|
||
|
||
### 5.5 Daemon 崩溃恢复(v2 新增)
|
||
|
||
**设计原则**:状态全在 SQLite,Daemon 无状态。重启 = 重新加载所有项目 + 所有任务状态,继续执行。
|
||
|
||
**恢复流程(保守策略)**:
|
||
|
||
```
|
||
PM2 检测 Daemon 挂了 → 重启 Daemon
|
||
│
|
||
├── 读取 _registry.yaml → 恢复项目列表
|
||
├── 遍历每个 active 项目 → 打开 SQLite 连接
|
||
├── 扫描所有 working 任务 → 标记为 failed(原因: "Daemon restart, agent process lost")
|
||
├── 启动 ProjectSlot 线程
|
||
└── 后续 pending 任务正常分配
|
||
```
|
||
|
||
**为什么不重新执行**:
|
||
1. Daemon 崩溃是不正常事件,Agent 子进程状态不可预测
|
||
2. output.json 可能写了一半,重新执行比恢复更安全
|
||
3. 用户手动 retry 比自动重新执行更可控
|
||
4. task_attempts 表记录完整,不丢信息
|
||
|
||
**关键设计**:
|
||
1. **SQLite 是真相来源**——所有任务状态、产出记录都在 `.db` 文件里,Daemon 内存无状态
|
||
2. **SQLite WAL 保护数据完整性**——崩溃时未提交的事务自动回滚
|
||
3. **ActiveAgentCounter / DaemonHealth 重启后归零**——不需要持久化
|
||
4. **task_attempts 的 attempt_index 递增**——retry 不覆盖历史
|
||
|
||
**不需要额外存储**:
|
||
- 不需要 checkpoint 文件——SQLite 就是 checkpoint
|
||
- 不需要 recovery log——task_attempts 表已经记录所有尝试
|
||
- 不需要 recovery log——`task_attempts` 表已记录每次尝试
|
||
- 不需要状态快照——每次 tick 从 SQLite 实时读取
|
||
|
||
**唯一注意**:`ActiveAgentCounter` 重启后从零开始——但 `_tick()` 会重新扫描 working 任务并重新计数,所以没问题。
|
||
|
||
### 5.4 并发调度模型(v2 新增)
|
||
|
||
#### 5.4.1 问题
|
||
|
||
原设计中 Daemon 主循环串行 tick 所有项目:
|
||
|
||
```
|
||
Tick → Project A(30s)→ Project B(等A完成)→ Project C(等B完成)
|
||
```
|
||
|
||
问题:
|
||
1. **项目间互相阻塞**——Project A 有一个长任务在执行,Project B 的独立任务必须等
|
||
2. **响应延迟**——3 个项目 tick 一次可能要 90s+,Project C 要等 60s 才被检查
|
||
3. **不符合业界实践**——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队
|
||
|
||
#### 5.4.2 业界并发模型调研
|
||
|
||
| 项目 | 并发模型 | 核心机制 |
|
||
|------|---------|---------|
|
||
| **open-multi-agent** | AgentPool + Semaphore | 全局 `maxConcurrency=5`,per-agent 互斥锁,`Promise.allSettled` 并行执行独立任务 |
|
||
| **Wanman** | per-agent 进程 | 每个 Agent 独立进程+独立 runLoop,Supervisor 通过消息总线协调 |
|
||
| **Google ADK** | asyncio.TaskGroup | `ParallelAgent` 用 `TaskGroup` 并行执行子 Agent |
|
||
| **Microsoft AutoGen** | Pregel Superstep | 每个 superstep 内所有激活 Executor 并行执行 |
|
||
| **Hermes** | 单线程 tick + flock | **单项目设计**,tick 内只有几个 cron job,不需要并发 |
|
||
|
||
**关键发现**:open-multi-agent 的 `AgentPool + Semaphore + per-agent Lock` 是最成熟、最可借鉴的模型。
|
||
|
||
#### 5.4.3 设计:per-project 线程 + 全局资源信号量
|
||
|
||
```
|
||
Daemon 主进程(轻量路由器 + 资源管控)
|
||
│
|
||
├── 全局 LLM Semaphore(max_concurrent=3)
|
||
├── per-agent Lock(张飞不能同时在两个项目里跑)
|
||
│
|
||
├── ProjectSlot A(独立线程)
|
||
│ └── 自己的 SQLite 连接
|
||
│ └── 自己的 tick 循环(30s)
|
||
│ └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore
|
||
│
|
||
├── ProjectSlot B(独立线程)
|
||
│ └── (同上)
|
||
│
|
||
└── ProjectSlot C(独立线程)
|
||
└── (同上)
|
||
```
|
||
|
||
**三层资源控制**:
|
||
|
||
| 层级 | 控制对象 | 机制 | 原因 |
|
||
|------|---------|------|------|
|
||
| L1: 项目隔离 | SQLite 连接 | per-project 独立连接 | 数据物理隔离,无竞争 |
|
||
| L2: Agent 互斥 | 同一 Agent 不能并行 | `threading.Lock` per-agent | Agent session 不是线程安全的,张飞同一时刻只能服务一个任务 |
|
||
| L3: 全局资源 | LLM API 调用并发 | `threading.Semaphore(max_concurrent)` | 防止 API 限流、控制成本 |
|
||
|
||
#### 5.4.4 核心代码
|
||
|
||
```python
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
|
||
|
||
class ActiveAgentCounter:
|
||
"""线程安全的 Agent 活跃任务计数器。"""
|
||
|
||
def __init__(self, max_global: int = 5):
|
||
self._counts: dict[str, int] = {} # agent_id → 活跃任务数
|
||
self._total = 0 # 全局活跃总数
|
||
self._max_global = max_global
|
||
self._lock = threading.Lock()
|
||
|
||
def can_acquire(self, agent_id: str, max_per_agent: int = 1) -> bool:
|
||
"""检查是否可以分配(非阻塞)。"""
|
||
with self._lock:
|
||
if self._total >= self._max_global:
|
||
return False
|
||
return self._counts.get(agent_id, 0) < max_per_agent
|
||
|
||
def increment(self, agent_id: str):
|
||
with self._lock:
|
||
self._counts[agent_id] = self._counts.get(agent_id, 0) + 1
|
||
self._total += 1
|
||
|
||
def decrement(self, agent_id: str):
|
||
with self._lock:
|
||
if self._counts.get(agent_id, 0) > 0:
|
||
self._counts[agent_id] -= 1
|
||
self._total -= 1
|
||
|
||
|
||
class Daemon:
|
||
"""单进程 Daemon,per-project 线程并发。"""
|
||
|
||
def __init__(self, config: DaemonConfig):
|
||
self.config = config
|
||
self.agent_counter = ActiveAgentCounter(max_global=config.max_global_active) # 默认 5
|
||
self.slots: dict[str, ProjectSlot] = {}
|
||
self._slot_threads: dict[str, threading.Thread] = {} # 线程存活监控
|
||
self._shutdown = threading.Event()
|
||
|
||
def start(self):
|
||
"""启动所有 active 项目的独立线程。"""
|
||
registry = load_registry()
|
||
for project_id, meta in registry["projects"].items():
|
||
if meta["status"] != "active":
|
||
continue
|
||
self._start_slot(project_id, meta)
|
||
|
||
# 主线程:监控 + 等待 shutdown
|
||
while not self._shutdown.is_set():
|
||
self._check_slot_health()
|
||
self._shutdown.wait(60) # 每 60s 检查一次线程存活
|
||
|
||
def _start_slot(self, project_id: str, meta: dict):
|
||
slot = ProjectSlot(
|
||
project_id=project_id,
|
||
config=meta,
|
||
agent_counter=self.agent_counter,
|
||
tick_interval=self.config.tick_interval,
|
||
shutdown_event=self._shutdown,
|
||
)
|
||
self.slots[project_id] = slot
|
||
t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True)
|
||
t.start()
|
||
self._slot_threads[project_id] = t
|
||
|
||
def _check_slot_health(self):
|
||
"""检查所有 ProjectSlot 线程是否存活,死亡则重启。"""
|
||
for project_id, thread in list(self._slot_threads.items()):
|
||
if not thread.is_alive():
|
||
logger.warning(f"ProjectSlot {project_id} thread died, restarting...")
|
||
meta = self.slots[project_id].config
|
||
self._start_slot(project_id, meta)
|
||
|
||
def shutdown(self):
|
||
self._shutdown.set()
|
||
|
||
|
||
class ProjectSlot:
|
||
"""单项目的独立 tick 循环。"""
|
||
|
||
def __init__(self, project_id, config, agent_counter,
|
||
tick_interval=30, shutdown_event=None):
|
||
self.project_id = project_id
|
||
self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db"))
|
||
self.config = config
|
||
self.agent_counter = agent_counter # 共享:全局 Agent 活跃计数器
|
||
self.tick_interval = tick_interval
|
||
self.shutdown = shutdown_event or threading.Event()
|
||
self.health = DaemonHealth(project_id)
|
||
|
||
def run_loop(self):
|
||
"""独立线程的主循环。"""
|
||
while not self.shutdown.is_set():
|
||
try:
|
||
self._tick()
|
||
except Exception as e:
|
||
logger.error(f"[{self.project_id}] tick failed: {e}")
|
||
self.shutdown.wait(self.tick_interval)
|
||
|
||
def _tick(self):
|
||
"""单次 tick:找 pending 任务,尝试分配。"""
|
||
# 先检查已完成的 Agent,释放计数器
|
||
completed = self._check_working_tasks() # 返回已完成的 agent_id 列表
|
||
for agent_id in completed:
|
||
self.agent_counter.decrement(agent_id)
|
||
|
||
pending = find_pending(self.conn)
|
||
if not pending:
|
||
self.health.record_idle()
|
||
return
|
||
|
||
for task in pending:
|
||
agent_id = task["assignee"]
|
||
max_per_agent = self.config.get("max_active_per_agent", 1)
|
||
|
||
# 检查全局 + per-agent 并发上限
|
||
if not self.agent_counter.can_acquire(agent_id, max_per_agent):
|
||
logger.info(f"[{self.project_id}] {agent_id} at capacity, skip task {task['id']}")
|
||
continue
|
||
|
||
# sequential 模式:检查该 Agent 在本项目是否有 working 任务
|
||
if self.config.get("agent_parallelism") != "parallel":
|
||
if has_working_task(self.conn, agent_id):
|
||
continue
|
||
|
||
# 生成 session_id(§5.4.6 命名规则)
|
||
session_id = self._get_session_id(agent_id, task['id'])
|
||
|
||
# spawn Agent(异步,不阻塞)
|
||
spawn_agent(
|
||
project_id=self.project_id,
|
||
task=task,
|
||
session_id=session_id,
|
||
)
|
||
self.agent_counter.increment(agent_id) # +1
|
||
self.health.record_change()
|
||
|
||
def _get_session_id(self, agent_id: str, task_id: str) -> str:
|
||
"""§5.4.6 session 命名规则。"""
|
||
if self.config.get("agent_parallelism") != "parallel":
|
||
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
|
||
return f"agent:{agent_id}:project:{self.project_id}"
|
||
else:
|
||
# parallel:每个任务独立 session
|
||
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
|
||
```
|
||
|
||
#### 5.4.5 资源控制模型(v2 修订:异步计数器替代同步信号量)
|
||
|
||
**原设计问题**:spawn_agent() 不是瞬时操作——Agent 执行涉及 LLM API 调用(可能多轮工具调用),如果在 spawn 完成后才释放锁/信号量,并发退化回串行;如果在 spawn 启动后立即释放,信号量没有真正限流。
|
||
|
||
**修订方案**(采纳司马懿评审建议):
|
||
|
||
| 资源 | 原方案 | 修订方案 | 原因 |
|
||
|------|--------|---------|------|
|
||
| Agent 互斥 | `threading.Lock` | 移除。改为 per-project session 命名 + `_check_working_tasks()` | 同一 Agent 可用不同 session-id 安全服务不同项目 |
|
||
| LLM 并发 | `threading.Semaphore` | `ActiveAgentCounter`(异步计数器) | spawn 是异步的,同步信号量无法精确限流 |
|
||
| 项目隔离 | per-project SQLite | 不变 | |
|
||
|
||
**新时序**:
|
||
|
||
```
|
||
ProjectSlot._tick()
|
||
│
|
||
├── 检查 ActiveAgentCounter[agent_id] < max_active?
|
||
│ └── 否 → 跳过,下个 tick 再检查
|
||
├── 检查 agent_parallelism == "sequential" 且该 Agent 有 working 任务?
|
||
│ └── 是 → 跳过
|
||
├── spawn_agent(project_id, task)
|
||
│ ├── session_id = "agent:{agent_id}:project:{project_id}:task:{task_id}"
|
||
│ └── ActiveAgentCounter.increment(agent_id) # +1
|
||
│
|
||
└── Agent 完成回调(下次 tick 检测到 output 或 webhook)
|
||
└── ActiveAgentCounter.decrement(agent_id) # -1
|
||
```
|
||
|
||
**关键变化**:
|
||
1. 不再使用 `threading.Lock` 和 `threading.Semaphore`——它们是同步原语,不适合异步 spawn 场景
|
||
2. 改用 `ActiveAgentCounter`(线程安全计数器),spawn 时 +1,Agent 完成回调时 -1
|
||
3. `_tick()` 分配前检查计数器,超过阈值就跳过
|
||
4. Agent session 按 `agent:{agent_id}:project:{project_id}:task:{task_id}` 命名,项目+任务级天然隔离
|
||
|
||
#### 5.4.6 Agent 并行策略 + Session 隔离
|
||
|
||
**并行策略配置**:
|
||
|
||
```yaml
|
||
# _registry.yaml 中可配置
|
||
projects:
|
||
quant-momentum:
|
||
agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认)
|
||
max_active_per_agent: 1 # sequential 的显式写法
|
||
quant-pairs:
|
||
agent_parallelism: parallel # 同一 Agent 可同时跑多个任务
|
||
max_active_per_agent: 2 # 最多 2 个并行
|
||
```
|
||
|
||
**Session 命名规则**:
|
||
|
||
```
|
||
格式:agent:{agent_id}:project:{project_id}:task:{task_id}
|
||
示例:agent:zhangfei-dev:project:quant-momentum:task:task-001
|
||
```
|
||
|
||
- 每个任务独立 session,任务间上下文不串
|
||
- 同一 Agent 在不同项目用不同 session,项目间上下文不串
|
||
- `sequential` 模式:同一项目同一 Agent 只有一个活跃 session(新的任务复用或新开)
|
||
- `parallel` 模式:每个任务独立 session
|
||
|
||
**sequential 模式下的 session 复用**:
|
||
|
||
```python
|
||
def _get_session_id(self, agent_id: str, task_id: str) -> str:
|
||
project_config = self.config
|
||
if project_config.get("agent_parallelism") != "parallel":
|
||
# sequential:同一项目同一 Agent 复用 session(保持上下文连续性)
|
||
return f"agent:{agent_id}:project:{self.project_id}"
|
||
else:
|
||
# parallel:每个任务独立 session
|
||
return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}"
|
||
```
|
||
|
||
#### 5.4.7 并发安全保证
|
||
|
||
| 并发场景 | 风险 | 保护机制 |
|
||
|---------|------|---------|
|
||
| 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 `.db` 文件,不存在此场景 |
|
||
| 两个项目同时分配同一个 Agent | Agent 资源争抢 | `ActiveAgentCounter` + `max_active_per_agent` 限制 |
|
||
| LLM API 并发超限 | API 限流/超限 | `ActiveAgentCounter` 全局计数,`_tick()` 分配前检查 |
|
||
| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活(§5.4.8) |
|
||
| Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 |
|
||
| Agent 完成回调丢失 | 计数器不归零 | 超时兜底:working 任务超过 `task_timeout` 视为完成,计数器 -1 |
|
||
| _registry.yaml 并发写入 | 数据损坏 | _registry.yaml 只在 CLI 操作时读写(非 tick 热路径),tick 状态用内存 dict |
|
||
|
||
### 6.1 项目管理命令
|
||
|
||
```bash
|
||
# 创建项目
|
||
python3 blackboard.py project create --name quant-momentum --display-name "动量因子策略" --agents zhangfei-dev,zhaoyun-data,guanyu-dev
|
||
|
||
# 列出项目
|
||
python3 blackboard.py project list
|
||
|
||
# 切换默认项目
|
||
python3 blackboard.py project default quant-momentum
|
||
|
||
# 归档项目(不删除数据,只停 tick)
|
||
python3 blackboard.py project archive quant-pairs
|
||
|
||
# 删除项目(删除数据,需确认)
|
||
python3 blackboard.py project delete quant-pairs --confirm
|
||
```
|
||
|
||
### 6.2 所有操作指定项目
|
||
|
||
```bash
|
||
# 方式1:命令行参数
|
||
python3 blackboard.py read --project quant-momentum --task task-001
|
||
|
||
# 方式2:环境变量(设置后所有命令默认用此项目)
|
||
export MOZIPLUS_PROJECT=quant-momentum
|
||
python3 blackboard.py read --task task-001
|
||
|
||
# 方式3:默认项目(_registry.yaml 中 default_project)
|
||
# 不指定 --project 也不设环境变量时,使用 default_project
|
||
python3 blackboard.py read --task task-001
|
||
```
|
||
|
||
### 6.3 Agent 使用的项目解析优先级
|
||
|
||
```
|
||
1. --project 参数(显式指定)
|
||
2. MOZIPLUS_PROJECT 环境变量
|
||
3. _registry.yaml 中的 default_project
|
||
4. 如果只有一个 active 项目,自动使用
|
||
5. 都没有 → 报错"请指定项目"
|
||
```
|
||
|
||
## 7. L2 上下文注入变更
|
||
|
||
### 7.1 Agent spawn 时注入项目上下文
|
||
|
||
L2 prompt_template 三段式注入增加项目段:
|
||
|
||
```
|
||
═══ 项目上下文 ═══
|
||
项目: quant-momentum(动量因子策略)
|
||
背景: <project_context.yaml 内容,由项目级配置提供>
|
||
可用 Agent: 张飞(编码)、赵云(数据)、关羽(风控)
|
||
═══ 任务上下文 ═══
|
||
(原有内容不变)
|
||
```
|
||
|
||
### 7.2 project_context.yaml 示例
|
||
|
||
```yaml
|
||
# projects/quant-momentum/config/project_context.yaml
|
||
description: "基于动量因子的量化策略研发"
|
||
domain: "量化交易"
|
||
data_sources:
|
||
- "NAS /Volumes/stock/ A股日线数据"
|
||
- "NAS /Volumes/stock/minute_kline/ 分钟线数据"
|
||
code_repo: "~/.openclaw/sanguo_projects/sanguo_quant_live/"
|
||
key_constraints:
|
||
- "所有策略必须通过回测验证才能上实盘"
|
||
- "止损逻辑必须经过关羽风控审查"
|
||
```
|
||
|
||
## 8. 跨项目协作
|
||
|
||
### 8.1 默认禁止跨项目
|
||
|
||
Agent 不能跨项目读写黑板。这是安全边界——不同项目的数据、配置、产出互不干扰。
|
||
|
||
### 8.2 跨项目数据共享
|
||
|
||
如果项目 A 需要项目 B 的产出(如"moziplus-dev 需要赵云的数据"),通过文件系统共享:
|
||
|
||
```bash
|
||
# 项目 A 中,Agent 把产出写到 NAS 共享路径
|
||
# 项目 B 中,Agent 从 NAS 共享路径读取
|
||
```
|
||
|
||
不需要特殊的跨项目协议——**NAS 路径就是跨项目的桥梁**,和当前团队的工作方式一致。
|
||
|
||
### 8.3 跨项目任务请求(可选扩展)
|
||
|
||
如果未来需要 Agent 主动发起跨项目请求:
|
||
|
||
```
|
||
项目 A 黑板 → 创建 cross_project_request 类型任务
|
||
→ Daemon 检测到 → 在项目 B 黑板创建对应任务
|
||
→ 项目 B Agent 完成 → 产出写入 NAS
|
||
→ Daemon 检测项目 B 完成 → 更新项目 A 任务状态
|
||
```
|
||
|
||
当前不实现,预留设计空间。
|
||
|
||
### 8.4 项目归档/删除时正在运行的任务(v2 新增)
|
||
|
||
**归档(archive)**:
|
||
1. 检查是否有 working 状态的任务
|
||
2. 有 → 将这些任务标记为 cancelled,等待 Agent 完成回调(超时兜底 5 分钟)
|
||
3. 无 → 立即停止该项目的 ProjectSlot 线程
|
||
4. 将 `_registry.yaml` 中状态改为 `archived`
|
||
|
||
**删除(delete)**:
|
||
1. 必须先 archive
|
||
2. 必须无 working 任务(归档时已处理)
|
||
3. `--confirm` 确认
|
||
4. `rm -rf projects/{project_id}/`
|
||
5. 从 `_registry.yaml` 移除条目
|
||
|
||
**禁止直接删除 active 项目**——必须先归档。
|
||
|
||
## 9. 与其他课题的关系
|
||
|
||
| 课题 | 关系 | 说明 |
|
||
|------|------|------|
|
||
| 课题1(三层执行) | 无冲突 | Agent spawn 时多传一个 project_id,L2 注入多加项目上下文 |
|
||
| 课题2(事件驱动) | 微调 | Inbox JSONL 增加 project_id 字段,Daemon 路由到正确连接 |
|
||
| 课题3(挑战/评审) | 项目级配置 | guardrails.yaml 项目级覆盖 |
|
||
| 课题4(拆解+上下文) | 项目级配置 | project_context.yaml 注入 L2 |
|
||
| 课题6(经验沉淀) | 项目级经验 | 每个项目独立的 experiences 表,经验不跨项目污染 |
|
||
| 课题7+9(交互+Dashboard) | 多项目视图 | Dashboard 需要项目切换/多项目概览 |
|
||
| Worktree 隔离 | 正交 | Worktree 解决"同项目内多 Agent 并行改代码",课题11 解决"不同项目数据隔离" |
|
||
|
||
## 10. 黑板 Schema 变更
|
||
|
||
**不增加 project_id 字段**——每个项目有独立数据库,表结构不变。
|
||
|
||
唯一新增:`_registry.yaml` 项目注册表。
|
||
|
||
## 11. 开发清单
|
||
|
||
| # | 任务 | 依赖 |
|
||
|---|------|------|
|
||
| 1 | 项目目录结构 + _registry.yaml + project.yaml Schema | 无 |
|
||
| 2 | ActiveAgentCounter(线程安全计数器 + 全局/per-agent 双重限制) | 无 |
|
||
| 3 | ProjectSlot(独立线程 tick + SQLite 独立连接 + spawn 前检查计数器) | 1, 2 |
|
||
| 4 | Daemon 主循环(启动状态恢复 + ProjectSlot 线程监控/重启) | 3 |
|
||
| 4a | 启动时幽灵 working 任务处理(扫描+标记 failed) | 4 |
|
||
| 5 | CLI project create/list/default/archive/delete 命令 | 1 |
|
||
| 6 | CLI 所有操作增加 --project 参数 + 优先级解析 | 1, 3 |
|
||
| 7 | L2 prompt_template 注入 project_context + session_id 命名规则 | 3, 6 |
|
||
| 8 | Daemon 逻辑健康自检(按项目追踪 + 计数器超时兜底) | 4 |
|
||
| 9 | 项目归档/删除安全流程(working 任务处理) | 4, 5 |
|
||
| 10 | Dashboard 项目切换 + 多项目概览 | 课题9 |
|
||
|
||
---
|
||
|
||
## 附录:方案 B 的详细反驳
|
||
|
||
有人可能觉得方案 B(加 project_id 字段)更简单。但实际上:
|
||
|
||
1. **安全面**:方案 B 靠 `WHERE project_id = ?` 逻辑隔离。一个漏掉的 WHERE = 数据泄漏。方案 C 靠物理文件隔离,漏不掉。
|
||
|
||
2. **性能面**:方案 B 所有项目共享一个 SQLite 文件。10 个项目各 10 个任务 = 100 个任务在一个 `.db` 里。WAL 写入是串行的,多项目并发 tick 会互相等待。方案 C 每个项目独立文件,互不影响。
|
||
|
||
3. **运维面**:方案 B 删除项目 = `DELETE FROM tasks WHERE project_id = ?` + 8 张表都要删。方案 C = `rm -rf projects/xxx/`。备份/恢复同理。
|
||
|
||
4. **配置面**:方案 B 的 guardrails.yaml 要设计"全局默认 + 项目覆盖"的合并逻辑。方案 C 每个项目独立 config/ 目录,天然隔离,覆盖逻辑更清晰。
|
||
|
||
5. **经验沉淀面**:方案 B 的经验混在一个 experiences 表里。如果项目 A 的"pytest 参数经验"污染到项目 B(B 可能不做 Python),反而有害。方案 C 每个项目独立经验库。
|
||
|
||
**唯一方案 B 更优的场景**:跨项目统计分析("所有项目的平均完成时间")。但这个需求可以后期通过注册表元数据实现,不需要把所有数据放一个库里。
|