diff --git a/docs/design/07-spawner-acquire-first.md b/docs/design/07-spawner-acquire-first.md new file mode 100644 index 0000000..902c4c9 --- /dev/null +++ b/docs/design/07-spawner-acquire-first.md @@ -0,0 +1,280 @@ +# #07 Spawner Acquire-First 设计 + +> 状态:设计待评审 +> 作者:庞统 +> 日期:2026-06-01 +> 评审:司马懿 + +## 一、问题 + +### 1.1 现状 + +`spawn_full_agent()` 的检查流程是串行递进的: + +``` +session check (S1-S5) → counter acquire (C1-C4) → spawn subprocess +``` + +### 1.2 问题 + +| # | 问题 | 影响 | +|---|------|------| +| P1 | **mozi 内部竞争窗口**:两个 tick 同时检查同一 agent,session check 都通过 → counter acquire 时第二个被拦,但第一个还没 spawn,浪费了一轮检查 | 效率 | +| P2 | **session check 和 spawn 之间间隙大**:session check 通过后要经过 counter acquire 才到 spawn,这段时间 webchat 可能抢占 → Gateway lock 竞争 | 正确性 | +| P3 | **检查串行耦合**:S1→S2→S3→S4→S5 之间有隐含依赖(如 S2 判断依赖 S1 结果),维护困难 | 可维护性 | +| P4 | **所有 AgentBusyError 不可区分**:外部占用 vs mozi 占用 vs cooldown,dispatcher 统一处理为 skipped | 可观测性 | + +### 1.3 设计目标 + +**核心原则:先 acquire(锁住 mozi 内部),再检查(贴近 spawn),确保 Gateway 只收到确认空闲的请求。** + +- mozi 内部不会有两个 spawn 同时检查同一 agent +- session check 尽可能贴近 spawn,最小化外部抢占窗口 +- 检查逻辑从串行递进改为并列收集 + +## 二、设计方案 + +### 2.1 新执行顺序 + +``` +spawn_full_agent(agent_id, use_main_session=True) + │ + ├─ Phase 0: Pre-acquire 修复(无锁) + │ └─ 检测 timeout/failed → revive(仅在无 counter 占用时) + │ + ├─ Phase 1: Counter acquire(互斥锁) + │ └─ counter.acquire() → 失败则 AgentBusyError(moji_busy) + │ + ├─ Phase 2: Session check(在锁保护下,贴近 spawn) + │ ├─ 并列收集所有状态 + │ └─ 统一判定:任一不满足 → release counter → AgentBusyError(具体原因) + │ + ├─ Phase 3: on_checks_passed 回调 + │ + └─ Phase 4: spawn subprocess(无额外检查,直接执行) +``` + +### 2.2 对比 + +``` +【当前】 【改后】 +session check (S1-S5) Phase 0: pre-acquire revive(无锁) + ↓ ↓ +counter acquire (C1-C4) Phase 1: counter acquire(互斥锁) + ↓ ↓ +on_checks_passed Phase 2: session check(锁保护下) + ↓ ↓ +spawn subprocess Phase 3: on_checks_passed + ↓ + Phase 4: spawn subprocess +``` + +### 2.3 Phase 0: Pre-acquire 修复 + +S4(timeout/failed → revive)是**修复性操作**,不是判定。在 counter acquire 之前执行: + +```python +# Phase 0: Pre-acquire 修复(无锁,快速) +if use_main_session: + state = self._check_session_state(agent_id) + if state.get("status") in ("timeout", "failed"): + logger.info("Pre-acquire: %s status=%s, reviving", agent_id, state["status"]) + self._revive_session(agent_id) +``` + +**为什么不在锁内 revive?** 因为 revive 是写 sessions.json,如果 agent 正在被另一个 spawn 持有 counter(working),revive 不应该干扰。counter 未持有时 revive 是安全的。 + +**幂等性**:revive 只在 status=running 时改为 idle,多次调用安全。 + +### 2.4 Phase 1: Counter acquire + +Counter 作为 mozi 内部的互斥锁。acquire 包含: +- C1 cooldown +- C2 global 上限 +- C3 per-agent 上限 +- C4 per-session-key 上限 + +失败时抛 `AgentBusyError(agent_id, reason="counter_blocked", detail={...})`。 + +**改动**:去掉外层冗余的 `can_acquire` 调用,只用 `acquire`(acquire 内部已有 can_acquire 检查)。 + +```python +# Phase 1: Counter acquire +if self.counter and not skip_counter: + acquired = await self.counter.acquire(agent_id, _sid_key) + if not acquired: + raise AgentBusyError(agent_id, reason="counter_blocked") +``` + +### 2.5 Phase 2: Session check(在锁保护下) + +将 S1/S3/S5 改为**并列收集 → 统一判定**,而非串行递进: + +```python +# Phase 2: Session state check(锁保护下,贴近 spawn) +if use_main_session: + state = self._check_session_state(agent_id) + + # 收集所有 block 原因 + blockers = [] + + if state.get("lock_pid_alive") and not state.get("lock_expired"): + blockers.append(("session_locked", state.get("lock_pid"))) + + if state.get("status") == "running": + blockers.append(("session_running", None)) + + if state.get("recent_compact"): + blockers.append(("session_compacting", None)) + + # 统一判定 + if blockers: + # 释放 counter,报具体原因 + if self.counter: + self.counter.release(agent_id, _sid_key) + primary = blockers[0] + raise AgentBusyError(agent_id, reason=primary[0], detail={"blockers": blockers}) +``` + +**关键变化**: +1. 检查在 counter 锁内执行 → mozi 内部不会并发检查同一 agent +2. 并列收集所有问题 → 日志中能看到所有 block 原因 +3. session check 紧贴 spawn → 外部抢占窗口最小 + +### 2.6 Phase 3 & 4: 不变 + +`on_checks_passed` 回调和 `spawn subprocess` 逻辑不变。 + +### 2.7 AgentBusyError 增强 + +```python +class AgentBusyError(Exception): + def __init__(self, agent_id: str, reason: str = "", detail: dict = None): + self.agent_id = agent_id + self.reason = reason # "counter_blocked" | "session_locked" | "session_running" | "session_compacting" + self.detail = detail or {} + super().__init__(f"{agent_id}: {reason}") +``` + +Dispatcher 层可以区分原因: +```python +except AgentBusyError as e: + if e.reason.startswith("session_"): + # 外部占用(webchat/compact),等外部释放 + self._record_routing(task, decision, "skipped", f"Session busy: {e.reason}") + elif e.reason == "counter_blocked": + # mozi 内部占用,等 counter release + self._record_routing(task, decision, "skipped", "Agent busy (counter)") +``` + +## 三、sub session 路径 + +`use_main_session=False` 时: + +``` +spawn_full_agent(agent_id, use_main_session=False) + │ + ├─ Phase 1: Counter acquire + ├─ Phase 2: 跳过(sub session 不检查 session state) + ├─ Phase 3: on_checks_passed + └─ Phase 4: spawn subprocess +``` + +Sub session 有独立的 session file → 独立的 lock → 不受 main session 的外部占用影响。 +只需 counter acquire 保护 mozi 内部并发即可。 + +## 四、优化细节 + +### 4.1 O1: lock PID 死 + status=running 假死漏洞 + +**当前问题**:lock 文件存在但 PID 已死 + sessions.json status=running → 被 S3 拦截而非 revive。 + +**修复**:在 Phase 2 并列收集中增加判定: + +```python +if state.get("status") == "running" and not state.get("lock_pid_alive"): + # status=running 但 lock PID 已死 → 假死,revive + logger.warning("Agent %s: status=running but lock PID dead, reviving", agent_id) + self._revive_session(agent_id) + # revive 后重新检查 + state = self._check_session_state(agent_id) + if state.get("status") == "running": + blockers.append(("session_stuck", None)) +``` + +### 4.2 O2: 去掉冗余 can_acquire + +当前 spawner 中 `can_acquire` + `acquire` 分两步调用。`acquire` 内部已有 `can_acquire` 检查。 + +**改后**:只用 `acquire`,返回 False 时抛 AgentBusyError。 + +### 4.3 O3: AgentBusyError 携带具体原因 + +reason 字段区分:`counter_blocked` / `session_locked` / `session_running` / `session_compacting` / `session_stuck`。 + +Dispatcher/ticker 日志中能看到具体原因。 + +### 4.4 O4: revive 清理 lock 文件 + +`_revive_session` 改 sessions.json status 外,同时删除残留的 lock 文件: + +```python +def _revive_session(agent_id: str) -> bool: + # ... 改 status: running → idle ... + + # 清理残留 lock 文件 + if sf: + lock_path = Path(sf + ".lock") + if lock_path.exists(): + try: + lock_path.unlink() + logger.info("Cleaned stale lock for %s", agent_id) + except Exception: + pass +``` + +### 4.5 O5: compact 扫描条件收紧 + +当前 compact 扫描在 status 非 idle/done/unknown/None 时都触发,范围过宽。 + +**改后**:只在 status 为 running 或 compacting 相关时扫描: + +```python +# 只在这些状态下检查 compact +if result["status"] in ("running",) and sf: + result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) +``` + +注:Gateway 的 sessions.json status 实际值主要是 `idle/running/timeout/failed`。 +`running` 时检查 compact 有意义(agent turn 执行中可能触发 compact)。 +其他状态不需要检查。 + +## 五、改动范围 + +| 文件 | 改动 | 行数估计 | +|------|------|---------| +| `spawner.py` `spawn_full_agent()` | 重组检查顺序 | ~50 行 | +| `spawner.py` `_check_session_state()` | 不变(数据收集,不含判定) | 0 | +| `spawner.py` `_revive_session()` | 增加清理 lock 文件 | +8 行 | +| `spawner.py` `AgentBusyError` | 增加 reason/detail 字段 | +5 行 | +| `dispatcher.py` | 捕获 AgentBusyError 时区分原因(日志) | +10 行 | +| `counter.py` | 不变 | 0 | +| `ticker.py` | 不变 | 0 | + +**总计 ~70 行改动,单文件核心改动 ~50 行。** + +## 六、不在这个方案范围内 + +| 项目 | 说明 | 后续 | +|------|------|------| +| Per-agent 等待队列 | acquire 失败后排队而非 tick 重试 | 独立设计 #08 | +| Session watcher | 检测 webchat 释放后自动 dequeue | 依赖 #08 | +| counter.clear_cooldown | 正常完成后主动清除 cooldown | 可选优化 | + +## 七、验证计划 + +1. **V1 基本功能**:创建任务 → broadcast → claim → 执行 → 完成(不变) +2. **V2 mozi 内部竞争**:同 agent 两个 pending 任务 → 第二个 AgentBusyError(reason=counter_blocked) +3. **V3 webchat 争抢**:mozi 执行中 → webchat 发消息 → webchat 正常 → mozi spawn 检测到 session_locked +4. **V4 假死恢复**:lock PID 死 + status=running → 自动 revive → 正常 spawn +5. **V5 compact 期间**:compact 进行中 → spawn AgentBusyError(reason=session_compacting) → compact 结束后正常