"""Agent Spawner - 异步 spawn Full Agent / Subagent Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional from src.blackboard.db import get_connection, init_db logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── # Mail 专用模板:inform 类型(纯通知,状态由系统管理) MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。 发件者: {from_agent} 主题: {title} 内容: {text} 已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。 ⚠️ 不要执行任何状态转换命令。 """ # Mail 专用模板:request 类型(需要处理并回复,状态由系统管理) MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。 发件者: {from_agent} 主题: {title} 内容: {text} ### 如何回复发件者 curl -s -X POST http://localhost:8083/api/mail \\ -H 'Content-Type: application/json' \\ -d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}' ⚠️ 不需要填 "to",系统自动回复给发件者。 ### 如何给其他人发新邮件 curl -s -X POST http://localhost:8083/api/mail \\ -H 'Content-Type: application/json' \\ -d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}' ⚠️ to 必须是有效的 agent id: {valid_agents} ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request) ⚠️ 不能给自己发邮件 ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 """ SPAWN_PROMPT_TEMPLATE = """{identity_section} ## 任务 {title} {description} 项目: {project_id} | ID: {task_id} 类型: {task_type} | 优先级: {priority} 验收标准: {must_haves} {retry_context} ## 你能做什么 - 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all - 读所有活跃任务: GET {api_base}/projects/{project_id}/tasks?status=working,claimed,review - 写产出: POST {api_base}/projects/{project_id}/tasks/{task_id}/outputs - 写评论/交接: POST {api_base}/projects/{project_id}/tasks/{task_id}/comments - 更新状态: POST {api_base}/projects/{project_id}/tasks/{task_id}/status - 创建子任务: POST {api_base}/projects/{project_id}/tasks - 认领任务: POST {api_base}/projects/{project_id}/tasks/{{{{id}}}}/claim ## 约束 - 完成后必须写产出物(output)并标 review,不能无产出就提交 - 失败了标 failed 并写明原因 - 产出物 handoff comment ≥ 50 字符(用于系统验证) - 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment) - 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组) - 安全红线: {guardrails_summary} ### API 请求体示例 写产出: POST .../outputs ```json {{{{"agent": "{agent_id}", "content_type": "code", "title": "产出标题", "content_path": "/path/to/file", "summary": "简要说明"}}}} ``` 写评论: POST .../comments ```json {{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}} ``` """ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一个 v2.9 四相循环的讨论环节。 ## 你的任务 {goal_snapshot} ## 约束 {constraints} ## 黑板 API 你可以随时: - 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs) - 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}} - 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}} - 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim ## 行为准则 1. **你是自主的。**读黑板、思考、行动,不要等指令。 2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。 3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。 4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。 5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。 6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。 ## 讨论完成后 - 如果讨论收敛到可执行的任务,直接创建 sub task - 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决 - 标记完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "done", "agent": "{agent_id}"}}' ``` """ # Mail 续杯专用模板:不包含状态转换指令(系统自动标 done) MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 发件者: {from_agent} 主题: {title} 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) 请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 ⚠️ 如果任务已完成,直接写产出即可,不要调 status API。 """ class AgentBusyError(Exception): """Agent 无法 spawn(被占用/冷却/session 锁等) #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 """ def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None): self.agent_id = agent_id self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck self.detail = detail or {} super().__init__(f"{agent_id}: {reason}") class AgentSpawner: """Agent spawn 管理""" def __init__( self, db_path: Optional[Path] = None, agent_timeout: float = 630.0, dry_run: bool = False, api_host: str = "127.0.0.1", api_port: int = 8083, bootstrap_builder: Optional[Any] = None, gateway_timeout: float = 600.0, max_retries: int = 3, max_monitor_timeouts: int = 3, counter: Optional[Any] = None, ): """ Args: db_path: 项目黑板 DB 路径(用于写 task_attempts) agent_timeout: Agent 超时秒数 dry_run: 测试模式,不实际 spawn api_host: API 地址(供 Agent 回写) api_port: API 端口(供 Agent 回写) """ self.db_path = db_path self.agent_timeout = agent_timeout self.dry_run = dry_run self.api_host = api_host self.api_port = api_port self.bootstrap_builder = bootstrap_builder self.gateway_timeout = gateway_timeout self.max_retries = max_retries self.max_monitor_timeouts = max_monitor_timeouts # v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release) self.counter = counter # guardrails: 由 main.py 在初始化后赋值 self.guardrails = None # session 注册表 {session_id: {...}} self._sessions: Dict[str, Dict[str, Any]] = {} # B2 compact 等待计数器 {task_id: count} self._compact_waits: Dict[str, int] = {} # B1 假死计数器 {task_id: count} self._stuck_counts: Dict[str, int] = {} self._valid_agents_cache: Optional[set] = None def _load_valid_agents(self) -> set: """从 config/default.yaml 读取有效 Agent ID 列表(带缓存)""" if self._valid_agents_cache is not None: return self._valid_agents_cache config_path = Path(__file__).parent.parent / "config" / "default.yaml" if config_path.exists(): try: import yaml with open(config_path) as f: cfg = yaml.safe_load(f) profiles = cfg.get("daemon", {}).get("agent_profiles", {}) if profiles: self._valid_agents_cache = set(profiles.keys()) return self._valid_agents_cache except Exception: pass self._valid_agents_cache = { "zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger" } return self._valid_agents_cache @property def active_sessions(self) -> Dict[str, Dict[str, Any]]: """当前活跃的 spawn sessions""" return {sid: s for sid, s in self._sessions.items() if s.get("status") == "running"} def build_spawn_message( self, task_id: str, title: str, description: str, task_type: str = "", priority: int = 5, must_haves: str = "", project_id: str = "", agent_id: str = "", current_status: str = "claimed", retry_context: str = "", task: Optional[Any] = None, project_config: Optional[Dict[str, Any]] = None, spawn_type: str = "executor", # executor | discussion | review ) -> str: """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) Args: current_status: 任务当前状态(动态生成状态机提示) retry_context: 重试上下文(前轮产出摘要 + 审查意见) task: Task 对象(BootstrapBuilder 用) project_config: 项目配置(BootstrapBuilder 用) spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查) """ # discussion 类型直接用模板(不走 BootstrapBuilder) if spawn_type == "discussion": return self._build_discussion_prompt( task_id, title, description, must_haves, project_id, agent_id) # 尝试 BootstrapBuilder if self.bootstrap_builder and task is not None: try: # v3.1: spawn_type 映射到角色 (executor→executor, review→reviewer, discussion→planner) role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"} role = role_map.get(spawn_type, "executor") bootstrap_prompt = self.bootstrap_builder.build_for_task( task=task, role=role, project_config=project_config, ) # mail 任务用精简模板,不走 BootstrapBuilder if project_id == "_mail": return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) api_section = self._build_api_section( project_id, task_id, agent_id) return bootstrap_prompt + "\n\n---\n\n" + api_section except Exception: logger.exception("BootstrapBuilder failed, falling back to template") # mail 任务用精简模板 if project_id == "_mail": return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) # Fallback: 使用硬编码模板 # mail 任务直接 done,不走 review completion_status = "done" if project_id == "_mail" else "review" identity_section = self._inject_agent_identity(agent_id) guardrails_summary = self._get_guardrails_summary() return SPAWN_PROMPT_TEMPLATE.format( identity_section=identity_section, project_id=project_id, task_id=task_id, title=title, description=description or "(无描述)", task_type=task_type or "general", priority=priority, must_haves=must_haves or "(无)", agent_id=agent_id, api_base=f"http://{self.api_host}:{self.api_port}/api", retry_context=retry_context or "", completion_status=completion_status, guardrails_summary=guardrails_summary, ) def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" # mail 任务直接 done,不走 review success_status = '"done"' if project_id == "_mail" else '"review"' return f"""## 操作指令 ### 状态回写 开始工作: ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 写入产出 ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 完成后 成功:status → {success_status} | 失败:status → "failed" """ def _build_discussion_prompt(self, task_id: str, title: str, description: str, must_haves: str, project_id: str, agent_id: str) -> str: """构建讨论类 spawn prompt(§3.3 框架 + Boids)""" goal_snapshot = description or title constraints = must_haves or "(无特殊约束)" return DISCUSSION_PROMPT_TEMPLATE.format( goal_snapshot=goal_snapshot, constraints=constraints, project_id=project_id, task_id=task_id, agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, ) def _inject_agent_identity(self, agent_id: str) -> str: """#03: 注入 Agent 身份+专长""" caps = "通用" router = getattr(self, '_router_ref', None) if router: profile = router.agent_profiles.get(agent_id) if profile and getattr(profile, 'capabilities_zh', None): caps = ", ".join(profile.capabilities_zh) return f"你是 {agent_id},专长: {caps}。" def _get_guardrails_summary(self) -> str: """#03: 从 GuardrailEngine 提取红线摘要""" if not self.guardrails: return "无特殊限制" try: return "、".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6]) except Exception: return "无特殊限制" def _get_agent_profile(self, agent_id: str): """获取 Agent 能力画像""" router = getattr(self, '_router_ref', None) if router: return router.agent_profiles.get(agent_id) return None def _build_mail_prompt(self, task_id: str, title: str, description: str, must_haves: str, agent_id: str) -> str: """构建 Mail 专用精简模板""" # 解析 must_haves 获取 from 和 performative from_agent = agent_id performative = "request" try: meta = json.loads(must_haves) if must_haves else {} from_agent = meta.get("from", agent_id) performative = meta.get("performative", meta.get("type", "request")) except Exception: pass # 截断 title 和 text 用于模板安全 safe_title = (title or "").replace('"', '\\"')[:100] safe_text = (description or "").replace('"', '\\"') # 获取有效 Agent 列表(从 config/default.yaml 读取) valid_agents_list = self._load_valid_agents() valid_agents_str = " / ".join(sorted(valid_agents_list)) common_kwargs = dict( from_agent=from_agent, title=safe_title, text=safe_text, task_id=task_id, agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, valid_agents=valid_agents_str, ) if performative == "inform": return MAIL_INFORM_TEMPLATE.format(**common_kwargs) else: return MAIL_REQUEST_TEMPLATE.format(**common_kwargs) async def spawn_full_agent( self, agent_id: str, message: str, new_session: bool = False, task_id: Optional[str] = None, on_complete: Optional[Any] = None, use_main_session: bool = False, task_db_path: Optional[Path] = None, reuse_session_id: Optional[str] = None, on_checks_passed: Optional[Any] = None, skip_counter: bool = False, ) -> str: """Spawn Full Agent(异步非阻塞) v2.7.2: counter acquire/release 在内部统一管理。 调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。 Args: on_complete: 业务回调(agent_id, outcome) - 不含 counter.release, counter.release 由内部 wrapped_on_complete 保证。 use_main_session: True = 投递到主 Agent session(不传 --session-id) on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前) reuse_session_id: 传入指定 session-id 复用(用于续杯) — deprecated,use_main_session=True 已替代 Returns: session_id Raises: AgentBusyError: agent 被 counter 占用或冷却中 """ # ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ── # Step 0: 分配 session_id(纯计算,无 IO) if use_main_session: session_id = None elif reuse_session_id: session_id = reuse_session_id else: session_id = str(uuid.uuid4()) _sid_key = session_id or "main" # counter 用的 key # Phase 0: Pre-acquire 修复(无锁) # timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。 # asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json # 写操作不会真正并行。 if use_main_session: pre_state = self._check_session_state(agent_id) if pre_state.get("status") in ("timeout", "failed"): logger.info("Phase 0: %s status=%s, reviving before acquire", agent_id, pre_state["status"]) self._revive_session(agent_id) elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"): # status=running 但 lock PID 已死 → 假死,revive logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id) self._revive_session(agent_id) # Phase 1: Counter acquire(互斥锁) # v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成) if self.counter and not skip_counter: acquired = await self.counter.acquire(agent_id, _sid_key) if not acquired: raise AgentBusyError(agent_id, reason="counter_blocked") # Phase 2: Session check(在锁保护下,贴近 spawn) # 并列收集所有 block 原因,统一判定。 if use_main_session: session_state = self._check_session_state(agent_id) logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s", agent_id, session_state.get('status'), session_state.get('lock_pid'), session_state.get('lock_pid_alive'), session_state.get('recent_compact')) blockers = [] if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"): blockers.append(("session_locked", session_state.get("lock_pid"))) if session_state.get("status") == "running": if session_state.get("lock_pid_alive"): # 真 running:外部进程占用 blockers.append(("session_running", None)) else: # 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理 pass if session_state.get("recent_compact"): blockers.append(("session_compacting", None)) if blockers: # 释放 counter,报具体原因 if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) primary_reason, primary_detail = blockers[0] logger.info("Phase 2 blocked %s: %s (all=%s)", agent_id, primary_reason, blockers) raise AgentBusyError(agent_id, reason=primary_reason, detail={"blockers": blockers}) # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) # 此场景应被 Phase 0 提前修复,这里做兜底 if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"): logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving", agent_id) self._revive_session(agent_id) session_state = self._check_session_state(agent_id) if session_state.get("status") == "running": if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) raise AgentBusyError(agent_id, reason="session_stuck", detail={"status": "running after revive"}) # Phase 3: on_checks_passed 回调 # 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动, # wrapped_on_complete 不会执行。需在此 try/except 中手动 release。 if on_checks_passed: try: on_checks_passed() except Exception: if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) raise if self.dry_run: logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key) self._register_session(_sid_key, agent_id, task_id, pid=None) return _sid_key # 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key) async def _wrapped_on_complete(aid, outcome): try: if self.counter: self.counter.release(aid, _sid_key) finally: if on_complete: try: result = on_complete(aid, outcome) if asyncio.iscoroutine(result): await result except Exception: logger.warning("Business on_complete failed for %s", aid, exc_info=True) cmd = [ "openclaw", "agent", "--agent", agent_id, ] if session_id: cmd.extend(["--session-id", session_id]) cmd.extend([ "--message", message, "--json", "--timeout", str(int(self.gateway_timeout)), ]) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._register_session(session_id, agent_id, task_id, proc.pid) logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) # Schedule monitor(传 wrapped_on_complete) asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, on_complete=_wrapped_on_complete, db_path=task_db_path or self.db_path) ) return session_id except Exception as e: # spawn 失败也要 release counter if self.counter: self.counter.release(agent_id, _sid_key) logger.exception("Failed to spawn agent %s", agent_id) self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) raise async def spawn_subagent( self, task_description: str, task_id: Optional[str] = None, ) -> str: """Spawn Subagent(占位,实际通过 Gateway API) Returns: session_id """ session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # TODO: F17 通过 Gateway API sessions_spawn 实现 logger.info("Subagent spawn (session=%s) - placeholder", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # ── 续杯 Prompt 模板 ── RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 ## 任务信息 - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} - 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) 请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ## 操作指令 ### 查看任务当前状态 ```bash curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all ``` ### 如果已经完成,标记 review ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` ### 写入产出(如果之前没写) ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\ -H 'Content-Type: application/json' \\ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 如果无法解决,标记失败 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` {fallback_hint}""" async def _monitor_process( self, session_id: Optional[str], proc: asyncio.subprocess.Process, agent_id: str, task_id: Optional[str], on_complete: Optional[Any] = None, db_path: Optional[Path] = None, monitor_timeout_count: int = 0, ) -> None: """监控子进程全生命周期(设计文档 spawner-monitor-design.md)""" stdout_chunks: list = [] stderr_chunks: list = [] try: # ── 等待进程退出 + 流式读取 ── async def _read_streams(): async def _read_out(): while True: chunk = await proc.stdout.read(4096) if not chunk: break stdout_chunks.append(chunk) async def _read_err(): while True: chunk = await proc.stderr.read(4096) if not chunk: break stderr_chunks.append(chunk) await asyncio.gather(_read_out(), _read_err(), proc.wait()) await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout) # ── 情况 A:进程退出 ── exit_code = proc.returncode await self._handle_exit( session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path ) except asyncio.TimeoutError: # ── 情况 B:monitor timeout(进程没退出)── logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count + 1, self.max_monitor_timeouts) await self._handle_monitor_timeout( session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count ) async def _handle_exit(self, session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path): """情况 A:进程退出后的处理 v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。 只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。 A9(api_error/429)额外推回 pending + 设冷却。 """ stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace") stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # 解析 stdout JSON json_result = self._parse_stdout_json(stdout_text) logger.info("Parsed JSON result for agent=%s session=%s: %s", agent_id, session_id, json_result) # 查任务实际状态 task_status = self._get_task_status(db_path, task_id) if task_id else None # 分类 cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text) outcome = cls["outcome"] # 更新 session 状态 sid = session_id or "main" if sid in self._sessions: self._sessions[sid]["status"] = outcome self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat() self._sessions[sid]["exit_code"] = exit_code if json_result: self._sessions[sid]["meta"] = json_result # 记录 attempt self._record_attempt( task_id, agent_id, outcome, exit_code=exit_code, db_path=db_path, metadata={ "status": json_result.get("status"), "summary": json_result.get("summary"), "fallback_used": json_result.get("fallback_used"), "fallback_reason": json_result.get("fallback_reason"), "task_status_at_exit": task_status, } ) logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)", agent_id, session_id, outcome, exit_code, task_status) if cls["should_retry"]: # A2/A3: gateway_timeout → 续杯(on_complete 会 release counter) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, cls.get("retry_field", "retry_count") ) elif outcome == "api_error": # A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却 # 有上限:api_retry_count 累计达 max_retries 则标 failed await self._do_on_complete_async(on_complete, agent_id, outcome) if self.counter: self.counter.set_cooldown(agent_id) if db_path and task_id: retry_counts = self._get_retry_counts(db_path, task_id) api_count = retry_counts.get("api_retry_count", 0) + 1 retry_counts["api_retry_count"] = api_count self._update_retry_counts(db_path, task_id, retry_counts) if api_count >= self.max_retries: logger.error("Task %s api_retry_count=%d >= max_retries, marking failed", task_id, api_count) self._mark_task(db_path, task_id, "failed", { "reason": "max_api_retry_count", "count": api_count, }) else: self._mark_task(db_path, task_id, "pending", { "reason": "api_error_retry", "api_retry_count": api_count, }) logger.info("Task %s pushed back to pending (api_error, api_retry=%d/%d)", task_id, api_count, self.max_retries) elif outcome == "fallback_timeout" and not cls["should_retry"]: # A3/A3b: fallback 分级处理 # fallback_count 从 task_attempts.metadata 读取, # 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b) fallback_count = 0 if db_path and task_id: retry_counts = self._get_retry_counts(db_path, task_id) fallback_count = retry_counts.get("fallback_count", 0) + 1 retry_counts["fallback_count"] = fallback_count self._update_retry_counts(db_path, task_id, retry_counts) if fallback_count >= self.max_retries: # A3: 连续 fallback 达上限,标 failed logger.error("A3 fallback exhausted: agent=%s session=%s task=%s " "fallback_count=%d reason=%s", agent_id, session_id, task_id, fallback_count, json_result.get("fallback_reason")) await self._do_on_complete_async(on_complete, agent_id, outcome) if db_path and task_id: self._mark_task(db_path, task_id, "failed", { "reason": "fallback_exhausted", "fallback_count": fallback_count, "fallback_reason": json_result.get("fallback_reason"), }) else: # A3b: fallback 未达上限,retry + cooldown logger.warning("A3b fallback retry: agent=%s session=%s task=%s " "fallback_count=%d/%d reason=%s", agent_id, session_id, task_id, fallback_count, self.max_retries, json_result.get("fallback_reason")) if self.counter: self.counter.set_cooldown(agent_id, seconds=30) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, "retry_count" ) else: # 其他:A1(completed), A4(agent_failed), A7(auth_failed), # A8(gateway_unreachable), A11(lock_conflict), # A10(compact_failed), A12(agent_error) # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck", "compact_hanging", "agent_error") and self.counter: self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟 logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome) # 注意: cooldown 期间任务状态仍为 working,但 counter 已释放。 # DB 中的 working 是“假 working”——ticker 不会重新分配,_check_timeouts 会 # 在 cooldown 结束后回收。如果 ticker 在此期间给同一 agent 分配新任务,属正常行为。 # 进程退出 → on_complete release counter # 任务状态由各 outcome 自行处理(或等 ticker) await self._do_on_complete_async(on_complete, agent_id, outcome) async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count): """情况 B:monitor timeout""" # 读已缓冲的 stderr try: remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0) if remaining: stderr_chunks.append(remaining) except Exception: pass stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # 检查 session 状态 state = self._check_session_state(agent_id) # B1: 假死 - 先复活,连续假死 ≥2 次再 failed if state.get("status") == "running" and not state.get("lock_pid_alive", True): # 假死计数 stuck_count = self._stuck_counts.get(task_id, 0) + 1 self._stuck_counts[task_id] = stuck_count if stuck_count >= 2: # 连续假死 ≥2 次,标 failed logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)", agent_id, stuck_count, session_id) self._mark_task(db_path, task_id, "failed", {"reason": "session_stuck", "stuck_count": stuck_count, "diagnostics": state}) await self._do_on_complete_async(on_complete, agent_id, "session_stuck") return # 第 1 次假死 → 尝试复活 logger.warning("Agent %s session stuck (attempt %d), reviving (session=%s)", agent_id, stuck_count, session_id) revived = self._revive_session(agent_id) if revived: logger.info("Agent %s session revived, releasing counter for ticker re-dispatch", agent_id) # release counter → 任务保持 working → ticker 下次 re-dispatch await self._do_on_complete_async(on_complete, agent_id, "session_revived") else: # 复活失败 → 标 failed logger.error("Agent %s revive failed, marking failed", agent_id) self._mark_task(db_path, task_id, "failed", {"reason": "revive_failed", "stuck_count": stuck_count, "diagnostics": state}) await self._do_on_complete_async(on_complete, agent_id, "revive_failed") return # B2/B3/B4: 进程还活着 # B2: compact 进行中 - 不计入 monitor timeout 计数,继续等 if state.get("recent_compact"): logger.info("Agent %s recent compaction detected, extending patience " "(session=%s, monitor=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) # 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次 # 用独立计数器防止无限等待 compact_wait_count = self._compact_waits.get(task_id, 0) + 1 self._compact_waits[task_id] = compact_wait_count if compact_wait_count >= self.max_monitor_timeouts: # #07.3 ACT-2: compact_hanging 不标 failed,只 release counter # 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch", agent_id, compact_wait_count) self._compact_waits.pop(task_id, None) await self._do_on_complete_async(on_complete, agent_id, "compact_hanging") return # 继续等 asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) return # B3/B4: 无 compact,正常计数 monitor_timeout_count += 1 if monitor_timeout_count >= self.max_monitor_timeouts: logger.error("Agent %s max monitor timeouts (session=%s, count=%d)", agent_id, session_id, monitor_timeout_count) self._mark_task(db_path, task_id, "failed", { "reason": "max_monitor_timeouts", "count": monitor_timeout_count, "elapsed_seconds": monitor_timeout_count * int(self.agent_timeout), "diagnostics": state, }) await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts") return # 未超限:继续等(不 release counter) logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): """续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。 需要手动 release counter,然后 spawn_full_agent 内部会 acquire。 on_complete(含 counter release)置为 None,避免 double release。 """ # v2.8.1 Bug-4 fix: 不再手动 release counter + 置 None on_complete # counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent # on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter # 续杯前检查任务状态,已终态则跳过 if db_path and task_id: try: conn = get_connection(db_path) try: row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() # Bug-6 fix: pending 不是终态 if row and row["status"] in ("done", "failed", "cancelled", "review"): logger.info("Retry skip: task %s already %s (agent=%s)", task_id, row["status"], agent_id) # on_complete = wrapped_on_complete,会 release counter await self._do_on_complete_async(on_complete, agent_id, "task_already_done") return finally: conn.close() except Exception: logger.warning("Retry status check failed for %s, proceeding", task_id) # 直接读写 tasks 表的 retry_count if retry_field == "retry_count" and db_path and task_id: try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?", (task_id,), ) conn.commit() row = conn.execute( "SELECT retry_count FROM tasks WHERE id=?", (task_id,) ).fetchone() count = row["retry_count"] if row else 1 finally: conn.close() except Exception: logger.exception("Failed to update retry_count for task %s", task_id) count = 1 else: retry_counts = self._get_retry_counts(db_path, task_id) count = retry_counts.get(retry_field, 0) + 1 retry_counts[retry_field] = count self._update_retry_counts(db_path, task_id, retry_counts) if count >= self.max_retries: logger.error("Agent %s max retries (session=%s, %s=%d)", agent_id, session_id, retry_field, count) self._mark_task(db_path, task_id, "failed", { "reason": f"max_{retry_field}", "count": count, }) await self._do_on_complete_async(on_complete, agent_id, "max_retries") return logger.info("Agent %s retry %s=%d/%d (session=%s)", agent_id, retry_field, count, self.max_retries, session_id) # 构建续杯 message(Mail 用专用模板,Task 用标准模板) task_info = self._get_task_info(db_path, task_id) or {} project_id = task_info.get("project_id", "") is_mail = project_id == "_mail" if is_mail: must_haves = task_info.get("must_haves", "{}") try: meta = json.loads(must_haves) if must_haves else {} except Exception: meta = {} message = MAIL_RETRY_PROMPT.format( from_agent=meta.get("from", "unknown"), title=task_info.get("title", ""), retry_count=count, max_retries=self.max_retries, ) else: fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" message = self.RETRY_PROMPT.format( project_id=project_id, task_id=task_id or "", title=task_info.get("title", ""), retry_count=count, max_retries=self.max_retries, api_host=self.api_host, api_port=self.api_port, agent_id=agent_id, fallback_hint=fallback_hint, ) # v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire) # on_complete = wrapped_on_complete(含 counter release),作为业务回调传入 try: await self.spawn_full_agent( agent_id=agent_id, message=message, task_id=task_id, on_complete=on_complete, use_main_session=True, # #02: 续杯走 main session task_db_path=db_path, skip_counter=True, # Bug-4 fix: counter 已在原始 spawn 中持有 ) except AgentBusyError as e: # #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞 # release counter → 任务保持 working → ticker 重新 dispatch logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch", agent_id, e.reason) await self._do_on_complete_async(on_complete, agent_id, "retry_session_busy") except Exception: logger.exception("Retry spawn failed for %s", agent_id) await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed") # ── 辅助方法 ── @staticmethod def _parse_stdout_json(stdout_text: str) -> dict: """解析 openclaw agent --json 的 stdout 输出 返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads 不再提取 meta,直接用顶层字段。 """ text = stdout_text.strip() if not text: return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} try: data = json.loads(text) except json.JSONDecodeError: # 多行输出,找最后一个 JSON for line in reversed(text.splitlines()): try: data = json.loads(line) break except json.JSONDecodeError: continue else: return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} # 从 data.result.meta.executionTrace 取 fallback 信息 result = data.get("result", {}) meta = result.get("meta", {}) trace = meta.get("executionTrace", {}) return { "status": data.get("status"), "summary": data.get("summary"), "fallback_used": trace.get("fallbackUsed", False), "fallback_reason": trace.get("fallbackReason"), "payloads": result.get("payloads", []), } @staticmethod def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]: """查任务实际 API 状态""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() return row["status"] if row else None finally: conn.close() except Exception: return None @staticmethod def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]: """查任务基本信息""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT id, title, status FROM tasks WHERE id=?", (task_id,) ).fetchone() if not row: return None info = dict(row) # 从 db_path 推断 project_id: data//blackboard.db info["project_id"] = db_path.parent.name return info finally: conn.close() except Exception: return None @staticmethod def _revive_session(agent_id: str) -> bool: """假死复活术:修改 sessions.json status 从 running 改为 idle""" sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return False try: with open(sessions_path) as f: sessions = json.load(f) main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) if main_session.get("status") != "running": return False # 不是 running 状态,不需要复活 main_session["status"] = "idle" sessions[main_key] = main_session with open(sessions_path, "w") as f: json.dump(sessions, f, indent=2) logger.info("Revived %s: sessions.json status changed running→idle", agent_id) # #07 O4: 同时清理残留 lock 文件 sf = main_session.get("sessionFile", "") if sf: lock_path = Path(sf + ".lock") if lock_path.exists(): try: lock_path.unlink() logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name) except Exception: pass return True except Exception: logger.exception("Failed to revive %s", agent_id) return False @staticmethod def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 300) -> bool: """v2.8.1 Fix-1: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, 但不保证更新 compactionCheckpoints。 """ if not session_file or not pathlib.Path(session_file).exists(): return False try: from datetime import datetime, timezone now = datetime.now(timezone.utc) with open(session_file, "rb") as sf: sf.seek(0, 2) size = sf.tell() sf.seek(max(0, size - 51200)) tail = sf.read().decode("utf-8", errors="replace") for line in reversed(tail.splitlines()): if not line.strip(): continue try: import json as _json obj = _json.loads(line) except (_json.JSONDecodeError, ValueError): continue if obj.get("type") == "compaction": ts = obj.get("timestamp", "") if ts: try: ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) if (now - ct).total_seconds() < window_seconds: return True except (ValueError, TypeError): pass ts = obj.get("timestamp", "") if ts: try: ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) if (now - ct).total_seconds() >= window_seconds: break except (ValueError, TypeError): pass return False except Exception: return False @staticmethod def _check_session_state(agent_id: str) -> dict: """检查 sessions.json 和 lock 状态 v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), 替代失效的 compactionCheckpoints 检测。 """ result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return result try: with open(sessions_path) as f: sessions = json.load(f) main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) result["status"] = main_session.get("status", "unknown") # 检查 lock (v3.1: done/timeout 时 lock 视为过期) sf = main_session.get("sessionFile", "") if sf: lock_path = Path(sf + ".lock") if lock_path.exists(): try: lock_data = json.loads(lock_path.read_text()) pid = lock_data.get("pid") result["lock_pid"] = pid if pid: import os try: os.kill(pid, 0) result["lock_pid_alive"] = True except ProcessLookupError: result["lock_pid_alive"] = False # session 已完成/超时 > lock 是 Gateway 冷却锁,不阻塞新 turn if result["status"] in ("done", "timeout"): result["lock_pid_alive"] = False result["lock_expired"] = True except Exception: pass # v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描 # 只在 agent 非空闲时才扫描(减少不必要 I/O) if result["status"] not in ("done", "idle", "unknown", None) and sf: result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) except Exception: pass return result @staticmethod def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str, task_status: Optional[str], stdout_text: str = "") -> dict: """分类退出原因,返回处理策略 v3.0: 基于 JSON status/summary/executionTrace 判定,不再依赖 transport 字段。 只有 status="timeout" 触发 retry,其他都不 retry。 """ status = json_result.get("status") summary = json_result.get("summary", "") fallback_used = json_result.get("fallback_used", False) # A4: 任务 DB status=failed(Agent 自己标的) if task_status == "failed": return {"outcome": "agent_failed", "should_retry": False} # A1: status=ok + completed + 非 fallback if status == "ok" and summary == "completed" and not fallback_used: return {"outcome": "completed", "should_retry": False} # A5/A6: status=ok + fallback if status == "ok" and fallback_used: return {"outcome": "fallback_timeout", "should_retry": False} # A2/A3: status=timeout → 唯一续杯场景 # 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程 # 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。 if status == "timeout": return {"outcome": "gateway_timeout", "should_retry": True, "retry_field": "retry_count"} # A0: stdout 为空且 exit≠0 = 进程异常终止 # 注意:exit=0 + stdout 为空可能是正常完成(--json 没输出), # 此时 task_status 如果是 done/review 会被上面的 A4 兜住 if status is None and not stdout_text.strip() and exit_code != 0: return {"outcome": "crashed", "should_retry": False, "original": "process_crash"} # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 # 查任务状态判断 if status is None and not stdout_text.strip() and exit_code == 0: terminal_statuses = {"done", "review"} if task_status in terminal_statuses: return {"outcome": "completed", "should_retry": False} return {"outcome": "agent_error", "should_retry": False} # A7-A12: status=error → 不续杯,stderr 辅助分类 if status == "error": stderr_lower = stderr_text.lower() if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]): return {"outcome": "auth_failed", "should_retry": False} if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]): return {"outcome": "gateway_unreachable", "should_retry": False} if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]): return {"outcome": "api_error", "should_retry": False} if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): return {"outcome": "compact_failed", "should_retry": False} if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]): return {"outcome": "lock_conflict", "should_retry": False} return {"outcome": "agent_error", "should_retry": False} # 兜底:status 未知值 return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"} @staticmethod def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict: """从最新 task_attempt 的 metadata 读计数器""" defaults = {"retry_count": 0, "connect_retry_count": 0, "api_retry_count": 0, "lock_retry_count": 0, "monitor_timeout_count": 0} if not db_path or not task_id: return defaults try: conn = get_connection(db_path) try: row = conn.execute( "SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1", (task_id,) ).fetchone() if row and row["metadata"]: stored = json.loads(row["metadata"]) for k in defaults: if k in stored: defaults[k] = stored[k] finally: conn.close() except Exception: pass return defaults def _update_retry_counts(self, db_path: Optional[Path], task_id: Optional[str], counts: dict): """将 retry counts 写回最新 task_attempt 的 metadata""" if not db_path or not task_id: return try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT rowid, metadata FROM task_attempts " "WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1", (task_id,) ).fetchone() if row: meta = json.loads(row["metadata"]) if row["metadata"] else {} meta.update(counts) conn.execute( "UPDATE task_attempts SET metadata=? WHERE rowid=?", (json.dumps(meta), row["rowid"]) ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to update retry counts for task %s", task_id) def _mark_task(self, db_path: Optional[Path], task_id: Optional[str], status: str, detail: Optional[dict] = None): """标记任务状态(用于 failed/escalate)""" if not db_path or not task_id: return try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?", (status, task_id) ) if detail: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", status, json.dumps(detail, ensure_ascii=False)) ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to mark task %s as %s", task_id, status) @staticmethod def _do_on_complete(on_complete, agent_id, outcome): """执行 on_complete 回调(同步+异步兼容)""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): # 注意:这里是同步调用的,不能 await # 在 _monitor_process 的 async 上下文中应该用 await pass except Exception: pass async def _do_on_complete_async(self, on_complete, agent_id, outcome): """异步执行 on_complete 回调""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): await result except Exception: logger.warning("on_complete callback failed for %s", agent_id, exc_info=True) def _register_session( self, session_id: str, agent_id: str, task_id: Optional[str], pid: Optional[int], ) -> None: """注册 spawn session""" self._sessions[session_id] = { "agent_id": agent_id, "task_id": task_id, "pid": pid, "status": "running", "started_at": datetime.utcnow().isoformat(), "completed_at": None, } def _record_attempt( self, task_id: Optional[str], agent_id: str, outcome: str, exit_code: Optional[int] = None, error: Optional[str] = None, metadata: Optional[dict] = None, db_path: Optional[Path] = None, ) -> None: """记录 task_attempt""" effective_db = db_path or self.db_path if not task_id or not effective_db: return try: conn = get_connection(effective_db) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?", (task_id,), ).fetchone() attempt_number = (row["max_a"] or 0) + 1 meta = metadata or {} if error: meta["error"] = error conn.execute( "INSERT INTO task_attempts " "(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) " "VALUES (?,?,?,?,?,?,datetime('now'))", (task_id, attempt_number, agent_id, outcome, exit_code, json.dumps(meta)), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent_id, "agent_completed" if outcome == "completed" else "daemon_tick", json.dumps({"outcome": outcome, "attempt": attempt_number})), ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to record attempt for task %s", task_id) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """获取 session 信息""" return self._sessions.get(session_id) def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: """v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)""" for sid, info in self._sessions.items(): if info.get("agent_id") == agent_id and info.get("status") == "running": return info return None def cleanup_session(self, session_id: str) -> None: """清理 session""" if session_id in self._sessions: session = self._sessions[session_id] task_id = session.get("task_id") del self._sessions[session_id] # 清理 B2 compact 等待计数器 if task_id and task_id in self._compact_waits: del self._compact_waits[task_id]