Files
sanguo_moziplus_v2/docs/design/07-spawner-acquire-first.md
T
2026-06-01 13:39:29 +08:00

281 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# #07 Spawner Acquire-First 设计
> 状态:设计待评审
> 作者:庞统
> 日期:2026-06-01
> 评审:司马懿
## 一、问题
### 1.1 现状
`spawn_full_agent()` 的检查流程是串行递进的:
```
session check (S1-S5) → counter acquire (C1-C4) → spawn subprocess
```
### 1.2 问题
| # | 问题 | 影响 |
|---|------|------|
| P1 | **mozi 内部竞争窗口**:两个 tick 同时检查同一 agentsession check 都通过 → counter acquire 时第二个被拦,但第一个还没 spawn,浪费了一轮检查 | 效率 |
| P2 | **session check 和 spawn 之间间隙大**session check 通过后要经过 counter acquire 才到 spawn,这段时间 webchat 可能抢占 → Gateway lock 竞争 | 正确性 |
| P3 | **检查串行耦合**S1→S2→S3→S4→S5 之间有隐含依赖(如 S2 判断依赖 S1 结果),维护困难 | 可维护性 |
| P4 | **所有 AgentBusyError 不可区分**:外部占用 vs mozi 占用 vs cooldowndispatcher 统一处理为 skipped | 可观测性 |
### 1.3 设计目标
**核心原则:先 acquire(锁住 mozi 内部),再检查(贴近 spawn),确保 Gateway 只收到确认空闲的请求。**
- mozi 内部不会有两个 spawn 同时检查同一 agent
- session check 尽可能贴近 spawn,最小化外部抢占窗口
- 检查逻辑从串行递进改为并列收集
## 二、设计方案
### 2.1 新执行顺序
```
spawn_full_agent(agent_id, use_main_session=True)
├─ Phase 0: Pre-acquire 修复(无锁)
│ └─ 检测 timeout/failed → revive(仅在无 counter 占用时)
├─ Phase 1: Counter acquire(互斥锁)
│ └─ counter.acquire() → 失败则 AgentBusyError(moji_busy)
├─ Phase 2: Session check(在锁保护下,贴近 spawn
│ ├─ 并列收集所有状态
│ └─ 统一判定:任一不满足 → release counter → AgentBusyError(具体原因)
├─ Phase 3: on_checks_passed 回调
└─ Phase 4: spawn subprocess(无额外检查,直接执行)
```
### 2.2 对比
```
【当前】 【改后】
session check (S1-S5) Phase 0: pre-acquire revive(无锁)
↓ ↓
counter acquire (C1-C4) Phase 1: counter acquire(互斥锁)
↓ ↓
on_checks_passed Phase 2: session check(锁保护下)
↓ ↓
spawn subprocess Phase 3: on_checks_passed
Phase 4: spawn subprocess
```
### 2.3 Phase 0: Pre-acquire 修复
S4timeout/failed → revive)是**修复性操作**,不是判定。在 counter acquire 之前执行:
```python
# Phase 0: Pre-acquire 修复(无锁,快速)
if use_main_session:
state = self._check_session_state(agent_id)
if state.get("status") in ("timeout", "failed"):
logger.info("Pre-acquire: %s status=%s, reviving", agent_id, state["status"])
self._revive_session(agent_id)
```
**为什么不在锁内 revive** 因为 revive 是写 sessions.json,如果 agent 正在被另一个 spawn 持有 counterworking),revive 不应该干扰。counter 未持有时 revive 是安全的。
**幂等性**revive 只在 status=running 时改为 idle,多次调用安全。
### 2.4 Phase 1: Counter acquire
Counter 作为 mozi 内部的互斥锁。acquire 包含:
- C1 cooldown
- C2 global 上限
- C3 per-agent 上限
- C4 per-session-key 上限
失败时抛 `AgentBusyError(agent_id, reason="counter_blocked", detail={...})`
**改动**:去掉外层冗余的 `can_acquire` 调用,只用 `acquire`acquire 内部已有 can_acquire 检查)。
```python
# Phase 1: Counter acquire
if self.counter and not skip_counter:
acquired = await self.counter.acquire(agent_id, _sid_key)
if not acquired:
raise AgentBusyError(agent_id, reason="counter_blocked")
```
### 2.5 Phase 2: Session check(在锁保护下)
将 S1/S3/S5 改为**并列收集 → 统一判定**,而非串行递进:
```python
# Phase 2: Session state check(锁保护下,贴近 spawn
if use_main_session:
state = self._check_session_state(agent_id)
# 收集所有 block 原因
blockers = []
if state.get("lock_pid_alive") and not state.get("lock_expired"):
blockers.append(("session_locked", state.get("lock_pid")))
if state.get("status") == "running":
blockers.append(("session_running", None))
if state.get("recent_compact"):
blockers.append(("session_compacting", None))
# 统一判定
if blockers:
# 释放 counter,报具体原因
if self.counter:
self.counter.release(agent_id, _sid_key)
primary = blockers[0]
raise AgentBusyError(agent_id, reason=primary[0], detail={"blockers": blockers})
```
**关键变化**
1. 检查在 counter 锁内执行 → mozi 内部不会并发检查同一 agent
2. 并列收集所有问题 → 日志中能看到所有 block 原因
3. session check 紧贴 spawn → 外部抢占窗口最小
### 2.6 Phase 3 & 4: 不变
`on_checks_passed` 回调和 `spawn subprocess` 逻辑不变。
### 2.7 AgentBusyError 增强
```python
class AgentBusyError(Exception):
def __init__(self, agent_id: str, reason: str = "", detail: dict = None):
self.agent_id = agent_id
self.reason = reason # "counter_blocked" | "session_locked" | "session_running" | "session_compacting"
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
```
Dispatcher 层可以区分原因:
```python
except AgentBusyError as e:
if e.reason.startswith("session_"):
# 外部占用(webchat/compact),等外部释放
self._record_routing(task, decision, "skipped", f"Session busy: {e.reason}")
elif e.reason == "counter_blocked":
# mozi 内部占用,等 counter release
self._record_routing(task, decision, "skipped", "Agent busy (counter)")
```
## 三、sub session 路径
`use_main_session=False` 时:
```
spawn_full_agent(agent_id, use_main_session=False)
├─ Phase 1: Counter acquire
├─ Phase 2: 跳过(sub session 不检查 session state
├─ Phase 3: on_checks_passed
└─ Phase 4: spawn subprocess
```
Sub session 有独立的 session file → 独立的 lock → 不受 main session 的外部占用影响。
只需 counter acquire 保护 mozi 内部并发即可。
## 四、优化细节
### 4.1 O1: lock PID 死 + status=running 假死漏洞
**当前问题**:lock 文件存在但 PID 已死 + sessions.json status=running → 被 S3 拦截而非 revive。
**修复**:在 Phase 2 并列收集中增加判定:
```python
if state.get("status") == "running" and not state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Agent %s: status=running but lock PID dead, reviving", agent_id)
self._revive_session(agent_id)
# revive 后重新检查
state = self._check_session_state(agent_id)
if state.get("status") == "running":
blockers.append(("session_stuck", None))
```
### 4.2 O2: 去掉冗余 can_acquire
当前 spawner 中 `can_acquire` + `acquire` 分两步调用。`acquire` 内部已有 `can_acquire` 检查。
**改后**:只用 `acquire`,返回 False 时抛 AgentBusyError。
### 4.3 O3: AgentBusyError 携带具体原因
reason 字段区分:`counter_blocked` / `session_locked` / `session_running` / `session_compacting` / `session_stuck`
Dispatcher/ticker 日志中能看到具体原因。
### 4.4 O4: revive 清理 lock 文件
`_revive_session` 改 sessions.json status 外,同时删除残留的 lock 文件:
```python
def _revive_session(agent_id: str) -> bool:
# ... 改 status: running → idle ...
# 清理残留 lock 文件
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s", agent_id)
except Exception:
pass
```
### 4.5 O5: compact 扫描条件收紧
当前 compact 扫描在 status 非 idle/done/unknown/None 时都触发,范围过宽。
**改后**:只在 status 为 running 或 compacting 相关时扫描:
```python
# 只在这些状态下检查 compact
if result["status"] in ("running",) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
```
注:Gateway 的 sessions.json status 实际值主要是 `idle/running/timeout/failed`
`running` 时检查 compact 有意义(agent turn 执行中可能触发 compact)。
其他状态不需要检查。
## 五、改动范围
| 文件 | 改动 | 行数估计 |
|------|------|---------|
| `spawner.py` `spawn_full_agent()` | 重组检查顺序 | ~50 行 |
| `spawner.py` `_check_session_state()` | 不变(数据收集,不含判定) | 0 |
| `spawner.py` `_revive_session()` | 增加清理 lock 文件 | +8 行 |
| `spawner.py` `AgentBusyError` | 增加 reason/detail 字段 | +5 行 |
| `dispatcher.py` | 捕获 AgentBusyError 时区分原因(日志) | +10 行 |
| `counter.py` | 不变 | 0 |
| `ticker.py` | 不变 | 0 |
**总计 ~70 行改动,单文件核心改动 ~50 行。**
## 六、不在这个方案范围内
| 项目 | 说明 | 后续 |
|------|------|------|
| Per-agent 等待队列 | acquire 失败后排队而非 tick 重试 | 独立设计 #08 |
| Session watcher | 检测 webchat 释放后自动 dequeue | 依赖 #08 |
| counter.clear_cooldown | 正常完成后主动清除 cooldown | 可选优化 |
## 七、验证计划
1. **V1 基本功能**:创建任务 → broadcast → claim → 执行 → 完成(不变)
2. **V2 mozi 内部竞争**:同 agent 两个 pending 任务 → 第二个 AgentBusyError(reason=counter_blocked)
3. **V3 webchat 争抢**mozi 执行中 → webchat 发消息 → webchat 正常 → mozi spawn 检测到 session_locked
4. **V4 假死恢复**lock PID 死 + status=running → 自动 revive → 正常 spawn
5. **V5 compact 期间**compact 进行中 → spawn AgentBusyError(reason=session_compacting) → compact 结束后正常