Files
sanguo_moziplus_v2/docs/design/07-spawner-acquire-first.md
cfdaily cb81251111
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
fix(docs): 24-compact-detection-fix → 15-compact-detection-fix 路径引用修正
2026-06-13 14:45:34 +08:00

23 KiB
Raw Permalink Blame History

#07 Spawner Acquire-First 设计

状态: 已完成(#07.1-#07.2 已实施) 作者:庞统 日期:2026-06-01 评审:司马懿

一、问题

1.1 现状

spawn_full_agent() 的检查流程是串行递进的:

session check (S1-S5) → counter acquire (C1-C4) → spawn subprocess

1.2 问题

# 问题 影响
P1 mozi 内部竞争窗口:两个 tick 同时检查同一 agentsession check 都通过 → counter acquire 时第二个被拦,但第一个还没 spawn,浪费了一轮检查 效率
P2 session check 和 spawn 之间间隙大session check 通过后要经过 counter acquire 才到 spawn,这段时间 webchat 可能抢占 → Gateway lock 竞争 正确性
P3 检查串行耦合S1→S2→S3→S4→S5 之间有隐含依赖(如 S2 判断依赖 S1 结果),维护困难 可维护性
P4 所有 AgentBusyError 不可区分:外部占用 vs mozi 占用 vs cooldowndispatcher 统一处理为 skipped 可观测性

1.3 设计目标

核心原则:先 acquire(锁住 mozi 内部),再检查(贴近 spawn),确保 Gateway 只收到确认空闲的请求。

  • mozi 内部不会有两个 spawn 同时检查同一 agent
  • session check 尽可能贴近 spawn,最小化外部抢占窗口
  • 检查逻辑从串行递进改为并列收集

二、设计方案

2.1 新执行顺序

spawn_full_agent(agent_id, use_main_session=True)
  │
  ├─ Phase 0: Pre-acquire 修复(无锁)
  │   └─ 检测 timeout/failed → revive(仅在无 counter 占用时)
  │
  ├─ Phase 1: Counter acquire(互斥锁)
  │   └─ counter.acquire() → 失败则 AgentBusyError(moji_busy)
  │
  ├─ Phase 2: Session check(在锁保护下,贴近 spawn)
  │   ├─ 并列收集所有状态
  │   └─ 统一判定:任一不满足 → release counter → AgentBusyError(具体原因)
  │
  ├─ Phase 3: on_checks_passed 回调
  │
  └─ Phase 4: spawn subprocess(无额外检查,直接执行)

2.2 对比

【当前】                          【改后】
session check (S1-S5)            Phase 0: pre-acquire revive(无锁)
  ↓                                ↓
counter acquire (C1-C4)          Phase 1: counter acquire(互斥锁)
  ↓                                ↓
on_checks_passed                  Phase 2: session check(锁保护下)
  ↓                                ↓
spawn subprocess                  Phase 3: on_checks_passed
                                   ↓
                                 Phase 4: spawn subprocess

2.3 Phase 0: Pre-acquire 修复

S4timeout/failed → revive)是修复性操作,不是判定。在 counter acquire 之前执行:

# Phase 0: Pre-acquire 修复(无锁,快速)
if use_main_session:
    state = self._check_session_state(agent_id)
    if state.get("status") in ("timeout", "failed"):
        logger.info("Pre-acquire: %s status=%s, reviving", agent_id, state["status"])
        self._revive_session(agent_id)

为什么不在锁内 revive 因为 revive 是写 sessions.json,如果 agent 正在被另一个 spawn 持有 counterworking),revive 不应该干扰。counter 未持有时 revive 是安全的。

幂等性revive 只在 status=running 时改为 idle,多次调用安全。

2.4 Phase 1: Counter acquire

Counter 作为 mozi 内部的互斥锁。acquire 包含:

  • C1 cooldown
  • C2 global 上限
  • C3 per-agent 上限
  • C4 per-session-key 上限

失败时抛 AgentBusyError(agent_id, reason="counter_blocked", detail={...})

改动:去掉外层冗余的 can_acquire 调用,只用 acquireacquire 内部已有 can_acquire 检查)。

# Phase 1: Counter acquire
if self.counter and not skip_counter:
    acquired = await self.counter.acquire(agent_id, _sid_key)
    if not acquired:
        raise AgentBusyError(agent_id, reason="counter_blocked")

2.5 Phase 2: Session check(在锁保护下)

将 S1/S3/S5 改为并列收集 → 统一判定,而非串行递进:

