From cab21dcfd8fc148e493d8cf6745ed8478b5b3bc1 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 08:21:21 +0800 Subject: [PATCH] auto-sync: 2026-05-26 08:21:21 --- src/daemon/spawner.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 3336d48..0d7e9e6 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -339,14 +339,27 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta ) -> str: """Spawn Full Agent(异步非阻塞) + v2.7.2: counter acquire/release 在内部统一管理。 + 调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。 + Args: - on_complete: async callback(agent_id, outcome) — Agent 完成后调用 + on_complete: 业务回调(agent_id, outcome) — 不含 counter.release, + counter.release 由内部 wrapped_on_complete 保证。 use_main_session: True = 投递到主 Agent session(不传 --session-id) reuse_session_id: 传入指定 session-id 复用(用于续杯) Returns: session_id + + Raises: + AgentBusyError: agent 被 counter 占用或冷却中 """ + # ── v2.7.2: counter 检查 + acquire ── + if self.counter: + if not await self.counter.can_acquire(agent_id): + raise AgentBusyError(agent_id) + await self.counter.acquire(agent_id) + # Session 策略:main > reuse > new if use_main_session: session_id = None @@ -360,6 +373,20 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta self._register_session(session_id or "main", agent_id, task_id, pid=None) return session_id or "main" + # ── v2.7.2: wrapped_on_complete 保证 counter release ── + async def _wrapped_on_complete(aid, outcome): + try: + if self.counter: + self.counter.release(aid) + finally: + if on_complete: + try: + result = on_complete(aid, outcome) + if asyncio.iscoroutine(result): + await result + except Exception: + logger.warning("Business on_complete failed for %s", aid, exc_info=True) + cmd = [ "openclaw", "agent", "--agent", agent_id, @@ -382,16 +409,19 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) - # Schedule monitor + # Schedule monitor(传 wrapped_on_complete) asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, - on_complete=on_complete, + on_complete=_wrapped_on_complete, db_path=task_db_path or self.db_path) ) return session_id except Exception as e: + # spawn 失败也要 release counter + if self.counter: + self.counter.release(agent_id) logger.exception("Failed to spawn agent %s", agent_id) self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) raise