diff --git a/docs/design/topic11-multi-project-proposal.md b/docs/design/topic11-multi-project-proposal.md index 02751db..d2da9d2 100644 --- a/docs/design/topic11-multi-project-proposal.md +++ b/docs/design/topic11-multi-project-proposal.md @@ -230,14 +230,42 @@ import time from pathlib import Path +class ActiveAgentCounter: + """线程安全的 Agent 活跃任务计数器。""" + + def __init__(self, max_global: int = 5): + self._counts: dict[str, int] = {} # agent_id → 活跃任务数 + self._total = 0 # 全局活跃总数 + self._max_global = max_global + self._lock = threading.Lock() + + def can_acquire(self, agent_id: str, max_per_agent: int = 1) -> bool: + """检查是否可以分配(非阻塞)。""" + with self._lock: + if self._total >= self._max_global: + return False + return self._counts.get(agent_id, 0) < max_per_agent + + def increment(self, agent_id: str): + with self._lock: + self._counts[agent_id] = self._counts.get(agent_id, 0) + 1 + self._total += 1 + + def decrement(self, agent_id: str): + with self._lock: + if self._counts.get(agent_id, 0) > 0: + self._counts[agent_id] -= 1 + self._total -= 1 + + class Daemon: """单进程 Daemon,per-project 线程并发。""" def __init__(self, config: DaemonConfig): self.config = config - self.llm_semaphore = threading.Semaphore(config.max_concurrent_llm) # 默认 3 - self.agent_locks: dict[str, threading.Lock] = {} # per-agent 互斥 + self.agent_counter = ActiveAgentCounter(max_global=config.max_global_active) # 默认 5 self.slots: dict[str, ProjectSlot] = {} + self._slot_threads: dict[str, threading.Thread] = {} # 线程存活监控 self._shutdown = threading.Event() def start(self): @@ -246,20 +274,33 @@ class Daemon: for project_id, meta in registry["projects"].items(): if meta["status"] != "active": continue - slot = ProjectSlot( - project_id=project_id, - config=meta, - llm_semaphore=self.llm_semaphore, - agent_locks=self.agent_locks, - tick_interval=self.config.tick_interval, # 默认 30s - shutdown_event=self._shutdown, - ) - self.slots[project_id] = slot - t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True) - t.start() + self._start_slot(project_id, meta) - # 主线程等待 shutdown - self._shutdown.wait() + # 主线程:监控 + 等待 shutdown + while not self._shutdown.is_set(): + self._check_slot_health() + self._shutdown.wait(60) # 每 60s 检查一次线程存活 + + def _start_slot(self, project_id: str, meta: dict): + slot = ProjectSlot( + project_id=project_id, + config=meta, + agent_counter=self.agent_counter, + tick_interval=self.config.tick_interval, + shutdown_event=self._shutdown, + ) + self.slots[project_id] = slot + t = threading.Thread(target=slot.run_loop, name=f"project-{project_id}", daemon=True) + t.start() + self._slot_threads[project_id] = t + + def _check_slot_health(self): + """检查所有 ProjectSlot 线程是否存活,死亡则重启。""" + for project_id, thread in list(self._slot_threads.items()): + if not thread.is_alive(): + logger.warning(f"ProjectSlot {project_id} thread died, restarting...") + meta = self.slots[project_id].config + self._start_slot(project_id, meta) def shutdown(self): self._shutdown.set() @@ -268,13 +309,12 @@ class Daemon: class ProjectSlot: """单项目的独立 tick 循环。""" - def __init__(self, project_id, config, llm_semaphore, agent_locks, + def __init__(self, project_id, config, agent_counter, tick_interval=30, shutdown_event=None): self.project_id = project_id self.conn = sqlite_connect(Path(f"projects/{project_id}/blackboard.db")) self.config = config - self.llm_sem = llm_semaphore # 共享:全局 LLM 信号量 - self.agent_locks = agent_locks # 共享:per-agent 锁字典 + self.agent_counter = agent_counter # 共享:全局 Agent 活跃计数器 self.tick_interval = tick_interval self.shutdown = shutdown_event or threading.Event() self.health = DaemonHealth(project_id) @@ -286,10 +326,15 @@ class ProjectSlot: self._tick() except Exception as e: logger.error(f"[{self.project_id}] tick failed: {e}") - self.shutdown.wait(self.tick_interval) # 可被 shutdown 中断的 sleep + self.shutdown.wait(self.tick_interval) def _tick(self): """单次 tick:找 pending 任务,尝试分配。""" + # 先检查已完成的 Agent,释放计数器 + completed = self._check_working_tasks() # 返回已完成的 agent_id 列表 + for agent_id in completed: + self.agent_counter.decrement(agent_id) + pending = find_pending(self.conn) if not pending: self.health.record_idle() @@ -297,75 +342,126 @@ class ProjectSlot: for task in pending: agent_id = task["assignee"] - lock = self.agent_locks.setdefault(agent_id, threading.Lock()) + max_per_agent = self.config.get("max_active_per_agent", 1) - # 非阻塞尝试:Agent 正忙就跳过,不排队等 - if not lock.acquire(blocking=False): - logger.info(f"[{self.project_id}] {agent_id} busy, skip task {task['id']}") + # 检查全局 + per-agent 并发上限 + if not self.agent_counter.can_acquire(agent_id, max_per_agent): + logger.info(f"[{self.project_id}] {agent_id} at capacity, skip task {task['id']}") continue - try: - # 等待全局 LLM 槽位(阻塞,但持有 agent_lock) - self.llm_sem.acquire() - try: - spawn_agent(self.project_id, task, self.conn) - self.health.record_change() - finally: - # LLM 调用完成后释放信号量 - # 注意:Agent spawn 后 LLM 调用即完成,不需要等 Agent 执行完 - self.llm_sem.release() - finally: - lock.release() + # sequential 模式:检查该 Agent 在本项目是否有 working 任务 + if self.config.get("agent_parallelism") != "parallel": + if has_working_task(self.conn, agent_id): + continue - # 检查 working 任务超时等 - self._check_working_tasks() + # 生成 session_id(§5.4.6 命名规则) + session_id = self._get_session_id(agent_id, task['id']) + + # spawn Agent(异步,不阻塞) + spawn_agent( + project_id=self.project_id, + task=task, + session_id=session_id, + ) + self.agent_counter.increment(agent_id) # +1 + self.health.record_change() + + def _get_session_id(self, agent_id: str, task_id: str) -> str: + """§5.4.6 session 命名规则。""" + if self.config.get("agent_parallelism") != "parallel": + # sequential:同一项目同一 Agent 复用 session(保持上下文连续性) + return f"agent:{agent_id}:project:{self.project_id}" + else: + # parallel:每个任务独立 session + return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}" ``` -#### 5.4.5 资源释放时序 +#### 5.4.5 资源控制模型(v2 修订:异步计数器替代同步信号量) + +**原设计问题**:spawn_agent() 不是瞬时操作——Agent 执行涉及 LLM API 调用(可能多轮工具调用),如果在 spawn 完成后才释放锁/信号量,并发退化回串行;如果在 spawn 启动后立即释放,信号量没有真正限流。 + +**修订方案**(采纳司马懿评审建议): + +| 资源 | 原方案 | 修订方案 | 原因 | +|------|--------|---------|------| +| Agent 互斥 | `threading.Lock` | 移除。改为 per-project session 命名 + `_check_working_tasks()` | 同一 Agent 可用不同 session-id 安全服务不同项目 | +| LLM 并发 | `threading.Semaphore` | `ActiveAgentCounter`(异步计数器) | spawn 是异步的,同步信号量无法精确限流 | +| 项目隔离 | per-project SQLite | 不变 | | + +**新时序**: ``` ProjectSlot._tick() │ - ├── lock.acquire() # 拿到 Agent 锁 - ├── llm_sem.acquire() # 拿到 LLM 槽位 - ├── spawn_agent() # spawn Agent 子进程(LLM 调用在 spawn 瞬间完成) - ├── llm_sem.release() # ✅ spawn 完立即释放 LLM 槽位 - ├── lock.release() # ✅ spawn 完立即释放 Agent 锁 + ├── 检查 ActiveAgentCounter[agent_id] < max_active? + │ └── 否 → 跳过,下个 tick 再检查 + ├── 检查 agent_parallelism == "sequential" 且该 Agent 有 working 任务? + │ └── 是 → 跳过 + ├── spawn_agent(project_id, task) + │ ├── session_id = "agent:{agent_id}:project:{project_id}:task:{task_id}" + │ └── ActiveAgentCounter.increment(agent_id) # +1 │ - └── Agent 子进程独立运行 → 完成后写 output.json → 下次 tick 检测 + └── Agent 完成回调(下次 tick 检测到 output 或 webhook) + └── ActiveAgentCounter.decrement(agent_id) # -1 ``` -**关键设计决策**:`lock` 和 `llm_sem` 在 `spawn_agent()` 返回后立即释放,**不等 Agent 执行完成**。 +**关键变化**: +1. 不再使用 `threading.Lock` 和 `threading.Semaphore`——它们是同步原语,不适合异步 spawn 场景 +2. 改用 `ActiveAgentCounter`(线程安全计数器),spawn 时 +1,Agent 完成回调时 -1 +3. `_tick()` 分配前检查计数器,超过阈值就跳过 +4. Agent session 按 `agent:{agent_id}:project:{project_id}:task:{task_id}` 命名,项目+任务级天然隔离 -原因: -1. `spawn_agent()` 是 `subprocess.Popen` 或 `openclaw agent` CLI 调用,启动后立即返回 -2. Agent 执行是异步的(子进程独立运行) -3. 如果等 Agent 执行完才释放锁,并发就退化回串行 +#### 5.4.6 Agent 并行策略 + Session 隔离 -**这意味着**:同一个 Agent 理论上可以同时有多个任务在跑。如果需要严格串行(一个 Agent 同一时刻只有一个任务),则改为在下次 tick 的 `_check_working_tasks()` 中检测 Agent 是否有 working 任务,有则不再分配新任务。 - -#### 5.4.6 Agent 并行策略配置 +**并行策略配置**: ```yaml # _registry.yaml 中可配置 projects: quant-momentum: agent_parallelism: sequential # 同一 Agent 同一时刻只跑一个任务(默认) + max_active_per_agent: 1 # sequential 的显式写法 quant-pairs: agent_parallelism: parallel # 同一 Agent 可同时跑多个任务 + max_active_per_agent: 2 # 最多 2 个并行 ``` -默认 `sequential`:`_tick()` 分配任务前先检查该 Agent 是否有 working 任务。 +**Session 命名规则**: + +``` +格式:agent:{agent_id}:project:{project_id}:task:{task_id} +示例:agent:zhangfei-dev:project:quant-momentum:task:task-001 +``` + +- 每个任务独立 session,任务间上下文不串 +- 同一 Agent 在不同项目用不同 session,项目间上下文不串 +- `sequential` 模式:同一项目同一 Agent 只有一个活跃 session(新的任务复用或新开) +- `parallel` 模式:每个任务独立 session + +**sequential 模式下的 session 复用**: + +```python +def _get_session_id(self, agent_id: str, task_id: str) -> str: + project_config = self.config + if project_config.get("agent_parallelism") != "parallel": + # sequential:同一项目同一 Agent 复用 session(保持上下文连续性) + return f"agent:{agent_id}:project:{self.project_id}" + else: + # parallel:每个任务独立 session + return f"agent:{agent_id}:project:{self.project_id}:task:{task_id}" +``` #### 5.4.7 并发安全保证 | 并发场景 | 风险 | 保护机制 | |---------|------|---------| | 两个项目同时写同一个 SQLite | 数据损坏 | 每个项目独立 `.db` 文件,不存在此场景 | -| 两个项目同时分配同一个 Agent | Agent session 上下文串 | per-agent Lock 互斥 | -| 三个项目同时调 LLM API | API 限流/超限 | 全局 Semaphore 限流 | -| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活 | +| 两个项目同时分配同一个 Agent | Agent 资源争抢 | `ActiveAgentCounter` + `max_active_per_agent` 限制 | +| LLM API 并发超限 | API 限流/超限 | `ActiveAgentCounter` 全局计数,`_tick()` 分配前检查 | +| ProjectSlot 线程异常退出 | 项目 tick 停止 | try/except 包裹 + Daemon 监控线程存活(§5.4.8) | | Daemon 主进程崩溃 | 所有项目停止 | PM2 自动重启 + SQLite WAL 保护数据完整性 | +| Agent 完成回调丢失 | 计数器不归零 | 超时兜底:working 任务超过 `task_timeout` 视为完成,计数器 -1 | +| _registry.yaml 并发写入 | 数据损坏 | _registry.yaml 只在 CLI 操作时读写(非 tick 热路径),tick 状态用内存 dict | ## 6. CLI 变更