From 3a9478a22ef586142c04bcff5bad8495c69c8781 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Mon, 1 Jun 2026 13:50:21 +0800 Subject: [PATCH] auto-sync: 2026-06-01 13:50:21 --- src/daemon/dispatcher.py | 21 ++++++-- src/daemon/spawner.py | 108 ++++++++++++++++++++++++++++----------- 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index a296926..d35f659 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -291,15 +291,24 @@ class Dispatcher: "status": "dispatched", "reason": decision["reason"], } - except AgentBusyError: + except AgentBusyError as e: + # #07: 区分外部占用 vs mozi 内部占用 + reason = getattr(e, 'reason', 'busy') + if reason.startswith("session_"): + log_level = logger.info + detail_msg = f"Session busy: {reason}" + else: + log_level = logger.debug + detail_msg = f"Agent busy: {reason}" + log_level("Dispatch skipped %s for task %s: %s", agent_id, task.id, detail_msg) # on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退 - self._record_routing(task, decision, "skipped", "Agent busy", _routing_db) + self._record_routing(task, decision, "skipped", detail_msg, _routing_db) return { "level": level.value, "agent_id": agent_id, "session_id": None, "status": "skipped", - "reason": "Agent busy (concurrent limit or cooling down)", + "reason": detail_msg, } except Exception as e: # on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending @@ -570,10 +579,12 @@ class Dispatcher: return {"level": level.value, "agent_id": agent_id, "session_id": session_id, "status": "dispatched", "reason": decision["reason"]} - except AgentBusyError: + except AgentBusyError as e: + reason = getattr(e, 'reason', 'busy') + detail_msg = f"Session busy: {reason}" if reason.startswith("session_") else f"Agent busy: {reason}" return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "skipped", - "reason": "Agent busy (concurrent limit or cooling down)"} + "reason": detail_msg} except Exception as e: return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index c9c3bc0..606243b 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -158,8 +158,15 @@ MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程 class AgentBusyError(Exception): - """Agent 被 counter 占用,无法 spawn""" - pass + """Agent 无法 spawn(被占用/冷却/session 锁等) + + #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 + """ + def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None): + self.agent_id = agent_id + self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck + self.detail = detail or {} + super().__init__(f"{agent_id}: {reason}") class AgentSpawner: @@ -450,9 +457,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta Raises: AgentBusyError: agent 被 counter 占用或冷却中 """ - # ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ── + # ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ── - # 1. 分配 session_id(纯计算,无 IO) + # Step 0: 分配 session_id(纯计算,无 IO) if use_main_session: session_id = None elif reuse_session_id: @@ -461,44 +468,75 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta session_id = str(uuid.uuid4()) _sid_key = session_id or "main" # counter 用的 key - # 2. session state 检查(main session 防外部占用) + # Phase 0: Pre-acquire 修复(无锁) + # timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。 + # asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json + # 写操作不会真正并行。 if use_main_session: - session_state = self._check_session_state(agent_id) - logger.info("Session state check for %s: status=%s lock_pid=%s lock_pid_alive=%s recent_compact=%s", - agent_id, session_state.get('status'), session_state.get('lock_pid'), - session_state.get('lock_pid_alive'), session_state.get('recent_compact')) - if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"): - logger.info("Spawn skipped: %s main session locked by PID %d", - agent_id, session_state.get("lock_pid")) - raise AgentBusyError(f"{agent_id}: session locked by PID {session_state.get('lock_pid')}") - if session_state.get("lock_expired"): - logger.info("Session lock expired for %s, clearing stale lock", agent_id) - if session_state.get("status") == "running": - logger.info("Spawn skipped: %s main session processing", agent_id) - raise AgentBusyError(f"{agent_id}: session processing") - # timeout/failed 状态:Gateway 会阻止新请求,需要先 reset - if session_state.get("status") in ("timeout", "failed"): - logger.info("Session %s detected for %s, attempting revive", session_state.get("status"), agent_id) + pre_state = self._check_session_state(agent_id) + if pre_state.get("status") in ("timeout", "failed"): + logger.info("Phase 0: %s status=%s, reviving before acquire", + agent_id, pre_state["status"]) + self._revive_session(agent_id) + elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"): + # status=running 但 lock PID 已死 → 假死,revive + logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id) self._revive_session(agent_id) - if session_state.get("recent_compact"): - logger.info("Spawn skipped: %s compacting", agent_id) - raise AgentBusyError(f"{agent_id}: compacting") - # 3. counter acquire(per session key 粒度) + # Phase 1: Counter acquire(互斥锁) # v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成) if self.counter and not skip_counter: - if not await self.counter.can_acquire(agent_id, _sid_key): - raise AgentBusyError(agent_id) - await self.counter.acquire(agent_id, _sid_key) + acquired = await self.counter.acquire(agent_id, _sid_key) + if not acquired: + raise AgentBusyError(agent_id, reason="counter_blocked") - # 3.5 on_checks_passed: 所有检查通过后的回调(session + counter) + # Phase 2: Session check(在锁保护下,贴近 spawn) + # 并列收集所有 block 原因,统一判定。 + if use_main_session: + session_state = self._check_session_state(agent_id) + logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s", + agent_id, session_state.get('status'), session_state.get('lock_pid'), + session_state.get('lock_pid_alive'), session_state.get('recent_compact')) + + blockers = [] + if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"): + blockers.append(("session_locked", session_state.get("lock_pid"))) + if session_state.get("status") == "running": + blockers.append(("session_running", None)) + if session_state.get("recent_compact"): + blockers.append(("session_compacting", None)) + + if blockers: + # 释放 counter,报具体原因 + if self.counter and not skip_counter: + self.counter.release(agent_id, _sid_key) + primary_reason, primary_detail = blockers[0] + logger.info("Phase 2 blocked %s: %s (all=%s)", + agent_id, primary_reason, blockers) + raise AgentBusyError(agent_id, reason=primary_reason, + detail={"blockers": blockers}) + + # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) + # 此场景应被 Phase 0 提前修复,这里做兜底 + if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"): + logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving", + agent_id) + self._revive_session(agent_id) + session_state = self._check_session_state(agent_id) + if session_state.get("status") == "running": + if self.counter and not skip_counter: + self.counter.release(agent_id, _sid_key) + raise AgentBusyError(agent_id, reason="session_stuck", + detail={"status": "running after revive"}) + + # Phase 3: on_checks_passed 回调 # 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动, # wrapped_on_complete 不会执行。需在此 try/except 中手动 release。 if on_checks_passed: try: on_checks_passed() except Exception: - if self.counter: + if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) raise @@ -1090,6 +1128,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ with open(sessions_path, "w") as f: json.dump(sessions, f, indent=2) logger.info("Revived %s: sessions.json status changed running→idle", agent_id) + # #07 O4: 同时清理残留 lock 文件 + sf = main_session.get("sessionFile", "") + if sf: + lock_path = Path(sf + ".lock") + if lock_path.exists(): + try: + lock_path.unlink() + logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name) + except Exception: + pass return True except Exception: logger.exception("Failed to revive %s", agent_id)