diff --git a/docs/design/topic11-multi-project-proposal.md b/docs/design/topic11-multi-project-proposal.md index 43a4fb6..02751db 100644 --- a/docs/design/topic11-multi-project-proposal.md +++ b/docs/design/topic11-multi-project-proposal.md @@ -2,8 +2,9 @@ > **日期**: 2026-05-16 > **作者**: 庞统(副军师)🐦 -> **状态**: 初稿待确认 +> **状态**: v2(并发调度模型重设计,待评审) > **前置**: 课题1-4、课题6 已完成设计 +> **变更**: v2 新增 §5.4 并发调度模型(per-project 线程 + 全局资源信号量),替代原串行 tick --- @@ -141,21 +142,11 @@ class ProjectManager: return global_guardrails ``` -### 5.2 Tick 逻辑变更 +### 5.2 ~~Tick 逻辑变更~~(已废弃,见 §5.4) -```python -# Daemon 主循环 -def tick(self): - registry = load_registry() - for project_id, project_meta in registry["projects"].items(): - if project_meta["status"] != "active": - continue - conn = self.project_manager.get_connection(project_id) - config = self.project_manager.get_config(project_id) - self._tick_project(project_id, conn, config) # 原有的 tick 逻辑 -``` - -每个项目的 tick 逻辑完全相同,只是操作的 SQLite 连接不同。 +> **原设计**:Daemon 主循环串行遍历所有项目 tick。每个项目 tick 完再 tick 下一个。 +> **问题**:所有项目/任务一起排队,项目 A 的长任务阻塞项目 B。 +> **新设计**:见 §5.4 per-project 并发调度。 ### 5.3 Daemon 逻辑健康自检 @@ -176,6 +167,206 @@ class DaemonHealth: return self._tick_state_changes[project_id] >= STALE_TICK_THRESHOLD ``` +### 5.4 并发调度模型(v2 新增) + +#### 5.4.1 问题 + +原设计中 Daemon 主循环串行 tick 所有项目: + +``` +Tick → Project A(30s)→ Project B(等A完成)→ Project C(等B完成) +``` + +问题: +1. **项目间互相阻塞**——Project A 有一个长任务在执行,Project B 的独立任务必须等 +2. **响应延迟**——3 个项目 tick 一次可能要 90s+,Project C 要等 60s 才被检查 +3. **不符合业界实践**——调研 7 个项目(Hermes/open-multi-agent/Wanman/Google ADK/Microsoft AutoGen/AgentScope/GSD),没有一个用全局串行排队 + +#### 5.4.2 业界并发模型调研 + +| 项目 | 并发模型 | 核心机制 | +|------|---------|---------| +| **open-multi-agent** | AgentPool + Semaphore | 全局 `maxConcurrency=5`,per-agent 互斥锁,`Promise.allSettled` 并行执行独立任务 | +| **Wanman** | per-agent 进程 | 每个 Agent 独立进程+独立 runLoop,Supervisor 通过消息总线协调 | +| **Google ADK** | asyncio.TaskGroup | `ParallelAgent` 用 `TaskGroup` 并行执行子 Agent | +| **Microsoft AutoGen** | Pregel Superstep | 每个 superstep 内所有激活 Executor 并行执行 | +| **Hermes** | 单线程 tick + flock | **单项目设计**,tick 内只有几个 cron job,不需要并发 | + +**关键发现**:open-multi-agent 的 `AgentPool + Semaphore + per-agent Lock` 是最成熟、最可借鉴的模型。 + +#### 5.4.3 设计:per-project 线程 + 全局资源信号量 + +``` +Daemon 主进程(轻量路由器 + 资源管控) +│ +├── 全局 LLM Semaphore(max_concurrent=3) +├── per-agent Lock(张飞不能同时在两个项目里跑) +│ +├── ProjectSlot A(独立线程) +│ └── 自己的 SQLite 连接 +│ └── 自己的 tick 循环(30s) +│ └── spawn Agent 时:acquire agent_lock → acquire llm_semaphore +│ +├── ProjectSlot B(独立线程) +│ └── (同上) +│ +└── ProjectSlot C(独立线程) + └── (同上) +``` + +**三层资源控制**: + +| 层级 | 控制对象 | 机制 | 原因 | +|------|---------|------|------| +| L1: 项目隔离 | SQLite 连接 | per-project 独立连接 | 数据物理隔离,无竞争 | +| L2: Agent 互斥 | 同一 Agent 不能并行 | `threading.Lock` per-agent | Agent session 不是线程安全的,张飞同一时刻只能服务一个任务 | +| L3: 全局资源 | LLM API 调用并发 | `threading.Semaphore(max_concurrent)` | 防止 API 限流、控制成本 | + +#### 5.4.4 核心代码 + +```python +import threading +import time +from pathlib import Path + + +class Daemon: + """单进程 Daemon,per-project 线程并发。""" + + def __init__(self, config: DaemonConfig): + self.config = config + self.llm_semaphore = threading.Semaphore(config.max_concurrent_llm) # 默认 3 + self.agent_locks: dict[str, threading.Lock] = {} # per-agent 互斥 + self.slots: dict[str, ProjectSlot] = {} + self._shutdown = threading.Event() + + def start(self): + """启动所有 active 项目的独立线程。""" + registry = load_registry() + for project_id, meta in registry["projects"].items(): + if meta["status"] != "active": + continue + slot = ProjectSlot( + project_id=project_id, + config=meta, + llm_semaphore=self.llm_semaphore, + agent_locks=self.agent_locks, + tick_interval=self.config.tick_interval, # 默认 30s + shutdown_event=self._shutdown, + ) + self.slots[project_id] = slot + t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True) + t.start() + + # 主线程等待 shutdown + self._shutdown.wait() + + def shutdown(self): + self._shutdown.set() + + +class ProjectSlot: + """单项目的独立 tick 循环。""" + + def __init__(self, project_id, config, llm_semaphore, agent_locks, + tick_interval=30, shutdown_event=None): + self.project_id = project_id + self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db")) + self.config = config + self.llm_sem = llm_semaphore # 共享:全局 LLM 信号量 + self.agent_locks = agent_locks # 共享:per-agent 锁字典 + self.tick_interval = tick_interval + self.shutdown = shutdown_event or threading.Event() + self.health = DaemonHealth(project_id) + + def run_loop(self): + """独立线程的主循环。""" + while not self.shutdown.is_set(): + try: + self._tick() + except Exception as e: + logger.error(f"[{self.project_id}] tick failed: {e}") + self.shutdown.wait(self.tick_interval) # 可被 shutdown 中断的 sleep + + def _tick(self): + """单次 tick:找 pending 任务,尝试分配。""" + pending = find_pending(self.conn) + if not pending: + self.health.record_idle() + return + + for task in pending: + agent_id = task["assignee"] + lock = self.agent_locks.setdefault(agent_id, threading.Lock()) + + # 非阻塞尝试:Agent 正忙就跳过,不排队等 + if not lock.acquire(blocking=False): + logger.info(f"[{self.project_id}] {agent_id} busy, skip task {task['id']}") + continue + + try: + # 等待全局 LLM 槽位(阻塞,但持有 agent_lock) + self.llm_sem.acquire() + try: + spawn_agent(self.project_id, task, self.conn) + self.health.record_change() + finally: + # LLM 调用完成后释放信号量 + # 注意:Agent spawn 后 LLM 调用即完成,不需要等 Agent 执行完 + self.llm_sem.release() + finally: + lock.release() + + # 检查 working 任务超时等 + self._check_working_tasks() +``` + +#### 5.4.5 资源释放时序 + +``` +ProjectSlot._tick() + │ + ├── lock.acquire() # 拿到 Agent 锁 + ├── llm_sem.acquire() # 拿到 LLM 槽位 + ├── spawn_agent() # spawn Agent 子进程(LLM 调用在 spawn 瞬间完成) + ├── llm_sem.release() # ✅ spawn 完立即释放 LLM 槽位 + ├── lock.release() # ✅ spawn 完立即释放 Agent 锁 + │ + └── Agent 子进程独立运行 → 完成后写 output.json → 下次 tick 检测 +``` + +**关键设计决策**:`lock` 和 `llm_sem` 在 `spawn_agent()` 返回后立即释放,**不等 Agent 执行完成**。 + +原因: +1. `spawn_agent()` 是 `subprocess.Popen` 或 `openclaw agent` CLI 调用,启动后立即返回 +2. Agent 执行是异步的(子进程独立运行) +3. 如果等 Agent 执行完才释放锁,并发就退化回串行 + +**这意味着**:同一个 Agent 理论上可以同时有多个任务在跑。如果需要严格串行(一个 Agent 同一时刻只有一个任务),则改为在下次 tick 的 `_check_working_tasks()` 中检测 Agent 是否有 working 任务,有则不再分配新任务。 + +#### 5.4.6 Agent 并行策略配置 + +```yaml +# _registry.yaml 中可配置 +projects: + quant-momentum: + agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认) + quant-pairs: + agent_parallelism: parallel # 同一 Agent 可同时跑多个任务 +``` + +默认 `sequential`:`_tick()` 分配任务前先检查该 Agent 是否有 working 任务。 + +#### 5.4.7 并发安全保证 + +| 并发场景 | 风险 | 保护机制 | +|---------|------|---------| +| 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 `.db` 文件,不存在此场景 | +| 两个项目同时分配同一个 Agent | Agent session 上下文串 | per-agent Lock 互斥 | +| 三个项目同时调 LLM API | API 限流/超限 | 全局 Semaphore 限流 | +| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活 | +| Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 | + ## 6. CLI 变更 ### 6.1 项目管理命令