Files
sanguo_moziplus_v2/src/daemon/spawner.py
T
2026-05-25 19:58:58 +08:00

1100 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent Spawner — 异步 spawn Full Agent / Subagent
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawnF17 完善)
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from src.blackboard.db import get_connection, init_db
logger = logging.getLogger("moziplus-v2.spawner")
# ── Prompt 模板 ──
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知,不需要回复)。
发件者: {from_agent}
主题: {title}
内容: {text}
已阅即可,无需操作。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。
发件者: {from_agent}
主题: {title}
内容: {text}
请处理后回复发件者:
curl -s -X POST http://localhost:8083/api/mail \\\n -H 'Content-Type: application/json' \\\n -d '{{"from": "{agent_id}", "to": "{from_agent}", "title": "回复: {title}", "text": "你的回复内容", "type": "inform", "in_reply_to": "{task_id}"}}'
⚠️ 将"你的回复内容"替换为实际回复。type 必须用 inform 防止循环。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照下面的步骤执行。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 描述: {description}
- 类型: {task_type}
- 优先级: {priority}
- 必要条件: {must_haves}
{retry_context}
## 状态机(你必须遵守的状态流转)
```
pending → claimed → working → review → done
│ │
│ └→ pending(驳回重做)
├──→ failed
├──→ blocked
└──→ cancelled
```
你当前处于 **{current_status}** 状态。
## 执行步骤
### 步骤 1: 开始工作
立即调 API 标记你已开始:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 步骤 2: 执行任务
根据任务描述完成你的工作(编码/回测/数据检查/审查等)。
### 步骤 3: 写入产出
⚠️ 这一步是必须的!不写产出 = 任务没完成。
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<产出类型>", "title": "<产出标题>", "content": "<你的产出内容>", "summary": "<简要说明>"}}'
```
**type 必须是以下之一**: code, document, data, config, other
如果产出太长,可以写文件后用路径引用:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "code", "title": "main.py", "content_path": "/path/to/file.py", "summary": "主程序"}}'
```
### 步骤 4: 提交完成或标记失败
✅ 成功完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "{completion_status}", "agent": "{agent_id}"}}'
```
❌ 无法完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
## FallbackAPI 调用失败时)
如果 API 失败 2 次,尝试:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}'
```
## 参考链接
- 查看任务完整信息: GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
- 写评论: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments {{"author": "{agent_id}", "body": "..."}}
- 完整 API 契约: docs/design/agent-api-contract.md
"""
# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done)
MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
发件者: {from_agent}
主题: {title}
续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
⚠️ 如果任务已完成,直接写产出即可,不要调 status API。
"""
class AgentSpawner:
"""Agent spawn 管理"""
def __init__(
self,
db_path: Optional[Path] = None,
agent_timeout: float = 630.0,
dry_run: bool = False,
api_host: str = "127.0.0.1",
api_port: int = 8083,
bootstrap_builder: Optional[Any] = None,
gateway_timeout: float = 600.0,
max_retries: int = 3,
max_monitor_timeouts: int = 3,
):
"""
Args:
db_path: 项目黑板 DB 路径(用于写 task_attempts
agent_timeout: Agent 超时秒数
dry_run: 测试模式,不实际 spawn
api_host: API 地址(供 Agent 回写)
api_port: API 端口(供 Agent 回写)
"""
self.db_path = db_path
self.agent_timeout = agent_timeout
self.dry_run = dry_run
self.api_host = api_host
self.api_port = api_port
self.bootstrap_builder = bootstrap_builder
self.gateway_timeout = gateway_timeout
self.max_retries = max_retries
self.max_monitor_timeouts = max_monitor_timeouts
# session 注册表 {session_id: {...}}
self._sessions: Dict[str, Dict[str, Any]] = {}
# B2 compact 等待计数器 {task_id: count}
self._compact_waits: Dict[str, int] = {}
@property
def active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""当前活跃的 spawn sessions"""
return {sid: s for sid, s in self._sessions.items()
if s.get("status") == "running"}
def build_spawn_message(
self,
task_id: str,
title: str,
description: str,
task_type: str = "",
priority: int = 5,
must_haves: str = "",
project_id: str = "",
agent_id: str = "",
current_status: str = "claimed",
retry_context: str = "",
task: Optional[Any] = None,
project_config: Optional[Dict[str, Any]] = None,
) -> str:
"""构建 Agent spawn 的消息(优先用 BootstrapBuilderfallback 用模板)
Args:
current_status: 任务当前状态(动态生成状态机提示)
retry_context: 重试上下文(前轮产出摘要 + 审查意见)
task: Task 对象(BootstrapBuilder 用)
project_config: 项目配置(BootstrapBuilder 用)
"""
# 尝试 BootstrapBuilder
if self.bootstrap_builder and task is not None:
try:
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
role="executor",
project_config=project_config,
)
# mail 任务用精简模板,不走 BootstrapBuilder
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
api_section = self._build_api_section(
project_id, task_id, agent_id)
return bootstrap_prompt + "\n\n---\n\n" + api_section
except Exception:
logger.exception("BootstrapBuilder failed, falling back to template")
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
# Fallback: 使用硬编码模板
# mail 任务直接 done,不走 review
completion_status = "done" if project_id == "_mail" else "review"
return SPAWN_PROMPT_TEMPLATE.format(
project_id=project_id,
task_id=task_id,
title=title,
description=description or "(无描述)",
task_type=task_type or "general",
priority=priority,
must_haves=must_haves or "(无)",
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
current_status=current_status or "claimed",
retry_context=retry_context or "",
completion_status=completion_status,
)
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
return f"""## 操作指令
### 状态回写
开始工作:
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 写入产出
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 完成后
成功:status → {success_status} | 失败:status → "failed"
"""
def _build_mail_prompt(self, task_id: str, title: str, description: str,
must_haves: str, agent_id: str) -> str:
"""构建 Mail 专用精简模板"""
# 解析 must_haves 获取 from 和 performative
from_agent = agent_id
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", agent_id)
performative = meta.get("performative", meta.get("type", "request"))
except Exception:
pass
# 截断 title 和 text 用于模板安全
safe_title = (title or "").replace('"', '\\"')[:100]
safe_text = (description or "").replace('"', '\\"')
common_kwargs = dict(
from_agent=from_agent,
title=safe_title,
text=safe_text,
task_id=task_id,
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
)
if performative == "inform":
return MAIL_INFORM_TEMPLATE.format(**common_kwargs)
else:
return MAIL_REQUEST_TEMPLATE.format(**common_kwargs)
async def spawn_full_agent(
self,
agent_id: str,
message: str,
new_session: bool = False,
task_id: Optional[str] = None,
on_complete: Optional[Any] = None,
use_main_session: bool = False,
task_db_path: Optional[Path] = None,
reuse_session_id: Optional[str] = None,
) -> str:
"""Spawn Full Agent(异步非阻塞)
Args:
on_complete: async callback(agent_id, outcome) — Agent 完成后调用
use_main_session: True = 投递到主 Agent session(不传 --session-id
reuse_session_id: 传入指定 session-id 复用(用于续杯)
Returns:
session_id
"""
# Session 策略:main > reuse > new
if use_main_session:
session_id = None
elif reuse_session_id:
session_id = reuse_session_id
else:
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id or "main")
self._register_session(session_id or "main", agent_id, task_id, pid=None)
return session_id or "main"
cmd = [
"openclaw", "agent",
"--agent", agent_id,
]
if session_id:
cmd.extend(["--session-id", session_id])
cmd.extend([
"--message", message,
"--json",
"--timeout", str(int(self.gateway_timeout)),
])
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
# Schedule monitor
asyncio.create_task(
self._monitor_process(session_id, proc, agent_id, task_id,
on_complete=on_complete,
db_path=task_db_path or self.db_path)
)
return session_id
except Exception as e:
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
raise
async def spawn_subagent(
self,
task_description: str,
task_id: Optional[str] = None,
) -> str:
"""Spawn Subagent(占位,实际通过 Gateway API
Returns:
session_id
"""
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# TODO: F17 通过 Gateway API sessions_spawn 实现
logger.info("Subagent spawn (session=%s) - placeholder", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# ── 续杯 Prompt 模板 ──
RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
## 操作指令
### 查看任务当前状态
```bash
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
```
### 如果已经完成,标记 review
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "review", "agent": "{agent_id}"}}'
```
### 写入产出(如果之前没写)
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\
-H 'Content-Type: application/json' \\
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 如果无法解决,标记失败
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
{fallback_hint}"""
async def _monitor_process(
self,
session_id: Optional[str],
proc: asyncio.subprocess.Process,
agent_id: str,
task_id: Optional[str],
on_complete: Optional[Any] = None,
db_path: Optional[Path] = None,
monitor_timeout_count: int = 0,
) -> None:
"""监控子进程全生命周期(设计文档 spawner-monitor-design.md"""
stdout_chunks: list = []
stderr_chunks: list = []
try:
# ── 等待进程退出 + 流式读取 ──
async def _read_streams():
async def _read_out():
while True:
chunk = await proc.stdout.read(4096)
if not chunk:
break
stdout_chunks.append(chunk)
async def _read_err():
while True:
chunk = await proc.stderr.read(4096)
if not chunk:
break
stderr_chunks.append(chunk)
await asyncio.gather(_read_out(), _read_err(), proc.wait())
await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout)
# ── 情况 A:进程退出 ──
exit_code = proc.returncode
await self._handle_exit(
session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path
)
except asyncio.TimeoutError:
# ── 情况 Bmonitor timeout(进程没退出)──
logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count + 1,
self.max_monitor_timeouts)
await self._handle_monitor_timeout(
session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks, monitor_timeout_count
)
async def _handle_exit(self, session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path):
"""情况 A:进程退出后的处理"""
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 解析 stdout JSON
meta = self._parse_stdout_json(stdout_text)
# 查任务实际状态
task_status = self._get_task_status(db_path, task_id) if task_id else None
# 分类
cls = self._classify_outcome(exit_code, meta, stderr_text, task_status)
outcome = cls["outcome"]
# 更新 session 状态
sid = session_id or "main"
if sid in self._sessions:
self._sessions[sid]["status"] = outcome
self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat()
self._sessions[sid]["exit_code"] = exit_code
if meta:
self._sessions[sid]["meta"] = meta
# 记录 attempt
self._record_attempt(
task_id, agent_id, outcome, exit_code=exit_code,
db_path=db_path,
metadata={
"transport": meta.get("transport"),
"fallback_reason": meta.get("fallbackReason"),
"duration_ms": meta.get("durationMs"),
"task_status_at_exit": task_status,
}
)
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
agent_id, session_id, outcome, exit_code, task_status)
if cls["release_counter"]:
await self._do_on_complete_async(on_complete, agent_id, outcome)
elif cls["should_retry"]:
# 续杯期间 counter 保持占用(设计文档规定)
# on_complete 传入续杯链:最终完成或超限时由 _do_retry 调用 release
await self._do_retry(
session_id, agent_id, task_id, on_complete, db_path,
cls.get("retry_field", "retry_count")
)
# else: 暂时性失败(A8/A9/A11),不 release,不 retry,等 ticker
async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks,
monitor_timeout_count):
"""情况 Bmonitor timeout"""
# 读已缓冲的 stderr
try:
remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0)
if remaining:
stderr_chunks.append(remaining)
except Exception:
pass
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
logger.error("Agent %s session stuck (session=%s, lock PID dead)",
agent_id, session_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "session_stuck", "diagnostics": state})
await self._do_on_complete_async(on_complete, agent_id, "session_stuck")
return
# B2/B3/B4: 进程还活着
# B2: compact 进行中 — 不计入 monitor timeout 计数,继续等
if state.get("recent_compact"):
logger.info("Agent %s recent compaction detected, extending patience "
"(session=%s, monitor=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
# 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次
# 用独立计数器防止无限等待
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
self._compact_waits[task_id] = compact_wait_count
if compact_wait_count >= self.max_monitor_timeouts:
logger.error("Agent %s max compact waits reached (session=%s, count=%d)",
agent_id, session_id, compact_wait_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "compact_hanging",
"compact_wait_count": compact_wait_count,
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
return
# 继续等
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
return
# B3/B4: 无 compact,正常计数
monitor_timeout_count += 1
if monitor_timeout_count >= self.max_monitor_timeouts:
logger.error("Agent %s max monitor timeouts (session=%s, count=%d)",
agent_id, session_id, monitor_timeout_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "max_monitor_timeouts",
"count": monitor_timeout_count,
"elapsed_seconds": monitor_timeout_count * int(self.agent_timeout),
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts")
return
# 未超限:继续等(不 release counter
logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
async def _do_retry(self, session_id, agent_id, task_id, on_complete,
db_path, retry_field="retry_count"):
"""续杯:用同一 session_id 再 spawn 一次"""
# Bug-6: 续杯前检查任务状态,已终态则跳过
if db_path and task_id:
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# counter 仍然占用,通过 on_complete release
await self._do_on_complete_async(on_complete, agent_id, "task_already_done")
return
finally:
conn.close()
except Exception:
logger.warning("Retry status check failed for %s, proceeding", task_id)
# 直接读写 tasks 表的 retry_count(广播场景下所有 Agent 共享同一 tasks 记录)
# task_attempts metadata 的 retry_count 不可靠(多 Agent 互相覆盖)
if retry_field == "retry_count" and db_path and task_id:
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
(task_id,),
)
conn.commit()
row = conn.execute(
"SELECT retry_count FROM tasks WHERE id=?", (task_id,)
).fetchone()
count = row["retry_count"] if row else 1
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry_count for task %s", task_id)
count = 1
else:
# 非 retry_count 的计数器(connect/api/lock)仍用 task_attempts metadata
retry_counts = self._get_retry_counts(db_path, task_id)
count = retry_counts.get(retry_field, 0) + 1
retry_counts[retry_field] = count
self._update_retry_counts(db_path, task_id, retry_counts)
if count >= self.max_retries:
logger.error("Agent %s max retries (session=%s, %s=%d)",
agent_id, session_id, retry_field, count)
self._mark_task(db_path, task_id, "failed", {
"reason": f"max_{retry_field}", "count": count,
})
await self._do_on_complete_async(on_complete, agent_id, "max_retries")
return
logger.info("Agent %s retry %s=%d/%d (session=%s)",
agent_id, retry_field, count, self.max_retries, session_id)
# 构建续杯 messageMail 用专用模板,Task 用标准模板)
task_info = self._get_task_info(db_path, task_id) or {}
project_id = task_info.get("project_id", "")
is_mail = project_id == "_mail"
if is_mail:
# Mail 续杯:精简模板,不含状态转换指令
must_haves = task_info.get("must_haves", "{}")
try:
meta = json.loads(must_haves) if must_haves else {}
except Exception:
meta = {}
message = MAIL_RETRY_PROMPT.format(
from_agent=meta.get("from", "unknown"),
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
)
else:
# Task 续杯:标准模板
fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else ""
message = self.RETRY_PROMPT.format(
project_id=project_id,
task_id=task_id or "",
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
api_host=self.api_host,
api_port=self.api_port,
agent_id=agent_id,
fallback_hint=fallback_hint,
)
# 续杯 spawncounter 保持占用,直到 max_retries 或最终完成时 release
# session 策略:原始是 main session (session_id=None) 时,retry 也用 main session
try:
await self.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task_id,
on_complete=on_complete,
use_main_session=(session_id is None),
reuse_session_id=session_id if session_id else None,
task_db_path=db_path,
)
except Exception:
logger.exception("Retry spawn failed for %s", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed")
# ── 辅助方法 ──
@staticmethod
def _parse_stdout_json(stdout_text: str) -> dict:
"""解析 openclaw agent --json 的 stdout 输出"""
text = stdout_text.strip()
if not text:
return {}
try:
data = json.loads(text)
return data.get("meta", {})
except json.JSONDecodeError:
# 多行输出,找最后一个 JSON
for line in reversed(text.splitlines()):
try:
data = json.loads(line)
return data.get("meta", {})
except json.JSONDecodeError:
continue
return {}
@staticmethod
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
"""查任务实际 API 状态"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
return row["status"] if row else None
finally:
conn.close()
except Exception:
return None
@staticmethod
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
"""查任务基本信息"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return None
info = dict(row)
# 从 db_path 推断 project_id: data/<project_id>/blackboard.db
info["project_id"] = db_path.parent.name
return info
finally:
conn.close()
except Exception:
return None
@staticmethod
def _check_session_state(agent_id: str) -> dict:
"""检查 sessions.json 和 lock 状态"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return result
try:
with open(sessions_path) as f:
sessions = json.load(f)
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
result["status"] = main_session.get("status", "unknown")
# 检查 lock
sf = main_session.get("sessionFile", "")
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_data = json.loads(lock_path.read_text())
pid = lock_data.get("pid")
result["lock_pid"] = pid
if pid:
import os
try:
os.kill(pid, 0)
result["lock_pid_alive"] = True
except ProcessLookupError:
result["lock_pid_alive"] = False
except Exception:
pass
# 最近 5 分钟的 compact
import time
now_ms = time.time() * 1000
for cp in main_session.get("compactionCheckpoints", []):
if (now_ms - cp.get("createdAt", 0)) < 300_000:
result["recent_compact"] = True
break
except Exception:
pass
return result
@staticmethod
def _classify_outcome(exit_code: int, meta: dict, stderr_text: str,
task_status: Optional[str]) -> dict:
"""分类退出原因,返回处理策略"""
transport = meta.get("transport", "")
fallback_reason = meta.get("fallbackReason")
# 终态判断
terminal_statuses = {"done", "review", "failed", "cancelled"}
is_terminal = task_status in terminal_statuses
# A4: 任务自己 failed
if task_status == "failed":
return {"outcome": "agent_failed", "release_counter": True,
"should_retry": False}
# A1: 正常完成
if exit_code == 0 and transport != "embedded" and is_terminal:
return {"outcome": "completed", "release_counter": True,
"should_retry": False}
# A5/A6: fallback
if exit_code == 0 and transport == "embedded":
if is_terminal:
return {"outcome": "fallback_timeout", "release_counter": True,
"should_retry": False}
# fallback 完成但任务没 done → 续杯
return {"outcome": "fallback_timeout", "release_counter": False,
"should_retry": True, "retry_field": "retry_count"}
# A2/A3: Gateway timeout(任务没完成)
if exit_code == 0 and not is_terminal:
return {"outcome": "gateway_timeout", "release_counter": False,
"should_retry": True, "retry_field": "retry_count"}
# A7: 认证失败
if exit_code != 0 and any(kw in stderr_text for kw in ["401", "403", "unauthorized", "auth"]):
return {"outcome": "auth_failed", "release_counter": True,
"should_retry": False}
# A8: Gateway 不可达
if exit_code != 0 and any(kw in stderr_text for kw in ["ECONNREFUSED", "ETIMEDOUT", "gateway closed", "ECONNRESET"]):
return {"outcome": "gateway_unreachable", "release_counter": True,
"should_retry": False, # 让 ticker 自然重试
"count_field": "connect_retry_count"}
# A9: API 错误
if exit_code != 0 and any(kw in stderr_text for kw in ["rate_limit", "500", "503", "API error"]):
return {"outcome": "api_error", "release_counter": True,
"should_retry": False,
"count_field": "api_retry_count"}
# A10: compact 失败
if exit_code != 0 and any(kw in stderr_text for kw in ["compaction-diag", "context-overflow", "timeout-compaction"]):
return {"outcome": "compact_failed", "release_counter": False,
"should_retry": True, "retry_field": "retry_count"}
# A11: Lock 冲突
if exit_code != 0 and any(kw in stderr_text for kw in ["lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "release_counter": True,
"should_retry": False,
"count_field": "lock_retry_count"}
# A12: 其他
return {"outcome": "agent_error", "release_counter": False,
"should_retry": True, "retry_field": "retry_count"}
@staticmethod
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
"""从最新 task_attempt 的 metadata 读计数器"""
defaults = {"retry_count": 0, "connect_retry_count": 0,
"api_retry_count": 0, "lock_retry_count": 0,
"monitor_timeout_count": 0}
if not db_path or not task_id:
return defaults
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row and row["metadata"]:
stored = json.loads(row["metadata"])
for k in defaults:
if k in stored:
defaults[k] = stored[k]
finally:
conn.close()
except Exception:
pass
return defaults
def _update_retry_counts(self, db_path: Optional[Path],
task_id: Optional[str], counts: dict):
"""将 retry counts 写回最新 task_attempt 的 metadata"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT rowid, metadata FROM task_attempts "
"WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
meta.update(counts)
conn.execute(
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
(json.dumps(meta), row["rowid"])
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry counts for task %s", task_id)
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
"""标记任务状态(用于 failed/escalate"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?",
(status, task_id)
)
if detail:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to mark task %s as %s", task_id, status)
@staticmethod
def _do_on_complete(on_complete, agent_id, outcome):
"""执行 on_complete 回调(同步+异步兼容)"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
# 注意:这里是同步调用的,不能 await
# 在 _monitor_process 的 async 上下文中应该用 await
pass
except Exception:
pass
async def _do_on_complete_async(self, on_complete, agent_id, outcome):
"""异步执行 on_complete 回调"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
def _register_session(
self,
session_id: str,
agent_id: str,
task_id: Optional[str],
pid: Optional[int],
) -> None:
"""注册 spawn session"""
self._sessions[session_id] = {
"agent_id": agent_id,
"task_id": task_id,
"pid": pid,
"status": "running",
"started_at": datetime.utcnow().isoformat(),
"completed_at": None,
}
def _record_attempt(
self,
task_id: Optional[str],
agent_id: str,
outcome: str,
exit_code: Optional[int] = None,
error: Optional[str] = None,
metadata: Optional[dict] = None,
db_path: Optional[Path] = None,
) -> None:
"""记录 task_attempt"""
effective_db = db_path or self.db_path
if not task_id or not effective_db:
return
try:
conn = get_connection(effective_db)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?",
(task_id,),
).fetchone()
attempt_number = (row["max_a"] or 0) + 1
meta = metadata or {}
if error:
meta["error"] = error
conn.execute(
"INSERT INTO task_attempts "
"(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) "
"VALUES (?,?,?,?,?,?,datetime('now'))",
(task_id, attempt_number, agent_id, outcome,
exit_code, json.dumps(meta)),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent_id,
"agent_completed" if outcome == "completed" else "daemon_tick",
json.dumps({"outcome": outcome, "attempt": attempt_number})),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to record attempt for task %s", task_id)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取 session 信息"""
return self._sessions.get(session_id)
def cleanup_session(self, session_id: str) -> None:
"""清理 session"""
if session_id in self._sessions:
session = self._sessions[session_id]
task_id = session.get("task_id")
del self._sessions[session_id]
# 清理 B2 compact 等待计数器
if task_id and task_id in self._compact_waits:
del self._compact_waits[task_id]