auto-sync: 2026-05-29 22:21:52

This commit is contained in:
cfdaily
2026-05-29 22:21:52 +08:00
parent ab9accca99
commit b0bd78dc34
+145 -132
View File
@@ -1,7 +1,7 @@
"""Agent Spawner 异步 spawn Full Agent / Subagent
"""Agent Spawner - 异步 spawn Full Agent / Subagent
Full Agent: asyncio.create_subprocess_exec异步非阻塞不 await 完成
Subagent: 占位实际通过 OpenClaw Gateway API sessions_spawnF17 完善
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善)
"""
from __future__ import annotations
@@ -21,15 +21,15 @@ logger = logging.getLogger("moziplus-v2.spawner")
# ── Prompt 模板 ──
# Mail 专用模板inform 类型纯通知状态由系统管理
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知,不需要回复)。
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。
发件者: {from_agent}
主题: {title}
内容: {text}
已阅即可,无需操作
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理
已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)
⚠️ 不要执行任何状态转换命令。
"""
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
@@ -39,10 +39,23 @@ MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回
主题: {title}
内容: {text}
请处理后回复发件者
curl -s -X POST http://localhost:8083/api/mail \\\n -H 'Content-Type: application/json' \\\n -d '{{"from": "{agent_id}", "to": "{from_agent}", "title": "回复: {title}", "text": "你的回复内容", "type": "inform", "in_reply_to": "{task_id}"}}'
### 如何回复发件者
⚠️ 将"你的回复内容"替换为实际回复。type 必须用 inform 防止循环。
curl -s -X POST http://localhost:8083/api/mail \\
-H 'Content-Type: application/json' \\
-d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}'
⚠️ 不需要填 "to",系统自动回复给发件者。
### 如何给其他人发新邮件
curl -s -X POST http://localhost:8083/api/mail \\
-H 'Content-Type: application/json' \\
-d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}'
⚠️ to 必须是有效的 agent id: {valid_agents}
⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request
⚠️ 不能给自己发邮件
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
@@ -60,12 +73,12 @@ SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照
{retry_context}
## 状态机你必须遵守的状态流转
## 状态机(你必须遵守的状态流转)
```
pending → claimed → working → review → done
│ │
│ └→ pending驳回重做
│ └→ pending(驳回重做)
├──→ failed
├──→ blocked
└──→ cancelled
@@ -77,7 +90,7 @@ pending → claimed → working → review → done
### 步骤 1: 开始工作
立即调 API 标记你已开始
立即调 API 标记你已开始:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
@@ -86,11 +99,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
### 步骤 2: 执行任务
根据任务描述完成你的工作编码/回测/数据检查/审查等
根据任务描述完成你的工作(编码/回测/数据检查/审查等)
### 步骤 3: 写入产出
⚠️ 这一步是必须的不写产出 = 任务没完成。
⚠️ 这一步是必须的!不写产出 = 任务没完成。
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
@@ -100,7 +113,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
**type 必须是以下之一**: code, document, data, config, other
如果产出太长可以写文件后用路径引用
如果产出太长,可以写文件后用路径引用:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
@@ -109,27 +122,27 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
### 步骤 4: 提交完成或标记失败
✅ 成功完成
✅ 成功完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "{completion_status}", "agent": "{agent_id}"}}'
```
❌ 无法完成
❌ 无法完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
## FallbackAPI 调用失败时
## Fallback(API 调用失败时)
如果 API 失败 2 次尝试
如果 API 失败 2 次,尝试:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败产出在本地文件"}}'
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}'
```
## 参考链接
@@ -151,28 +164,28 @@ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一
## 黑板 API
你可以随时
- 读黑板GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all含 comments、outputs
- 写 commentPOST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments
你可以随时:
- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs)
- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments
body: {{"author": "{agent_id}", "body": "内容", "mentions": ["agent_id"]}}
- 创建 sub taskPOST http://{api_host}:{api_port}/api/projects/{project_id}/tasks
- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks
body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}}
- 认领任务POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim
- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim
## 行为准则
1. **你是自主的。**读黑板、思考、行动不要等指令。
2. **不重复别人的工作。**动手前先读黑板看谁在做什么Separation
3. **保持方向对齐。**你的产出方向和 parent goal 对齐不确定时 @pangtong-fujunshiAlignment
4. **产出可共享。**产出写入黑板让其他人能看到你的成果Cohesion
5. **不越界。**安全红线不要碰超出能力的 @ 庞统升级Boundary
6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent讨论是灵活的不是固定阶段的。
1. **你是自主的。**读黑板、思考、行动,不要等指令。
2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)
3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)
4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)
5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)
6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。
## 讨论完成后
- 如果讨论收敛到可执行的任务直接创建 sub task
- 如果有分歧或不确定在黑板上写 comment @ 庞统裁决
- 标记完成
- 如果讨论收敛到可执行的任务,直接创建 sub task
- 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决
- 标记完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
@@ -181,22 +194,22 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"""
# Mail 续杯专用模板不包含状态转换指令系统自动标 done
# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done)
MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
发件者: {from_agent}
主题: {title}
续杯次数: 第 {retry_count}上限 {max_retries}
续杯次数: 第 {retry_count}(上限 {max_retries})
请检查 session 历史中你之前做了什么然后继续未完成的工作。
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
⚠️ 不要执行任何状态转换命令标 working/done/review/failed 等),系统会自动处理。
⚠️ 如果任务已完成直接写产出即可不要调 status API。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
⚠️ 如果任务已完成,直接写产出即可,不要调 status API。
"""
class AgentBusyError(Exception):
"""Agent 被 counter 占用无法 spawn"""
"""Agent 被 counter 占用,无法 spawn"""
pass
@@ -218,11 +231,11 @@ class AgentSpawner:
):
"""
Args:
db_path: 项目黑板 DB 路径用于写 task_attempts
db_path: 项目黑板 DB 路径(用于写 task_attempts)
agent_timeout: Agent 超时秒数
dry_run: 测试模式不实际 spawn
api_host: API 地址供 Agent 回写
api_port: API 端口供 Agent 回写
dry_run: 测试模式,不实际 spawn
api_host: API 地址(供 Agent 回写)
api_port: API 端口(供 Agent 回写)
"""
self.db_path = db_path
self.agent_timeout = agent_timeout
@@ -233,7 +246,7 @@ class AgentSpawner:
self.gateway_timeout = gateway_timeout
self.max_retries = max_retries
self.max_monitor_timeouts = max_monitor_timeouts
# v2.7.2: counter 引用spawn_full_agent 内部 acquire/release
# v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release)
self.counter = counter
# session 注册表 {session_id: {...}}
@@ -265,16 +278,16 @@ class AgentSpawner:
project_config: Optional[Dict[str, Any]] = None,
spawn_type: str = "executor", # executor | discussion | review
) -> str:
"""构建 Agent spawn 的消息优先用 BootstrapBuilderfallback 用模板
"""构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板)
Args:
current_status: 任务当前状态动态生成状态机提示
retry_context: 重试上下文前轮产出摘要 + 审查意见
task: Task 对象BootstrapBuilder 用
project_config: 项目配置BootstrapBuilder 用
spawn_type: spawn 类型executor=执行, discussion=讨论, review=审查
current_status: 任务当前状态(动态生成状态机提示)
retry_context: 重试上下文(前轮产出摘要 + 审查意见)
task: Task 对象(BootstrapBuilder 用)
project_config: 项目配置(BootstrapBuilder 用)
spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查)
"""
# discussion 类型直接用模板不走 BootstrapBuilder
# discussion 类型直接用模板(不走 BootstrapBuilder)
if spawn_type == "discussion":
return self._build_discussion_prompt(
task_id, title, description, must_haves,
@@ -288,7 +301,7 @@ class AgentSpawner:
role="executor",
project_config=project_config,
)
# mail 任务用精简模板不走 BootstrapBuilder
# mail 任务用精简模板,不走 BootstrapBuilder
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
api_section = self._build_api_section(
@@ -302,16 +315,16 @@ class AgentSpawner:
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
# Fallback: 使用硬编码模板
# mail 任务直接 done不走 review
# mail 任务直接 done,不走 review
completion_status = "done" if project_id == "_mail" else "review"
return SPAWN_PROMPT_TEMPLATE.format(
project_id=project_id,
task_id=task_id,
title=title,
description=description or "无描述",
description=description or "(无描述)",
task_type=task_type or "general",
priority=priority,
must_haves=must_haves or "(无)",
must_haves=must_haves or "(无)",
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
@@ -322,13 +335,13 @@ class AgentSpawner:
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
"""构建 API 回写操作指令BootstrapBuilder 模式下补充"""
# mail 任务直接 done不走 review
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
return f"""## 操作指令
### 状态回写
开始工作
开始工作:
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
@@ -343,15 +356,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
```
### 完成后
成功status → {success_status} | 失败status → "failed"
成功:status → {success_status} | 失败:status → "failed"
"""
def _build_discussion_prompt(self, task_id: str, title: str,
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
"""构建讨论类 spawn prompt§3.3 框架 + Boids"""
"""构建讨论类 spawn prompt(§3.3 框架 + Boids)"""
goal_snapshot = description or title
constraints = must_haves or "无特殊约束"
constraints = must_haves or "(无特殊约束)"
return DISCUSSION_PROMPT_TEMPLATE.format(
goal_snapshot=goal_snapshot,
@@ -407,17 +420,17 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
reuse_session_id: Optional[str] = None,
on_checks_passed: Optional[Any] = None,
) -> str:
"""Spawn Full Agent异步非阻塞
"""Spawn Full Agent(异步非阻塞)
v2.7.2: counter acquire/release 在内部统一管理。
调用级生命周期spawn 时 acquire进程退出时 release通过 wrapped_on_complete
调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)
Args:
on_complete: 业务回调(agent_id, outcome) 不含 counter.release
on_complete: 业务回调(agent_id, outcome) - 不含 counter.release,
counter.release 由内部 wrapped_on_complete 保证。
use_main_session: True = 投递到主 Agent session不传 --session-id
on_checks_passed: 所有检查通过后的回调session check + counter acquire 后、subprocess 前
reuse_session_id: 传入指定 session-id 复用用于续杯
use_main_session: True = 投递到主 Agent session(不传 --session-id)
on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前)
reuse_session_id: 传入指定 session-id 复用(用于续杯)
Returns:
session_id
@@ -425,9 +438,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
Raises:
AgentBusyError: agent 被 counter 占用或冷却中
"""
# ── v2.7.2 → v2.1: 先分配 session_id再 counter acquire ──
# ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ──
# 1. 分配 session_id纯计算无 IO
# 1. 分配 session_id(纯计算,无 IO)
if use_main_session:
session_id = None
elif reuse_session_id:
@@ -436,7 +449,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
session_id = str(uuid.uuid4())
_sid_key = session_id or "main" # counter 用的 key
# 2. session state 检查main session 防外部占用
# 2. session state 检查(main session 防外部占用)
if use_main_session:
session_state = self._check_session_state(agent_id)
if session_state.get("lock_pid_alive"):
@@ -450,14 +463,14 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
logger.info("Spawn skipped: %s compacting", agent_id)
raise AgentBusyError(f"{agent_id}: compacting")
# 3. counter acquireper session key 粒度
# 3. counter acquire(per session key 粒度)
if self.counter:
if not await self.counter.can_acquire(agent_id, _sid_key):
raise AgentBusyError(agent_id)
await self.counter.acquire(agent_id, _sid_key)
# 3.5 on_checks_passed: 所有检查通过后的回调session + counter
# 注意如果回调抛异常counter 已 acquire 但 subprocess 未启动
# 3.5 on_checks_passed: 所有检查通过后的回调(session + counter)
# 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动,
# wrapped_on_complete 不会执行。需在此 try/except 中手动 release。
if on_checks_passed:
try:
@@ -472,7 +485,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
self._register_session(_sid_key, agent_id, task_id, pid=None)
return _sid_key
# 4. wrapped_on_complete 保证 counter release闭包捕获 _sid_key
# 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key)
async def _wrapped_on_complete(aid, outcome):
try:
if self.counter:
@@ -508,7 +521,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
# Schedule monitor传 wrapped_on_complete
# Schedule monitor(传 wrapped_on_complete)
asyncio.create_task(
self._monitor_process(session_id, proc, agent_id, task_id,
on_complete=_wrapped_on_complete,
@@ -530,7 +543,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
task_description: str,
task_id: Optional[str] = None,
) -> str:
"""Spawn Subagent占位实际通过 Gateway API
"""Spawn Subagent(占位,实际通过 Gateway API)
Returns:
session_id
@@ -556,9 +569,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 续杯次数: 第 {retry_count}上限 {max_retries}
- 续杯次数: 第 {retry_count}(上限 {max_retries})
请检查 session 历史中你之前做了什么然后继续未完成的工作。
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
## 操作指令
@@ -567,21 +580,21 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
```
### 如果已经完成标记 review
### 如果已经完成,标记 review
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "review", "agent": "{agent_id}"}}'
```
### 写入产出如果之前没写
### 写入产出(如果之前没写)
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\
-H 'Content-Type: application/json' \\
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 如果无法解决标记失败
### 如果无法解决,标记失败
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
@@ -600,7 +613,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
db_path: Optional[Path] = None,
monitor_timeout_count: int = 0,
) -> None:
"""监控子进程全生命周期设计文档 spawner-monitor-design.md"""
"""监控子进程全生命周期(设计文档 spawner-monitor-design.md)"""
stdout_chunks: list = []
stderr_chunks: list = []
@@ -624,7 +637,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
await asyncio.gather(_read_out(), _read_err(), proc.wait())
await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout)
# ── 情况 A进程退出 ──
# ── 情况 A:进程退出 ──
exit_code = proc.returncode
await self._handle_exit(
session_id, agent_id, task_id, exit_code,
@@ -632,7 +645,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
)
except asyncio.TimeoutError:
# ── 情况 Bmonitor timeout进程没退出──
# ── 情况 B:monitor timeout(进程没退出)──
logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count + 1,
self.max_monitor_timeouts)
@@ -643,11 +656,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
async def _handle_exit(self, session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path):
"""情况 A进程退出后的处理
"""情况 A:进程退出后的处理
v2.7.2: 进程退出 = counter release由 on_complete = wrapped_on_complete 保证
只有 A2/A3gateway_timeout触发续杯其他都不 retry。
A9api_error/429额外推回 pending + 设冷却。
v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)
只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。
A9(api_error/429)额外推回 pending + 设冷却。
"""
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
@@ -690,13 +703,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
agent_id, session_id, outcome, exit_code, task_status)
if cls["should_retry"]:
# A2/A3: gateway_timeout → 续杯on_complete 会 release counter
# A2/A3: gateway_timeout → 续杯(on_complete 会 release counter)
await self._do_retry(
session_id, agent_id, task_id, on_complete, db_path,
cls.get("retry_field", "retry_count")
)
elif outcome == "api_error":
# A9: 429/API 错误 → release counteron_complete+ 推回 pending + 冷却
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
await self._do_on_complete_async(on_complete, agent_id, outcome)
if self.counter:
self.counter.set_cooldown(agent_id)
@@ -706,7 +719,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
})
logger.info("Task %s pushed back to pending (api_error)", task_id)
elif outcome == "fallback_timeout" and not cls["should_retry"]:
# A5/A6: fallback 不应出现标 failed + escalate + context 日志
# A5/A6: fallback 不应出现,标 failed + escalate + context 日志
logger.error("UNEXPECTED FALLBACK: agent=%s session=%s task=%s "
"fallback_used=%s fallback_reason=%s counter_active=%s "
"This indicates counter check failed to prevent concurrent spawn.",
@@ -721,17 +734,17 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"fallback_reason": json_result.get("fallback_reason"),
})
else:
# 其他A1(completed), A4(agent_failed), A7(auth_failed),
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# 进程退出 → on_complete release counter
# 任务状态由各 outcome 自行处理或等 ticker
# 任务状态由各 outcome 自行处理(或等 ticker)
await self._do_on_complete_async(on_complete, agent_id, outcome)
async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks,
monitor_timeout_count):
"""情况 Bmonitor timeout"""
"""情况 B:monitor timeout"""
# 读已缓冲的 stderr
try:
remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0)
@@ -745,14 +758,14 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 先复活连续假死 ≥2 次再 failed
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
# 假死计数
stuck_count = self._stuck_counts.get(task_id, 0) + 1
self._stuck_counts[task_id] = stuck_count
if stuck_count >= 2:
# 连续假死 ≥2 次标 failed
# 连续假死 ≥2 次,标 failed
logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)",
agent_id, stuck_count, session_id)
self._mark_task(db_path, task_id, "failed",
@@ -780,12 +793,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return
# B2/B3/B4: 进程还活着
# B2: compact 进行中 不计入 monitor timeout 计数继续等
# B2: compact 进行中 - 不计入 monitor timeout 计数,继续等
if state.get("recent_compact"):
logger.info("Agent %s recent compaction detected, extending patience "
"(session=%s, monitor=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
# 不递增 monitor_timeout_count但最多额外等 max_monitor_timeouts 次
# 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次
# 用独立计数器防止无限等待
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
self._compact_waits[task_id] = compact_wait_count
@@ -809,7 +822,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
)
return
# B3/B4: 无 compact正常计数
# B3/B4: 无 compact,正常计数
monitor_timeout_count += 1
if monitor_timeout_count >= self.max_monitor_timeouts:
logger.error("Agent %s max monitor timeouts (session=%s, count=%d)",
@@ -823,7 +836,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts")
return
# 未超限继续等不 release counter
# 未超限:继续等(不 release counter)
logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
asyncio.create_task(
@@ -836,19 +849,19 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
async def _do_retry(self, session_id, agent_id, task_id, on_complete,
db_path, retry_field="retry_count"):
"""续杯手动 release counter 后通过 spawn_full_agent 重新 spawn
"""续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn
v2.7.2: 进程已退出但 wrapped_on_complete 未被调用只有 should_retry 分支走到这里
需要手动 release counter然后 spawn_full_agent 内部会 acquire。
on_complete含 counter release置为 None避免 double release。
v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)
需要手动 release counter,然后 spawn_full_agent 内部会 acquire。
on_complete(含 counter release)置为 None,避免 double release。
"""
# ── 关键手动 release counter进程退出 = agent 空闲──
# ── 关键:手动 release counter(进程退出 = agent 空闲)──
if self.counter:
self.counter.release(agent_id, session_id or "main")
# 旧 wrapped_on_complete 含 counter.release不再使用防止 double release
# 旧 wrapped_on_complete 含 counter.release,不再使用,防止 double release
on_complete = None
# 续杯前检查任务状态已终态则跳过
# 续杯前检查任务状态,已终态则跳过
if db_path and task_id:
try:
conn = get_connection(db_path)
@@ -859,7 +872,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# on_complete = wrapped_on_complete会 release counter
# on_complete = wrapped_on_complete,会 release counter
await self._do_on_complete_async(on_complete, agent_id, "task_already_done")
return
finally:
@@ -905,7 +918,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
logger.info("Agent %s retry %s=%d/%d (session=%s)",
agent_id, retry_field, count, self.max_retries, session_id)
# 构建续杯 messageMail 用专用模板Task 用标准模板
# 构建续杯 message(Mail 用专用模板,Task 用标准模板)
task_info = self._get_task_info(db_path, task_id) or {}
project_id = task_info.get("project_id", "")
is_mail = project_id == "_mail"
@@ -923,7 +936,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
max_retries=self.max_retries,
)
else:
fallback_hint = "\n⚠️ 之前有 fallback 执行请调 API 检查任务当前状态和已有产出确认是否已完成。" if retry_field == "retry_count" else ""
fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else ""
message = self.RETRY_PROMPT.format(
project_id=project_id,
task_id=task_id or "",
@@ -936,8 +949,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
fallback_hint=fallback_hint,
)
# v2.7.2: 通过 spawn_full_agent 重新 spawn内部 can_acquire + acquire
# on_complete = wrapped_on_complete含 counter release),作为业务回调传入
# v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire)
# on_complete = wrapped_on_complete(含 counter release),作为业务回调传入
try:
await self.spawn_full_agent(
agent_id=agent_id,
@@ -949,7 +962,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
task_db_path=db_path,
)
except AgentBusyError:
# agent 被其他任务占用不应发生但防御
# agent 被其他任务占用(不应发生,但防御)
logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy")
except Exception:
@@ -962,8 +975,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
def _parse_stdout_json(stdout_text: str) -> dict:
"""解析 openclaw agent --json 的 stdout 输出
返回可直接使用的字段status, summary, fallback_used, fallback_reason, payloads
不再提取 meta直接用顶层字段。
返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads
不再提取 meta,直接用顶层字段。
"""
text = stdout_text.strip()
if not text:
@@ -971,7 +984,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
try:
data = json.loads(text)
except json.JSONDecodeError:
# 多行输出找最后一个 JSON
# 多行输出,找最后一个 JSON
for line in reversed(text.splitlines()):
try:
data = json.loads(line)
@@ -1035,7 +1048,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
@staticmethod
def _revive_session(agent_id: str) -> bool:
"""假死复活术修改 sessions.json status 从 running 改为 idle"""
"""假死复活术:修改 sessions.json status 从 running 改为 idle"""
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return False
@@ -1045,7 +1058,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
if main_session.get("status") != "running":
return False # 不是 running 状态不需要复活
return False # 不是 running 状态,不需要复活
main_session["status"] = "idle"
sessions[main_key] = main_session
with open(sessions_path, "w") as f:
@@ -1103,16 +1116,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
@staticmethod
def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str,
task_status: Optional[str], stdout_text: str = "") -> dict:
"""分类退出原因返回处理策略
"""分类退出原因,返回处理策略
v3.0: 基于 JSON status/summary/executionTrace 判定不再依赖 transport 字段。
只有 status="timeout" 触发 retry其他都不 retry。
v3.0: 基于 JSON status/summary/executionTrace 判定,不再依赖 transport 字段。
只有 status="timeout" 触发 retry,其他都不 retry。
"""
status = json_result.get("status")
summary = json_result.get("summary", "")
fallback_used = json_result.get("fallback_used", False)
# A4: 任务 DB status=failedAgent 自己标的
# A4: 任务 DB status=failed(Agent 自己标的)
if task_status == "failed":
return {"outcome": "agent_failed", "should_retry": False}
@@ -1130,12 +1143,12 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"retry_field": "retry_count"}
# A0: stdout 为空且 exit≠0 = 进程异常终止
# 注意exit=0 + stdout 为空可能是正常完成--json 没输出),
# 注意:exit=0 + stdout 为空可能是正常完成(--json 没输出),
# 此时 task_status 如果是 done/review 会被上面的 A4 兜住
if status is None and not stdout_text.strip() and exit_code != 0:
return {"outcome": "process_crash", "should_retry": False}
# stdout 为空但 exit=0可能是正常完成但 --json 没输出
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
# 查任务状态判断
if status is None and not stdout_text.strip() and exit_code == 0:
terminal_statuses = {"done", "review"}
@@ -1143,7 +1156,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return {"outcome": "completed", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
# A7-A12: status=error → 不续杯stderr 辅助分类
# A7-A12: status=error → 不续杯,stderr 辅助分类
if status == "error":
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]):
@@ -1158,7 +1171,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return {"outcome": "lock_conflict", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
# 兜底status 未知值
# 兜底:status 未知值
return {"outcome": "unknown_status", "should_retry": False}
@staticmethod
@@ -1216,7 +1229,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
"""标记任务状态用于 failed/escalate"""
"""标记任务状态(用于 failed/escalate)"""
if not db_path or not task_id:
return
try:
@@ -1240,13 +1253,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
@staticmethod
def _do_on_complete(on_complete, agent_id, outcome):
"""执行 on_complete 回调同步+异步兼容"""
"""执行 on_complete 回调(同步+异步兼容)"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
# 注意这里是同步调用的不能 await
# 注意:这里是同步调用的,不能 await
# 在 _monitor_process 的 async 上下文中应该用 await
pass
except Exception:
@@ -1332,7 +1345,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return self._sessions.get(session_id)
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""v2.7.2: 根据 agent_id 获取活跃 session 信息用于进程存活性检查"""
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
for sid, info in self._sessions.items():
if info.get("agent_id") == agent_id and info.get("status") == "running":
return info