From 06978696265f11128059906579e3812da7485262 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 08:23:16 +0800 Subject: [PATCH] auto-sync: 2026-05-26 08:23:16 --- src/daemon/spawner.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index cc7b894..a48882f 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -710,8 +710,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): - """续杯:用同一 session_id 再 spawn 一次""" - # Bug-6: 续杯前检查任务状态,已终态则跳过 + """续杯:release counter 后通过 spawn_full_agent 重新 spawn + + v2.7.2: on_complete 是 wrapped_on_complete(含 counter release)。 + 调用 spawn_full_agent 时 counter 已 release,内部会 can_acquire + acquire。 + """ + # 续杯前检查任务状态,已终态则跳过 if db_path and task_id: try: conn = get_connection(db_path) @@ -722,7 +726,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"): logger.info("Retry skip: task %s already %s (agent=%s)", task_id, row["status"], agent_id) - # counter 仍然占用,通过 on_complete release + # on_complete = wrapped_on_complete,会 release counter await self._do_on_complete_async(on_complete, agent_id, "task_already_done") return finally: @@ -730,8 +734,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: logger.warning("Retry status check failed for %s, proceeding", task_id) - # 直接读写 tasks 表的 retry_count(广播场景下所有 Agent 共享同一 tasks 记录) - # task_attempts metadata 的 retry_count 不可靠(多 Agent 互相覆盖) + # 直接读写 tasks 表的 retry_count if retry_field == "retry_count" and db_path and task_id: try: conn = get_connection(db_path) @@ -752,7 +755,6 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.exception("Failed to update retry_count for task %s", task_id) count = 1 else: - # 非 retry_count 的计数器(connect/api/lock)仍用 task_attempts metadata retry_counts = self._get_retry_counts(db_path, task_id) count = retry_counts.get(retry_field, 0) + 1 retry_counts[retry_field] = count @@ -776,7 +778,6 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ is_mail = project_id == "_mail" if is_mail: - # Mail 续杯:精简模板,不含状态转换指令 must_haves = task_info.get("must_haves", "{}") try: meta = json.loads(must_haves) if must_haves else {} @@ -789,7 +790,6 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ max_retries=self.max_retries, ) else: - # Task 续杯:标准模板 fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" message = self.RETRY_PROMPT.format( project_id=project_id, @@ -803,8 +803,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ fallback_hint=fallback_hint, ) - # 续杯 spawn(counter 保持占用,直到 max_retries 或最终完成时 release) - # session 策略:原始是 main session (session_id=None) 时,retry 也用 main session + # v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire) + # on_complete = wrapped_on_complete(含 counter release),作为业务回调传入 try: await self.spawn_full_agent( agent_id=agent_id, @@ -815,6 +815,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ reuse_session_id=session_id if session_id else None, task_db_path=db_path, ) + except AgentBusyError: + # agent 被其他任务占用(不应发生,但防御) + logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id) + await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy") except Exception: logger.exception("Retry spawn failed for %s", agent_id) await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed")