auto-sync: 2026-06-01 13:50:21

This commit is contained in:
cfdaily
2026-06-01 13:50:21 +08:00
parent fb8807f5a8
commit 3a9478a22e
2 changed files with 94 additions and 35 deletions
+16 -5
View File
@@ -291,15 +291,24 @@ class Dispatcher:
"status": "dispatched",
"reason": decision["reason"],
}
except AgentBusyError:
except AgentBusyError as e:
# #07: 区分外部占用 vs mozi 内部占用
reason = getattr(e, 'reason', 'busy')
if reason.startswith("session_"):
log_level = logger.info
detail_msg = f"Session busy: {reason}"
else:
log_level = logger.debug
detail_msg = f"Agent busy: {reason}"
log_level("Dispatch skipped %s for task %s: %s", agent_id, task.id, detail_msg)
# on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退
self._record_routing(task, decision, "skipped", "Agent busy", _routing_db)
self._record_routing(task, decision, "skipped", detail_msg, _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "skipped",
"reason": "Agent busy (concurrent limit or cooling down)",
"reason": detail_msg,
}
except Exception as e:
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
@@ -570,10 +579,12 @@ class Dispatcher:
return {"level": level.value, "agent_id": agent_id,
"session_id": session_id, "status": "dispatched",
"reason": decision["reason"]}
except AgentBusyError:
except AgentBusyError as e:
reason = getattr(e, 'reason', 'busy')
detail_msg = f"Session busy: {reason}" if reason.startswith("session_") else f"Agent busy: {reason}"
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "skipped",
"reason": "Agent busy (concurrent limit or cooling down)"}
"reason": detail_msg}
except Exception as e:
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
+78 -30
View File
@@ -158,8 +158,15 @@ MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程
class AgentBusyError(Exception):
"""Agent 被 counter 占用,无法 spawn"""
pass
"""Agent 无法 spawn(被占用/冷却/session 锁等)
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
"""
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
self.agent_id = agent_id
self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
class AgentSpawner:
@@ -450,9 +457,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
Raises:
AgentBusyError: agent 被 counter 占用或冷却中
"""
# ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ──
# ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ──
# 1. 分配 session_id(纯计算,无 IO)
# Step 0: 分配 session_id纯计算,无 IO
if use_main_session:
session_id = None
elif reuse_session_id:
@@ -461,44 +468,75 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
session_id = str(uuid.uuid4())
_sid_key = session_id or "main" # counter 用的 key
# 2. session state 检查(main session 防外部占用)
# Phase 0: Pre-acquire 修复(无锁)
# timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。
# asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json
# 写操作不会真正并行。
if use_main_session:
session_state = self._check_session_state(agent_id)
logger.info("Session state check for %s: status=%s lock_pid=%s lock_pid_alive=%s recent_compact=%s",
agent_id, session_state.get('status'), session_state.get('lock_pid'),
session_state.get('lock_pid_alive'), session_state.get('recent_compact'))
if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"):
logger.info("Spawn skipped: %s main session locked by PID %d",
agent_id, session_state.get("lock_pid"))
raise AgentBusyError(f"{agent_id}: session locked by PID {session_state.get('lock_pid')}")
if session_state.get("lock_expired"):
logger.info("Session lock expired for %s, clearing stale lock", agent_id)
if session_state.get("status") == "running":
logger.info("Spawn skipped: %s main session processing", agent_id)
raise AgentBusyError(f"{agent_id}: session processing")
# timeout/failed 状态:Gateway 会阻止新请求,需要先 reset
if session_state.get("status") in ("timeout", "failed"):
logger.info("Session %s detected for %s, attempting revive", session_state.get("status"), agent_id)
pre_state = self._check_session_state(agent_id)
if pre_state.get("status") in ("timeout", "failed"):
logger.info("Phase 0: %s status=%s, reviving before acquire",
agent_id, pre_state["status"])
self._revive_session(agent_id)
elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id)
self._revive_session(agent_id)
if session_state.get("recent_compact"):
logger.info("Spawn skipped: %s compacting", agent_id)
raise AgentBusyError(f"{agent_id}: compacting")
# 3. counter acquire(per session key 粒度)
# Phase 1: Counter acquire(互斥锁)
# v2.8.1 Bug-4 fix: retry 时跳过 countercounter 从原始 spawn 保持到 retry 完成)
if self.counter and not skip_counter:
if not await self.counter.can_acquire(agent_id, _sid_key):
raise AgentBusyError(agent_id)
await self.counter.acquire(agent_id, _sid_key)
acquired = await self.counter.acquire(agent_id, _sid_key)
if not acquired:
raise AgentBusyError(agent_id, reason="counter_blocked")
# 3.5 on_checks_passed: 所有检查通过后的回调(session + counter)
# Phase 2: Session check(在锁保护下,贴近 spawn)
# 并列收集所有 block 原因,统一判定。
if use_main_session:
session_state = self._check_session_state(agent_id)
logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s",
agent_id, session_state.get('status'), session_state.get('lock_pid'),
session_state.get('lock_pid_alive'), session_state.get('recent_compact'))
blockers = []
if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"):
blockers.append(("session_locked", session_state.get("lock_pid")))
if session_state.get("status") == "running":
blockers.append(("session_running", None))
if session_state.get("recent_compact"):
blockers.append(("session_compacting", None))
if blockers:
# 释放 counter,报具体原因
if self.counter and not skip_counter:
self.counter.release(agent_id, _sid_key)
primary_reason, primary_detail = blockers[0]
logger.info("Phase 2 blocked %s: %s (all=%s)",
agent_id, primary_reason, blockers)
raise AgentBusyError(agent_id, reason=primary_reason,
detail={"blockers": blockers})
# Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检)
# 此场景应被 Phase 0 提前修复,这里做兜底
if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"):
logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving",
agent_id)
self._revive_session(agent_id)
session_state = self._check_session_state(agent_id)
if session_state.get("status") == "running":
if self.counter and not skip_counter:
self.counter.release(agent_id, _sid_key)
raise AgentBusyError(agent_id, reason="session_stuck",
detail={"status": "running after revive"})
# Phase 3: on_checks_passed 回调
# 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动,
# wrapped_on_complete 不会执行。需在此 try/except 中手动 release。
if on_checks_passed:
try:
on_checks_passed()
except Exception:
if self.counter:
if self.counter and not skip_counter:
self.counter.release(agent_id, _sid_key)
raise
@@ -1090,6 +1128,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
with open(sessions_path, "w") as f:
json.dump(sessions, f, indent=2)
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
# #07 O4: 同时清理残留 lock 文件
sf = main_session.get("sessionFile", "")
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name)
except Exception:
pass
return True
except Exception:
logger.exception("Failed to revive %s", agent_id)