From b0bd78dc3435ea390bcc6d93d9093c2ef20680fe Mon Sep 17 00:00:00 2001 From: cfdaily Date: Fri, 29 May 2026 22:21:52 +0800 Subject: [PATCH] auto-sync: 2026-05-29 22:21:52 --- src/daemon/spawner.py | 277 ++++++++++++++++++++++-------------------- 1 file changed, 145 insertions(+), 132 deletions(-) diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index e082764..11622ae 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1,7 +1,7 @@ -"""Agent Spawner — 异步 spawn Full Agent / Subagent +"""Agent Spawner - 异步 spawn Full Agent / Subagent -Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) -Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) +Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) +Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) """ from __future__ import annotations @@ -21,15 +21,15 @@ logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── -# Mail 专用模板:inform 类型(纯通知,状态由系统管理) -MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知,不需要回复)。 +# Mail 专用模板:inform 类型(纯通知,状态由系统管理) +MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。 发件者: {from_agent} 主题: {title} 内容: {text} -已阅即可,无需操作。 -⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 +已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。 +⚠️ 不要执行任何状态转换命令。 """ # Mail 专用模板:request 类型(需要处理并回复,状态由系统管理) @@ -39,10 +39,23 @@ MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回 主题: {title} 内容: {text} -请处理后回复发件者: -curl -s -X POST http://localhost:8083/api/mail \\\n -H 'Content-Type: application/json' \\\n -d '{{"from": "{agent_id}", "to": "{from_agent}", "title": "回复: {title}", "text": "你的回复内容", "type": "inform", "in_reply_to": "{task_id}"}}' +### 如何回复发件者 -⚠️ 将"你的回复内容"替换为实际回复。type 必须用 inform 防止循环。 +curl -s -X POST http://localhost:8083/api/mail \\ + -H 'Content-Type: application/json' \\ + -d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}' + +⚠️ 不需要填 "to",系统自动回复给发件者。 + +### 如何给其他人发新邮件 + +curl -s -X POST http://localhost:8083/api/mail \\ + -H 'Content-Type: application/json' \\ + -d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}' + +⚠️ to 必须是有效的 agent id: {valid_agents} +⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request) +⚠️ 不能给自己发邮件 ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 """ @@ -60,12 +73,12 @@ SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照 {retry_context} -## 状态机(你必须遵守的状态流转) +## 状态机(你必须遵守的状态流转) ``` pending → claimed → working → review → done │ │ - │ └→ pending(驳回重做) + │ └→ pending(驳回重做) ├──→ failed ├──→ blocked └──→ cancelled @@ -77,7 +90,7 @@ pending → claimed → working → review → done ### 步骤 1: 开始工作 -立即调 API 标记你已开始: +立即调 API 标记你已开始: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ @@ -86,11 +99,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ### 步骤 2: 执行任务 -根据任务描述完成你的工作(编码/回测/数据检查/审查等)。 +根据任务描述完成你的工作(编码/回测/数据检查/审查等)。 ### 步骤 3: 写入产出 -⚠️ 这一步是必须的!不写产出 = 任务没完成。 +⚠️ 这一步是必须的!不写产出 = 任务没完成。 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ @@ -100,7 +113,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ **type 必须是以下之一**: code, document, data, config, other -如果产出太长,可以写文件后用路径引用: +如果产出太长,可以写文件后用路径引用: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ @@ -109,27 +122,27 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ### 步骤 4: 提交完成或标记失败 -✅ 成功完成: +✅ 成功完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "{completion_status}", "agent": "{agent_id}"}}' ``` -❌ 无法完成: +❌ 无法完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` -## Fallback(API 调用失败时) +## Fallback(API 调用失败时) -如果 API 失败 2 次,尝试: +如果 API 失败 2 次,尝试: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ - -d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}' + -d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}' ``` ## 参考链接 @@ -151,28 +164,28 @@ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一 ## 黑板 API -你可以随时: -- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs) -- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments +你可以随时: +- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs) +- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments body: {{"author": "{agent_id}", "body": "内容", "mentions": ["agent_id"]}} -- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks +- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}} -- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim +- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim ## 行为准则 -1. **你是自主的。**读黑板、思考、行动,不要等指令。 -2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。 -3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。 -4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。 -5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。 -6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。 +1. **你是自主的。**读黑板、思考、行动,不要等指令。 +2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。 +3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。 +4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。 +5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。 +6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。 ## 讨论完成后 -- 如果讨论收敛到可执行的任务,直接创建 sub task -- 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决 -- 标记完成: +- 如果讨论收敛到可执行的任务,直接创建 sub task +- 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决 +- 标记完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ @@ -181,22 +194,22 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ """ -# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done) +# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done) MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 发件者: {from_agent} 主题: {title} -续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) +续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) -请检查 session 历史中你之前做了什么,然后继续未完成的工作。 +请检查 session 历史中你之前做了什么,然后继续未完成的工作。 -⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 -⚠️ 如果任务已完成,直接写产出即可,不要调 status API。 +⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 +⚠️ 如果任务已完成,直接写产出即可,不要调 status API。 """ class AgentBusyError(Exception): - """Agent 被 counter 占用,无法 spawn""" + """Agent 被 counter 占用,无法 spawn""" pass @@ -218,11 +231,11 @@ class AgentSpawner: ): """ Args: - db_path: 项目黑板 DB 路径(用于写 task_attempts) + db_path: 项目黑板 DB 路径(用于写 task_attempts) agent_timeout: Agent 超时秒数 - dry_run: 测试模式,不实际 spawn - api_host: API 地址(供 Agent 回写) - api_port: API 端口(供 Agent 回写) + dry_run: 测试模式,不实际 spawn + api_host: API 地址(供 Agent 回写) + api_port: API 端口(供 Agent 回写) """ self.db_path = db_path self.agent_timeout = agent_timeout @@ -233,7 +246,7 @@ class AgentSpawner: self.gateway_timeout = gateway_timeout self.max_retries = max_retries self.max_monitor_timeouts = max_monitor_timeouts - # v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release) + # v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release) self.counter = counter # session 注册表 {session_id: {...}} @@ -265,16 +278,16 @@ class AgentSpawner: project_config: Optional[Dict[str, Any]] = None, spawn_type: str = "executor", # executor | discussion | review ) -> str: - """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) + """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) Args: - current_status: 任务当前状态(动态生成状态机提示) - retry_context: 重试上下文(前轮产出摘要 + 审查意见) - task: Task 对象(BootstrapBuilder 用) - project_config: 项目配置(BootstrapBuilder 用) - spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查) + current_status: 任务当前状态(动态生成状态机提示) + retry_context: 重试上下文(前轮产出摘要 + 审查意见) + task: Task 对象(BootstrapBuilder 用) + project_config: 项目配置(BootstrapBuilder 用) + spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查) """ - # discussion 类型直接用模板(不走 BootstrapBuilder) + # discussion 类型直接用模板(不走 BootstrapBuilder) if spawn_type == "discussion": return self._build_discussion_prompt( task_id, title, description, must_haves, @@ -288,7 +301,7 @@ class AgentSpawner: role="executor", project_config=project_config, ) - # mail 任务用精简模板,不走 BootstrapBuilder + # mail 任务用精简模板,不走 BootstrapBuilder if project_id == "_mail": return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) api_section = self._build_api_section( @@ -302,16 +315,16 @@ class AgentSpawner: return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) # Fallback: 使用硬编码模板 - # mail 任务直接 done,不走 review + # mail 任务直接 done,不走 review completion_status = "done" if project_id == "_mail" else "review" return SPAWN_PROMPT_TEMPLATE.format( project_id=project_id, task_id=task_id, title=title, - description=description or "(无描述)", + description=description or "(无描述)", task_type=task_type or "general", priority=priority, - must_haves=must_haves or "(无)", + must_haves=must_haves or "(无)", agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, @@ -322,13 +335,13 @@ class AgentSpawner: def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: - """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" - # mail 任务直接 done,不走 review + """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" + # mail 任务直接 done,不走 review success_status = '"done"' if project_id == "_mail" else '"review"' return f"""## 操作指令 ### 状态回写 -开始工作: +开始工作: ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ @@ -343,15 +356,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta ``` ### 完成后 -成功:status → {success_status} | 失败:status → "failed" +成功:status → {success_status} | 失败:status → "failed" """ def _build_discussion_prompt(self, task_id: str, title: str, description: str, must_haves: str, project_id: str, agent_id: str) -> str: - """构建讨论类 spawn prompt(§3.3 框架 + Boids)""" + """构建讨论类 spawn prompt(§3.3 框架 + Boids)""" goal_snapshot = description or title - constraints = must_haves or "(无特殊约束)" + constraints = must_haves or "(无特殊约束)" return DISCUSSION_PROMPT_TEMPLATE.format( goal_snapshot=goal_snapshot, @@ -407,17 +420,17 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta reuse_session_id: Optional[str] = None, on_checks_passed: Optional[Any] = None, ) -> str: - """Spawn Full Agent(异步非阻塞) + """Spawn Full Agent(异步非阻塞) v2.7.2: counter acquire/release 在内部统一管理。 - 调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。 + 调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。 Args: - on_complete: 业务回调(agent_id, outcome) — 不含 counter.release, + on_complete: 业务回调(agent_id, outcome) - 不含 counter.release, counter.release 由内部 wrapped_on_complete 保证。 - use_main_session: True = 投递到主 Agent session(不传 --session-id) - on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前) - reuse_session_id: 传入指定 session-id 复用(用于续杯) + use_main_session: True = 投递到主 Agent session(不传 --session-id) + on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前) + reuse_session_id: 传入指定 session-id 复用(用于续杯) Returns: session_id @@ -425,9 +438,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta Raises: AgentBusyError: agent 被 counter 占用或冷却中 """ - # ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ── + # ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ── - # 1. 分配 session_id(纯计算,无 IO) + # 1. 分配 session_id(纯计算,无 IO) if use_main_session: session_id = None elif reuse_session_id: @@ -436,7 +449,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta session_id = str(uuid.uuid4()) _sid_key = session_id or "main" # counter 用的 key - # 2. session state 检查(main session 防外部占用) + # 2. session state 检查(main session 防外部占用) if use_main_session: session_state = self._check_session_state(agent_id) if session_state.get("lock_pid_alive"): @@ -450,14 +463,14 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta logger.info("Spawn skipped: %s compacting", agent_id) raise AgentBusyError(f"{agent_id}: compacting") - # 3. counter acquire(per session key 粒度) + # 3. counter acquire(per session key 粒度) if self.counter: if not await self.counter.can_acquire(agent_id, _sid_key): raise AgentBusyError(agent_id) await self.counter.acquire(agent_id, _sid_key) - # 3.5 on_checks_passed: 所有检查通过后的回调(session + counter) - # 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动, + # 3.5 on_checks_passed: 所有检查通过后的回调(session + counter) + # 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动, # wrapped_on_complete 不会执行。需在此 try/except 中手动 release。 if on_checks_passed: try: @@ -472,7 +485,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta self._register_session(_sid_key, agent_id, task_id, pid=None) return _sid_key - # 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key) + # 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key) async def _wrapped_on_complete(aid, outcome): try: if self.counter: @@ -508,7 +521,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) - # Schedule monitor(传 wrapped_on_complete) + # Schedule monitor(传 wrapped_on_complete) asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, on_complete=_wrapped_on_complete, @@ -530,7 +543,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta task_description: str, task_id: Optional[str] = None, ) -> str: - """Spawn Subagent(占位,实际通过 Gateway API) + """Spawn Subagent(占位,实际通过 Gateway API) Returns: session_id @@ -556,9 +569,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} -- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) +- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) -请检查 session 历史中你之前做了什么,然后继续未完成的工作。 +请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ## 操作指令 @@ -567,21 +580,21 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all ``` -### 如果已经完成,标记 review +### 如果已经完成,标记 review ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` -### 写入产出(如果之前没写) +### 写入产出(如果之前没写) ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\ -H 'Content-Type: application/json' \\ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` -### 如果无法解决,标记失败 +### 如果无法解决,标记失败 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ @@ -600,7 +613,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ db_path: Optional[Path] = None, monitor_timeout_count: int = 0, ) -> None: - """监控子进程全生命周期(设计文档 spawner-monitor-design.md)""" + """监控子进程全生命周期(设计文档 spawner-monitor-design.md)""" stdout_chunks: list = [] stderr_chunks: list = [] @@ -624,7 +637,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ await asyncio.gather(_read_out(), _read_err(), proc.wait()) await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout) - # ── 情况 A:进程退出 ── + # ── 情况 A:进程退出 ── exit_code = proc.returncode await self._handle_exit( session_id, agent_id, task_id, exit_code, @@ -632,7 +645,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ) except asyncio.TimeoutError: - # ── 情况 B:monitor timeout(进程没退出)── + # ── 情况 B:monitor timeout(进程没退出)── logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count + 1, self.max_monitor_timeouts) @@ -643,11 +656,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ async def _handle_exit(self, session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path): - """情况 A:进程退出后的处理 + """情况 A:进程退出后的处理 - v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。 - 只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。 - A9(api_error/429)额外推回 pending + 设冷却。 + v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。 + 只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。 + A9(api_error/429)额外推回 pending + 设冷却。 """ stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace") stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") @@ -690,13 +703,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ agent_id, session_id, outcome, exit_code, task_status) if cls["should_retry"]: - # A2/A3: gateway_timeout → 续杯(on_complete 会 release counter) + # A2/A3: gateway_timeout → 续杯(on_complete 会 release counter) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, cls.get("retry_field", "retry_count") ) elif outcome == "api_error": - # A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却 + # A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却 await self._do_on_complete_async(on_complete, agent_id, outcome) if self.counter: self.counter.set_cooldown(agent_id) @@ -706,7 +719,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ }) logger.info("Task %s pushed back to pending (api_error)", task_id) elif outcome == "fallback_timeout" and not cls["should_retry"]: - # A5/A6: fallback 不应出现,标 failed + escalate + context 日志 + # A5/A6: fallback 不应出现,标 failed + escalate + context 日志 logger.error("UNEXPECTED FALLBACK: agent=%s session=%s task=%s " "fallback_used=%s fallback_reason=%s counter_active=%s " "This indicates counter check failed to prevent concurrent spawn.", @@ -721,17 +734,17 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ "fallback_reason": json_result.get("fallback_reason"), }) else: - # 其他:A1(completed), A4(agent_failed), A7(auth_failed), + # 其他:A1(completed), A4(agent_failed), A7(auth_failed), # A8(gateway_unreachable), A11(lock_conflict), # A10(compact_failed), A12(agent_error) # 进程退出 → on_complete release counter - # 任务状态由各 outcome 自行处理(或等 ticker) + # 任务状态由各 outcome 自行处理(或等 ticker) await self._do_on_complete_async(on_complete, agent_id, outcome) async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count): - """情况 B:monitor timeout""" + """情况 B:monitor timeout""" # 读已缓冲的 stderr try: remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0) @@ -745,14 +758,14 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # 检查 session 状态 state = self._check_session_state(agent_id) - # B1: 假死 — 先复活,连续假死 ≥2 次再 failed + # B1: 假死 - 先复活,连续假死 ≥2 次再 failed if state.get("status") == "running" and not state.get("lock_pid_alive", True): # 假死计数 stuck_count = self._stuck_counts.get(task_id, 0) + 1 self._stuck_counts[task_id] = stuck_count if stuck_count >= 2: - # 连续假死 ≥2 次,标 failed + # 连续假死 ≥2 次,标 failed logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)", agent_id, stuck_count, session_id) self._mark_task(db_path, task_id, "failed", @@ -780,12 +793,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return # B2/B3/B4: 进程还活着 - # B2: compact 进行中 — 不计入 monitor timeout 计数,继续等 + # B2: compact 进行中 - 不计入 monitor timeout 计数,继续等 if state.get("recent_compact"): logger.info("Agent %s recent compaction detected, extending patience " "(session=%s, monitor=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) - # 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次 + # 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次 # 用独立计数器防止无限等待 compact_wait_count = self._compact_waits.get(task_id, 0) + 1 self._compact_waits[task_id] = compact_wait_count @@ -809,7 +822,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ ) return - # B3/B4: 无 compact,正常计数 + # B3/B4: 无 compact,正常计数 monitor_timeout_count += 1 if monitor_timeout_count >= self.max_monitor_timeouts: logger.error("Agent %s max monitor timeouts (session=%s, count=%d)", @@ -823,7 +836,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts") return - # 未超限:继续等(不 release counter) + # 未超限:继续等(不 release counter) logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) asyncio.create_task( @@ -836,19 +849,19 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): - """续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn + """续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn - v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。 - 需要手动 release counter,然后 spawn_full_agent 内部会 acquire。 - on_complete(含 counter release)置为 None,避免 double release。 + v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。 + 需要手动 release counter,然后 spawn_full_agent 内部会 acquire。 + on_complete(含 counter release)置为 None,避免 double release。 """ - # ── 关键:手动 release counter(进程退出 = agent 空闲)── + # ── 关键:手动 release counter(进程退出 = agent 空闲)── if self.counter: self.counter.release(agent_id, session_id or "main") - # 旧 wrapped_on_complete 含 counter.release,不再使用,防止 double release + # 旧 wrapped_on_complete 含 counter.release,不再使用,防止 double release on_complete = None - # 续杯前检查任务状态,已终态则跳过 + # 续杯前检查任务状态,已终态则跳过 if db_path and task_id: try: conn = get_connection(db_path) @@ -859,7 +872,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"): logger.info("Retry skip: task %s already %s (agent=%s)", task_id, row["status"], agent_id) - # on_complete = wrapped_on_complete,会 release counter + # on_complete = wrapped_on_complete,会 release counter await self._do_on_complete_async(on_complete, agent_id, "task_already_done") return finally: @@ -905,7 +918,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.info("Agent %s retry %s=%d/%d (session=%s)", agent_id, retry_field, count, self.max_retries, session_id) - # 构建续杯 message(Mail 用专用模板,Task 用标准模板) + # 构建续杯 message(Mail 用专用模板,Task 用标准模板) task_info = self._get_task_info(db_path, task_id) or {} project_id = task_info.get("project_id", "") is_mail = project_id == "_mail" @@ -923,7 +936,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ max_retries=self.max_retries, ) else: - fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" + fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" message = self.RETRY_PROMPT.format( project_id=project_id, task_id=task_id or "", @@ -936,8 +949,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ fallback_hint=fallback_hint, ) - # v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire) - # on_complete = wrapped_on_complete(含 counter release),作为业务回调传入 + # v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire) + # on_complete = wrapped_on_complete(含 counter release),作为业务回调传入 try: await self.spawn_full_agent( agent_id=agent_id, @@ -949,7 +962,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ task_db_path=db_path, ) except AgentBusyError: - # agent 被其他任务占用(不应发生,但防御) + # agent 被其他任务占用(不应发生,但防御) logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id) await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy") except Exception: @@ -962,8 +975,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ def _parse_stdout_json(stdout_text: str) -> dict: """解析 openclaw agent --json 的 stdout 输出 - 返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads - 不再提取 meta,直接用顶层字段。 + 返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads + 不再提取 meta,直接用顶层字段。 """ text = stdout_text.strip() if not text: @@ -971,7 +984,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ try: data = json.loads(text) except json.JSONDecodeError: - # 多行输出,找最后一个 JSON + # 多行输出,找最后一个 JSON for line in reversed(text.splitlines()): try: data = json.loads(line) @@ -1035,7 +1048,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ @staticmethod def _revive_session(agent_id: str) -> bool: - """假死复活术:修改 sessions.json status 从 running 改为 idle""" + """假死复活术:修改 sessions.json status 从 running 改为 idle""" sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return False @@ -1045,7 +1058,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) if main_session.get("status") != "running": - return False # 不是 running 状态,不需要复活 + return False # 不是 running 状态,不需要复活 main_session["status"] = "idle" sessions[main_key] = main_session with open(sessions_path, "w") as f: @@ -1103,16 +1116,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ @staticmethod def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str, task_status: Optional[str], stdout_text: str = "") -> dict: - """分类退出原因,返回处理策略 + """分类退出原因,返回处理策略 - v3.0: 基于 JSON status/summary/executionTrace 判定,不再依赖 transport 字段。 - 只有 status="timeout" 触发 retry,其他都不 retry。 + v3.0: 基于 JSON status/summary/executionTrace 判定,不再依赖 transport 字段。 + 只有 status="timeout" 触发 retry,其他都不 retry。 """ status = json_result.get("status") summary = json_result.get("summary", "") fallback_used = json_result.get("fallback_used", False) - # A4: 任务 DB status=failed(Agent 自己标的) + # A4: 任务 DB status=failed(Agent 自己标的) if task_status == "failed": return {"outcome": "agent_failed", "should_retry": False} @@ -1130,12 +1143,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ "retry_field": "retry_count"} # A0: stdout 为空且 exit≠0 = 进程异常终止 - # 注意:exit=0 + stdout 为空可能是正常完成(--json 没输出), + # 注意:exit=0 + stdout 为空可能是正常完成(--json 没输出), # 此时 task_status 如果是 done/review 会被上面的 A4 兜住 if status is None and not stdout_text.strip() and exit_code != 0: return {"outcome": "process_crash", "should_retry": False} - # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 + # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 # 查任务状态判断 if status is None and not stdout_text.strip() and exit_code == 0: terminal_statuses = {"done", "review"} @@ -1143,7 +1156,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return {"outcome": "completed", "should_retry": False} return {"outcome": "agent_error", "should_retry": False} - # A7-A12: status=error → 不续杯,stderr 辅助分类 + # A7-A12: status=error → 不续杯,stderr 辅助分类 if status == "error": stderr_lower = stderr_text.lower() if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]): @@ -1158,7 +1171,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return {"outcome": "lock_conflict", "should_retry": False} return {"outcome": "agent_error", "should_retry": False} - # 兜底:status 未知值 + # 兜底:status 未知值 return {"outcome": "unknown_status", "should_retry": False} @staticmethod @@ -1216,7 +1229,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ def _mark_task(self, db_path: Optional[Path], task_id: Optional[str], status: str, detail: Optional[dict] = None): - """标记任务状态(用于 failed/escalate)""" + """标记任务状态(用于 failed/escalate)""" if not db_path or not task_id: return try: @@ -1240,13 +1253,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ @staticmethod def _do_on_complete(on_complete, agent_id, outcome): - """执行 on_complete 回调(同步+异步兼容)""" + """执行 on_complete 回调(同步+异步兼容)""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): - # 注意:这里是同步调用的,不能 await + # 注意:这里是同步调用的,不能 await # 在 _monitor_process 的 async 上下文中应该用 await pass except Exception: @@ -1332,7 +1345,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return self._sessions.get(session_id) def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: - """v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)""" + """v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)""" for sid, info in self._sessions.items(): if info.get("agent_id") == agent_id and info.get("status") == "running": return info