1614 lines
71 KiB
Python
1614 lines
71 KiB
Python
"""Agent Spawner - 异步 spawn Full Agent / Subagent
|
||
|
||
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
|
||
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import uuid
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from src.blackboard.db import get_connection, init_db
|
||
|
||
logger = logging.getLogger("moziplus-v2.spawner")
|
||
|
||
|
||
# ── Prompt 模板 ──
|
||
|
||
# Mail 专用模板:inform 类型(纯通知,状态由系统管理)
|
||
MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。
|
||
|
||
发件者: {from_agent}
|
||
主题: {title}
|
||
内容: {text}
|
||
|
||
已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。
|
||
⚠️ 不要执行任何状态转换命令。
|
||
"""
|
||
|
||
# Mail 专用模板:request 类型(需要处理并回复,状态由系统管理)
|
||
MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。
|
||
|
||
发件者: {from_agent}
|
||
主题: {title}
|
||
内容: {text}
|
||
|
||
### 如何回复发件者
|
||
|
||
curl -s -X POST http://localhost:8083/api/mail \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}'
|
||
|
||
⚠️ 不需要填 "to",系统自动回复给发件者。
|
||
|
||
### 如何给其他人发新邮件
|
||
|
||
curl -s -X POST http://localhost:8083/api/mail \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}'
|
||
|
||
⚠️ to 必须是有效的 agent id: {valid_agents}
|
||
⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)
|
||
⚠️ 不能给自己发邮件
|
||
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
|
||
"""
|
||
|
||
SPAWN_PROMPT_TEMPLATE = """{identity_section}
|
||
|
||
## 任务
|
||
{title}
|
||
{description}
|
||
|
||
项目: {project_id} | ID: {task_id}
|
||
类型: {task_type} | 优先级: {priority}
|
||
验收标准: {must_haves}
|
||
|
||
{retry_context}
|
||
|
||
## 你能做什么
|
||
- 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all
|
||
- 读所有活跃任务: GET {api_base}/projects/{project_id}/tasks?status=working,claimed,review
|
||
- 写产出: POST {api_base}/projects/{project_id}/tasks/{task_id}/outputs
|
||
- 写评论/交接: POST {api_base}/projects/{project_id}/tasks/{task_id}/comments
|
||
- 更新状态: POST {api_base}/projects/{project_id}/tasks/{task_id}/status
|
||
- 创建子任务: POST {api_base}/projects/{project_id}/tasks
|
||
- 认领任务: POST {api_base}/projects/{project_id}/tasks/{{{{id}}}}/claim
|
||
|
||
## 约束
|
||
- 完成后必须写产出物(output)并标 review,不能无产出就提交
|
||
- 失败了标 failed 并写明原因
|
||
- 产出物 handoff comment ≥ 50 字符(用于系统验证)
|
||
- 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment)
|
||
- 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组)
|
||
- 安全红线: {guardrails_summary}
|
||
|
||
### API 请求体示例
|
||
写产出: POST .../outputs
|
||
```json
|
||
{{{{"agent": "{agent_id}", "content_type": "code", "title": "产出标题", "content_path": "/path/to/file", "summary": "简要说明"}}}}
|
||
```
|
||
|
||
写评论: POST .../comments
|
||
```json
|
||
{{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}}
|
||
```
|
||
"""
|
||
|
||
|
||
DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一个 v2.9 四相循环的讨论环节。
|
||
|
||
## 你的任务
|
||
|
||
{goal_snapshot}
|
||
|
||
## 约束
|
||
|
||
{constraints}
|
||
|
||
## 黑板 API
|
||
|
||
你可以随时:
|
||
- 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs)
|
||
- 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments
|
||
body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}}
|
||
- 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks
|
||
body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}}
|
||
- 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim
|
||
|
||
## 行为准则
|
||
|
||
1. **你是自主的。**读黑板、思考、行动,不要等指令。
|
||
2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。
|
||
3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。
|
||
4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。
|
||
5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。
|
||
6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。
|
||
|
||
## 讨论完成后
|
||
|
||
- 如果讨论收敛到可执行的任务,直接创建 sub task
|
||
- 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决
|
||
- 标记完成:
|
||
```bash
|
||
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \
|
||
-H 'Content-Type: application/json' \
|
||
-d '{{"status": "done", "agent": "{agent_id}"}}'
|
||
```
|
||
"""
|
||
|
||
|
||
# Mail 续杯专用模板:不包含状态转换指令(系统自动标 done)
|
||
MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
|
||
|
||
发件者: {from_agent}
|
||
主题: {title}
|
||
续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
|
||
|
||
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
|
||
|
||
⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。
|
||
⚠️ 如果任务已完成,直接写产出即可,不要调 status API。
|
||
"""
|
||
|
||
|
||
class AgentBusyError(Exception):
|
||
"""Agent 无法 spawn(被占用/冷却/session 锁等)
|
||
|
||
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
|
||
"""
|
||
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
|
||
self.agent_id = agent_id
|
||
self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck
|
||
self.detail = detail or {}
|
||
super().__init__(f"{agent_id}: {reason}")
|
||
|
||
|
||
class AgentSpawner:
|
||
"""Agent spawn 管理"""
|
||
|
||
def __init__(
|
||
self,
|
||
db_path: Optional[Path] = None,
|
||
agent_timeout: float = 630.0,
|
||
dry_run: bool = False,
|
||
api_host: str = "127.0.0.1",
|
||
api_port: int = 8083,
|
||
bootstrap_builder: Optional[Any] = None,
|
||
gateway_timeout: float = 600.0,
|
||
max_retries: int = 3,
|
||
max_monitor_timeouts: int = 3,
|
||
counter: Optional[Any] = None,
|
||
):
|
||
"""
|
||
Args:
|
||
db_path: 项目黑板 DB 路径(用于写 task_attempts)
|
||
agent_timeout: Agent 超时秒数
|
||
dry_run: 测试模式,不实际 spawn
|
||
api_host: API 地址(供 Agent 回写)
|
||
api_port: API 端口(供 Agent 回写)
|
||
"""
|
||
self.db_path = db_path
|
||
self.agent_timeout = agent_timeout
|
||
self.dry_run = dry_run
|
||
self.api_host = api_host
|
||
self.api_port = api_port
|
||
self.bootstrap_builder = bootstrap_builder
|
||
self.gateway_timeout = gateway_timeout
|
||
self.max_retries = max_retries
|
||
self.max_monitor_timeouts = max_monitor_timeouts
|
||
# v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release)
|
||
self.counter = counter
|
||
# guardrails: 由 main.py 在初始化后赋值
|
||
self.guardrails = None
|
||
|
||
# session 注册表 {session_id: {...}}
|
||
self._sessions: Dict[str, Dict[str, Any]] = {}
|
||
# B2 compact 等待计数器 {task_id: count}
|
||
self._compact_waits: Dict[str, int] = {}
|
||
# B1 假死计数器 {task_id: count}
|
||
self._stuck_counts: Dict[str, int] = {}
|
||
self._valid_agents_cache: Optional[set] = None
|
||
|
||
def _load_valid_agents(self) -> set:
|
||
"""从 config/default.yaml 读取有效 Agent ID 列表(带缓存)"""
|
||
if self._valid_agents_cache is not None:
|
||
return self._valid_agents_cache
|
||
config_path = Path(__file__).parent.parent / "config" / "default.yaml"
|
||
if config_path.exists():
|
||
try:
|
||
import yaml
|
||
with open(config_path) as f:
|
||
cfg = yaml.safe_load(f)
|
||
profiles = cfg.get("daemon", {}).get("agent_profiles", {})
|
||
if profiles:
|
||
self._valid_agents_cache = set(profiles.keys())
|
||
return self._valid_agents_cache
|
||
except Exception:
|
||
pass
|
||
self._valid_agents_cache = {
|
||
"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
|
||
"jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"
|
||
}
|
||
return self._valid_agents_cache
|
||
|
||
@property
|
||
def active_sessions(self) -> Dict[str, Dict[str, Any]]:
|
||
"""当前活跃的 spawn sessions"""
|
||
return {sid: s for sid, s in self._sessions.items()
|
||
if s.get("status") == "running"}
|
||
|
||
def build_spawn_message(
|
||
self,
|
||
task_id: str,
|
||
title: str,
|
||
description: str,
|
||
task_type: str = "",
|
||
priority: int = 5,
|
||
must_haves: str = "",
|
||
project_id: str = "",
|
||
agent_id: str = "",
|
||
current_status: str = "claimed",
|
||
retry_context: str = "",
|
||
task: Optional[Any] = None,
|
||
project_config: Optional[Dict[str, Any]] = None,
|
||
spawn_type: str = "executor", # executor | discussion | review
|
||
) -> str:
|
||
"""构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板)
|
||
|
||
Args:
|
||
current_status: 任务当前状态(动态生成状态机提示)
|
||
retry_context: 重试上下文(前轮产出摘要 + 审查意见)
|
||
task: Task 对象(BootstrapBuilder 用)
|
||
project_config: 项目配置(BootstrapBuilder 用)
|
||
spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查)
|
||
"""
|
||
# discussion 类型直接用模板(不走 BootstrapBuilder)
|
||
if spawn_type == "discussion":
|
||
return self._build_discussion_prompt(
|
||
task_id, title, description, must_haves,
|
||
project_id, agent_id)
|
||
|
||
# mail 任务用精简模板
|
||
if project_id == "_mail":
|
||
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
|
||
|
||
# 走 BootstrapBuilder 新路径
|
||
if self.bootstrap_builder and task is not None:
|
||
role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"}
|
||
role = role_map.get(spawn_type, "executor")
|
||
bootstrap_prompt = self.bootstrap_builder.build_for_task(
|
||
task=task,
|
||
role=role,
|
||
)
|
||
api_section = self._build_api_section(
|
||
project_id, task_id, agent_id)
|
||
return bootstrap_prompt + "\n\n---\n\n" + api_section
|
||
|
||
# 无 BootstrapBuilder 或无 task 对象 → 最小 fallback
|
||
# 只保留任务上下文 + API 操作指令
|
||
logger.warning("No BootstrapBuilder or task object, using minimal fallback")
|
||
return self._build_minimal_fallback(
|
||
task_id, title, description, must_haves,
|
||
project_id, agent_id)
|
||
|
||
def _build_minimal_fallback(self, task_id, title, description, must_haves,
|
||
project_id, agent_id):
|
||
"""最小 fallback:只有任务上下文 + API 指令"""
|
||
task_section = f"""## 任务
|
||
{title}
|
||
{description or "(无描述)"}
|
||
|
||
项目: {project_id} | ID: {task_id}
|
||
验收标准: {must_haves or "(无)"}"""
|
||
api_section = self._build_api_section(project_id, task_id, agent_id)
|
||
return task_section + "\n\n---\n\n" + api_section
|
||
|
||
def _build_api_section(self, project_id: str, task_id: str,
|
||
agent_id: str) -> str:
|
||
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
|
||
# mail 任务直接 done,不走 review
|
||
success_status = '"done"' if project_id == "_mail" else '"review"'
|
||
return f"""## 操作指令
|
||
|
||
### 状态回写
|
||
开始工作:
|
||
```bash
|
||
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \
|
||
-H 'Content-Type: application/json' \
|
||
-d '{{"status": "working", "agent": "{agent_id}"}}'
|
||
```
|
||
|
||
### 写入产出
|
||
```bash
|
||
curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \
|
||
-H 'Content-Type: application/json' \
|
||
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
|
||
```
|
||
|
||
### 完成后
|
||
成功:status → {success_status} | 失败:status → "failed"
|
||
"""
|
||
|
||
def _build_discussion_prompt(self, task_id: str, title: str,
|
||
description: str, must_haves: str,
|
||
project_id: str, agent_id: str) -> str:
|
||
"""构建讨论类 spawn prompt(§3.3 框架 + Boids)"""
|
||
goal_snapshot = description or title
|
||
constraints = must_haves or "(无特殊约束)"
|
||
|
||
return DISCUSSION_PROMPT_TEMPLATE.format(
|
||
goal_snapshot=goal_snapshot,
|
||
constraints=constraints,
|
||
project_id=project_id,
|
||
task_id=task_id,
|
||
agent_id=agent_id,
|
||
api_host=self.api_host,
|
||
api_port=self.api_port,
|
||
)
|
||
|
||
def _inject_agent_identity(self, agent_id: str) -> str:
|
||
"""#03: 注入 Agent 身份+专长"""
|
||
caps = "通用"
|
||
router = getattr(self, '_router_ref', None)
|
||
if router:
|
||
profile = router.agent_profiles.get(agent_id)
|
||
if profile and getattr(profile, 'capabilities_zh', None):
|
||
caps = ", ".join(profile.capabilities_zh)
|
||
return f"你是 {agent_id},专长: {caps}。"
|
||
|
||
def _get_guardrails_summary(self) -> str:
|
||
"""#03: 从 GuardrailEngine 提取红线摘要"""
|
||
if not self.guardrails:
|
||
return "无特殊限制"
|
||
try:
|
||
return "、".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6])
|
||
except Exception:
|
||
return "无特殊限制"
|
||
|
||
def _get_agent_profile(self, agent_id: str):
|
||
"""获取 Agent 能力画像"""
|
||
router = getattr(self, '_router_ref', None)
|
||
if router:
|
||
return router.agent_profiles.get(agent_id)
|
||
return None
|
||
|
||
|
||
def _build_mail_prompt(self, task_id: str, title: str, description: str,
|
||
must_haves: str, agent_id: str) -> str:
|
||
"""构建 Mail 专用精简模板"""
|
||
# 解析 must_haves 获取 from 和 performative
|
||
from_agent = agent_id
|
||
performative = "request"
|
||
try:
|
||
meta = json.loads(must_haves) if must_haves else {}
|
||
from_agent = meta.get("from", agent_id)
|
||
performative = meta.get("performative", meta.get("type", "request"))
|
||
except Exception:
|
||
pass
|
||
|
||
# 截断 title 和 text 用于模板安全
|
||
safe_title = (title or "").replace('"', '\\"')[:100]
|
||
safe_text = (description or "").replace('"', '\\"')
|
||
|
||
# 获取有效 Agent 列表(从 config/default.yaml 读取)
|
||
valid_agents_list = self._load_valid_agents()
|
||
valid_agents_str = " / ".join(sorted(valid_agents_list))
|
||
|
||
common_kwargs = dict(
|
||
from_agent=from_agent,
|
||
title=safe_title,
|
||
text=safe_text,
|
||
task_id=task_id,
|
||
agent_id=agent_id,
|
||
api_host=self.api_host,
|
||
api_port=self.api_port,
|
||
valid_agents=valid_agents_str,
|
||
)
|
||
|
||
if performative == "inform":
|
||
return MAIL_INFORM_TEMPLATE.format(**common_kwargs)
|
||
else:
|
||
return MAIL_REQUEST_TEMPLATE.format(**common_kwargs)
|
||
|
||
async def spawn_full_agent(
|
||
self,
|
||
agent_id: str,
|
||
message: str,
|
||
new_session: bool = False,
|
||
task_id: Optional[str] = None,
|
||
on_complete: Optional[Any] = None,
|
||
use_main_session: bool = False,
|
||
task_db_path: Optional[Path] = None,
|
||
reuse_session_id: Optional[str] = None,
|
||
on_checks_passed: Optional[Any] = None,
|
||
skip_counter: bool = False,
|
||
broadcast_task_ids: Optional[List[str]] = None,
|
||
) -> str:
|
||
"""Spawn Full Agent(异步非阻塞)
|
||
|
||
v2.7.2: counter acquire/release 在内部统一管理。
|
||
调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。
|
||
|
||
Args:
|
||
on_complete: 业务回调(agent_id, outcome) - 不含 counter.release,
|
||
counter.release 由内部 wrapped_on_complete 保证。
|
||
use_main_session: True = 投递到主 Agent session(不传 --session-id)
|
||
on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前)
|
||
reuse_session_id: 传入指定 session-id 复用(用于续杯) - deprecated,use_main_session=True 已替代
|
||
|
||
Returns:
|
||
session_id
|
||
|
||
Raises:
|
||
AgentBusyError: agent 被 counter 占用或冷却中
|
||
"""
|
||
# ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ──
|
||
|
||
# Step 0: 分配 session_id(纯计算,无 IO)
|
||
if use_main_session:
|
||
session_id = None
|
||
elif reuse_session_id:
|
||
session_id = reuse_session_id
|
||
else:
|
||
session_id = str(uuid.uuid4())
|
||
_sid_key = session_id or "main" # counter 用的 key
|
||
|
||
# Phase 0: Pre-acquire 修复(无锁)
|
||
# timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。
|
||
# asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json
|
||
# 写操作不会真正并行。
|
||
if use_main_session:
|
||
pre_state = self._check_session_state(agent_id)
|
||
if pre_state.get("status") in ("timeout", "failed"):
|
||
logger.info("Phase 0: %s status=%s, reviving before acquire",
|
||
agent_id, pre_state["status"])
|
||
self._revive_session(agent_id)
|
||
elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"):
|
||
# status=running 但 lock PID 已死 → 假死,revive
|
||
logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id)
|
||
self._revive_session(agent_id)
|
||
|
||
# Phase 1: Counter acquire(互斥锁)
|
||
# v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成)
|
||
if self.counter and not skip_counter:
|
||
acquired = await self.counter.acquire(agent_id, _sid_key)
|
||
if not acquired:
|
||
raise AgentBusyError(agent_id, reason="counter_blocked")
|
||
|
||
# Phase 2: Session check(在锁保护下,贴近 spawn)
|
||
# 并列收集所有 block 原因,统一判定。
|
||
if use_main_session:
|
||
session_state = self._check_session_state(agent_id)
|
||
logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s",
|
||
agent_id, session_state.get('status'), session_state.get('lock_pid'),
|
||
session_state.get('lock_pid_alive'), session_state.get('recent_compact'))
|
||
|
||
blockers = []
|
||
if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"):
|
||
blockers.append(("session_locked", session_state.get("lock_pid")))
|
||
if session_state.get("status") == "running":
|
||
if session_state.get("lock_pid_alive"):
|
||
# 真 running:外部进程占用
|
||
blockers.append(("session_running", None))
|
||
else:
|
||
# 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理
|
||
pass
|
||
if session_state.get("recent_compact"):
|
||
blockers.append(("session_compacting", None))
|
||
|
||
if blockers:
|
||
# 释放 counter,报具体原因
|
||
if self.counter and not skip_counter:
|
||
self.counter.release(agent_id, _sid_key)
|
||
primary_reason, primary_detail = blockers[0]
|
||
logger.info("Phase 2 blocked %s: %s (all=%s)",
|
||
agent_id, primary_reason, blockers)
|
||
raise AgentBusyError(agent_id, reason=primary_reason,
|
||
detail={"blockers": blockers})
|
||
|
||
# Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检)
|
||
# 此场景应被 Phase 0 提前修复,这里做兜底
|
||
if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"):
|
||
logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving",
|
||
agent_id)
|
||
self._revive_session(agent_id)
|
||
session_state = self._check_session_state(agent_id)
|
||
if session_state.get("status") == "running":
|
||
if self.counter and not skip_counter:
|
||
self.counter.release(agent_id, _sid_key)
|
||
raise AgentBusyError(agent_id, reason="session_stuck",
|
||
detail={"status": "running after revive"})
|
||
|
||
# Phase 3: on_checks_passed 回调
|
||
# 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动,
|
||
# wrapped_on_complete 不会执行。需在此 try/except 中手动 release。
|
||
if on_checks_passed:
|
||
try:
|
||
on_checks_passed()
|
||
except Exception:
|
||
if self.counter and not skip_counter:
|
||
self.counter.release(agent_id, _sid_key)
|
||
raise
|
||
|
||
if self.dry_run:
|
||
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key)
|
||
self._register_session(_sid_key, agent_id, task_id, pid=None)
|
||
return _sid_key
|
||
|
||
# 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key)
|
||
async def _wrapped_on_complete(aid, outcome):
|
||
try:
|
||
if self.counter:
|
||
self.counter.release(aid, _sid_key)
|
||
finally:
|
||
if on_complete:
|
||
try:
|
||
result = on_complete(aid, outcome)
|
||
if asyncio.iscoroutine(result):
|
||
await result
|
||
except Exception:
|
||
logger.warning("Business on_complete failed for %s", aid, exc_info=True)
|
||
|
||
cmd = [
|
||
"openclaw", "agent",
|
||
"--agent", agent_id,
|
||
]
|
||
if session_id:
|
||
cmd.extend(["--session-id", session_id])
|
||
cmd.extend([
|
||
"--message", message,
|
||
"--json",
|
||
"--timeout", str(int(self.gateway_timeout)),
|
||
])
|
||
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
self._register_session(session_id, agent_id, task_id, proc.pid,
|
||
broadcast_task_ids=broadcast_task_ids)
|
||
logger.info("Spawned agent %s (session=%s, pid=%d)",
|
||
agent_id, session_id, proc.pid)
|
||
|
||
# Schedule monitor(传 wrapped_on_complete)
|
||
asyncio.create_task(
|
||
self._monitor_process(session_id, proc, agent_id, task_id,
|
||
on_complete=_wrapped_on_complete,
|
||
db_path=task_db_path or self.db_path)
|
||
)
|
||
|
||
return session_id
|
||
|
||
except Exception as e:
|
||
# spawn 失败也要 release counter
|
||
if self.counter:
|
||
self.counter.release(agent_id, _sid_key)
|
||
logger.exception("Failed to spawn agent %s", agent_id)
|
||
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
|
||
raise
|
||
|
||
async def spawn_subagent(
|
||
self,
|
||
task_description: str,
|
||
task_id: Optional[str] = None,
|
||
) -> str:
|
||
"""Spawn Subagent(占位,实际通过 Gateway API)
|
||
|
||
Returns:
|
||
session_id
|
||
"""
|
||
session_id = str(uuid.uuid4())
|
||
|
||
if self.dry_run:
|
||
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
|
||
self._register_session(session_id, "subagent", task_id, pid=None)
|
||
return session_id
|
||
|
||
# TODO: F17 通过 Gateway API sessions_spawn 实现
|
||
logger.info("Subagent spawn (session=%s) - placeholder", session_id)
|
||
self._register_session(session_id, "subagent", task_id, pid=None)
|
||
return session_id
|
||
|
||
# ── 续杯 Prompt 模板 ──
|
||
|
||
RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。
|
||
|
||
## 任务信息
|
||
|
||
- 项目: {project_id}
|
||
- 任务ID: {task_id}
|
||
- 标题: {title}
|
||
- 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次)
|
||
|
||
请检查 session 历史中你之前做了什么,然后继续未完成的工作。
|
||
|
||
## 操作指令
|
||
|
||
### 查看任务当前状态
|
||
```bash
|
||
curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all
|
||
```
|
||
|
||
### 如果已经完成,标记 review
|
||
```bash
|
||
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"status": "review", "agent": "{agent_id}"}}'
|
||
```
|
||
|
||
### 写入产出(如果之前没写)
|
||
```bash
|
||
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}'
|
||
```
|
||
|
||
### 如果无法解决,标记失败
|
||
```bash
|
||
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}'
|
||
```
|
||
|
||
{fallback_hint}"""
|
||
|
||
async def _monitor_process(
|
||
self,
|
||
session_id: Optional[str],
|
||
proc: asyncio.subprocess.Process,
|
||
agent_id: str,
|
||
task_id: Optional[str],
|
||
on_complete: Optional[Any] = None,
|
||
db_path: Optional[Path] = None,
|
||
monitor_timeout_count: int = 0,
|
||
) -> None:
|
||
"""监控子进程全生命周期(设计文档 spawner-monitor-design.md)"""
|
||
stdout_chunks: list = []
|
||
stderr_chunks: list = []
|
||
|
||
try:
|
||
# ── 等待进程退出 + 流式读取 ──
|
||
async def _read_streams():
|
||
async def _read_out():
|
||
while True:
|
||
chunk = await proc.stdout.read(4096)
|
||
if not chunk:
|
||
break
|
||
stdout_chunks.append(chunk)
|
||
|
||
async def _read_err():
|
||
while True:
|
||
chunk = await proc.stderr.read(4096)
|
||
if not chunk:
|
||
break
|
||
stderr_chunks.append(chunk)
|
||
|
||
await asyncio.gather(_read_out(), _read_err(), proc.wait())
|
||
|
||
await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout)
|
||
# ── 情况 A:进程退出 ──
|
||
exit_code = proc.returncode
|
||
await self._handle_exit(
|
||
session_id, agent_id, task_id, exit_code,
|
||
stdout_chunks, stderr_chunks, on_complete, db_path
|
||
)
|
||
|
||
except asyncio.TimeoutError:
|
||
# ── 情况 B:monitor timeout(进程没退出)──
|
||
logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)",
|
||
agent_id, session_id, monitor_timeout_count + 1,
|
||
self.max_monitor_timeouts)
|
||
await self._handle_monitor_timeout(
|
||
session_id, agent_id, task_id, proc,
|
||
on_complete, db_path, stderr_chunks, monitor_timeout_count
|
||
)
|
||
|
||
async def _handle_exit(self, session_id, agent_id, task_id, exit_code,
|
||
stdout_chunks, stderr_chunks, on_complete, db_path):
|
||
"""情况 A:进程退出后的处理
|
||
|
||
v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。
|
||
只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。
|
||
A9(api_error/429)额外推回 pending + 设冷却。
|
||
"""
|
||
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
|
||
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
|
||
|
||
# 解析 stdout JSON
|
||
json_result = self._parse_stdout_json(stdout_text)
|
||
logger.info("Parsed JSON result for agent=%s session=%s: %s",
|
||
agent_id, session_id, json_result)
|
||
|
||
# 查任务实际状态
|
||
task_status = self._get_task_status(db_path, task_id) if task_id else None
|
||
|
||
# 分类
|
||
cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text)
|
||
outcome = cls["outcome"]
|
||
|
||
# 更新 session 状态
|
||
sid = session_id or "main"
|
||
if sid in self._sessions:
|
||
self._sessions[sid]["status"] = outcome
|
||
self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat()
|
||
self._sessions[sid]["exit_code"] = exit_code
|
||
if json_result:
|
||
self._sessions[sid]["meta"] = json_result
|
||
|
||
# 记录 attempt
|
||
self._record_attempt(
|
||
task_id, agent_id, outcome, exit_code=exit_code,
|
||
db_path=db_path,
|
||
metadata={
|
||
"status": json_result.get("status"),
|
||
"summary": json_result.get("summary"),
|
||
"fallback_used": json_result.get("fallback_used"),
|
||
"fallback_reason": json_result.get("fallback_reason"),
|
||
"task_status_at_exit": task_status,
|
||
}
|
||
)
|
||
|
||
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)",
|
||
agent_id, session_id, outcome, exit_code, task_status)
|
||
|
||
# 广播反馈追踪(Phase 1 bug fix)
|
||
if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker:
|
||
# 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker
|
||
sess_info = self._sessions.get(session_id or "main", {})
|
||
bt_ids = sess_info.get("broadcast_task_ids") or []
|
||
# 广播场景一律标 no_reply:Agent 只 claim 一个任务,
|
||
# 其余任务的 tracker 不能被 claimed 清除
|
||
for real_task_id in bt_ids:
|
||
self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply")
|
||
elif task_id and hasattr(self, '_ticker') and self._ticker:
|
||
outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply"
|
||
self._ticker.record_broadcast_response(task_id, agent_id, outcome_str)
|
||
|
||
if cls["should_retry"]:
|
||
# cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10)
|
||
cooldown_seconds = cls.get("cooldown_seconds", 0)
|
||
if cooldown_seconds and self.counter:
|
||
self.counter.set_cooldown(agent_id, seconds=cooldown_seconds)
|
||
# A2/A3: gateway_timeout → 续杯(on_complete 会 release counter)
|
||
await self._do_retry(
|
||
session_id, agent_id, task_id, on_complete, db_path,
|
||
cls.get("retry_field", "retry_count")
|
||
)
|
||
elif outcome == "api_error":
|
||
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
|
||
# 有上限:api_retry_count 累计达 max_retries 则标 failed
|
||
await self._do_on_complete_async(on_complete, agent_id, outcome)
|
||
if self.counter:
|
||
self.counter.set_cooldown(agent_id)
|
||
if db_path and task_id:
|
||
retry_counts = self._get_retry_counts(db_path, task_id)
|
||
api_count = retry_counts.get("api_retry_count", 0) + 1
|
||
retry_counts["api_retry_count"] = api_count
|
||
self._update_retry_counts(db_path, task_id, retry_counts)
|
||
if api_count >= self.max_retries:
|
||
logger.error("Task %s api_retry_count=%d >= max_retries, marking failed",
|
||
task_id, api_count)
|
||
self._mark_task(db_path, task_id, "failed", {
|
||
"reason": "max_api_retry_count", "count": api_count,
|
||
})
|
||
else:
|
||
self._mark_task(db_path, task_id, "pending", {
|
||
"reason": "api_error_retry",
|
||
"api_retry_count": api_count,
|
||
})
|
||
logger.info("Task %s pushed back to pending (api_error, api_retry=%d/%d)",
|
||
task_id, api_count, self.max_retries)
|
||
elif outcome == "fallback_timeout" and not cls["should_retry"]:
|
||
# A3/A3b: fallback 分级处理
|
||
# fallback_count 从 task_attempts.metadata 读取,
|
||
# 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b)
|
||
fallback_count = 0
|
||
if db_path and task_id:
|
||
retry_counts = self._get_retry_counts(db_path, task_id)
|
||
fallback_count = retry_counts.get("fallback_count", 0) + 1
|
||
retry_counts["fallback_count"] = fallback_count
|
||
self._update_retry_counts(db_path, task_id, retry_counts)
|
||
|
||
if fallback_count >= self.max_retries:
|
||
# A3: 连续 fallback 达上限,标 failed
|
||
logger.error("A3 fallback exhausted: agent=%s session=%s task=%s "
|
||
"fallback_count=%d reason=%s",
|
||
agent_id, session_id, task_id, fallback_count,
|
||
json_result.get("fallback_reason"))
|
||
await self._do_on_complete_async(on_complete, agent_id, outcome)
|
||
if db_path and task_id:
|
||
self._mark_task(db_path, task_id, "failed", {
|
||
"reason": "fallback_exhausted",
|
||
"fallback_count": fallback_count,
|
||
"fallback_reason": json_result.get("fallback_reason"),
|
||
})
|
||
else:
|
||
# A3b: fallback 未达上限,retry + cooldown
|
||
logger.warning("A3b fallback retry: agent=%s session=%s task=%s "
|
||
"fallback_count=%d/%d reason=%s",
|
||
agent_id, session_id, task_id, fallback_count,
|
||
self.max_retries, json_result.get("fallback_reason"))
|
||
if self.counter:
|
||
self.counter.set_cooldown(agent_id, seconds=60)
|
||
await self._do_retry(
|
||
session_id, agent_id, task_id, on_complete, db_path,
|
||
"fallback_retry_count" # 独立计数,不与 gateway_timeout 的 retry_count 共用
|
||
)
|
||
else:
|
||
# 其他:A1(completed), A4(agent_failed), A7(auth_failed),
|
||
# A8(gateway_unreachable), A11(lock_conflict),
|
||
# A10(compact_failed), A12(agent_error)
|
||
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
|
||
if outcome == "crashed" and self.counter:
|
||
self.counter.set_cooldown(agent_id, seconds=60)
|
||
logger.info("Crash cooldown set for %s: 60s (outcome=%s)", agent_id, outcome)
|
||
elif outcome in ("compact_failed", "process_crash", "session_stuck",
|
||
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
|
||
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
|
||
logger.info("Error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
|
||
# F1: 不可恢复 outcome → 立刻标 failed + 写黑板
|
||
if outcome in ("auth_failed", "agent_error") and db_path and task_id:
|
||
logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome)
|
||
self._mark_task(db_path, task_id, "failed", {
|
||
"reason": outcome,
|
||
"stderr_preview": (stderr_text or "")[:500],
|
||
})
|
||
# 注意: cooldown 期间任务状态仍为 working,但 counter 已释放。
|
||
# DB 中的 working 是"假 working"——ticker 不会重新分配,_check_timeouts 会
|
||
# 在 cooldown 结束后回收。如果 ticker 在此期间给同一 agent 分配新任务,属正常行为。
|
||
# 进程退出 → on_complete release counter
|
||
# 任务状态由各 outcome 自行处理(或等 ticker)
|
||
await self._do_on_complete_async(on_complete, agent_id, outcome)
|
||
|
||
async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc,
|
||
on_complete, db_path, stderr_chunks,
|
||
monitor_timeout_count):
|
||
"""情况 B:monitor timeout"""
|
||
# 读已缓冲的 stderr
|
||
try:
|
||
remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0)
|
||
if remaining:
|
||
stderr_chunks.append(remaining)
|
||
except Exception:
|
||
pass
|
||
|
||
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
|
||
|
||
# 检查 session 状态
|
||
state = self._check_session_state(agent_id)
|
||
|
||
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
|
||
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
|
||
# 假死计数
|
||
stuck_count = self._stuck_counts.get(task_id, 0) + 1
|
||
self._stuck_counts[task_id] = stuck_count
|
||
|
||
if stuck_count >= 2:
|
||
# 连续假死 ≥2 次,标 failed
|
||
logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)",
|
||
agent_id, stuck_count, session_id)
|
||
self._mark_task(db_path, task_id, "failed",
|
||
{"reason": "session_stuck", "stuck_count": stuck_count,
|
||
"diagnostics": state})
|
||
await self._do_on_complete_async(on_complete, agent_id, "session_stuck")
|
||
return
|
||
|
||
# 第 1 次假死 → 尝试复活
|
||
logger.warning("Agent %s session stuck (attempt %d), reviving (session=%s)",
|
||
agent_id, stuck_count, session_id)
|
||
revived = self._revive_session(agent_id)
|
||
if revived:
|
||
logger.info("Agent %s session revived, releasing counter for ticker re-dispatch",
|
||
agent_id)
|
||
# release counter → 任务保持 working → ticker 下次 re-dispatch
|
||
await self._do_on_complete_async(on_complete, agent_id, "session_revived")
|
||
else:
|
||
# 复活失败 → 标 failed
|
||
logger.error("Agent %s revive failed, marking failed", agent_id)
|
||
self._mark_task(db_path, task_id, "failed",
|
||
{"reason": "revive_failed", "stuck_count": stuck_count,
|
||
"diagnostics": state})
|
||
await self._do_on_complete_async(on_complete, agent_id, "revive_failed")
|
||
return
|
||
|
||
# B2/B3/B4: 进程还活着
|
||
# B2: compact 进行中 - 不计入 monitor timeout 计数,继续等
|
||
if state.get("recent_compact"):
|
||
logger.info("Agent %s recent compaction detected, extending patience "
|
||
"(session=%s, monitor=%d/%d)",
|
||
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
|
||
# 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次
|
||
# 用独立计数器防止无限等待
|
||
compact_wait_count = self._compact_waits.get(task_id, 0) + 1
|
||
self._compact_waits[task_id] = compact_wait_count
|
||
if compact_wait_count >= self.max_monitor_timeouts:
|
||
# #07.3 ACT-2: compact_hanging 不标 failed,只 release counter
|
||
# 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch
|
||
logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch",
|
||
agent_id, compact_wait_count)
|
||
self._compact_waits.pop(task_id, None)
|
||
await self._do_on_complete_async(on_complete, agent_id, "compact_hanging")
|
||
return
|
||
# 继续等
|
||
asyncio.create_task(
|
||
self._monitor_process(
|
||
session_id, proc, agent_id, task_id,
|
||
on_complete=on_complete, db_path=db_path,
|
||
monitor_timeout_count=monitor_timeout_count,
|
||
)
|
||
)
|
||
return
|
||
|
||
# B3/B4: 无 compact,正常计数
|
||
monitor_timeout_count += 1
|
||
if monitor_timeout_count >= self.max_monitor_timeouts:
|
||
logger.error("Agent %s max monitor timeouts (session=%s, count=%d)",
|
||
agent_id, session_id, monitor_timeout_count)
|
||
self._mark_task(db_path, task_id, "failed", {
|
||
"reason": "max_monitor_timeouts",
|
||
"count": monitor_timeout_count,
|
||
"elapsed_seconds": monitor_timeout_count * int(self.agent_timeout),
|
||
"diagnostics": state,
|
||
})
|
||
await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts")
|
||
return
|
||
|
||
# 未超限:继续等(不 release counter)
|
||
logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)",
|
||
agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts)
|
||
asyncio.create_task(
|
||
self._monitor_process(
|
||
session_id, proc, agent_id, task_id,
|
||
on_complete=on_complete, db_path=db_path,
|
||
monitor_timeout_count=monitor_timeout_count,
|
||
)
|
||
)
|
||
|
||
async def _do_retry(self, session_id, agent_id, task_id, on_complete,
|
||
db_path, retry_field="retry_count"):
|
||
"""续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn
|
||
|
||
v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。
|
||
需要手动 release counter,然后 spawn_full_agent 内部会 acquire。
|
||
on_complete(含 counter release)置为 None,避免 double release。
|
||
"""
|
||
# v2.8.1 Bug-4 fix: 不再手动 release counter + 置 None on_complete
|
||
# counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent
|
||
# on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter
|
||
|
||
# 续杯前检查任务状态,已终态则跳过
|
||
if db_path and task_id:
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
# Bug-6 fix: pending 不是终态
|
||
if row and row["status"] in ("done", "failed", "cancelled", "review"):
|
||
logger.info("Retry skip: task %s already %s (agent=%s)",
|
||
task_id, row["status"], agent_id)
|
||
# on_complete = wrapped_on_complete,会 release counter
|
||
await self._do_on_complete_async(on_complete, agent_id, "task_already_done")
|
||
return
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.warning("Retry status check failed for %s, proceeding", task_id)
|
||
|
||
# 直接读写 tasks 表的 retry_count
|
||
if retry_field == "retry_count" and db_path and task_id:
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute(
|
||
"SELECT retry_count FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
count = row["retry_count"] if row else 1
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception("Failed to update retry_count for task %s", task_id)
|
||
count = 1
|
||
else:
|
||
retry_counts = self._get_retry_counts(db_path, task_id)
|
||
count = retry_counts.get(retry_field, 0) + 1
|
||
retry_counts[retry_field] = count
|
||
self._update_retry_counts(db_path, task_id, retry_counts)
|
||
|
||
if count >= self.max_retries:
|
||
logger.error("Agent %s max retries (session=%s, %s=%d)",
|
||
agent_id, session_id, retry_field, count)
|
||
self._mark_task(db_path, task_id, "failed", {
|
||
"reason": f"max_{retry_field}", "count": count,
|
||
})
|
||
await self._do_on_complete_async(on_complete, agent_id, "max_retries")
|
||
return
|
||
|
||
logger.info("Agent %s retry %s=%d/%d (session=%s)",
|
||
agent_id, retry_field, count, self.max_retries, session_id)
|
||
|
||
# 构建续杯 message(Mail 用专用模板,Task 用标准模板)
|
||
task_info = self._get_task_info(db_path, task_id) or {}
|
||
project_id = task_info.get("project_id", "")
|
||
is_mail = project_id == "_mail"
|
||
|
||
if is_mail:
|
||
must_haves = task_info.get("must_haves", "{}")
|
||
try:
|
||
meta = json.loads(must_haves) if must_haves else {}
|
||
except Exception:
|
||
meta = {}
|
||
message = MAIL_RETRY_PROMPT.format(
|
||
from_agent=meta.get("from", "unknown"),
|
||
title=task_info.get("title", ""),
|
||
retry_count=count,
|
||
max_retries=self.max_retries,
|
||
)
|
||
else:
|
||
fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else ""
|
||
message = self.RETRY_PROMPT.format(
|
||
project_id=project_id,
|
||
task_id=task_id or "",
|
||
title=task_info.get("title", ""),
|
||
retry_count=count,
|
||
max_retries=self.max_retries,
|
||
api_host=self.api_host,
|
||
api_port=self.api_port,
|
||
agent_id=agent_id,
|
||
fallback_hint=fallback_hint,
|
||
)
|
||
|
||
# v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire)
|
||
# on_complete = wrapped_on_complete(含 counter release),作为业务回调传入
|
||
try:
|
||
await self.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=message,
|
||
task_id=task_id,
|
||
on_complete=on_complete,
|
||
use_main_session=True, # #02: 续杯走 main session
|
||
task_db_path=db_path,
|
||
skip_counter=True, # Bug-4 fix: counter 已在原始 spawn 中持有
|
||
)
|
||
except AgentBusyError as e:
|
||
# #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞
|
||
# release counter → 任务保持 working → ticker 重新 dispatch
|
||
logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch",
|
||
agent_id, e.reason)
|
||
await self._do_on_complete_async(on_complete, agent_id, "retry_session_busy")
|
||
except Exception:
|
||
logger.exception("Retry spawn failed for %s", agent_id)
|
||
await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed")
|
||
|
||
# ── 辅助方法 ──
|
||
|
||
@staticmethod
|
||
def _parse_stdout_json(stdout_text: str) -> dict:
|
||
"""解析 openclaw agent --json 的 stdout 输出
|
||
|
||
返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads
|
||
不再提取 meta,直接用顶层字段。
|
||
"""
|
||
text = stdout_text.strip()
|
||
if not text:
|
||
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
|
||
try:
|
||
data = json.loads(text)
|
||
except json.JSONDecodeError:
|
||
# 多行输出,找最后一个 JSON
|
||
for line in reversed(text.splitlines()):
|
||
try:
|
||
data = json.loads(line)
|
||
break
|
||
except json.JSONDecodeError:
|
||
continue
|
||
else:
|
||
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
|
||
|
||
# 从 data.result.meta.executionTrace 取 fallback 信息
|
||
result = data.get("result", {})
|
||
meta = result.get("meta", {})
|
||
trace = meta.get("executionTrace", {})
|
||
|
||
return {
|
||
"status": data.get("status"),
|
||
"summary": data.get("summary"),
|
||
"fallback_used": trace.get("fallbackUsed", False),
|
||
"fallback_reason": trace.get("fallbackReason"),
|
||
"payloads": result.get("payloads", []),
|
||
}
|
||
|
||
@staticmethod
|
||
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
|
||
"""查任务实际 API 状态"""
|
||
if not db_path or not task_id:
|
||
return None
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
return row["status"] if row else None
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
|
||
"""查任务基本信息"""
|
||
if not db_path or not task_id:
|
||
return None
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
if not row:
|
||
return None
|
||
info = dict(row)
|
||
# 从 db_path 推断 project_id: data/<project_id>/blackboard.db
|
||
info["project_id"] = db_path.parent.name
|
||
return info
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def _revive_session(agent_id: str) -> bool:
|
||
"""假死复活术:修改 sessions.json status 从 running 改为 idle"""
|
||
sessions_path = Path(os.environ.get(
|
||
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
|
||
)) / "agents" / agent_id / "sessions" / "sessions.json"
|
||
if not sessions_path.exists():
|
||
return False
|
||
try:
|
||
with open(sessions_path) as f:
|
||
sessions = json.load(f)
|
||
main_key = f"agent:{agent_id}:main"
|
||
main_session = sessions.get(main_key, {})
|
||
if main_session.get("status") != "running":
|
||
return False # 不是 running 状态,不需要复活
|
||
main_session["status"] = "idle"
|
||
sessions[main_key] = main_session
|
||
with open(sessions_path, "w") as f:
|
||
json.dump(sessions, f, indent=2)
|
||
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
|
||
# #07 O4: 同时清理残留 lock 文件
|
||
sf = main_session.get("sessionFile", "")
|
||
if sf:
|
||
lock_path = Path(sf + ".lock")
|
||
if lock_path.exists():
|
||
try:
|
||
lock_path.unlink()
|
||
logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name)
|
||
except Exception:
|
||
pass
|
||
return True
|
||
except Exception:
|
||
logger.exception("Failed to revive %s", agent_id)
|
||
return False
|
||
|
||
@staticmethod
|
||
def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 900) -> bool:
|
||
"""v2.8.2 Fix-2: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。
|
||
|
||
比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录,
|
||
但不保证更新 compactionCheckpoints。
|
||
|
||
v2.8.2: 窗口从 300s→900s(15min), 尾部读取从 50KB→1MB。
|
||
实测 50KB 在长对话中不够(compact 记录被推出窗口导致漏检)。
|
||
正常扫描量不变:从尾部往前扫,遇到超过 15min 的 timestamp 即 break。
|
||
"""
|
||
if not session_file or not pathlib.Path(session_file).exists():
|
||
return False
|
||
try:
|
||
from datetime import datetime, timezone
|
||
now = datetime.now(timezone.utc)
|
||
with open(session_file, "rb") as sf:
|
||
sf.seek(0, 2)
|
||
size = sf.tell()
|
||
sf.seek(max(0, size - 1048576))
|
||
tail = sf.read().decode("utf-8", errors="replace")
|
||
for line in reversed(tail.splitlines()):
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
import json as _json
|
||
obj = _json.loads(line)
|
||
except (_json.JSONDecodeError, ValueError):
|
||
continue
|
||
if obj.get("type") == "compaction":
|
||
ts = obj.get("timestamp", "")
|
||
if ts:
|
||
try:
|
||
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||
if (now - ct).total_seconds() < window_seconds:
|
||
return True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
ts = obj.get("timestamp", "")
|
||
if ts:
|
||
try:
|
||
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||
if (now - ct).total_seconds() >= window_seconds:
|
||
break
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
@staticmethod
|
||
def _check_session_state(agent_id: str) -> dict:
|
||
"""检查 sessions.json 和 lock 状态
|
||
|
||
v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1),
|
||
替代失效的 compactionCheckpoints 检测。
|
||
"""
|
||
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
|
||
sessions_path = Path(os.environ.get(
|
||
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
|
||
)) / "agents" / agent_id / "sessions" / "sessions.json"
|
||
if not sessions_path.exists():
|
||
return result
|
||
try:
|
||
with open(sessions_path) as f:
|
||
sessions = json.load(f)
|
||
main_key = f"agent:{agent_id}:main"
|
||
main_session = sessions.get(main_key, {})
|
||
result["status"] = main_session.get("status", "unknown")
|
||
|
||
# 检查 lock (v3.1: done/timeout 时 lock 视为过期)
|
||
sf = main_session.get("sessionFile", "")
|
||
if sf:
|
||
lock_path = Path(sf + ".lock")
|
||
if lock_path.exists():
|
||
try:
|
||
lock_data = json.loads(lock_path.read_text())
|
||
pid = lock_data.get("pid")
|
||
result["lock_pid"] = pid
|
||
if pid:
|
||
try:
|
||
os.kill(pid, 0)
|
||
result["lock_pid_alive"] = True
|
||
except ProcessLookupError:
|
||
result["lock_pid_alive"] = False
|
||
# session 已完成/超时 > lock 是 Gateway 冷却锁,不阻塞新 turn
|
||
if result["status"] in ("done", "timeout"):
|
||
result["lock_pid_alive"] = False
|
||
result["lock_expired"] = True
|
||
# running + lock 超时 >30分钟 > 视为 idle,允许 dispatch
|
||
elif result["status"] == "running" and result["lock_pid_alive"]:
|
||
try:
|
||
lock_data = json.loads(lock_path.read_text())
|
||
created_at_str = lock_data.get("createdAt", "")
|
||
if created_at_str:
|
||
from datetime import datetime as _dt, timezone as _tz
|
||
created_dt = _dt.fromisoformat(created_at_str.replace("Z", "+00:00"))
|
||
elapsed = (_dt.now(_tz.utc) - created_dt).total_seconds()
|
||
if elapsed > 1800: # 30 minutes
|
||
result["lock_pid_alive"] = False
|
||
result["lock_expired"] = True
|
||
logger.info("Lock expired for %s: running + lock age %.0fs > 1800s",
|
||
agent_id, elapsed)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
|
||
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
|
||
if result["status"] not in ("done", "idle", "unknown", None) and sf:
|
||
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
|
||
except Exception:
|
||
pass
|
||
return result
|
||
|
||
@staticmethod
|
||
def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str,
|
||
task_status: Optional[str], stdout_text: str = "") -> dict:
|
||
"""分类退出原因,返回处理策略
|
||
|
||
v3.1: A0 拆分为 A14-A17(信号中断/stderr 智能分类)。
|
||
A8/A10 改为可恢复 retry。cooldown 统一 60s。
|
||
"""
|
||
status = json_result.get("status")
|
||
summary = json_result.get("summary", "")
|
||
fallback_used = json_result.get("fallback_used", False)
|
||
|
||
# A4: 任务 DB status=failed(Agent 自己标的)
|
||
if task_status == "failed":
|
||
return {"outcome": "agent_failed", "should_retry": False}
|
||
|
||
# A1: status=ok + completed + 非 fallback
|
||
if status == "ok" and summary == "completed" and not fallback_used:
|
||
return {"outcome": "completed", "should_retry": False}
|
||
|
||
# A5/A6: status=ok + fallback
|
||
if status == "ok" and fallback_used:
|
||
return {"outcome": "fallback_timeout", "should_retry": False}
|
||
|
||
# A2/A3: status=timeout → 唯一续杯场景
|
||
# 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程
|
||
# 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。
|
||
if status == "timeout":
|
||
return {"outcome": "gateway_timeout", "should_retry": True,
|
||
"retry_field": "retry_count"}
|
||
|
||
# A0 拆分: 无 JSON 输出 + exit≠0
|
||
if status is None and not stdout_text.strip() and exit_code != 0:
|
||
# A14: SIGINT(130) / SIGTERM(143) → 外部中断,可恢复
|
||
if exit_code in (130, 143):
|
||
return {"outcome": "interrupted", "should_retry": True,
|
||
"retry_field": "retry_count", "cooldown_seconds": 60}
|
||
# A15/A16: stderr 含 network/compact 关键字 → 可恢复
|
||
if stderr_text:
|
||
stderr_lower = stderr_text.lower()
|
||
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
|
||
return {"outcome": "gateway_unreachable", "should_retry": True,
|
||
"retry_field": "retry_count", "cooldown_seconds": 60}
|
||
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
|
||
return {"outcome": "compact_interrupted", "should_retry": True,
|
||
"retry_field": "retry_count", "cooldown_seconds": 60}
|
||
# A17: 真正的 crash → 保持 working,ticker 兜底
|
||
return {"outcome": "crashed", "should_retry": False, "original": "process_crash"}
|
||
|
||
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
|
||
# 查任务状态判断
|
||
if status is None and not stdout_text.strip() and exit_code == 0:
|
||
terminal_statuses = {"done", "review"}
|
||
if task_status in terminal_statuses:
|
||
return {"outcome": "completed", "should_retry": False}
|
||
return {"outcome": "agent_error", "should_retry": False}
|
||
|
||
# A7-A12: status=error → 不续杯,stderr 辅助分类
|
||
if status == "error":
|
||
stderr_lower = stderr_text.lower()
|
||
if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]):
|
||
return {"outcome": "auth_failed", "should_retry": False}
|
||
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
|
||
return {"outcome": "gateway_unreachable", "should_retry": True,
|
||
"retry_field": "retry_count", "cooldown_seconds": 60}
|
||
if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]):
|
||
return {"outcome": "api_error", "should_retry": False}
|
||
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
|
||
return {"outcome": "compact_failed", "should_retry": False}
|
||
if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]):
|
||
return {"outcome": "lock_conflict", "should_retry": True,
|
||
"retry_field": "retry_count", "cooldown_seconds": 60}
|
||
return {"outcome": "agent_error", "should_retry": False}
|
||
|
||
# 兜底:status 未知值
|
||
return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"}
|
||
|
||
@staticmethod
|
||
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
|
||
"""从最新 task_attempt 的 metadata 读计数器"""
|
||
defaults = {"retry_count": 0, "connect_retry_count": 0,
|
||
"api_retry_count": 0, "lock_retry_count": 0,
|
||
"monitor_timeout_count": 0}
|
||
if not db_path or not task_id:
|
||
return defaults
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
|
||
(task_id,)
|
||
).fetchone()
|
||
if row and row["metadata"]:
|
||
stored = json.loads(row["metadata"])
|
||
for k in defaults:
|
||
if k in stored:
|
||
defaults[k] = stored[k]
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return defaults
|
||
|
||
def _update_retry_counts(self, db_path: Optional[Path],
|
||
task_id: Optional[str], counts: dict):
|
||
"""将 retry counts 写回最新 task_attempt 的 metadata"""
|
||
if not db_path or not task_id:
|
||
return
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute(
|
||
"SELECT rowid, metadata FROM task_attempts "
|
||
"WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1",
|
||
(task_id,)
|
||
).fetchone()
|
||
if row:
|
||
meta = json.loads(row["metadata"]) if row["metadata"] else {}
|
||
meta.update(counts)
|
||
conn.execute(
|
||
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
|
||
(json.dumps(meta), row["rowid"])
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception("Failed to update retry counts for task %s", task_id)
|
||
|
||
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
|
||
status: str, detail: Optional[dict] = None):
|
||
"""标记任务状态(用于 failed/escalate)"""
|
||
if not db_path or not task_id:
|
||
return
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?",
|
||
(status, task_id)
|
||
)
|
||
if detail:
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
# F2: conn 已关闭,Blackboard 内部自己 get_connection
|
||
if status == "failed":
|
||
reason = (detail or {}).get("reason", "unknown")
|
||
try:
|
||
from src.daemon.mail_notify import _is_mail_project, notify_mail_failed
|
||
if _is_mail_project(db_path):
|
||
# Mail 失败:通知发件人,不 @pangtong
|
||
notify_mail_failed(db_path, task_id, reason, detail)
|
||
else:
|
||
# Task 失败:@pangtong(F2 原逻辑)
|
||
from src.blackboard.operations import Blackboard
|
||
bb = Blackboard(db_path)
|
||
cid = bb.add_comment(task_id, "daemon",
|
||
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
|
||
comment_type="system")
|
||
bb.record_mentions(cid, task_id, ["pangtong-fujunshi"])
|
||
logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason)
|
||
except Exception as e:
|
||
logger.warning("Task %s: failed to notify: %s", task_id, e)
|
||
except Exception:
|
||
logger.exception("Failed to mark task %s as %s", task_id, status)
|
||
|
||
@staticmethod
|
||
def _do_on_complete(on_complete, agent_id, outcome):
|
||
"""执行 on_complete 回调(同步+异步兼容)"""
|
||
if not on_complete:
|
||
return
|
||
try:
|
||
result = on_complete(agent_id, outcome)
|
||
if asyncio.iscoroutine(result):
|
||
# 注意:这里是同步调用的,不能 await
|
||
# 在 _monitor_process 的 async 上下文中应该用 await
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
async def _do_on_complete_async(self, on_complete, agent_id, outcome):
|
||
"""异步执行 on_complete 回调"""
|
||
if not on_complete:
|
||
return
|
||
try:
|
||
result = on_complete(agent_id, outcome)
|
||
if asyncio.iscoroutine(result):
|
||
await result
|
||
except Exception:
|
||
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
|
||
|
||
def _register_session(
|
||
self,
|
||
session_id: str,
|
||
agent_id: str,
|
||
task_id: Optional[str],
|
||
pid: Optional[int],
|
||
broadcast_task_ids: Optional[List[str]] = None,
|
||
) -> None:
|
||
"""注册 spawn session"""
|
||
self._sessions[session_id] = {
|
||
"agent_id": agent_id,
|
||
"task_id": task_id,
|
||
"pid": pid,
|
||
"status": "running",
|
||
"started_at": datetime.utcnow().isoformat(),
|
||
"completed_at": None,
|
||
"broadcast_task_ids": broadcast_task_ids,
|
||
}
|
||
|
||
def _record_attempt(
|
||
self,
|
||
task_id: Optional[str],
|
||
agent_id: str,
|
||
outcome: str,
|
||
exit_code: Optional[int] = None,
|
||
error: Optional[str] = None,
|
||
metadata: Optional[dict] = None,
|
||
db_path: Optional[Path] = None,
|
||
) -> None:
|
||
"""记录 task_attempt"""
|
||
# 广播 spawn 产生的 "broadcast" task_id 不记录 attempts,避免脏数据
|
||
if task_id == "broadcast":
|
||
return
|
||
effective_db = db_path or self.db_path
|
||
if not task_id or not effective_db:
|
||
return
|
||
|
||
try:
|
||
conn = get_connection(effective_db)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute(
|
||
"SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?",
|
||
(task_id,),
|
||
).fetchone()
|
||
attempt_number = (row["max_a"] or 0) + 1
|
||
|
||
meta = metadata or {}
|
||
if error:
|
||
meta["error"] = error
|
||
conn.execute(
|
||
"INSERT INTO task_attempts "
|
||
"(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) "
|
||
"VALUES (?,?,?,?,?,?,datetime('now'))",
|
||
(task_id, attempt_number, agent_id, outcome,
|
||
exit_code, json.dumps(meta)),
|
||
)
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
|
||
(task_id, agent_id,
|
||
"agent_completed" if outcome == "completed" else "daemon_tick",
|
||
json.dumps({"outcome": outcome, "attempt": attempt_number})),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.exception("Failed to record attempt for task %s", task_id)
|
||
|
||
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取 session 信息"""
|
||
return self._sessions.get(session_id)
|
||
|
||
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
|
||
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
|
||
for sid, info in self._sessions.items():
|
||
if info.get("agent_id") == agent_id and info.get("status") == "running":
|
||
return info
|
||
return None
|
||
|
||
def cleanup_session(self, session_id: str) -> None:
|
||
"""清理 session"""
|
||
if session_id in self._sessions:
|
||
session = self._sessions[session_id]
|
||
task_id = session.get("task_id")
|
||
del self._sessions[session_id]
|
||
# 清理 B2 compact 等待计数器
|
||
if task_id and task_id in self._compact_waits:
|
||
del self._compact_waits[task_id]
|