14 KiB
#07 Spawner Acquire-First 设计
状态:设计待评审 作者:庞统 日期:2026-06-01 评审:司马懿
一、问题
1.1 现状
spawn_full_agent() 的检查流程是串行递进的:
session check (S1-S5) → counter acquire (C1-C4) → spawn subprocess
1.2 问题
| # | 问题 | 影响 |
|---|---|---|
| P1 | mozi 内部竞争窗口:两个 tick 同时检查同一 agent,session check 都通过 → counter acquire 时第二个被拦,但第一个还没 spawn,浪费了一轮检查 | 效率 |
| P2 | session check 和 spawn 之间间隙大:session check 通过后要经过 counter acquire 才到 spawn,这段时间 webchat 可能抢占 → Gateway lock 竞争 | 正确性 |
| P3 | 检查串行耦合:S1→S2→S3→S4→S5 之间有隐含依赖(如 S2 判断依赖 S1 结果),维护困难 | 可维护性 |
| P4 | 所有 AgentBusyError 不可区分:外部占用 vs mozi 占用 vs cooldown,dispatcher 统一处理为 skipped | 可观测性 |
1.3 设计目标
核心原则:先 acquire(锁住 mozi 内部),再检查(贴近 spawn),确保 Gateway 只收到确认空闲的请求。
- mozi 内部不会有两个 spawn 同时检查同一 agent
- session check 尽可能贴近 spawn,最小化外部抢占窗口
- 检查逻辑从串行递进改为并列收集
二、设计方案
2.1 新执行顺序
spawn_full_agent(agent_id, use_main_session=True)
│
├─ Phase 0: Pre-acquire 修复(无锁)
│ └─ 检测 timeout/failed → revive(仅在无 counter 占用时)
│
├─ Phase 1: Counter acquire(互斥锁)
│ └─ counter.acquire() → 失败则 AgentBusyError(moji_busy)
│
├─ Phase 2: Session check(在锁保护下,贴近 spawn)
│ ├─ 并列收集所有状态
│ └─ 统一判定:任一不满足 → release counter → AgentBusyError(具体原因)
│
├─ Phase 3: on_checks_passed 回调
│
└─ Phase 4: spawn subprocess(无额外检查,直接执行)
2.2 对比
【当前】 【改后】
session check (S1-S5) Phase 0: pre-acquire revive(无锁)
↓ ↓
counter acquire (C1-C4) Phase 1: counter acquire(互斥锁)
↓ ↓
on_checks_passed Phase 2: session check(锁保护下)
↓ ↓
spawn subprocess Phase 3: on_checks_passed
↓
Phase 4: spawn subprocess
2.3 Phase 0: Pre-acquire 修复
S4(timeout/failed → revive)是修复性操作,不是判定。在 counter acquire 之前执行:
# Phase 0: Pre-acquire 修复(无锁,快速)
if use_main_session:
state = self._check_session_state(agent_id)
if state.get("status") in ("timeout", "failed"):
logger.info("Pre-acquire: %s status=%s, reviving", agent_id, state["status"])
self._revive_session(agent_id)
为什么不在锁内 revive? 因为 revive 是写 sessions.json,如果 agent 正在被另一个 spawn 持有 counter(working),revive 不应该干扰。counter 未持有时 revive 是安全的。
幂等性:revive 只在 status=running 时改为 idle,多次调用安全。
2.4 Phase 1: Counter acquire
Counter 作为 mozi 内部的互斥锁。acquire 包含:
- C1 cooldown
- C2 global 上限
- C3 per-agent 上限
- C4 per-session-key 上限
失败时抛 AgentBusyError(agent_id, reason="counter_blocked", detail={...})。
改动:去掉外层冗余的 can_acquire 调用,只用 acquire(acquire 内部已有 can_acquire 检查)。
# Phase 1: Counter acquire
if self.counter and not skip_counter:
acquired = await self.counter.acquire(agent_id, _sid_key)
if not acquired:
raise AgentBusyError(agent_id, reason="counter_blocked")
2.5 Phase 2: Session check(在锁保护下)
将 S1/S3/S5 改为并列收集 → 统一判定,而非串行递进:
# Phase 2: Session state check(锁保护下,贴近 spawn)
if use_main_session:
state = self._check_session_state(agent_id)
# 收集所有 block 原因
blockers = []
if state.get("lock_pid_alive") and not state.get("lock_expired"):
blockers.append(("session_locked", state.get("lock_pid")))
if state.get("status") == "running":
blockers.append(("session_running", None))
if state.get("recent_compact"):
blockers.append(("session_compacting", None))
# 统一判定
if blockers:
# 释放 counter,报具体原因
if self.counter:
self.counter.release(agent_id, _sid_key)
primary = blockers[0]
raise AgentBusyError(agent_id, reason=primary[0], detail={"blockers": blockers})
关键变化:
- 检查在 counter 锁内执行 → mozi 内部不会并发检查同一 agent
- 并列收集所有问题 → 日志中能看到所有 block 原因
- session check 紧贴 spawn → 外部抢占窗口最小
2.6 Phase 3 & 4: 不变
on_checks_passed 回调和 spawn subprocess 逻辑不变。
2.7 AgentBusyError 增强
class AgentBusyError(Exception):
def __init__(self, agent_id: str, reason: str = "", detail: dict = None):
self.agent_id = agent_id
self.reason = reason # "counter_blocked" | "session_locked" | "session_running" | "session_compacting"
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
Dispatcher 层可以区分原因:
except AgentBusyError as e:
if e.reason.startswith("session_"):
# 外部占用(webchat/compact),等外部释放
self._record_routing(task, decision, "skipped", f"Session busy: {e.reason}")
elif e.reason == "counter_blocked":
# mozi 内部占用,等 counter release
self._record_routing(task, decision, "skipped", "Agent busy (counter)")
三、sub session 路径
use_main_session=False 时:
spawn_full_agent(agent_id, use_main_session=False)
│
├─ Phase 1: Counter acquire
├─ Phase 2: 跳过(sub session 不检查 session state)
├─ Phase 3: on_checks_passed
└─ Phase 4: spawn subprocess
Sub session 有独立的 session file → 独立的 lock → 不受 main session 的外部占用影响。 只需 counter acquire 保护 mozi 内部并发即可。
四、优化细节
4.1 O1: lock PID 死 + status=running 假死漏洞
当前问题:lock 文件存在但 PID 已死 + sessions.json status=running → 被 S3 拦截而非 revive。
修复:在 Phase 2 并列收集中增加判定:
if state.get("status") == "running" and not state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Agent %s: status=running but lock PID dead, reviving", agent_id)
self._revive_session(agent_id)
# revive 后重新检查
state = self._check_session_state(agent_id)
if state.get("status") == "running":
blockers.append(("session_stuck", None))
4.2 O2: 去掉冗余 can_acquire
当前 spawner 中 can_acquire + acquire 分两步调用。acquire 内部已有 can_acquire 检查。
改后:只用 acquire,返回 False 时抛 AgentBusyError。
4.3 O3: AgentBusyError 携带具体原因
reason 字段区分:counter_blocked / session_locked / session_running / session_compacting / session_stuck。
Dispatcher/ticker 日志中能看到具体原因。
4.4 O4: revive 清理 lock 文件
_revive_session 改 sessions.json status 外,同时删除残留的 lock 文件:
def _revive_session(agent_id: str) -> bool:
# ... 改 status: running → idle ...
# 清理残留 lock 文件
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s", agent_id)
except Exception:
pass
4.5 O5: compact 扫描条件收紧
当前 compact 扫描在 status 非 idle/done/unknown/None 时都触发,范围过宽。
改后:只在 status 为 running 或 compacting 相关时扫描:
# 只在这些状态下检查 compact
if result["status"] in ("running",) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
注:Gateway 的 sessions.json status 实际值主要是 idle/running/timeout/failed。
running 时检查 compact 有意义(agent turn 执行中可能触发 compact)。
其他状态不需要检查。
五、改动范围
| 文件 | 改动 | 行数估计 |
|---|---|---|
spawner.py spawn_full_agent() |
重组检查顺序 | ~50 行 |
spawner.py _check_session_state() |
不变(数据收集,不含判定) | 0 |
spawner.py _revive_session() |
增加清理 lock 文件 | +8 行 |
spawner.py AgentBusyError |
增加 reason/detail 字段 | +5 行 |
dispatcher.py |
捕获 AgentBusyError 时区分原因(日志) | +10 行 |
counter.py |
不变 | 0 |
ticker.py |
不变 | 0 |
总计 ~70 行改动,单文件核心改动 ~50 行。
六、Executor/Review 统一(#07.2)
6.1 问题
Executor 和 Review 本质上都是「agent 执行子任务」,spawner 层已完全统一。但 ticker 和 dispatcher 层存在大量 review-only 特殊判断,增加维护负担且逻辑不完整(executor 反而缺少某些保护)。
冗余的特殊路径:
| 特殊路径 | 位置 | 问题 |
|---|---|---|
| Fix-3b review_timeout | _check_timeouts 末尾 |
独立的 15min 超时回收,与上方 process_dead 检测功能重叠 |
_check_crash_limit 只在 review |
_dispatch_reviews |
executor 路径没有 crash 上限保护 |
| review 的 crash rollback | dispatcher on_complete |
executor crash 时没有回退 current_agent |
6.2 统一原则
所有非终态状态(claimed/working/review)的超时、crash、进程检测逻辑统一走 _check_timeouts,不因状态类型分叉。
Executor 和 Review 的唯二必须差异:
- completion_status:executor 完成 → 标 review;review 完成 → 标 done
- dispatch 入口:executor 从 pending dispatch;review 从 review 状态 dispatch(检查产出后再 dispatch)
其余全是同一套逻辑,不应有 if-review 分支。
6.3 改动
6.3.1 删除 Fix-3b(~40 行)
_check_timeouts 末尾的 review_timeout_minutes 整段删除(约 40 行)。
理由:
- v2.7.2 的 process_dead 检测已覆盖「进程死 → 推回 pending」的场景
- review agent crash → 进程死 → process_dead → 推回 pending →
_dispatch_reviews重新 dispatch → crash_limit 拦截 - Fix-3b 用超时时间(15min)做同一件事,是冗余的特殊路径
6.3.2 crash_limit 统一到 _check_timeouts
把 _dispatch_reviews 里的 _check_crash_limit 调用移到 _check_timeouts,覆盖 working 和 review 状态:
# _check_timeouts 中,working/review 超时检测之前
if status in ("working", "review"):
if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
# 标 failed,写 observation
...
continue
同时删除 _dispatch_reviews 里的 crash_limit 检查(~15 行)。
6.3.3 on_complete crash 回退统一
dispatcher _task_on_complete 中:
- 删除
_is_review分支中的ROLLBACK_CURRENT_AGENT_OUTCOMES处理(~20 行) - 改为 executor/review 共用统一的 crash 回退逻辑:
def _task_on_complete(aid, outcome):
try:
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES:
# 统一 crash 回退:executor 和 review 都回退 current_agent
_dispatcher._rollback_current_agent(_task_db, _task_id, aid)
if _is_review:
if outcome in ("completed", "session_revived"):
_dispatcher._mark_task_status(_task_db, _task_id, "done")
# else: crash/error,保持 review,等待 ticker 处理
else:
_dispatcher._task_auto_complete(_task_id, _task_db)
except Exception as e:
logger.error("Task %s: on_complete error: %s", _task_id, e)
6.3.4 _dispatch_reviews 精简
删除 crash_limit 检查后,_dispatch_reviews 只保留 review 特有的业务逻辑:
- 检查已有 review 记录 → 跳过
- 检查活跃 routing → 防重复
- 检查产出 → 无产出标 failed
- 调度 review agent
6.4 改动范围
| 文件 | 改动 | 行数 |
|---|---|---|
ticker.py _check_timeouts |
增加 crash_limit 覆盖 working/review | +15 行 |
ticker.py _check_timeouts |
删除 Fix-3b 整段 | -40 行 |
ticker.py _dispatch_reviews |
删除 crash_limit 检查 | -15 行 |
dispatcher.py _task_on_complete |
统一 crash 回退 | ~25 行(净减 ~10 行) |
spawner.py |
不变 | 0 |
总计:净减 ~30 行,简化逻辑。
6.5 验证
| 测试 | 预期 |
|---|---|
| executor crash 3 次 | 统一走 _check_timeouts → failed |
| review crash 3 次 | 同上(不再走 _dispatch_reviews 的 crash_limit) |
| review agent 进程死 | process_dead 检测 → 推回 pending(不再走 Fix-3b) |
| executor crash 后 current_agent | 回退到 assignee(统一 rollback) |
| review 正常完成 | 标 done(不变) |
| executor 正常完成 | 标 review(不变) |
七、不在这个方案范围内
| 项目 | 说明 | 后续 |
|---|---|---|
| Per-agent 等待队列 | acquire 失败后排队而非 tick 重试 | 独立设计 #08 |
| Session watcher | 检测 webchat 释放后自动 dequeue | 依赖 #08 |
| counter.clear_cooldown | 正常完成后主动清除 cooldown | 可选优化 |
八、验证计划
- V1 基本功能:创建任务 → broadcast → claim → 执行 → 完成(不变)
- V2 mozi 内部竞争:同 agent 两个 pending 任务 → 第二个 AgentBusyError(reason=counter_blocked)
- V3 webchat 争抢:mozi 执行中 → webchat 发消息 → webchat 正常 → mozi spawn 检测到 session_locked
- V4 假死恢复:lock PID 死 + status=running → 自动 revive → 正常 spawn
- V5 compact 期间:compact 进行中 → spawn AgentBusyError(reason=session_compacting) → compact 结束后正常