# Phase 2: Session state check(锁保护下,贴近 spawn
if use_main_session:
    state = self._check_session_state(agent_id)
    
    # 收集所有 block 原因
    blockers = []
    
    if state.get("lock_pid_alive") and not state.get("lock_expired"):
        blockers.append(("session_locked", state.get("lock_pid")))
    
    if state.get("status") == "running":
        blockers.append(("session_running", None))
    
    if state.get("recent_compact"):
        blockers.append(("session_compacting", None))
    
    # 统一判定
    if blockers:
        # 释放 counter,报具体原因
        if self.counter:
            self.counter.release(agent_id, _sid_key)
        primary = blockers[0]
        raise AgentBusyError(agent_id, reason=primary[0], detail={"blockers": blockers})

关键变化

  1. 检查在 counter 锁内执行 → mozi 内部不会并发检查同一 agent
  2. 并列收集所有问题 → 日志中能看到所有 block 原因
  3. session check 紧贴 spawn → 外部抢占窗口最小

2.6 Phase 3 & 4: 不变

on_checks_passed 回调和 spawn subprocess 逻辑不变。

2.7 AgentBusyError 增强

class AgentBusyError(Exception):
    def __init__(self, agent_id: str, reason: str = "", detail: dict = None):
        self.agent_id = agent_id
        self.reason = reason      # "counter_blocked" | "session_locked" | "session_running" | "session_compacting"
        self.detail = detail or {}
        super().__init__(f"{agent_id}: {reason}")

Dispatcher 层可以区分原因:

except AgentBusyError as e:
    if e.reason.startswith("session_"):
        # 外部占用(webchat/compact),等外部释放
        self._record_routing(task, decision, "skipped", f"Session busy: {e.reason}")
    elif e.reason == "counter_blocked":
        # mozi 内部占用,等 counter release
        self._record_routing(task, decision, "skipped", "Agent busy (counter)")

三、sub session 路径

use_main_session=False 时:

spawn_full_agent(agent_id, use_main_session=False)
  │
  ├─ Phase 1: Counter acquire
  ├─ Phase 2: 跳过(sub session 不检查 session state
  ├─ Phase 3: on_checks_passed
  └─ Phase 4: spawn subprocess

Sub session 有独立的 session file → 独立的 lock → 不受 main session 的外部占用影响。 只需 counter acquire 保护 mozi 内部并发即可。

四、优化细节

4.1 O1: lock PID 死 + status=running 假死漏洞

当前问题:lock 文件存在但 PID 已死 + sessions.json status=running → 被 S3 拦截而非 revive。

修复:在 Phase 2 并列收集中增加判定:

if state.get("status") == "running" and not state.get("lock_pid_alive"):
    # status=running 但 lock PID 已死 → 假死,revive
    logger.warning("Agent %s: status=running but lock PID dead, reviving", agent_id)
    self._revive_session(agent_id)
    # revive 后重新检查
    state = self._check_session_state(agent_id)
    if state.get("status") == "running":
        blockers.append(("session_stuck", None))

4.2 O2: 去掉冗余 can_acquire

当前 spawner 中 can_acquire + acquire 分两步调用。acquire 内部已有 can_acquire 检查。

改后:只用 acquire,返回 False 时抛 AgentBusyError。

4.3 O3: AgentBusyError 携带具体原因

reason 字段区分:counter_blocked / session_locked / session_running / session_compacting / session_stuck

Dispatcher/ticker 日志中能看到具体原因。

4.4 O4: revive 清理 lock 文件

_revive_session 改 sessions.json status 外,同时删除残留的 lock 文件:

def _revive_session(agent_id: str) -> bool:
    # ... 改 status: running → idle ...
    
    # 清理残留 lock 文件
    if sf:
        lock_path = Path(sf + ".lock")
        if lock_path.exists():
            try:
                lock_path.unlink()
                logger.info("Cleaned stale lock for %s", agent_id)
            except Exception:
                pass

4.5 O5: compact 检测(§15 rotation-only v3

§15 设计文档:docs/design/15-compact-detection-fix.md

检测方法:读 gateway 日志尾部 2MB,按 sessionKey 过滤 [compaction] rotated active transcript 事件。 如果最近的 rotation 事件在 120s 窗口内 → 视为 compact 循环进行中(可能还在 post-compact retry)。

旧方法 _check_recent_compaction_jsonl(扫描 session jsonl 的 type=compaction 事件)保留作为 fallback。

# §15 v3: compact 检测优先用 gateway 日志 rotation 事件
if result["status"] not in ("idle", "unknown", None):
    session_key = f"agent:{agent_id}:main"
    result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
        session_key)
    if not result["recent_compact"] and sf:
        result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)

