Files
sanguo_moziplus_v2/docs/design/07-spawner-acquire-first.md
cfdaily cb81251111
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
fix(docs): 24-compact-detection-fix → 15-compact-detection-fix 路径引用修正
2026-06-13 14:45:34 +08:00

552 lines
23 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# #07 Spawner Acquire-First 设计
> 状态:✅ 已完成(#07.1-#07.2 已实施)
> 作者:庞统
> 日期:2026-06-01
> 评审:司马懿
## 一、问题
### 1.1 现状
`spawn_full_agent()` 的检查流程是串行递进的:
```
session check (S1-S5) → counter acquire (C1-C4) → spawn subprocess
```
### 1.2 问题
| # | 问题 | 影响 |
|---|------|------|
| P1 | **mozi 内部竞争窗口**:两个 tick 同时检查同一 agentsession check 都通过 → counter acquire 时第二个被拦,但第一个还没 spawn,浪费了一轮检查 | 效率 |
| P2 | **session check 和 spawn 之间间隙大**session check 通过后要经过 counter acquire 才到 spawn,这段时间 webchat 可能抢占 → Gateway lock 竞争 | 正确性 |
| P3 | **检查串行耦合**S1→S2→S3→S4→S5 之间有隐含依赖(如 S2 判断依赖 S1 结果),维护困难 | 可维护性 |
| P4 | **所有 AgentBusyError 不可区分**:外部占用 vs mozi 占用 vs cooldowndispatcher 统一处理为 skipped | 可观测性 |
### 1.3 设计目标
**核心原则:先 acquire(锁住 mozi 内部),再检查(贴近 spawn),确保 Gateway 只收到确认空闲的请求。**
- mozi 内部不会有两个 spawn 同时检查同一 agent
- session check 尽可能贴近 spawn,最小化外部抢占窗口
- 检查逻辑从串行递进改为并列收集
## 二、设计方案
### 2.1 新执行顺序
```
spawn_full_agent(agent_id, use_main_session=True)
├─ Phase 0: Pre-acquire 修复(无锁)
│ └─ 检测 timeout/failed → revive(仅在无 counter 占用时)
├─ Phase 1: Counter acquire(互斥锁)
│ └─ counter.acquire() → 失败则 AgentBusyError(moji_busy)
├─ Phase 2: Session check(在锁保护下,贴近 spawn
│ ├─ 并列收集所有状态
│ └─ 统一判定:任一不满足 → release counter → AgentBusyError(具体原因)
├─ Phase 3: on_checks_passed 回调
└─ Phase 4: spawn subprocess(无额外检查,直接执行)
```
### 2.2 对比
```
【当前】 【改后】
session check (S1-S5) Phase 0: pre-acquire revive(无锁)
↓ ↓
counter acquire (C1-C4) Phase 1: counter acquire(互斥锁)
↓ ↓
on_checks_passed Phase 2: session check(锁保护下)
↓ ↓
spawn subprocess Phase 3: on_checks_passed
Phase 4: spawn subprocess
```
### 2.3 Phase 0: Pre-acquire 修复
S4timeout/failed → revive)是**修复性操作**,不是判定。在 counter acquire 之前执行:
```python
# Phase 0: Pre-acquire 修复(无锁,快速)
if use_main_session:
state = self._check_session_state(agent_id)
if state.get("status") in ("timeout", "failed"):
logger.info("Pre-acquire: %s status=%s, reviving", agent_id, state["status"])
self._revive_session(agent_id)
```
**为什么不在锁内 revive** 因为 revive 是写 sessions.json,如果 agent 正在被另一个 spawn 持有 counterworking),revive 不应该干扰。counter 未持有时 revive 是安全的。
**幂等性**revive 只在 status=running 时改为 idle,多次调用安全。
### 2.4 Phase 1: Counter acquire
Counter 作为 mozi 内部的互斥锁。acquire 包含:
- C1 cooldown
- C2 global 上限
- C3 per-agent 上限
- C4 per-session-key 上限
失败时抛 `AgentBusyError(agent_id, reason="counter_blocked", detail={...})`
**改动**:去掉外层冗余的 `can_acquire` 调用,只用 `acquire`acquire 内部已有 can_acquire 检查)。
```python
# Phase 1: Counter acquire
if self.counter and not skip_counter:
acquired = await self.counter.acquire(agent_id, _sid_key)
if not acquired:
raise AgentBusyError(agent_id, reason="counter_blocked")
```
### 2.5 Phase 2: Session check(在锁保护下)
将 S1/S3/S5 改为**并列收集 → 统一判定**,而非串行递进:
```python
# Phase 2: Session state check(锁保护下,贴近 spawn
if use_main_session:
state = self._check_session_state(agent_id)
# 收集所有 block 原因
blockers = []
if state.get("lock_pid_alive") and not state.get("lock_expired"):
blockers.append(("session_locked", state.get("lock_pid")))
if state.get("status") == "running":
blockers.append(("session_running", None))
if state.get("recent_compact"):
blockers.append(("session_compacting", None))
# 统一判定
if blockers:
# 释放 counter,报具体原因
if self.counter:
self.counter.release(agent_id, _sid_key)
primary = blockers[0]
raise AgentBusyError(agent_id, reason=primary[0], detail={"blockers": blockers})
```
**关键变化**
1. 检查在 counter 锁内执行 → mozi 内部不会并发检查同一 agent
2. 并列收集所有问题 → 日志中能看到所有 block 原因
3. session check 紧贴 spawn → 外部抢占窗口最小
### 2.6 Phase 3 & 4: 不变
`on_checks_passed` 回调和 `spawn subprocess` 逻辑不变。
### 2.7 AgentBusyError 增强
```python
class AgentBusyError(Exception):
def __init__(self, agent_id: str, reason: str = "", detail: dict = None):
self.agent_id = agent_id
self.reason = reason # "counter_blocked" | "session_locked" | "session_running" | "session_compacting"
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
```
Dispatcher 层可以区分原因:
```python
except AgentBusyError as e:
if e.reason.startswith("session_"):
# 外部占用(webchat/compact),等外部释放
self._record_routing(task, decision, "skipped", f"Session busy: {e.reason}")
elif e.reason == "counter_blocked":
# mozi 内部占用,等 counter release
self._record_routing(task, decision, "skipped", "Agent busy (counter)")
```
## 三、sub session 路径
`use_main_session=False` 时:
```
spawn_full_agent(agent_id, use_main_session=False)
├─ Phase 1: Counter acquire
├─ Phase 2: 跳过(sub session 不检查 session state
├─ Phase 3: on_checks_passed
└─ Phase 4: spawn subprocess
```
Sub session 有独立的 session file → 独立的 lock → 不受 main session 的外部占用影响。
只需 counter acquire 保护 mozi 内部并发即可。
## 四、优化细节
### 4.1 O1: lock PID 死 + status=running 假死漏洞
**当前问题**:lock 文件存在但 PID 已死 + sessions.json status=running → 被 S3 拦截而非 revive。
**修复**:在 Phase 2 并列收集中增加判定:
```python
if state.get("status") == "running" and not state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Agent %s: status=running but lock PID dead, reviving", agent_id)
self._revive_session(agent_id)
# revive 后重新检查
state = self._check_session_state(agent_id)
if state.get("status") == "running":
blockers.append(("session_stuck", None))
```
### 4.2 O2: 去掉冗余 can_acquire
当前 spawner 中 `can_acquire` + `acquire` 分两步调用。`acquire` 内部已有 `can_acquire` 检查。
**改后**:只用 `acquire`,返回 False 时抛 AgentBusyError。
### 4.3 O3: AgentBusyError 携带具体原因
reason 字段区分:`counter_blocked` / `session_locked` / `session_running` / `session_compacting` / `session_stuck`
Dispatcher/ticker 日志中能看到具体原因。
### 4.4 O4: revive 清理 lock 文件
`_revive_session` 改 sessions.json status 外,同时删除残留的 lock 文件:
```python
def _revive_session(agent_id: str) -> bool:
# ... 改 status: running → idle ...
# 清理残留 lock 文件
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s", agent_id)
except Exception:
pass
```
### 4.5 O5: compact 检测(§15 rotation-only v3
§15 设计文档:`docs/design/15-compact-detection-fix.md`
**检测方法**:读 gateway 日志尾部 2MB,按 sessionKey 过滤 `[compaction] rotated active transcript` 事件。
如果最近的 rotation 事件在 120s 窗口内 → 视为 compact 循环进行中(可能还在 post-compact retry)。
旧方法 `_check_recent_compaction_jsonl`(扫描 session jsonl 的 `type=compaction` 事件)保留作为 fallback。
```python
# §15 v3: compact 检测优先用 gateway 日志 rotation 事件
if result["status"] not in ("idle", "unknown", None):
session_key = f"agent:{agent_id}:main"
result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
session_key)
if not result["recent_compact"] and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
```
注:Gateway 的 sessions.json status 实际值主要是 `idle/running/timeout/failed`
非空闲状态(`running`/`timeout`/`failed`)时检查 compact 有意义。
其他状态不需要检查。
## 五、改动范围
| 文件 | 改动 | 行数估计 |
|------|------|---------|
| `spawner.py` `spawn_full_agent()` | 重组检查顺序 | ~50 行 |
| `spawner.py` `_check_session_state()` | 不变(数据收集,不含判定) | 0 |
| `spawner.py` `_revive_session()` | 增加清理 lock 文件 | +8 行 |
| `spawner.py` `AgentBusyError` | 增加 reason/detail 字段 | +5 行 |
| `dispatcher.py` | 捕获 AgentBusyError 时区分原因(日志) | +10 行 |
| `counter.py` | 不变 | 0 |
| `ticker.py` | 不变 | 0 |
**总计 ~70 行改动,单文件核心改动 ~50 行。**
## 六、Executor/Review 统一(#07.2
### 6.1 问题
Executor 和 Review 本质上都是「agent 执行子任务」,spawner 层已完全统一。但 ticker 和 dispatcher 层存在大量 review-only 特殊判断,增加维护负担且逻辑不完整(executor 反而缺少某些保护)。
**冗余的特殊路径:**
| 特殊路径 | 位置 | 问题 |
|----------|------|------|
| Fix-3b review_timeout | `_check_timeouts` 末尾 | 独立的 15min 超时回收,与上方 process_dead 检测功能重叠 |
| `_check_crash_limit` 只在 review | `_dispatch_reviews` | executor 路径没有 crash 上限保护 |
| review 的 crash rollback | dispatcher `on_complete` | executor crash 时没有回退 current_agent |
### 6.2 统一原则
**所有非终态状态(claimed/working/review)的超时、crash、进程检测逻辑统一走 `_check_timeouts`,不因状态类型分叉。**
Executor 和 Review 的唯二必须差异:
1. **completion_status**executor 完成 → 标 reviewreview 完成 → 标 done
2. **dispatch 入口**executor 从 pending dispatchreview 从 review 状态 dispatch(检查产出后再 dispatch
其余全是同一套逻辑,不应有 if-review 分支。
### 6.3 改动
#### 6.3.1 删除 Fix-3b~40 行)
`_check_timeouts` 末尾的 `review_timeout_minutes` 整段删除(约 40 行)。
**理由**
- v2.7.2 的 process_dead 检测已覆盖「进程死 → 推回 pending」的场景
- review agent crash → 进程死 → process_dead → 推回 pending → `_dispatch_reviews` 重新 dispatch → crash_limit 拦截
- Fix-3b 用超时时间(15min)做同一件事,是冗余的特殊路径
#### 6.3.1b process_dead 对 review 状态的处理
process_dead 检测到进程死后,区分任务状态:
- `working` → 推回 `pending`(原有行为)
- `review`**不推 pending**,只 release counter,任务保持 `review`,等 `_dispatch_reviews` 下个 tick 自然 dispatch
**理由**review → pending 后走 `_dispatch_pending`executor dispatch),可能分配给非 reviewer。保持 review 状态让 `_dispatch_reviews` 正确处理。
#### 6.3.1c 实施顺序
6.3.1(删 Fix-3b)和 6.3.3on_complete 统一)**必须在同一 commit 中完成**。否则中间窗口期 review crash 后 current_agent 未回退,会导致 `_dispatch_reviews` 的 exclude_current 卡死。
#### 6.3.2 crash_limit 统一到 `_check_timeouts`
`_dispatch_reviews` 里的 `_check_crash_limit` 调用移到 `_check_timeouts`,覆盖 working 和 review 状态:
```python
# _check_timeouts 中,working/review 超时检测之前
if status in ("working", "review"):
if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
# 标 failed,写 observation
...
continue
```
同时删除 `_dispatch_reviews` 里的 crash_limit 检查(~15 行)。
#### 6.3.3 on_complete crash 回退统一
dispatcher `_task_on_complete` 中:
- 删除 `_is_review` 分支中的 `ROLLBACK_CURRENT_AGENT_OUTCOMES` 处理(~20 行)
- 改为 executor/review 共用统一的 crash 回退逻辑:
```python
def _task_on_complete(aid, outcome):
try:
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES:
# 统一 crash 回退:executor 和 review 都回退 current_agent
_dispatcher._rollback_current_agent(_task_db, _task_id, aid)
if _is_review:
if outcome in ("completed", "session_revived"):
_dispatcher._mark_task_status(_task_db, _task_id, "done")
# else: crash/error,保持 review,等待 ticker 处理
else:
_dispatcher._task_auto_complete(_task_id, _task_db)
except Exception as e:
logger.error("Task %s: on_complete error: %s", _task_id, e)
```
#### 6.3.4 `_dispatch_reviews` 精简
删除 crash_limit 检查后,`_dispatch_reviews` 只保留 review 特有的业务逻辑:
- 检查已有 review 记录 → 跳过
- 检查活跃 routing → 防重复
- 检查产出 → 无产出标 failed
- 调度 review agent
保留独立函数的理由:review 的前置检查(产出验证、已有 review 防重复)与 executor 的 dispatch 逻辑不同,不适合合并到 `_dispatch_pending`
### 6.4 改动范围
| 文件 | 改动 | 行数 |
|------|------|------|
| `ticker.py` `_check_timeouts` | 增加 crash_limit 覆盖 working/review | +15 行 |
| `ticker.py` `_check_timeouts` | 删除 Fix-3b 整段 | -40 行 |
| `ticker.py` `_dispatch_reviews` | 删除 crash_limit 检查 | -15 行 |
| `dispatcher.py` `_task_on_complete` | 统一 crash 回退 | ~25 行(净减 ~10 行) |
| `spawner.py` | 不变 | 0 |
**总计:净减 ~30 行,简化逻辑。**
### 6.5 验证
| 测试 | 预期 |
|------|------|
| executor crash 3 次 | 统一走 `_check_timeouts` → failed |
| review crash 3 次 | 同上(不再走 `_dispatch_reviews` 的 crash_limit |
| review agent 进程死 | process_dead 检测 → 推回 pending(不再走 Fix-3b |
| executor crash 后 current_agent | 回退到 assignee(统一 rollback |
| review 正常完成 | 标 done(不变) |
| executor 正常完成 | 标 review(不变) |
## 七、#07.3 Startup Recovery + Compact Retry
### 7.1 问题
#### 问题 APM2 重启后 mail 任务变孤儿
**现象**18:42 PM2 restart 后,mail-1780310389485 保持 working 状态 34 分钟无人处理。
**根因**startup recovery 的主路径是 `_check_timeouts` 扫 DB working 任务按时间戳判断超时。但 mail auto-working 时 dispatcher 不设 `started_at` 也不设 `claimed_at`,导致 `_check_timeouts` 行 1284-1286 跳过:
```python
start_time_str = task.started_at or task.claimed_at
if not start_time_str:
continue # ← mail 任务永远走这里
```
**Startup recovery 的两层设计**
| 层 | 机制 | 重启后 |
|---|------|--------|
| L1 主路径 | `_check_timeouts` 扫 DB working → 按时间戳判断超时 → 回收 | ✅ 普通 task 生效,✗ mail 不生效 |
| L2 补充 | `process_dead` 扫 counter.active_agents → 检查进程存活 | ✗ 重启后 counter 为空 |
L2 重启后失效是预期行为(纯内存态),L1 才是 startup recovery 的主路径。问题出在 L1 对 mail 的时间戳 fallback 缺失。
#### 问题 Bcompact_hanging 直接标 failed,不 retry
**现象**agent 执行过程中触发 auto-compactionmonitor 等了 3×630s=31.5 分钟后放弃,标 `compact_hanging` → failed。
**期望行为**:compact 只是暂时性阻塞,不应该直接 failed。应该让任务有机会重试,agent 在 compact 完成后继续。
#### 问题 Cretry 遇 session busy 时 counter 处理不一致
**现象**`_do_retry``spawn_full_agent(skip_counter=True)`Phase 2 发现 session busycompact/lock/running)→ AgentBusyError → `_do_retry` catch 后调 `on_complete("retry_agent_busy")` → counter release → retry 丢失。
**原始设计**spawner-monitor-design.md 原则 5):“续杯只有 Gateway timeout 才触发:lock/compact/api_error 等不续杯,等 ticker”。设计上 retry 遇 busy 应 release counter → 任务保持 working → ticker 30 秒后重新 dispatch。
**但 Bug-4 fix 后** counter 不 release 了(retry 期间保持持有),导致和原始设计矛盾:counter 不 release → ticker 不会重新 dispatch → 任务卡住。
### 7.2 设计原则
1. **PM2 重启后所有 working 任务必须可回收**`_check_timeouts` 不能因时间戳缺失跳过任何任务
2. **compact 是暂时性阻塞,不应导致任务 failed**compact_hanging 应释放 counter → 等 ticker 重新 dispatch
3. **retry 遇 session busy 释放 counter → 等 ticker**:spawn 失败了,继续持有 counter 无意义,不如释放给 ticker 走正常 dispatch
4. **agent 不能同时执行两个任务**spawn 前 Phase 2 的所有检查(lock/running/compact)都必须通过
### 7.3 改动
#### ACT-1_check_timeouts 时间戳 fallbackP0
**文件**`ticker.py` `_check_timeouts`1 行改动
```python
# Before
start_time_str = task.started_at or task.claimed_at
# After
start_time_str = task.started_at or task.claimed_at or task.updated_at
```
**理由**`_transition_status` 每次状态变更都更新 `updated_at`mail auto-working 时必然有值。用 `updated_at` 做 fallback 后,重启后超时检测对所有任务类型生效。
#### ACT-2compact_hanging 不标 failedP1
**文件**`spawner.py` `_handle_monitor_timeout`~8 行改动
**问题**compact_hanging 时进程还活着(monitor 等了 3×630s),直接 retry spawn 会撞上正在运行的 session。设计原则“不主动 kill 进程”。
**方案**compact_hanging 时不标 failed**只 release counter + 任务保持 working** → 等 ticker 重新 dispatch。
```python
# Before (compact_hanging)
self._mark_task(db_path, task_id, "failed", {"reason": "compact_hanging", ...})
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
# After
# compact 超限 → release counter + 保持 working
# 等进程自然结束后,ticker _check_timeouts 检测到超时 → 推回 pending → 重新 dispatch
logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch",
agent_id, compact_wait_count)
self._compact_waits.pop(task_id, None) # 清理计数器
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
# 不标 failed,任务保持 working
```
**流程**
1. compact_hanging → release counter → 任务保持 working(无 active monitor
2. 进程自然结束(Gateway timeout / compact 完成 / agent 正常退出)
3. ticker `_check_timeouts` 检测 working + 超时 → 推回 pending
4. `_dispatch_pending``spawn_full_agent` → Phase 2 检查 session
5. session 已空闲 → 正常 spawnsession 仍 busy → AgentBusyError → 30 秒后再试
**好处**
- 不 kill 进程(遵循设计原则 4)
- 不走 spawner retry(遵循设计原则 5lock/compact 等 ticker
- 利用已有 ticker 循环自然重试
- 不需要新的“延迟 retry”机制
#### ACT-3retry 遇 session busy 释放 counterP1
**文件**`spawner.py` `_do_retry`~5 行改动
**方案**retry 遇 AgentBusyError 时 release counter + 任务保持 working → 等 ticker。和 ACT-2 一致。
```python
# Before
except AgentBusyError:
logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy")
# After
except AgentBusyError as e:
logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch",
agent_id, e.reason)
# release counter + 任务保持 working → ticker 下次 tick 重新 dispatch
await self._do_on_complete_async(on_complete, agent_id, "retry_session_busy")
```
**对 Bug-4 fix 的影响**Bug-4 fix 让 retry 期间 counter 保持持有(防止 ticker acquire 同一 agent)。但 retry 遇 session busy 时 release counter 是合理的——spawn 失败了,继续持有 counter 无意义(没有 monitor 在等),不如释放给 ticker 走正常 dispatch 路径。
### 7.4 改动范围
| 文件 | 改动 | 行数 |
|------|------|------|
| `ticker.py` `_check_timeouts` | `started_at or claimed_at or updated_at` fallback | 1 行 |
| `spawner.py` `_handle_monitor_timeout` | compact_hanging:删 `_mark_task(failed)`,只 release counter | ~8 行 |
| `spawner.py` `_do_retry` | AgentBusyError catch 日志优化 | ~5 行 |
**总计:~14 行改动。**
### 7.5 验证
| 测试 | 预期 |
|------|------|
| PM2 重启后 mail 孤儿 | `_check_timeouts``updated_at` fallback → 30 分钟后回收 |
| compact 等超限 | compact_hanging → release counter → 任务 working → ticker 重新 dispatch |
| retry 遇 compact | AgentBusyError → release counter → ticker 重新 dispatch |
| retry 遇 lock | 同上 |
| 正常 retrygateway_timeout | 不变,`_do_retry` 正常执行 |
### 7.6 待讨论
1. **compact_hanging 后进程还活着**ticker 重新 dispatch 时会撞上 running session → AgentBusyError → 30 秒后再试。如果 compact 持续很久,可能循环多次。是否需要加一个“compact 重试总上限”?
2. **retry 遇 busy 后 counter 释放**ticker 重新 dispatch 走正常 `_dispatch_pending``spawn_full_agent`(不带 skip_counter)→ acquire counter → Phase 2 检查。如果此时 session 仍然 busy,会再次 AgentBusyError。这是预期行为,但意味着“ticker 重试”比“spawner retry”多走一层(acquire + Phase 2 检查),效率稍低。可接受吗?
## 八、不在这个方案范围内
| 项目 | 说明 | 后续 |
|------|------|------|
| Per-agent 等待队列 | acquire 失败后排队而非 tick 重试 | 独立设计 #08 |
| Session watcher | 检测 webchat 释放后自动 dequeue | 依赖 #08 |
| counter.clear_cooldown | 正常完成后主动清除 cooldown | 可选优化 |
## 八、验证计划
1. **V1 基本功能**:创建任务 → broadcast → claim → 执行 → 完成(不变)
2. **V2 mozi 内部竞争**:同 agent 两个 pending 任务 → 第二个 AgentBusyError(reason=counter_blocked)
3. **V3 webchat 争抢**mozi 执行中 → webchat 发消息 → webchat 正常 → mozi spawn 检测到 session_locked
4. **V4 假死恢复**lock PID 死 + status=running → 自动 revive → 正常 spawn
5. **V5 compact 期间**compact 进行中 → spawn AgentBusyError(reason=session_compacting) → compact 结束后正常