Files
sanguo_moziplus_v2/src/daemon/spawner.py
T
2026-05-30 00:00:45 +08:00

1391 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent Spawner - 异步 spawn Full Agent / Subagent
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善)
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from src.blackboard.db import get_connection, init_db
logger = logging.getLogger("moziplus-v2.spawner")
# ── Prompt 模板 ──
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。
发件者: {from_agent}
主题: {title}
内容: {text}
已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。
⚠️ 不要执行任何状态转换命令。
"""
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。
发件者: {from_agent}
主题: {title}
内容: {text}
### 如何回复发件者
curl -s -X POST http://localhost:8083/api/mail \\
-H 'Content-Type: application/json' \\
-d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}'
⚠️ 不需要填 "to",系统自动回复给发件者。
### 如何给其他人发新邮件
curl -s -X POST http://localhost:8083/api/mail \\
-H 'Content-Type: application/json' \\
-d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}'
⚠️ to 必须是有效的 agent id: {valid_agents}
⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request
⚠️ 不能给自己发邮件
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照下面的步骤执行。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 描述: {description}
- 类型: {task_type}
- 优先级: {priority}
- 必要条件: {must_haves}
{retry_context}
## 状态机(你必须遵守的状态流转)
```
pending → claimed → working → review → done
│ │
│ └→ pending(驳回重做)
├──→ failed
├──→ blocked
└──→ cancelled
```
你当前处于 **{current_status}** 状态。
## 执行步骤
### 步骤 1: 开始工作
立即调 API 标记你已开始:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 步骤 2: 执行任务
根据任务描述完成你的工作(编码/回测/数据检查/审查等)。
### 步骤 3: 写入产出
⚠️ 这一步是必须的!不写产出 = 任务没完成。
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<产出类型>", "title": "<产出标题>", "content": "<你的产出内容>", "summary": "<简要说明>"}}'
```
**type 必须是以下之一**: code, document, data, config, other
如果产出太长,可以写文件后用路径引用:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "code", "title": "main.py", "content_path": "/path/to/file.py", "summary": "主程序"}}'
```
### 步骤 4: 提交完成或标记失败
✅ 成功完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "{completion_status}", "agent": "{agent_id}"}}'
```
❌ 无法完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
## Fallback(API 调用失败时)
如果 API 失败 2 次,尝试:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}'
```
## 参考链接
- 查看任务完整信息: GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
- 写评论: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments {{"author": "{agent_id}", "body": "..."}}
- 完整 API 契约: docs/design/agent-api-contract.md
"""
DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一个 v2.9 四相循环的讨论环节。
## 你的任务
{goal_snapshot}
## 约束
{constraints}
## 黑板 API
你可以随时:
- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs)
- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments
body: {{"author": "{agent_id}", "body": "内容", "mentions": ["agent_id"]}}
- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks
body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}}
- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim
## 行为准则
1. **你是自主的。**读黑板、思考、行动,不要等指令。
2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。
3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。
4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。
5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。
6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。
## 讨论完成后
- 如果讨论收敛到可执行的任务,直接创建 sub task
- 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决
- 标记完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "done", "agent": "{agent_id}"}}'
```
"""
# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done)
MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
发件者: {from_agent}
主题: {title}
续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
⚠️ 如果任务已完成,直接写产出即可,不要调 status API。
"""
class AgentBusyError(Exception):
"""Agent 被 counter 占用,无法 spawn"""
pass
class AgentSpawner:
"""Agent spawn 管理"""
def __init__(
self,
db_path: Optional[Path] = None,
agent_timeout: float = 630.0,
dry_run: bool = False,
api_host: str = "127.0.0.1",
api_port: int = 8083,
bootstrap_builder: Optional[Any] = None,
gateway_timeout: float = 600.0,
max_retries: int = 3,
max_monitor_timeouts: int = 3,
counter: Optional[Any] = None,
):
"""
Args:
db_path: 项目黑板 DB 路径(用于写 task_attempts)
agent_timeout: Agent 超时秒数
dry_run: 测试模式,不实际 spawn
api_host: API 地址(供 Agent 回写)
api_port: API 端口(供 Agent 回写)
"""
self.db_path = db_path
self.agent_timeout = agent_timeout
self.dry_run = dry_run
self.api_host = api_host
self.api_port = api_port
self.bootstrap_builder = bootstrap_builder
self.gateway_timeout = gateway_timeout
self.max_retries = max_retries
self.max_monitor_timeouts = max_monitor_timeouts
# v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release)
self.counter = counter
# session 注册表 {session_id: {...}}
self._sessions: Dict[str, Dict[str, Any]] = {}
# B2 compact 等待计数器 {task_id: count}
self._compact_waits: Dict[str, int] = {}
# B1 假死计数器 {task_id: count}
self._stuck_counts: Dict[str, int] = {}
self._valid_agents_cache: Optional[set] = None
def _load_valid_agents(self) -> set:
"""从 config/default.yaml 读取有效 Agent ID 列表(带缓存)"""
if self._valid_agents_cache is not None:
return self._valid_agents_cache
config_path = Path(__file__).parent.parent / "config" / "default.yaml"
if config_path.exists():
try:
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f)
profiles = cfg.get("daemon", {}).get("agent_profiles", {})
if profiles:
self._valid_agents_cache = set(profiles.keys())
return self._valid_agents_cache
except Exception:
pass
self._valid_agents_cache = {
"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"
}
return self._valid_agents_cache
@property
def active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""当前活跃的 spawn sessions"""
return {sid: s for sid, s in self._sessions.items()
if s.get("status") == "running"}
def build_spawn_message(
self,
task_id: str,
title: str,
description: str,
task_type: str = "",
priority: int = 5,
must_haves: str = "",
project_id: str = "",
agent_id: str = "",
current_status: str = "claimed",
retry_context: str = "",
task: Optional[Any] = None,
project_config: Optional[Dict[str, Any]] = None,
spawn_type: str = "executor", # executor | discussion | review
) -> str:
"""构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板)
Args:
current_status: 任务当前状态(动态生成状态机提示)
retry_context: 重试上下文(前轮产出摘要 + 审查意见)
task: Task 对象(BootstrapBuilder 用)
project_config: 项目配置(BootstrapBuilder 用)
spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查)
"""
# discussion 类型直接用模板(不走 BootstrapBuilder)
if spawn_type == "discussion":
return self._build_discussion_prompt(
task_id, title, description, must_haves,
project_id, agent_id)
# 尝试 BootstrapBuilder
if self.bootstrap_builder and task is not None:
try:
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
role="executor",
project_config=project_config,
)
# mail 任务用精简模板,不走 BootstrapBuilder
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
api_section = self._build_api_section(
project_id, task_id, agent_id)
return bootstrap_prompt + "\n\n---\n\n" + api_section
except Exception:
logger.exception("BootstrapBuilder failed, falling back to template")
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
# Fallback: 使用硬编码模板
# mail 任务直接 done,不走 review
completion_status = "done" if project_id == "_mail" else "review"
return SPAWN_PROMPT_TEMPLATE.format(
project_id=project_id,
task_id=task_id,
title=title,
description=description or "(无描述)",
task_type=task_type or "general",
priority=priority,
must_haves=must_haves or "(无)",
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
current_status=current_status or "claimed",
retry_context=retry_context or "",
completion_status=completion_status,
)
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
return f"""## 操作指令
### 状态回写
开始工作:
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 写入产出
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 完成后
成功:status → {success_status} | 失败:status → "failed"
"""
def _build_discussion_prompt(self, task_id: str, title: str,
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
"""构建讨论类 spawn prompt(§3.3 框架 + Boids)"""
goal_snapshot = description or title
constraints = must_haves or "(无特殊约束)"
return DISCUSSION_PROMPT_TEMPLATE.format(
goal_snapshot=goal_snapshot,
constraints=constraints,
project_id=project_id,
task_id=task_id,
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
)
def _build_mail_prompt(self, task_id: str, title: str, description: str,
must_haves: str, agent_id: str) -> str:
"""构建 Mail 专用精简模板"""
# 解析 must_haves 获取 from 和 performative
from_agent = agent_id
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", agent_id)
performative = meta.get("performative", meta.get("type", "request"))
except Exception:
pass
# 截断 title 和 text 用于模板安全
safe_title = (title or "").replace('"', '\\"')[:100]
safe_text = (description or "").replace('"', '\\"')
# 获取有效 Agent 列表(从 config/default.yaml 读取)
valid_agents_list = self._load_valid_agents()
valid_agents_str = " / ".join(sorted(valid_agents_list))
common_kwargs = dict(
from_agent=from_agent,
title=safe_title,
text=safe_text,
task_id=task_id,
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
valid_agents=valid_agents_str,
)
if performative == "inform":
return MAIL_INFORM_TEMPLATE.format(**common_kwargs)
else:
return MAIL_REQUEST_TEMPLATE.format(**common_kwargs)
async def spawn_full_agent(
self,
agent_id: str,
message: str,
new_session: bool = False,
task_id: Optional[str] = None,
on_complete: Optional[Any] = None,
use_main_session: bool = False,
task_db_path: Optional[Path] = None,
reuse_session_id: Optional[str] = None,
on_checks_passed: Optional[Any] = None,
) -> str:
"""Spawn Full Agent(异步非阻塞)
v2.7.2: counter acquire/release 在内部统一管理。
调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。
Args:
on_complete: 业务回调(agent_id, outcome) - 不含 counter.release,
counter.release 由内部 wrapped_on_complete 保证。
use_main_session: True = 投递到主 Agent session(不传 --session-id)
on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前)
reuse_session_id: 传入指定 session-id 复用(用于续杯)
Returns:
session_id
Raises:
AgentBusyError: agent 被 counter 占用或冷却中
"""
# ── v2.7.2 → v2.1: 先分配 session_id,再 counter acquire ──
# 1. 分配 session_id(纯计算,无 IO)
if use_main_session:
session_id = None
elif reuse_session_id:
session_id = reuse_session_id
else:
session_id = str(uuid.uuid4())
_sid_key = session_id or "main" # counter 用的 key
# 2. session state 检查(main session 防外部占用)
if use_main_session:
session_state = self._check_session_state(agent_id)
if session_state.get("lock_pid_alive"):
logger.info("Spawn skipped: %s main session locked by PID %d",
agent_id, session_state.get("lock_pid"))
raise AgentBusyError(f"{agent_id}: session locked by PID {session_state.get('lock_pid')}")
if session_state.get("status") == "running":
logger.info("Spawn skipped: %s main session processing", agent_id)
raise AgentBusyError(f"{agent_id}: session processing")
if session_state.get("recent_compact"):
logger.info("Spawn skipped: %s compacting", agent_id)
raise AgentBusyError(f"{agent_id}: compacting")
# 3. counter acquire(per session key 粒度)
if self.counter:
if not await self.counter.can_acquire(agent_id, _sid_key):
raise AgentBusyError(agent_id)
await self.counter.acquire(agent_id, _sid_key)
# 3.5 on_checks_passed: 所有检查通过后的回调(session + counter)
# 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动,
# wrapped_on_complete 不会执行。需在此 try/except 中手动 release。
if on_checks_passed:
try:
on_checks_passed()
except Exception:
if self.counter:
self.counter.release(agent_id, _sid_key)
raise
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key)
self._register_session(_sid_key, agent_id, task_id, pid=None)
return _sid_key
# 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key)
async def _wrapped_on_complete(aid, outcome):
try:
if self.counter:
self.counter.release(aid, _sid_key)
finally:
if on_complete:
try:
result = on_complete(aid, outcome)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("Business on_complete failed for %s", aid, exc_info=True)
cmd = [
"openclaw", "agent",
"--agent", agent_id,
]
if session_id:
cmd.extend(["--session-id", session_id])
cmd.extend([
"--message", message,
"--json",
"--timeout", str(int(self.gateway_timeout)),
])
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
# Schedule monitor(传 wrapped_on_complete)
asyncio.create_task(
self._monitor_process(session_id, proc, agent_id, task_id,
on_complete=_wrapped_on_complete,
db_path=task_db_path or self.db_path)
)
return session_id
except Exception as e:
# spawn 失败也要 release counter
if self.counter:
self.counter.release(agent_id, _sid_key)
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
raise
async def spawn_subagent(
self,
task_description: str,
task_id: Optional[str] = None,
) -> str:
"""Spawn Subagent(占位,实际通过 Gateway API)
Returns:
session_id
"""
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# TODO: F17 通过 Gateway API sessions_spawn 实现
logger.info("Subagent spawn (session=%s) - placeholder", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# ── 续杯 Prompt 模板 ──
RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
## 操作指令
### 查看任务当前状态
```bash
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
```
### 如果已经完成,标记 review
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "review", "agent": "{agent_id}"}}'
```
### 写入产出(如果之前没写)
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\
-H 'Content-Type: application/json' \\
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 如果无法解决,标记失败
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
{fallback_hint}"""
async def _monitor_process(
self,
session_id: Optional[str],
proc: asyncio.subprocess.Process,
agent_id: str,
task_id: Optional[str],
on_complete: Optional[Any] = None,
db_path: Optional[Path] = None,
monitor_timeout_count: int = 0,
) -> None:
"""监控子进程全生命周期(设计文档 spawner-monitor-design.md)"""
stdout_chunks: list = []
stderr_chunks: list = []
try:
# ── 等待进程退出 + 流式读取 ──
async def _read_streams():
async def _read_out():
while True:
chunk = await proc.stdout.read(4096)
if not chunk:
break
stdout_chunks.append(chunk)
async def _read_err():
while True:
chunk = await proc.stderr.read(4096)
if not chunk:
break
stderr_chunks.append(chunk)
await asyncio.gather(_read_out(), _read_err(), proc.wait())
await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout)
# ── 情况 A:进程退出 ──
exit_code = proc.returncode
await self._handle_exit(
session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path
)
except asyncio.TimeoutError:
# ── 情况 B:monitor timeout(进程没退出)──
logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count + 1,
self.max_monitor_timeouts)
await self._handle_monitor_timeout(
session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks, monitor_timeout_count
)
async def _handle_exit(self, session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path):
"""情况 A:进程退出后的处理
v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。
只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。
A9(api_error/429)额外推回 pending + 设冷却。
"""
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 解析 stdout JSON
json_result = self._parse_stdout_json(stdout_text)
logger.info("Parsed JSON result for agent=%s session=%s: %s",
agent_id, session_id, json_result)
# 查任务实际状态
task_status = self._get_task_status(db_path, task_id) if task_id else None
# 分类
cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text)
outcome = cls["outcome"]
# 更新 session 状态
sid = session_id or "main"
if sid in self._sessions:
self._sessions[sid]["status"] = outcome
self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat()
self._sessions[sid]["exit_code"] = exit_code
if json_result:
self._sessions[sid]["meta"] = json_result
# 记录 attempt
self._record_attempt(
task_id, agent_id, outcome, exit_code=exit_code,
db_path=db_path,
metadata={
"status": json_result.get("status"),
"summary": json_result.get("summary"),
"fallback_used": json_result.get("fallback_used"),
"fallback_reason": json_result.get("fallback_reason"),
"task_status_at_exit": task_status,
}
)
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
agent_id, session_id, outcome, exit_code, task_status)
if cls["should_retry"]:
# A2/A3: gateway_timeout → 续杯(on_complete 会 release counter)
await self._do_retry(
session_id, agent_id, task_id, on_complete, db_path,
cls.get("retry_field", "retry_count")
)
elif outcome == "api_error":
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
await self._do_on_complete_async(on_complete, agent_id, outcome)
if self.counter:
self.counter.set_cooldown(agent_id)
if db_path and task_id:
self._mark_task(db_path, task_id, "pending", {
"reason": "api_error_retry",
})
logger.info("Task %s pushed back to pending (api_error)", task_id)
elif outcome == "fallback_timeout" and not cls["should_retry"]:
# A5/A6: fallback 不应出现,标 failed + escalate + context 日志
logger.error("UNEXPECTED FALLBACK: agent=%s session=%s task=%s "
"fallback_used=%s fallback_reason=%s counter_active=%s "
"This indicates counter check failed to prevent concurrent spawn.",
agent_id, session_id, task_id,
json_result.get("fallback_used"), json_result.get("fallback_reason"),
self.counter.active_agents if self.counter else "N/A")
await self._do_on_complete_async(on_complete, agent_id, outcome)
if db_path and task_id:
self._mark_task(db_path, task_id, "failed", {
"reason": "unexpected_fallback",
"status": json_result.get("status"),
"fallback_reason": json_result.get("fallback_reason"),
})
else:
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# 进程退出 → on_complete release counter
# 任务状态由各 outcome 自行处理(或等 ticker)
await self._do_on_complete_async(on_complete, agent_id, outcome)
async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks,
monitor_timeout_count):
"""情况 B:monitor timeout"""
# 读已缓冲的 stderr
try:
remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0)
if remaining:
stderr_chunks.append(remaining)
except Exception:
pass
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
# 假死计数
stuck_count = self._stuck_counts.get(task_id, 0) + 1
self._stuck_counts[task_id] = stuck_count
if stuck_count >= 2:
# 连续假死 ≥2 次,标 failed
logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)",
agent_id, stuck_count, session_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "session_stuck", "stuck_count": stuck_count,
"diagnostics": state})
await self._do_on_complete_async(on_complete, agent_id, "session_stuck")
return
# 第 1 次假死 → 尝试复活
logger.warning("Agent %s session stuck (attempt %d), reviving (session=%s)",
agent_id, stuck_count, session_id)
revived = self._revive_session(agent_id)
if revived:
logger.info("Agent %s session revived, releasing counter for ticker re-dispatch",
agent_id)
# release counter → 任务保持 working → ticker 下次 re-dispatch
await self._do_on_complete_async(on_complete, agent_id, "session_revived")
else:
# 复活失败 → 标 failed
logger.error("Agent %s revive failed, marking failed", agent_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "revive_failed", "stuck_count": stuck_count,
"diagnostics": state})
await self._do_on_complete_async(on_complete, agent_id, "revive_failed")
return
# B2/B3/B4: 进程还活着
# B2: compact 进行中 - 不计入 monitor timeout 计数,继续等
if state.get("recent_compact"):
logger.info("Agent %s recent compaction detected, extending patience "
"(session=%s, monitor=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
# 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次
# 用独立计数器防止无限等待
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
self._compact_waits[task_id] = compact_wait_count
if compact_wait_count >= self.max_monitor_timeouts:
logger.error("Agent %s max compact waits reached (session=%s, count=%d)",
agent_id, session_id, compact_wait_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "compact_hanging",
"compact_wait_count": compact_wait_count,
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
return
# 继续等
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
return
# B3/B4: 无 compact,正常计数
monitor_timeout_count += 1
if monitor_timeout_count >= self.max_monitor_timeouts:
logger.error("Agent %s max monitor timeouts (session=%s, count=%d)",
agent_id, session_id, monitor_timeout_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "max_monitor_timeouts",
"count": monitor_timeout_count,
"elapsed_seconds": monitor_timeout_count * int(self.agent_timeout),
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts")
return
# 未超限:继续等(不 release counter)
logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
async def _do_retry(self, session_id, agent_id, task_id, on_complete,
db_path, retry_field="retry_count"):
"""续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn
v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。
需要手动 release counter,然后 spawn_full_agent 内部会 acquire。
on_complete(含 counter release)置为 None,避免 double release。
"""
# ── 关键:手动 release counter(进程退出 = agent 空闲)──
if self.counter:
self.counter.release(agent_id, session_id or "main")
# 旧 wrapped_on_complete 含 counter.release,不再使用,防止 double release
on_complete = None
# 续杯前检查任务状态,已终态则跳过
if db_path and task_id:
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# on_complete = wrapped_on_complete,会 release counter
await self._do_on_complete_async(on_complete, agent_id, "task_already_done")
return
finally:
conn.close()
except Exception:
logger.warning("Retry status check failed for %s, proceeding", task_id)
# 直接读写 tasks 表的 retry_count
if retry_field == "retry_count" and db_path and task_id:
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
(task_id,),
)
conn.commit()
row = conn.execute(
"SELECT retry_count FROM tasks WHERE id=?", (task_id,)
).fetchone()
count = row["retry_count"] if row else 1
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry_count for task %s", task_id)
count = 1
else:
retry_counts = self._get_retry_counts(db_path, task_id)
count = retry_counts.get(retry_field, 0) + 1
retry_counts[retry_field] = count
self._update_retry_counts(db_path, task_id, retry_counts)
if count >= self.max_retries:
logger.error("Agent %s max retries (session=%s, %s=%d)",
agent_id, session_id, retry_field, count)
self._mark_task(db_path, task_id, "failed", {
"reason": f"max_{retry_field}", "count": count,
})
await self._do_on_complete_async(on_complete, agent_id, "max_retries")
return
logger.info("Agent %s retry %s=%d/%d (session=%s)",
agent_id, retry_field, count, self.max_retries, session_id)
# 构建续杯 message(Mail 用专用模板,Task 用标准模板)
task_info = self._get_task_info(db_path, task_id) or {}
project_id = task_info.get("project_id", "")
is_mail = project_id == "_mail"
if is_mail:
must_haves = task_info.get("must_haves", "{}")
try:
meta = json.loads(must_haves) if must_haves else {}
except Exception:
meta = {}
message = MAIL_RETRY_PROMPT.format(
from_agent=meta.get("from", "unknown"),
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
)
else:
fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else ""
message = self.RETRY_PROMPT.format(
project_id=project_id,
task_id=task_id or "",
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
api_host=self.api_host,
api_port=self.api_port,
agent_id=agent_id,
fallback_hint=fallback_hint,
)
# v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire)
# on_complete = wrapped_on_complete(含 counter release),作为业务回调传入
try:
await self.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task_id,
on_complete=on_complete,
use_main_session=(session_id is None),
reuse_session_id=session_id if session_id else None,
task_db_path=db_path,
)
except AgentBusyError:
# agent 被其他任务占用(不应发生,但防御)
logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy")
except Exception:
logger.exception("Retry spawn failed for %s", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed")
# ── 辅助方法 ──
@staticmethod
def _parse_stdout_json(stdout_text: str) -> dict:
"""解析 openclaw agent --json 的 stdout 输出
返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads
不再提取 meta,直接用顶层字段。
"""
text = stdout_text.strip()
if not text:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
try:
data = json.loads(text)
except json.JSONDecodeError:
# 多行输出,找最后一个 JSON
for line in reversed(text.splitlines()):
try:
data = json.loads(line)
break
except json.JSONDecodeError:
continue
else:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
# 从 data.result.meta.executionTrace 取 fallback 信息
result = data.get("result", {})
meta = result.get("meta", {})
trace = meta.get("executionTrace", {})
return {
"status": data.get("status"),
"summary": data.get("summary"),
"fallback_used": trace.get("fallbackUsed", False),
"fallback_reason": trace.get("fallbackReason"),
"payloads": result.get("payloads", []),
}
@staticmethod
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
"""查任务实际 API 状态"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
return row["status"] if row else None
finally:
conn.close()
except Exception:
return None
@staticmethod
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
"""查任务基本信息"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return None
info = dict(row)
# 从 db_path 推断 project_id: data/<project_id>/blackboard.db
info["project_id"] = db_path.parent.name
return info
finally:
conn.close()
except Exception:
return None
@staticmethod
def _revive_session(agent_id: str) -> bool:
"""假死复活术:修改 sessions.json status 从 running 改为 idle"""
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return False
try:
with open(sessions_path) as f:
sessions = json.load(f)
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
if main_session.get("status") != "running":
return False # 不是 running 状态,不需要复活
main_session["status"] = "idle"
sessions[main_key] = main_session
with open(sessions_path, "w") as f:
json.dump(sessions, f, indent=2)
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
return True
except Exception:
logger.exception("Failed to revive %s", agent_id)
return False
@staticmethod
def _check_session_state(agent_id: str) -> dict:
"""检查 sessions.json 和 lock 状态"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return result
try:
with open(sessions_path) as f:
sessions = json.load(f)
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
result["status"] = main_session.get("status", "unknown")
# 检查 lock
sf = main_session.get("sessionFile", "")
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_data = json.loads(lock_path.read_text())
pid = lock_data.get("pid")
result["lock_pid"] = pid
if pid:
import os
try:
os.kill(pid, 0)
result["lock_pid_alive"] = True
except ProcessLookupError:
result["lock_pid_alive"] = False
except Exception:
pass
# 最近 5 分钟的 compact
import time
now_ms = time.time() * 1000
for cp in main_session.get("compactionCheckpoints", []):
if (now_ms - cp.get("createdAt", 0)) < 300_000:
result["recent_compact"] = True
break
except Exception:
pass
return result
@staticmethod
def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str,
task_status: Optional[str], stdout_text: str = "") -> dict:
"""分类退出原因,返回处理策略
v3.0: 基于 JSON status/summary/executionTrace 判定,不再依赖 transport 字段。
只有 status="timeout" 触发 retry,其他都不 retry。
"""
status = json_result.get("status")
summary = json_result.get("summary", "")
fallback_used = json_result.get("fallback_used", False)
# A4: 任务 DB status=failed(Agent 自己标的)
if task_status == "failed":
return {"outcome": "agent_failed", "should_retry": False}
# A1: status=ok + completed + 非 fallback
if status == "ok" and summary == "completed" and not fallback_used:
return {"outcome": "completed", "should_retry": False}
# A5/A6: status=ok + fallback
if status == "ok" and fallback_used:
return {"outcome": "fallback_timeout", "should_retry": False}
# A2/A3: status=timeout → 唯一续杯场景
if status == "timeout":
return {"outcome": "gateway_timeout", "should_retry": True,
"retry_field": "retry_count"}
# A0: stdout 为空且 exit≠0 = 进程异常终止
# 注意:exit=0 + stdout 为空可能是正常完成(--json 没输出),
# 此时 task_status 如果是 done/review 会被上面的 A4 兜住
if status is None and not stdout_text.strip() and exit_code != 0:
return {"outcome": "process_crash", "should_retry": False}
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
# 查任务状态判断
if status is None and not stdout_text.strip() and exit_code == 0:
terminal_statuses = {"done", "review"}
if task_status in terminal_statuses:
return {"outcome": "completed", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
# A7-A12: status=error → 不续杯,stderr 辅助分类
if status == "error":
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]):
return {"outcome": "auth_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
return {"outcome": "gateway_unreachable", "should_retry": False}
if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]):
return {"outcome": "api_error", "should_retry": False}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
# 兜底:status 未知值
return {"outcome": "unknown_status", "should_retry": False}
@staticmethod
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
"""从最新 task_attempt 的 metadata 读计数器"""
defaults = {"retry_count": 0, "connect_retry_count": 0,
"api_retry_count": 0, "lock_retry_count": 0,
"monitor_timeout_count": 0}
if not db_path or not task_id:
return defaults
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row and row["metadata"]:
stored = json.loads(row["metadata"])
for k in defaults:
if k in stored:
defaults[k] = stored[k]
finally:
conn.close()
except Exception:
pass
return defaults
def _update_retry_counts(self, db_path: Optional[Path],
task_id: Optional[str], counts: dict):
"""将 retry counts 写回最新 task_attempt 的 metadata"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT rowid, metadata FROM task_attempts "
"WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
meta.update(counts)
conn.execute(
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
(json.dumps(meta), row["rowid"])
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry counts for task %s", task_id)
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
"""标记任务状态(用于 failed/escalate)"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?",
(status, task_id)
)
if detail:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to mark task %s as %s", task_id, status)
@staticmethod
def _do_on_complete(on_complete, agent_id, outcome):
"""执行 on_complete 回调(同步+异步兼容)"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
# 注意:这里是同步调用的,不能 await
# 在 _monitor_process 的 async 上下文中应该用 await
pass
except Exception:
pass
async def _do_on_complete_async(self, on_complete, agent_id, outcome):
"""异步执行 on_complete 回调"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
def _register_session(
self,
session_id: str,
agent_id: str,
task_id: Optional[str],
pid: Optional[int],
) -> None:
"""注册 spawn session"""
self._sessions[session_id] = {
"agent_id": agent_id,
"task_id": task_id,
"pid": pid,
"status": "running",
"started_at": datetime.utcnow().isoformat(),
"completed_at": None,
}
def _record_attempt(
self,
task_id: Optional[str],
agent_id: str,
outcome: str,
exit_code: Optional[int] = None,
error: Optional[str] = None,
metadata: Optional[dict] = None,
db_path: Optional[Path] = None,
) -> None:
"""记录 task_attempt"""
effective_db = db_path or self.db_path
if not task_id or not effective_db:
return
try:
conn = get_connection(effective_db)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?",
(task_id,),
).fetchone()
attempt_number = (row["max_a"] or 0) + 1
meta = metadata or {}
if error:
meta["error"] = error
conn.execute(
"INSERT INTO task_attempts "
"(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) "
"VALUES (?,?,?,?,?,?,datetime('now'))",
(task_id, attempt_number, agent_id, outcome,
exit_code, json.dumps(meta)),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent_id,
"agent_completed" if outcome == "completed" else "daemon_tick",
json.dumps({"outcome": outcome, "attempt": attempt_number})),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to record attempt for task %s", task_id)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取 session 信息"""
return self._sessions.get(session_id)
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
for sid, info in self._sessions.items():
if info.get("agent_id") == agent_id and info.get("status") == "running":
return info
return None
def cleanup_session(self, session_id: str) -> None:
"""清理 session"""
if session_id in self._sessions:
session = self._sessions[session_id]
task_id = session.get("task_id")
del self._sessions[session_id]
# 清理 B2 compact 等待计数器
if task_id and task_id in self._compact_waits:
del self._compact_waits[task_id]