Files
sanguo_moziplus_v2/src/daemon/spawner.py
T
2026-05-26 11:48:30 +08:00

1266 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent Spawner — 异步 spawn Full Agent / Subagent
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawnF17 完善)
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from src.blackboard.db import get_connection, init_db
logger = logging.getLogger("moziplus-v2.spawner")
# ── Prompt 模板 ──
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知,不需要回复)。
发件者: {from_agent}
主题: {title}
内容: {text}
已阅即可,无需操作。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。
发件者: {from_agent}
主题: {title}
内容: {text}
请处理后回复发件者:
curl -s -X POST http://localhost:8083/api/mail \\\n -H 'Content-Type: application/json' \\\n -d '{{"from": "{agent_id}", "to": "{from_agent}", "title": "回复: {title}", "text": "你的回复内容", "type": "inform", "in_reply_to": "{task_id}"}}'
⚠️ 将"你的回复内容"替换为实际回复。type 必须用 inform 防止循环。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
"""
SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照下面的步骤执行。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 描述: {description}
- 类型: {task_type}
- 优先级: {priority}
- 必要条件: {must_haves}
{retry_context}
## 状态机(你必须遵守的状态流转)
```
pending → claimed → working → review → done
│ │
│ └→ pending(驳回重做)
├──→ failed
├──→ blocked
└──→ cancelled
```
你当前处于 **{current_status}** 状态。
## 执行步骤
### 步骤 1: 开始工作
立即调 API 标记你已开始:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 步骤 2: 执行任务
根据任务描述完成你的工作(编码/回测/数据检查/审查等)。
### 步骤 3: 写入产出
⚠️ 这一步是必须的!不写产出 = 任务没完成。
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<产出类型>", "title": "<产出标题>", "content": "<你的产出内容>", "summary": "<简要说明>"}}'
```
**type 必须是以下之一**: code, document, data, config, other
如果产出太长,可以写文件后用路径引用:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "code", "title": "main.py", "content_path": "/path/to/file.py", "summary": "主程序"}}'
```
### 步骤 4: 提交完成或标记失败
✅ 成功完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "{completion_status}", "agent": "{agent_id}"}}'
```
❌ 无法完成:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
## FallbackAPI 调用失败时)
如果 API 失败 2 次,尝试:
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}'
```
## 参考链接
- 查看任务完整信息: GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
- 写评论: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments {{"author": "{agent_id}", "body": "..."}}
- 完整 API 契约: docs/design/agent-api-contract.md
"""
# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done)
MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
发件者: {from_agent}
主题: {title}
续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
⚠️ 如果任务已完成,直接写产出即可,不要调 status API。
"""
class AgentBusyError(Exception):
"""Agent 被 counter 占用,无法 spawn"""
pass
class AgentSpawner:
"""Agent spawn 管理"""
def __init__(
self,
db_path: Optional[Path] = None,
agent_timeout: float = 630.0,
dry_run: bool = False,
api_host: str = "127.0.0.1",
api_port: int = 8083,
bootstrap_builder: Optional[Any] = None,
gateway_timeout: float = 600.0,
max_retries: int = 3,
max_monitor_timeouts: int = 3,
counter: Optional[Any] = None,
):
"""
Args:
db_path: 项目黑板 DB 路径(用于写 task_attempts
agent_timeout: Agent 超时秒数
dry_run: 测试模式,不实际 spawn
api_host: API 地址(供 Agent 回写)
api_port: API 端口(供 Agent 回写)
"""
self.db_path = db_path
self.agent_timeout = agent_timeout
self.dry_run = dry_run
self.api_host = api_host
self.api_port = api_port
self.bootstrap_builder = bootstrap_builder
self.gateway_timeout = gateway_timeout
self.max_retries = max_retries
self.max_monitor_timeouts = max_monitor_timeouts
# v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release
self.counter = counter
# session 注册表 {session_id: {...}}
self._sessions: Dict[str, Dict[str, Any]] = {}
# B2 compact 等待计数器 {task_id: count}
self._compact_waits: Dict[str, int] = {}
# B1 假死计数器 {task_id: count}
self._stuck_counts: Dict[str, int] = {}
@property
def active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""当前活跃的 spawn sessions"""
return {sid: s for sid, s in self._sessions.items()
if s.get("status") == "running"}
def build_spawn_message(
self,
task_id: str,
title: str,
description: str,
task_type: str = "",
priority: int = 5,
must_haves: str = "",
project_id: str = "",
agent_id: str = "",
current_status: str = "claimed",
retry_context: str = "",
task: Optional[Any] = None,
project_config: Optional[Dict[str, Any]] = None,
) -> str:
"""构建 Agent spawn 的消息(优先用 BootstrapBuilderfallback 用模板)
Args:
current_status: 任务当前状态(动态生成状态机提示)
retry_context: 重试上下文(前轮产出摘要 + 审查意见)
task: Task 对象(BootstrapBuilder 用)
project_config: 项目配置(BootstrapBuilder 用)
"""
# 尝试 BootstrapBuilder
if self.bootstrap_builder and task is not None:
try:
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
role="executor",
project_config=project_config,
)
# mail 任务用精简模板,不走 BootstrapBuilder
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
api_section = self._build_api_section(
project_id, task_id, agent_id)
return bootstrap_prompt + "\n\n---\n\n" + api_section
except Exception:
logger.exception("BootstrapBuilder failed, falling back to template")
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
# Fallback: 使用硬编码模板
# mail 任务直接 done,不走 review
completion_status = "done" if project_id == "_mail" else "review"
return SPAWN_PROMPT_TEMPLATE.format(
project_id=project_id,
task_id=task_id,
title=title,
description=description or "(无描述)",
task_type=task_type or "general",
priority=priority,
must_haves=must_haves or "(无)",
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
current_status=current_status or "claimed",
retry_context=retry_context or "",
completion_status=completion_status,
)
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
return f"""## 操作指令
### 状态回写
开始工作:
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \
-H 'Content-Type: application/json' \
-d '{{"status": "working", "agent": "{agent_id}"}}'
```
### 写入产出
```bash
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
-H 'Content-Type: application/json' \
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 完成后
成功:status → {success_status} | 失败:status → "failed"
"""
def _build_mail_prompt(self, task_id: str, title: str, description: str,
must_haves: str, agent_id: str) -> str:
"""构建 Mail 专用精简模板"""
# 解析 must_haves 获取 from 和 performative
from_agent = agent_id
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", agent_id)
performative = meta.get("performative", meta.get("type", "request"))
except Exception:
pass
# 截断 title 和 text 用于模板安全
safe_title = (title or "").replace('"', '\\"')[:100]
safe_text = (description or "").replace('"', '\\"')
common_kwargs = dict(
from_agent=from_agent,
title=safe_title,
text=safe_text,
task_id=task_id,
agent_id=agent_id,
api_host=self.api_host,
api_port=self.api_port,
)
if performative == "inform":
return MAIL_INFORM_TEMPLATE.format(**common_kwargs)
else:
return MAIL_REQUEST_TEMPLATE.format(**common_kwargs)
async def spawn_full_agent(
self,
agent_id: str,
message: str,
new_session: bool = False,
task_id: Optional[str] = None,
on_complete: Optional[Any] = None,
use_main_session: bool = False,
task_db_path: Optional[Path] = None,
reuse_session_id: Optional[str] = None,
) -> str:
"""Spawn Full Agent(异步非阻塞)
v2.7.2: counter acquire/release 在内部统一管理。
调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。
Args:
on_complete: 业务回调(agent_id, outcome) — 不含 counter.release
counter.release 由内部 wrapped_on_complete 保证。
use_main_session: True = 投递到主 Agent session(不传 --session-id
reuse_session_id: 传入指定 session-id 复用(用于续杯)
Returns:
session_id
Raises:
AgentBusyError: agent 被 counter 占用或冷却中
"""
# ── v2.7.2: counter 检查 + acquire ──
if self.counter:
if not await self.counter.can_acquire(agent_id):
raise AgentBusyError(agent_id)
await self.counter.acquire(agent_id)
# ── v2.7.2: session state 检查(防外部占用)──
if use_main_session:
session_state = self._check_session_state(agent_id)
if session_state.get("lock_pid_alive"):
# main session 被 webchat/Control UI/cron 占用
logger.info("Spawn skipped: %s main session locked by PID %d",
agent_id, session_state.get("lock_pid"))
raise AgentBusyError(f"{agent_id}: session locked by PID {session_state.get('lock_pid')}")
if session_state.get("status") == "processing":
logger.info("Spawn skipped: %s main session processing", agent_id)
raise AgentBusyError(f"{agent_id}: session processing")
if session_state.get("recent_compact"):
logger.info("Spawn skipped: %s compacting", agent_id)
raise AgentBusyError(f"{agent_id}: compacting")
# Session 策略:main > reuse > new
if use_main_session:
session_id = None
elif reuse_session_id:
session_id = reuse_session_id
else:
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id or "main")
self._register_session(session_id or "main", agent_id, task_id, pid=None)
return session_id or "main"
# ── v2.7.2: wrapped_on_complete 保证 counter release ──
async def _wrapped_on_complete(aid, outcome):
try:
if self.counter:
self.counter.release(aid)
finally:
if on_complete:
try:
result = on_complete(aid, outcome)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("Business on_complete failed for %s", aid, exc_info=True)
cmd = [
"openclaw", "agent",
"--agent", agent_id,
]
if session_id:
cmd.extend(["--session-id", session_id])
cmd.extend([
"--message", message,
"--json",
"--timeout", str(int(self.gateway_timeout)),
])
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
# Schedule monitor(传 wrapped_on_complete
asyncio.create_task(
self._monitor_process(session_id, proc, agent_id, task_id,
on_complete=_wrapped_on_complete,
db_path=task_db_path or self.db_path)
)
return session_id
except Exception as e:
# spawn 失败也要 release counter
if self.counter:
self.counter.release(agent_id)
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
raise
async def spawn_subagent(
self,
task_description: str,
task_id: Optional[str] = None,
) -> str:
"""Spawn Subagent(占位,实际通过 Gateway API
Returns:
session_id
"""
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# TODO: F17 通过 Gateway API sessions_spawn 实现
logger.info("Subagent spawn (session=%s) - placeholder", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# ── 续杯 Prompt 模板 ──
RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
## 任务信息
- 项目: {project_id}
- 任务ID: {task_id}
- 标题: {title}
- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
## 操作指令
### 查看任务当前状态
```bash
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
```
### 如果已经完成,标记 review
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "review", "agent": "{agent_id}"}}'
```
### 写入产出(如果之前没写)
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\
-H 'Content-Type: application/json' \\
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
```
### 如果无法解决,标记失败
```bash
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
```
{fallback_hint}"""
async def _monitor_process(
self,
session_id: Optional[str],
proc: asyncio.subprocess.Process,
agent_id: str,
task_id: Optional[str],
on_complete: Optional[Any] = None,
db_path: Optional[Path] = None,
monitor_timeout_count: int = 0,
) -> None:
"""监控子进程全生命周期(设计文档 spawner-monitor-design.md"""
stdout_chunks: list = []
stderr_chunks: list = []
try:
# ── 等待进程退出 + 流式读取 ──
async def _read_streams():
async def _read_out():
while True:
chunk = await proc.stdout.read(4096)
if not chunk:
break
stdout_chunks.append(chunk)
async def _read_err():
while True:
chunk = await proc.stderr.read(4096)
if not chunk:
break
stderr_chunks.append(chunk)
await asyncio.gather(_read_out(), _read_err(), proc.wait())
await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout)
# ── 情况 A:进程退出 ──
exit_code = proc.returncode
await self._handle_exit(
session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path
)
except asyncio.TimeoutError:
# ── 情况 Bmonitor timeout(进程没退出)──
logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count + 1,
self.max_monitor_timeouts)
await self._handle_monitor_timeout(
session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks, monitor_timeout_count
)
async def _handle_exit(self, session_id, agent_id, task_id, exit_code,
stdout_chunks, stderr_chunks, on_complete, db_path):
"""情况 A:进程退出后的处理
v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。
只有 A2/A3gateway_timeout)触发续杯,其他都不 retry。
A9api_error/429)额外推回 pending + 设冷却。
"""
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 解析 stdout JSON
meta = self._parse_stdout_json(stdout_text)
# 查任务实际状态
task_status = self._get_task_status(db_path, task_id) if task_id else None
# 分类
cls = self._classify_outcome(exit_code, meta, stderr_text, task_status)
outcome = cls["outcome"]
# 更新 session 状态
sid = session_id or "main"
if sid in self._sessions:
self._sessions[sid]["status"] = outcome
self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat()
self._sessions[sid]["exit_code"] = exit_code
if meta:
self._sessions[sid]["meta"] = meta
# 记录 attempt
self._record_attempt(
task_id, agent_id, outcome, exit_code=exit_code,
db_path=db_path,
metadata={
"transport": meta.get("transport"),
"fallback_reason": meta.get("fallbackReason"),
"duration_ms": meta.get("durationMs"),
"task_status_at_exit": task_status,
}
)
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
agent_id, session_id, outcome, exit_code, task_status)
if cls["should_retry"]:
# A2/A3: gateway_timeout → 续杯(on_complete 会 release counter
await self._do_retry(
session_id, agent_id, task_id, on_complete, db_path,
cls.get("retry_field", "retry_count")
)
elif outcome == "api_error":
# A9: 429/API 错误 → release counteron_complete+ 推回 pending + 冷却
await self._do_on_complete_async(on_complete, agent_id, outcome)
if self.counter:
self.counter.set_cooldown(agent_id)
if db_path and task_id:
self._mark_task(db_path, task_id, "pending", {
"reason": "api_error_retry",
})
logger.info("Task %s pushed back to pending (api_error)", task_id)
elif outcome == "fallback_timeout" and not cls["should_retry"]:
# A5/A6: fallback 不应出现,标 failed + escalate + context 日志
logger.error("UNEXPECTED FALLBACK: agent=%s session=%s task=%s "
"transport=%s fallbackReason=%s counter_active=%s "
"This indicates counter check failed to prevent concurrent spawn.",
agent_id, session_id, task_id,
meta.get("transport"), meta.get("fallbackReason"),
self.counter.active_agents if self.counter else "N/A")
await self._do_on_complete_async(on_complete, agent_id, outcome)
if db_path and task_id:
self._mark_task(db_path, task_id, "failed", {
"reason": "unexpected_fallback",
"transport": meta.get("transport"),
"fallback_reason": meta.get("fallbackReason"),
"duration_ms": meta.get("durationMs"),
})
else:
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# 进程退出 → on_complete release counter
# 任务状态由各 outcome 自行处理(或等 ticker
await self._do_on_complete_async(on_complete, agent_id, outcome)
async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc,
on_complete, db_path, stderr_chunks,
monitor_timeout_count):
"""情况 Bmonitor timeout"""
# 读已缓冲的 stderr
try:
remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0)
if remaining:
stderr_chunks.append(remaining)
except Exception:
pass
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 — 先复活,连续假死 ≥2 次再 failed
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
# 假死计数
stuck_count = self._stuck_counts.get(task_id, 0) + 1
self._stuck_counts[task_id] = stuck_count
if stuck_count >= 2:
# 连续假死 ≥2 次,标 failed
logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)",
agent_id, stuck_count, session_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "session_stuck", "stuck_count": stuck_count,
"diagnostics": state})
await self._do_on_complete_async(on_complete, agent_id, "session_stuck")
return
# 第 1 次假死 → 尝试复活
logger.warning("Agent %s session stuck (attempt %d), reviving (session=%s)",
agent_id, stuck_count, session_id)
revived = self._revive_session(agent_id)
if revived:
logger.info("Agent %s session revived, releasing counter for ticker re-dispatch",
agent_id)
# release counter → 任务保持 working → ticker 下次 re-dispatch
await self._do_on_complete_async(on_complete, agent_id, "session_revived")
else:
# 复活失败 → 标 failed
logger.error("Agent %s revive failed, marking failed", agent_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "revive_failed", "stuck_count": stuck_count,
"diagnostics": state})
await self._do_on_complete_async(on_complete, agent_id, "revive_failed")
return
# B2/B3/B4: 进程还活着
# B2: compact 进行中 — 不计入 monitor timeout 计数,继续等
if state.get("recent_compact"):
logger.info("Agent %s recent compaction detected, extending patience "
"(session=%s, monitor=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
# 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次
# 用独立计数器防止无限等待
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
self._compact_waits[task_id] = compact_wait_count
if compact_wait_count >= self.max_monitor_timeouts:
logger.error("Agent %s max compact waits reached (session=%s, count=%d)",
agent_id, session_id, compact_wait_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "compact_hanging",
"compact_wait_count": compact_wait_count,
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
return
# 继续等
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
return
# B3/B4: 无 compact,正常计数
monitor_timeout_count += 1
if monitor_timeout_count >= self.max_monitor_timeouts:
logger.error("Agent %s max monitor timeouts (session=%s, count=%d)",
agent_id, session_id, monitor_timeout_count)
self._mark_task(db_path, task_id, "failed", {
"reason": "max_monitor_timeouts",
"count": monitor_timeout_count,
"elapsed_seconds": monitor_timeout_count * int(self.agent_timeout),
"diagnostics": state,
})
await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts")
return
# 未超限:继续等(不 release counter
logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)",
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
asyncio.create_task(
self._monitor_process(
session_id, proc, agent_id, task_id,
on_complete=on_complete, db_path=db_path,
monitor_timeout_count=monitor_timeout_count,
)
)
async def _do_retry(self, session_id, agent_id, task_id, on_complete,
db_path, retry_field="retry_count"):
"""续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn
v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。
需要手动 release counter,然后 spawn_full_agent 内部会 acquire。
on_complete(含 counter release)置为 None,避免 double release。
"""
# ── 关键:手动 release counter(进程退出 = agent 空闲)──
if self.counter:
self.counter.release(agent_id)
# 旧 wrapped_on_complete 含 counter.release,不再使用,防止 double release
on_complete = None
# 续杯前检查任务状态,已终态则跳过
if db_path and task_id:
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["status"] in ("done", "failed", "cancelled", "review", "pending"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# on_complete = wrapped_on_complete,会 release counter
await self._do_on_complete_async(on_complete, agent_id, "task_already_done")
return
finally:
conn.close()
except Exception:
logger.warning("Retry status check failed for %s, proceeding", task_id)
# 直接读写 tasks 表的 retry_count
if retry_field == "retry_count" and db_path and task_id:
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
(task_id,),
)
conn.commit()
row = conn.execute(
"SELECT retry_count FROM tasks WHERE id=?", (task_id,)
).fetchone()
count = row["retry_count"] if row else 1
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry_count for task %s", task_id)
count = 1
else:
retry_counts = self._get_retry_counts(db_path, task_id)
count = retry_counts.get(retry_field, 0) + 1
retry_counts[retry_field] = count
self._update_retry_counts(db_path, task_id, retry_counts)
if count >= self.max_retries:
logger.error("Agent %s max retries (session=%s, %s=%d)",
agent_id, session_id, retry_field, count)
self._mark_task(db_path, task_id, "failed", {
"reason": f"max_{retry_field}", "count": count,
})
await self._do_on_complete_async(on_complete, agent_id, "max_retries")
return
logger.info("Agent %s retry %s=%d/%d (session=%s)",
agent_id, retry_field, count, self.max_retries, session_id)
# 构建续杯 messageMail 用专用模板,Task 用标准模板)
task_info = self._get_task_info(db_path, task_id) or {}
project_id = task_info.get("project_id", "")
is_mail = project_id == "_mail"
if is_mail:
must_haves = task_info.get("must_haves", "{}")
try:
meta = json.loads(must_haves) if must_haves else {}
except Exception:
meta = {}
message = MAIL_RETRY_PROMPT.format(
from_agent=meta.get("from", "unknown"),
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
)
else:
fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else ""
message = self.RETRY_PROMPT.format(
project_id=project_id,
task_id=task_id or "",
title=task_info.get("title", ""),
retry_count=count,
max_retries=self.max_retries,
api_host=self.api_host,
api_port=self.api_port,
agent_id=agent_id,
fallback_hint=fallback_hint,
)
# v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire
# on_complete = wrapped_on_complete(含 counter release),作为业务回调传入
try:
await self.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task_id,
on_complete=on_complete,
use_main_session=(session_id is None),
reuse_session_id=session_id if session_id else None,
task_db_path=db_path,
)
except AgentBusyError:
# agent 被其他任务占用(不应发生,但防御)
logger.warning("Retry spawn skipped: %s busy (unexpected)", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_agent_busy")
except Exception:
logger.exception("Retry spawn failed for %s", agent_id)
await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed")
# ── 辅助方法 ──
@staticmethod
def _parse_stdout_json(stdout_text: str) -> dict:
"""解析 openclaw agent --json 的 stdout 输出
openclaw agent --json 输出格式:
{ "kind": "agent-response", "response": { "meta": { "transport": ..., ... } } }
"""
text = stdout_text.strip()
if not text:
return {}
try:
data = json.loads(text)
# 正确路径:data.response.meta
response = data.get("response", data)
return response.get("meta", {})
except json.JSONDecodeError:
# 多行输出,找最后一个 JSON
for line in reversed(text.splitlines()):
try:
data = json.loads(line)
response = data.get("response", data)
return response.get("meta", {})
except json.JSONDecodeError:
continue
return {}
@staticmethod
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
"""查任务实际 API 状态"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
return row["status"] if row else None
finally:
conn.close()
except Exception:
return None
@staticmethod
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
"""查任务基本信息"""
if not db_path or not task_id:
return None
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return None
info = dict(row)
# 从 db_path 推断 project_id: data/<project_id>/blackboard.db
info["project_id"] = db_path.parent.name
return info
finally:
conn.close()
except Exception:
return None
@staticmethod
def _revive_session(agent_id: str) -> bool:
"""假死复活术:修改 sessions.json status 从 running 改为 idle"""
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return False
try:
with open(sessions_path) as f:
sessions = json.load(f)
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
if main_session.get("status") != "running":
return False # 不是 running 状态,不需要复活
main_session["status"] = "idle"
sessions[main_key] = main_session
with open(sessions_path, "w") as f:
json.dump(sessions, f, indent=2)
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
return True
except Exception:
logger.exception("Failed to revive %s", agent_id)
return False
@staticmethod
def _check_session_state(agent_id: str) -> dict:
"""检查 sessions.json 和 lock 状态"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json"
if not sessions_path.exists():
return result
try:
with open(sessions_path) as f:
sessions = json.load(f)
main_key = f"agent:{agent_id}:main"
main_session = sessions.get(main_key, {})
result["status"] = main_session.get("status", "unknown")
# 检查 lock
sf = main_session.get("sessionFile", "")
if sf:
lock_path = Path(sf + ".lock")
if lock_path.exists():
try:
lock_data = json.loads(lock_path.read_text())
pid = lock_data.get("pid")
result["lock_pid"] = pid
if pid:
import os
try:
os.kill(pid, 0)
result["lock_pid_alive"] = True
except ProcessLookupError:
result["lock_pid_alive"] = False
except Exception:
pass
# 最近 5 分钟的 compact
import time
now_ms = time.time() * 1000
for cp in main_session.get("compactionCheckpoints", []):
if (now_ms - cp.get("createdAt", 0)) < 300_000:
result["recent_compact"] = True
break
except Exception:
pass
return result
@staticmethod
def _classify_outcome(exit_code: int, meta: dict, stderr_text: str,
task_status: Optional[str]) -> dict:
"""分类退出原因,返回处理策略
v2.7.2: 去掉 release_counter 字段。进程退出 = release counter(由 wrapped_on_complete 保证)。
只有 A2/A3gateway_timeout)触发 retry,其他都不 retry。
"""
transport = meta.get("transport", "")
fallback_reason = meta.get("fallbackReason")
# 终态判断
terminal_statuses = {"done", "review", "failed", "cancelled"}
is_terminal = task_status in terminal_statuses
# A4: 任务自己 failed
if task_status == "failed":
return {"outcome": "agent_failed", "should_retry": False}
# A1: 正常完成
if exit_code == 0 and transport != "embedded" and is_terminal:
return {"outcome": "completed", "should_retry": False}
# A5/A6: fallback(不应出现 — 出现说明 counter 检查失效)
if exit_code == 0 and transport == "embedded":
if is_terminal:
return {"outcome": "fallback_timeout", "should_retry": False}
# fallback 完成但任务没 done → 不 retry,等 _handle_exit 特殊处理
return {"outcome": "fallback_timeout", "should_retry": False}
# A2/A3: Gateway timeout(任务没完成)— 唯一续杯场景
# P2 兜底:如果 transport 仍为空(解析失败),检查 stderr 判断是否是 lock/compact 等
if exit_code == 0 and not is_terminal:
# P2: transport=null 时检查 stderr 辅助判断
if not transport:
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "should_retry": False}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["rate_limit", "500", "503"]):
return {"outcome": "api_error", "should_retry": False}
return {"outcome": "gateway_timeout", "should_retry": True,
"retry_field": "retry_count"}
# A7: 认证失败
if exit_code != 0 and any(kw in stderr_text for kw in ["401", "403", "unauthorized", "auth"]):
return {"outcome": "auth_failed", "should_retry": False}
# A8: Gateway 不可达
if exit_code != 0 and any(kw in stderr_text for kw in ["ECONNREFUSED", "ETIMEDOUT", "gateway closed", "ECONNRESET"]):
return {"outcome": "gateway_unreachable", "should_retry": False,
"count_field": "connect_retry_count"}
# A9: API 错误(429 等)— 不 retry_handle_exit 推回 pending + 冷却
if exit_code != 0 and any(kw in stderr_text for kw in ["rate_limit", "500", "503", "API error"]):
return {"outcome": "api_error", "should_retry": False,
"count_field": "api_retry_count"}
# A10: compact 失败 — 不 retry,等 ticker 重新调度
if exit_code != 0 and any(kw in stderr_text for kw in ["compaction-diag", "context-overflow", "timeout-compaction"]):
return {"outcome": "compact_failed", "should_retry": False}
# A11: Lock 冲突
if exit_code != 0 and any(kw in stderr_text for kw in ["lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "should_retry": False,
"count_field": "lock_retry_count"}
# A12: 其他 — 不 retry
return {"outcome": "agent_error", "should_retry": False}
@staticmethod
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
"""从最新 task_attempt 的 metadata 读计数器"""
defaults = {"retry_count": 0, "connect_retry_count": 0,
"api_retry_count": 0, "lock_retry_count": 0,
"monitor_timeout_count": 0}
if not db_path or not task_id:
return defaults
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row and row["metadata"]:
stored = json.loads(row["metadata"])
for k in defaults:
if k in stored:
defaults[k] = stored[k]
finally:
conn.close()
except Exception:
pass
return defaults
def _update_retry_counts(self, db_path: Optional[Path],
task_id: Optional[str], counts: dict):
"""将 retry counts 写回最新 task_attempt 的 metadata"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT rowid, metadata FROM task_attempts "
"WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
(task_id,)
).fetchone()
if row:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
meta.update(counts)
conn.execute(
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
(json.dumps(meta), row["rowid"])
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry counts for task %s", task_id)
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
"""标记任务状态(用于 failed/escalate"""
if not db_path or not task_id:
return
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?",
(status, task_id)
)
if detail:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to mark task %s as %s", task_id, status)
@staticmethod
def _do_on_complete(on_complete, agent_id, outcome):
"""执行 on_complete 回调(同步+异步兼容)"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
# 注意:这里是同步调用的,不能 await
# 在 _monitor_process 的 async 上下文中应该用 await
pass
except Exception:
pass
async def _do_on_complete_async(self, on_complete, agent_id, outcome):
"""异步执行 on_complete 回调"""
if not on_complete:
return
try:
result = on_complete(agent_id, outcome)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
def _register_session(
self,
session_id: str,
agent_id: str,
task_id: Optional[str],
pid: Optional[int],
) -> None:
"""注册 spawn session"""
self._sessions[session_id] = {
"agent_id": agent_id,
"task_id": task_id,
"pid": pid,
"status": "running",
"started_at": datetime.utcnow().isoformat(),
"completed_at": None,
}
def _record_attempt(
self,
task_id: Optional[str],
agent_id: str,
outcome: str,
exit_code: Optional[int] = None,
error: Optional[str] = None,
metadata: Optional[dict] = None,
db_path: Optional[Path] = None,
) -> None:
"""记录 task_attempt"""
effective_db = db_path or self.db_path
if not task_id or not effective_db:
return
try:
conn = get_connection(effective_db)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?",
(task_id,),
).fetchone()
attempt_number = (row["max_a"] or 0) + 1
meta = metadata or {}
if error:
meta["error"] = error
conn.execute(
"INSERT INTO task_attempts "
"(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) "
"VALUES (?,?,?,?,?,?,datetime('now'))",
(task_id, attempt_number, agent_id, outcome,
exit_code, json.dumps(meta)),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent_id,
"agent_completed" if outcome == "completed" else "daemon_tick",
json.dumps({"outcome": outcome, "attempt": attempt_number})),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to record attempt for task %s", task_id)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取 session 信息"""
return self._sessions.get(session_id)
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
for sid, info in self._sessions.items():
if info.get("agent_id") == agent_id and info.get("status") == "running":
return info
return None
def cleanup_session(self, session_id: str) -> None:
"""清理 session"""
if session_id in self._sessions:
session = self._sessions[session_id]
task_id = session.get("task_id")
del self._sessions[session_id]
# 清理 B2 compact 等待计数器
if task_id and task_id in self._compact_waits:
del self._compact_waits[task_id]