注:Gateway 的 sessions.json status 实际值主要是 idle/running/timeout/failed。 非空闲状态(running/timeout/failed)时检查 compact 有意义。 其他状态不需要检查。

五、改动范围

文件 改动 行数估计
spawner.py spawn_full_agent() 重组检查顺序 ~50 行
spawner.py _check_session_state() 不变(数据收集,不含判定) 0
spawner.py _revive_session() 增加清理 lock 文件 +8 行
spawner.py AgentBusyError 增加 reason/detail 字段 +5 行
dispatcher.py 捕获 AgentBusyError 时区分原因(日志) +10 行
counter.py 不变 0
ticker.py 不变 0

总计 ~70 行改动,单文件核心改动 ~50 行。

六、Executor/Review 统一(#07.2

6.1 问题

Executor 和 Review 本质上都是「agent 执行子任务」,spawner 层已完全统一。但 ticker 和 dispatcher 层存在大量 review-only 特殊判断,增加维护负担且逻辑不完整(executor 反而缺少某些保护)。

冗余的特殊路径:

特殊路径 位置 问题
Fix-3b review_timeout _check_timeouts 末尾 独立的 15min 超时回收,与上方 process_dead 检测功能重叠
_check_crash_limit 只在 review _dispatch_reviews executor 路径没有 crash 上限保护
review 的 crash rollback dispatcher on_complete executor crash 时没有回退 current_agent

6.2 统一原则

所有非终态状态(claimed/working/review)的超时、crash、进程检测逻辑统一走 _check_timeouts,不因状态类型分叉。

Executor 和 Review 的唯二必须差异:

  1. completion_statusexecutor 完成 → 标 reviewreview 完成 → 标 done
  2. dispatch 入口executor 从 pending dispatchreview 从 review 状态 dispatch(检查产出后再 dispatch

其余全是同一套逻辑,不应有 if-review 分支。

6.3 改动

6.3.1 删除 Fix-3b~40 行)

_check_timeouts 末尾的 review_timeout_minutes 整段删除(约 40 行)。

理由

  • v2.7.2 的 process_dead 检测已覆盖「进程死 → 推回 pending」的场景
  • review agent crash → 进程死 → process_dead → 推回 pending → _dispatch_reviews 重新 dispatch → crash_limit 拦截
  • Fix-3b 用超时时间(15min)做同一件事,是冗余的特殊路径

6.3.1b process_dead 对 review 状态的处理

process_dead 检测到进程死后,区分任务状态:

  • working → 推回 pending(原有行为)
  • review不推 pending,只 release counter,任务保持 review,等 _dispatch_reviews 下个 tick 自然 dispatch

理由review → pending 后走 _dispatch_pendingexecutor dispatch),可能分配给非 reviewer。保持 review 状态让 _dispatch_reviews 正确处理。

6.3.1c 实施顺序

6.3.1(删 Fix-3b)和 6.3.3on_complete 统一)必须在同一 commit 中完成。否则中间窗口期 review crash 后 current_agent 未回退,会导致 _dispatch_reviews 的 exclude_current 卡死。

6.3.2 crash_limit 统一到 _check_timeouts

_dispatch_reviews 里的 _check_crash_limit 调用移到 _check_timeouts,覆盖 working 和 review 状态:

# _check_timeouts 中,working/review 超时检测之前
if status in ("working", "review"):
    if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
        # 标 failed,写 observation
        ...
        continue

同时删除 _dispatch_reviews 里的 crash_limit 检查(~15 行)。

6.3.3 on_complete crash 回退统一

dispatcher _task_on_complete 中:

  • 删除 _is_review 分支中的 ROLLBACK_CURRENT_AGENT_OUTCOMES 处理(~20 行)
  • 改为 executor/review 共用统一的 crash 回退逻辑:
def _task_on_complete(aid, outcome):
    try:
        if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES:
            # 统一 crash 回退:executor 和 review 都回退 current_agent
            _dispatcher._rollback_current_agent(_task_db, _task_id, aid)

        if _is_review:
            if outcome in ("completed", "session_revived"):
                _dispatcher._mark_task_status(_task_db, _task_id, "done")
            # else: crash/error,保持 review等待 ticker 处理
        else:
            _dispatcher._task_auto_complete(_task_id, _task_db)
    except Exception as e:
        logger.error("Task %s: on_complete error: %s", _task_id, e)

6.3.4 _dispatch_reviews 精简

删除 crash_limit 检查后,_dispatch_reviews 只保留 review 特有的业务逻辑:

  • 检查已有 review 记录 → 跳过
  • 检查活跃 routing → 防重复
  • 检查产出 → 无产出标 failed
  • 调度 review agent

保留独立函数的理由:review 的前置检查(产出验证、已有 review 防重复)与 executor 的 dispatch 逻辑不同,不适合合并到 _dispatch_pending

6.4 改动范围

文件 改动 行数
ticker.py _check_timeouts 增加 crash_limit 覆盖 working/review +15 行
ticker.py _check_timeouts 删除 Fix-3b 整段 -40 行
ticker.py _dispatch_reviews 删除 crash_limit 检查 -15 行
dispatcher.py _task_on_complete 统一 crash 回退 ~25 行(净减 ~10 行)
spawner.py 不变 0

总计:净减 ~30 行,简化逻辑。

6.5 验证

测试 预期
executor crash 3 次 统一走 _check_timeouts → failed
review crash 3 次 同上(不再走 _dispatch_reviews 的 crash_limit
review agent 进程死 process_dead 检测 → 推回 pending(不再走 Fix-3b
executor crash 后 current_agent 回退到 assignee(统一 rollback
review 正常完成 标 done(不变)
executor 正常完成 标 review(不变)

七、#07.3 Startup Recovery + Compact Retry

7.1 问题

问题 A:PM2 重启后 mail 任务变孤儿

现象18:42 PM2 restart 后,mail-1780310389485 保持 working 状态 34 分钟无人处理。

根因startup recovery 的主路径是 _check_timeouts 扫 DB working 任务按时间戳判断超时。但 mail auto-working 时 dispatcher 不设 started_at 也不设 claimed_at,导致 _check_timeouts 行 1284-1286 跳过:

start_time_str = task.started_at or task.claimed_at
if not start_time_str:
    continue  # ← mail 任务永远走这里

Startup recovery 的两层设计

机制 重启后
L1 主路径 _check_timeouts 扫 DB working → 按时间戳判断超时 → 回收 普通 task 生效,✗ mail 不生效
L2 补充 process_dead 扫 counter.active_agents → 检查进程存活 ✗ 重启后 counter 为空

L2 重启后失效是预期行为(纯内存态),L1 才是 startup recovery 的主路径。问题出在 L1 对 mail 的时间戳 fallback 缺失。

问题 Bcompact_hanging 直接标 failed,不 retry

现象agent 执行过程中触发 auto-compactionmonitor 等了 3×630s=31.5 分钟后放弃,标 compact_hanging → failed。

期望行为:compact 只是暂时性阻塞,不应该直接 failed。应该让任务有机会重试,agent 在 compact 完成后继续。

问题 Cretry 遇 session busy 时 counter 处理不一致

现象_do_retryspawn_full_agent(skip_counter=True)Phase 2 发现 session busycompact/lock/running)→ AgentBusyError → _do_retry catch 后调 on_complete("retry_agent_busy") → counter release → retry 丢失。

原始设计spawner-monitor-design.md 原则 5):“续杯只有 Gateway timeout 才触发:lock/compact/api_error 等不续杯,等 ticker”。设计上 retry 遇 busy 应 release counter → 任务保持 working → ticker 30 秒后重新 dispatch。

但 Bug-4 fix 后 counter 不 release 了(retry 期间保持持有),导致和原始设计矛盾:counter 不 release → ticker 不会重新 dispatch → 任务卡住。

7.2 设计原则

  1. PM2 重启后所有 working 任务必须可回收_check_timeouts 不能因时间戳缺失跳过任何任务
  2. compact 是暂时性阻塞,不应导致任务 failedcompact_hanging 应释放 counter → 等 ticker 重新 dispatch
  3. retry 遇 session busy 释放 counter → 等 ticker:spawn 失败了,继续持有 counter 无意义,不如释放给 ticker 走正常 dispatch
  4. agent 不能同时执行两个任务spawn 前 Phase 2 的所有检查(lock/running/compact)都必须通过

7.3 改动

ACT-1_check_timeouts 时间戳 fallbackP0

文件ticker.py _check_timeouts1 行改动

# Before
start_time_str = task.started_at or task.claimed_at

# After
start_time_str = task.started_at or task.claimed_at or task.updated_at

理由_transition_status 每次状态变更都更新 updated_atmail auto-working 时必然有值。用 updated_at 做 fallback 后,重启后超时检测对所有任务类型生效。

ACT-2compact_hanging 不标 failedP1

文件spawner.py _handle_monitor_timeout~8 行改动

问题compact_hanging 时进程还活着(monitor 等了 3×630s),直接 retry spawn 会撞上正在运行的 session。设计原则“不主动 kill 进程”。

方案compact_hanging 时不标 failed只 release counter + 任务保持 working → 等 ticker 重新 dispatch。

# Before (compact_hanging)
self._mark_task(db_path, task_id, "failed", {"reason": "compact_hanging", ...})
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")

# After
# compact 超限 → release counter + 保持 working
# 等进程自然结束后,ticker _check_timeouts 检测到超时 → 推回 pending → 重新 dispatch
logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch",
               agent_id, compact_wait_count)
self._compact_waits.pop(task_id, None)  # 清理计数器
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
# 不标 failed,任务保持 working

流程

  1. compact_hanging → release counter → 任务保持 working(无 active monitor
  2. 进程自然结束(Gateway timeout / compact 完成 / agent 正常退出)
  3. ticker _check_timeouts 检测 working + 超时 → 推回 pending
  4. _dispatch_pendingspawn_full_agent → Phase 2 检查 session
  5. session 已空闲 → 正常 spawnsession 仍 busy → AgentBusyError → 30 秒后再试

好处

  • 不 kill 进程(遵循设计原则 4
  • 不走 spawner retry(遵循设计原则 5lock/compact 等 ticker
  • 利用已有 ticker 循环自然重试
  • 不需要新的“延迟 retry”机制

ACT-3retry 遇 session busy 释放 counterP1

文件spawner.py _do_retry~5 行改动

方案retry 遇 AgentBusyError 时 release counter + 任务保持 working → 等 ticker。和 ACT-2 一致。

# Before
except AgentBusyError:
    logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id)
    await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy")

# After
except AgentBusyError as e:
    logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch",
                   agent_id, e.reason)
    # release counter + 任务保持 working → ticker 下次 tick 重新 dispatch
    await self._do_on_complete_async(on_complete, agent_id, "retry_session_busy")

对 Bug-4 fix 的影响Bug-4 fix 让 retry 期间 counter 保持持有(防止 ticker acquire 同一 agent)。但 retry 遇 session busy 时 release counter 是合理的——spawn 失败了,继续持有 counter 无意义(没有 monitor 在等),不如释放给 ticker 走正常 dispatch 路径。

7.4 改动范围

文件 改动 行数
ticker.py _check_timeouts started_at or claimed_at or updated_at fallback 1 行
spawner.py _handle_monitor_timeout compact_hanging:删 _mark_task(failed),只 release counter ~8 行
spawner.py _do_retry AgentBusyError catch 日志优化 ~5 行

总计:~14 行改动。

7.5 验证

测试 预期
PM2 重启后 mail 孤儿 _check_timeoutsupdated_at fallback → 30 分钟后回收
compact 等超限 compact_hanging → release counter → 任务 working → ticker 重新 dispatch
retry 遇 compact AgentBusyError → release counter → ticker 重新 dispatch
retry 遇 lock 同上
正常 retrygateway_timeout 不变,_do_retry 正常执行

7.6 待讨论

  1. compact_hanging 后进程还活着ticker 重新 dispatch 时会撞上 running session → AgentBusyError → 30 秒后再试。如果 compact 持续很久,可能循环多次。是否需要加一个“compact 重试总上限”?

  2. retry 遇 busy 后 counter 释放ticker 重新 dispatch 走正常 _dispatch_pendingspawn_full_agent(不带 skip_counter)→ acquire counter → Phase 2 检查。如果此时 session 仍然 busy,会再次 AgentBusyError。这是预期行为,但意味着“ticker 重试”比“spawner retry”多走一层(acquire + Phase 2 检查),效率稍低。可接受吗?

八、不在这个方案范围内

项目 说明 后续
Per-agent 等待队列 acquire 失败后排队而非 tick 重试 独立设计 #08
Session watcher 检测 webchat 释放后自动 dequeue 依赖 #08
counter.clear_cooldown 正常完成后主动清除 cooldown 可选优化

八、验证计划

  1. V1 基本功能:创建任务 → broadcast → claim → 执行 → 完成(不变)
  2. V2 mozi 内部竞争:同 agent 两个 pending 任务 → 第二个 AgentBusyError(reason=counter_blocked)
  3. V3 webchat 争抢mozi 执行中 → webchat 发消息 → webchat 正常 → mozi spawn 检测到 session_locked
  4. V4 假死恢复lock PID 死 + status=running → 自动 revive → 正常 spawn
  5. V5 compact 期间compact 进行中 → spawn AgentBusyError(reason=session_compacting) → compact 结束后正常