"""Agent Spawner - 异步 spawn Full Agent / Subagent Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) """ from __future__ import annotations import asyncio import json import logging import os import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── # Mail 专用模板:inform 类型(纯通知,状态由系统管理) MAIL_INFORM_TEMPLATE = """你收到一封飞鸽传书(纯通知)。 发件者: {from_agent} 主题: {title} 内容: {text} 已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。 ⚠️ 不要执行任何状态转换命令。 """ # Mail 专用模板:request 类型(需要处理并回复,状态由系统管理) MAIL_REQUEST_TEMPLATE = """你收到一封飞鸽传书,需要你处理并回复。 发件者: {from_agent} 主题: {title} 内容: {text} ### 如何回复发件者 curl -s -X POST http://localhost:8083/api/mail \\ -H 'Content-Type: application/json' \\ -d '{{"from": "{agent_id}", "in_reply_to": "{task_id}", "title": "回复: {title}", "text": "你的回复内容"}}' ⚠️ 不需要填 "to",系统自动回复给发件者。 ### 如何给其他人发新邮件 curl -s -X POST http://localhost:8083/api/mail \\ -H 'Content-Type: application/json' \\ -d '{{"from": "{agent_id}", "to": "对方agent-id", "title": "标题", "text": "正文", "type": "inform"}}' ⚠️ to 必须是有效的 agent id: {valid_agents} ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request) ⚠️ 不能给自己发邮件 ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 """ SPAWN_PROMPT_TEMPLATE = """{identity_section} ## 任务 {title} {description} 项目: {project_id} | ID: {task_id} 类型: {task_type} | 优先级: {priority} 验收标准: {must_haves} {retry_context} ## 你能做什么 - 读任务详情(含依赖、讨论、产出): GET {api_base}/projects/{project_id}/tasks/{task_id}?expand=all - 读所有活跃任务: GET {api_base}/projects/{project_id}/tasks?status=working,claimed,review - 写产出: POST {api_base}/projects/{project_id}/tasks/{task_id}/outputs - 写评论/交接: POST {api_base}/projects/{project_id}/tasks/{task_id}/comments - 更新状态: POST {api_base}/projects/{project_id}/tasks/{task_id}/status - 创建子任务: POST {api_base}/projects/{project_id}/tasks - 认领任务: POST {api_base}/projects/{project_id}/tasks/{{{{id}}}}/claim ## 约束 - 完成后必须写产出物(output)并标 review,不能无产出就提交 - 失败了标 failed 并写明原因 - 产出物 handoff comment ≥ 50 字符(用于系统验证) - 禁止使用 sessions_send 直接发消息(用 Mail API 或黑板 comment) - 委托他人做事用黑板 comment @agent-id,系统自动路由(如 @zhaoyun-data 你来获取数据,无需手动传 mentions 数组) - 安全红线: {guardrails_summary} ### API 请求体示例 写产出: POST .../outputs ```json {{{{"agent": "{agent_id}", "content_type": "code", "title": "产出标题", "content_path": "/path/to/file", "summary": "简要说明"}}}} ``` 写评论: POST .../comments ```json {{{{"author": "{agent_id}", "body": "评论内容(≥50字符)", "comment_type": "handoff"}}}} ``` """ DISCUSSION_PROMPT_TEMPLATE = """你被 spawn 来参与黑板讨论。这是一个 v2.9 四相循环的讨论环节。 ## 你的任务 {goal_snapshot} ## 约束 {constraints} ## 黑板 API 你可以随时: - 读黑板:GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all(含 comments、outputs) - 写 comment:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments body: {{"author": "{agent_id}", "body": "内容(@agent-id 自动路由)"}} - 创建 sub task:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks body: {{"title": "...", "description": "...", "task_type": "...", "parent_task": "{task_id}", "must_haves": "{{\"capability\": \"...\"}}"}} - 认领任务:POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{{sub_task_id}}/claim ## 行为准则 1. **你是自主的。**读黑板、思考、行动,不要等指令。 2. **不重复别人的工作。**动手前先读黑板看谁在做什么(Separation)。 3. **保持方向对齐。**你的产出方向和 parent goal 对齐,不确定时 @pangtong-fujunshi(Alignment)。 4. **产出可共享。**产出写入黑板,让其他人能看到你的成果(Cohesion)。 5. **不越界。**安全红线不要碰,超出能力的 @ 庞统升级(Boundary)。 6. **随时讨论。**执行过程中需要协作时 @ 对应 Agent,讨论是灵活的不是固定阶段的。 ## 讨论完成后 - 如果讨论收敛到可执行的任务,直接创建 sub task - 如果有分歧或不确定,在黑板上写 comment @ 庞统裁决 - 标记完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "done", "agent": "{agent_id}"}}' ``` """ # Mail 续杯专用模板:不包含状态转换指令(系统自动标 done) MAIL_RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 发件者: {from_agent} 主题: {title} 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) 请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。 ⚠️ 如果任务已完成,直接写产出即可,不要调 status API。 """ class AgentBusyError(Exception): """Agent 无法 spawn(被占用/冷却/session 锁等) #07: reason 字段区分具体原因,便于 dispatcher 层区分处理。 """ def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None): self.agent_id = agent_id self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck self.detail = detail or {} super().__init__(f"{agent_id}: {reason}") class AgentSpawner: """Agent spawn 管理""" def __init__( self, db_path: Optional[Path] = None, agent_timeout: float = 630.0, dry_run: bool = False, api_host: str = "127.0.0.1", api_port: int = 8083, bootstrap_builder: Optional[Any] = None, gateway_timeout: float = 600.0, max_retries: int = 3, max_monitor_timeouts: int = 3, counter: Optional[Any] = None, ): """ Args: db_path: 项目黑板 DB 路径(用于写 task_attempts) agent_timeout: Agent 超时秒数 dry_run: 测试模式,不实际 spawn api_host: API 地址(供 Agent 回写) api_port: API 端口(供 Agent 回写) """ self.db_path = db_path self.agent_timeout = agent_timeout self.dry_run = dry_run self.api_host = api_host self.api_port = api_port self.bootstrap_builder = bootstrap_builder self.gateway_timeout = gateway_timeout self.max_retries = max_retries self.max_monitor_timeouts = max_monitor_timeouts # v2.7.2: counter 引用(spawn_full_agent 内部 acquire/release) self.counter = counter # guardrails: 由 main.py 在初始化后赋值 self.guardrails = None # session 注册表 {session_id: {...}} self._sessions: Dict[str, Dict[str, Any]] = {} # B2 compact 等待计数器 {task_id: count} self._compact_waits: Dict[str, int] = {} # B1 假死计数器 {task_id: count} self._stuck_counts: Dict[str, int] = {} self._valid_agents_cache: Optional[set] = None def _load_valid_agents(self) -> set: """从 config/default.yaml 读取有效 Agent ID 列表(带缓存)""" if self._valid_agents_cache is not None: return self._valid_agents_cache config_path = Path(__file__).parent.parent / "config" / "default.yaml" if config_path.exists(): try: import yaml with open(config_path) as f: cfg = yaml.safe_load(f) profiles = cfg.get("daemon", {}).get("agent_profiles", {}) if profiles: self._valid_agents_cache = set(profiles.keys()) return self._valid_agents_cache except Exception: pass self._valid_agents_cache = { "zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger" } return self._valid_agents_cache @property def active_sessions(self) -> Dict[str, Dict[str, Any]]: """当前活跃的 spawn sessions""" return {sid: s for sid, s in self._sessions.items() if s.get("status") == "running"} def build_spawn_message( self, task_id: str, title: str, description: str, task_type: str = "", priority: int = 5, must_haves: str = "", project_id: str = "", agent_id: str = "", current_status: str = "claimed", retry_context: str = "", task: Optional[Any] = None, project_config: Optional[Dict[str, Any]] = None, spawn_type: str = "executor", # executor | discussion | review ) -> str: """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) Args: current_status: 任务当前状态(动态生成状态机提示) retry_context: 重试上下文(前轮产出摘要 + 审查意见) task: Task 对象(BootstrapBuilder 用) project_config: 项目配置(BootstrapBuilder 用) spawn_type: spawn 类型(executor=执行, discussion=讨论, review=审查) """ # discussion 类型直接用模板(不走 BootstrapBuilder) if spawn_type == "discussion": return self._build_discussion_prompt( task_id, title, description, must_haves, project_id, agent_id) # mail 任务用精简模板 if project_id == "_mail": return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) # 走 BootstrapBuilder 新路径 if self.bootstrap_builder and task is not None: role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"} role = role_map.get(spawn_type, "executor") bootstrap_prompt = self.bootstrap_builder.build_for_task( task=task, role=role, ) api_section = self._build_api_section( project_id, task_id, agent_id) return bootstrap_prompt + "\n\n---\n\n" + api_section # 无 BootstrapBuilder 或无 task 对象 → 最小 fallback # 只保留任务上下文 + API 操作指令 logger.warning("No BootstrapBuilder or task object, using minimal fallback") return self._build_minimal_fallback( task_id, title, description, must_haves, project_id, agent_id) def _build_minimal_fallback(self, task_id, title, description, must_haves, project_id, agent_id): """最小 fallback:只有任务上下文 + API 指令""" task_section = f"""## 任务 {title} {description or "(无描述)"} 项目: {project_id} | ID: {task_id} 验收标准: {must_haves or "(无)"}""" api_section = self._build_api_section(project_id, task_id, agent_id) return task_section + "\n\n---\n\n" + api_section def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" # mail 任务直接 done,不走 review success_status = '"done"' if project_id == "_mail" else '"review"' return f"""## 操作指令 ### 状态回写 开始工作: ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 写入产出 ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 完成后 成功:status → {success_status} | 失败:status → "failed" """ def _build_discussion_prompt(self, task_id: str, title: str, description: str, must_haves: str, project_id: str, agent_id: str) -> str: """构建讨论类 spawn prompt(§3.3 框架 + Boids)""" goal_snapshot = description or title constraints = must_haves or "(无特殊约束)" return DISCUSSION_PROMPT_TEMPLATE.format( goal_snapshot=goal_snapshot, constraints=constraints, project_id=project_id, task_id=task_id, agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, ) def _inject_agent_identity(self, agent_id: str) -> str: """#03: 注入 Agent 身份+专长""" caps = "通用" router = getattr(self, '_router_ref', None) if router: profile = router.agent_profiles.get(agent_id) if profile and getattr(profile, 'capabilities_zh', None): caps = ", ".join(profile.capabilities_zh) return f"你是 {agent_id},专长: {caps}。" def _get_guardrails_summary(self) -> str: """#03: 从 GuardrailEngine 提取红线摘要""" if not self.guardrails: return "无特殊限制" try: return "、".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6]) except Exception: return "无特殊限制" def _get_agent_profile(self, agent_id: str): """获取 Agent 能力画像""" router = getattr(self, '_router_ref', None) if router: return router.agent_profiles.get(agent_id) return None def _build_mail_prompt(self, task_id: str, title: str, description: str, must_haves: str, agent_id: str) -> str: """构建 Mail 专用精简模板""" # 解析 must_haves 获取 from 和 performative from_agent = agent_id performative = "request" try: meta = json.loads(must_haves) if must_haves else {} from_agent = meta.get("from", agent_id) performative = meta.get("performative", meta.get("type", "request")) except Exception: pass # 截断 title 和 text 用于模板安全 safe_title = (title or "").replace('"', '\\"')[:100] safe_text = (description or "").replace('"', '\\"') # 获取有效 Agent 列表(从 config/default.yaml 读取) valid_agents_list = self._load_valid_agents() valid_agents_str = " / ".join(sorted(valid_agents_list)) common_kwargs = dict( from_agent=from_agent, title=safe_title, text=safe_text, task_id=task_id, agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, valid_agents=valid_agents_str, ) if performative == "inform": return MAIL_INFORM_TEMPLATE.format(**common_kwargs) else: return MAIL_REQUEST_TEMPLATE.format(**common_kwargs) async def spawn_full_agent( self, agent_id: str, message: str, new_session: bool = False, task_id: Optional[str] = None, on_complete: Optional[Any] = None, use_main_session: bool = False, task_db_path: Optional[Path] = None, reuse_session_id: Optional[str] = None, on_checks_passed: Optional[Any] = None, skip_counter: bool = False, broadcast_task_ids: Optional[List[str]] = None, ) -> str: """Spawn Full Agent(异步非阻塞) v2.7.2: counter acquire/release 在内部统一管理。 调用级生命周期:spawn 时 acquire,进程退出时 release(通过 wrapped_on_complete)。 Args: on_complete: 业务回调(agent_id, outcome) - 不含 counter.release, counter.release 由内部 wrapped_on_complete 保证。 use_main_session: True = 投递到主 Agent session(不传 --session-id) on_checks_passed: 所有检查通过后的回调(session check + counter acquire 后、subprocess 前) reuse_session_id: 传入指定 session-id 复用(用于续杯) - deprecated,use_main_session=True 已替代 Returns: session_id Raises: AgentBusyError: agent 被 counter 占用或冷却中 """ # ── #07 Acquire-First: counter 前置 → session check 在锁内贴近 spawn ── # Step 0: 分配 session_id(纯计算,无 IO) if use_main_session: session_id = None elif reuse_session_id: session_id = reuse_session_id else: session_id = str(uuid.uuid4()) _sid_key = session_id or "main" # counter 用的 key # Phase 0: Pre-acquire 修复(无锁) # timeout/failed 状态先修复再 acquire。revive 只改 running→idle,幂等安全。 # asyncio 协作式并发保证同一时刻只有一个协程在执行,revive 的 sessions.json # 写操作不会真正并行。 if use_main_session: pre_state = self._check_session_state(agent_id) if pre_state.get("status") in ("timeout", "failed"): logger.info("Phase 0: %s status=%s, reviving before acquire", agent_id, pre_state["status"]) self._revive_session(agent_id) elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"): # status=running 但 lock PID 已死 → 假死,revive logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id) self._revive_session(agent_id) # Phase 1: Counter acquire(互斥锁) # v2.8.1 Bug-4 fix: retry 时跳过 counter(counter 从原始 spawn 保持到 retry 完成) if self.counter and not skip_counter: acquired = await self.counter.acquire(agent_id, _sid_key) if not acquired: raise AgentBusyError(agent_id, reason="counter_blocked") # Phase 2: Session check(在锁保护下,贴近 spawn) # 并列收集所有 block 原因,统一判定。 if use_main_session: session_state = self._check_session_state(agent_id) logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s", agent_id, session_state.get('status'), session_state.get('lock_pid'), session_state.get('lock_pid_alive'), session_state.get('recent_compact')) blockers = [] if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"): blockers.append(("session_locked", session_state.get("lock_pid"))) if session_state.get("status") == "running": if session_state.get("lock_pid_alive"): # 真 running:外部进程占用 blockers.append(("session_running", None)) else: # 假 running:lock PID 死了但 status 还在 running → Phase 2.5 处理 pass if session_state.get("recent_compact"): blockers.append(("session_compacting", None)) if blockers: # 释放 counter,报具体原因 if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) primary_reason, primary_detail = blockers[0] logger.info("Phase 2 blocked %s: %s (all=%s)", agent_id, primary_reason, blockers) raise AgentBusyError(agent_id, reason=primary_reason, detail={"blockers": blockers}) # Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检) # 此场景应被 Phase 0 提前修复,这里做兜底 if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"): logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving", agent_id) self._revive_session(agent_id) session_state = self._check_session_state(agent_id) if session_state.get("status") == "running": if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) raise AgentBusyError(agent_id, reason="session_stuck", detail={"status": "running after revive"}) # Phase 3: on_checks_passed 回调 # 注意:如果回调抛异常,counter 已 acquire 但 subprocess 未启动, # wrapped_on_complete 不会执行。需在此 try/except 中手动 release。 if on_checks_passed: try: on_checks_passed() except Exception: if self.counter and not skip_counter: self.counter.release(agent_id, _sid_key) raise if self.dry_run: logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key) self._register_session(_sid_key, agent_id, task_id, pid=None) return _sid_key # 4. wrapped_on_complete 保证 counter release(闭包捕获 _sid_key) async def _wrapped_on_complete(aid, outcome): try: if self.counter: self.counter.release(aid, _sid_key) finally: if on_complete: try: result = on_complete(aid, outcome) if asyncio.iscoroutine(result): await result except Exception: logger.warning("Business on_complete failed for %s", aid, exc_info=True) cmd = [ "openclaw", "agent", "--agent", agent_id, ] if session_id: cmd.extend(["--session-id", session_id]) cmd.extend([ "--message", message, "--json", "--timeout", str(int(self.gateway_timeout)), ]) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._register_session(session_id, agent_id, task_id, proc.pid, broadcast_task_ids=broadcast_task_ids) logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) # Schedule monitor(传 wrapped_on_complete) asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, on_complete=_wrapped_on_complete, db_path=task_db_path or self.db_path) ) return session_id except Exception as e: # spawn 失败也要 release counter if self.counter: self.counter.release(agent_id, _sid_key) logger.exception("Failed to spawn agent %s", agent_id) self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) raise async def spawn_subagent( self, task_description: str, task_id: Optional[str] = None, ) -> str: """Spawn Subagent(占位,实际通过 Gateway API) Returns: session_id """ session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # TODO: F17 通过 Gateway API sessions_spawn 实现 logger.info("Subagent spawn (session=%s) - placeholder", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # ── 续杯 Prompt 模板 ── RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 ## 任务信息 - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} - 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) 请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ## 操作指令 ### 查看任务当前状态 ```bash curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all ``` ### 如果已经完成,标记 review ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` ### 写入产出(如果之前没写) ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\ -H 'Content-Type: application/json' \\ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 如果无法解决,标记失败 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` {fallback_hint}""" async def _monitor_process( self, session_id: Optional[str], proc: asyncio.subprocess.Process, agent_id: str, task_id: Optional[str], on_complete: Optional[Any] = None, db_path: Optional[Path] = None, monitor_timeout_count: int = 0, ) -> None: """监控子进程全生命周期(设计文档 spawner-monitor-design.md)""" stdout_chunks: list = [] stderr_chunks: list = [] try: # ── 等待进程退出 + 流式读取 ── async def _read_streams(): async def _read_out(): while True: chunk = await proc.stdout.read(4096) if not chunk: break stdout_chunks.append(chunk) async def _read_err(): while True: chunk = await proc.stderr.read(4096) if not chunk: break stderr_chunks.append(chunk) await asyncio.gather(_read_out(), _read_err(), proc.wait()) await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout) # ── 情况 A:进程退出 ── exit_code = proc.returncode await self._handle_exit( session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path ) except asyncio.TimeoutError: # ── 情况 B:monitor timeout(进程没退出)── logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count + 1, self.max_monitor_timeouts) await self._handle_monitor_timeout( session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count ) async def _handle_exit(self, session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path): """情况 A:进程退出后的处理 v2.7.2: 进程退出 = counter release(由 on_complete = wrapped_on_complete 保证)。 只有 A2/A3(gateway_timeout)触发续杯,其他都不 retry。 A9(api_error/429)额外推回 pending + 设冷却。 """ stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace") stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # 解析 stdout JSON json_result = self._parse_stdout_json(stdout_text) logger.info("Parsed JSON result for agent=%s session=%s: %s", agent_id, session_id, json_result) # 查任务实际状态 task_status = self._get_task_status(db_path, task_id) if task_id else None # 分类 cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text) outcome = cls["outcome"] # 更新 session 状态 sid = session_id or "main" if sid in self._sessions: self._sessions[sid]["status"] = outcome self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat() self._sessions[sid]["exit_code"] = exit_code if json_result: self._sessions[sid]["meta"] = json_result # 记录 attempt self._record_attempt( task_id, agent_id, outcome, exit_code=exit_code, db_path=db_path, metadata={ "status": json_result.get("status"), "summary": json_result.get("summary"), "fallback_used": json_result.get("fallback_used"), "fallback_reason": json_result.get("fallback_reason"), "task_status_at_exit": task_status, } ) logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)", agent_id, session_id, outcome, exit_code, task_status) # 广播反馈追踪(Phase 1 bug fix) if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker: # 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker sess_info = self._sessions.get(session_id or "main", {}) bt_ids = sess_info.get("broadcast_task_ids") or [] # 广播场景一律标 no_reply:Agent 只 claim 一个任务, # 其余任务的 tracker 不能被 claimed 清除 for real_task_id in bt_ids: self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply") elif task_id and hasattr(self, '_ticker') and self._ticker: outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply" self._ticker.record_broadcast_response(task_id, agent_id, outcome_str) if cls["should_retry"]: # cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10) cooldown_seconds = cls.get("cooldown_seconds", 0) if cooldown_seconds and self.counter: self.counter.set_cooldown(agent_id, seconds=cooldown_seconds) # A2/A3: gateway_timeout → 续杯(on_complete 会 release counter) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, cls.get("retry_field", "retry_count") ) elif outcome == "api_error": # A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却 # 有上限:api_retry_count 累计达 max_retries 则标 failed await self._do_on_complete_async(on_complete, agent_id, outcome) if self.counter: self.counter.set_cooldown(agent_id) if db_path and task_id: retry_counts = self._get_retry_counts(db_path, task_id) api_count = retry_counts.get("api_retry_count", 0) + 1 retry_counts["api_retry_count"] = api_count self._update_retry_counts(db_path, task_id, retry_counts) if api_count >= self.max_retries: logger.error("Task %s api_retry_count=%d >= max_retries, marking failed", task_id, api_count) self._mark_task(db_path, task_id, "failed", { "reason": "max_api_retry_count", "count": api_count, }) else: self._mark_task(db_path, task_id, "pending", { "reason": "api_error_retry", "api_retry_count": api_count, }) logger.info("Task %s pushed back to pending (api_error, api_retry=%d/%d)", task_id, api_count, self.max_retries) elif outcome == "fallback_timeout" and not cls["should_retry"]: # A3/A3b: fallback 分级处理 # fallback_count 从 task_attempts.metadata 读取, # 达 max_retries 标 failed(A3),否则 retry + cooldown(A3b) fallback_count = 0 if db_path and task_id: retry_counts = self._get_retry_counts(db_path, task_id) fallback_count = retry_counts.get("fallback_count", 0) + 1 retry_counts["fallback_count"] = fallback_count self._update_retry_counts(db_path, task_id, retry_counts) if fallback_count >= self.max_retries: # A3: 连续 fallback 达上限,标 failed logger.error("A3 fallback exhausted: agent=%s session=%s task=%s " "fallback_count=%d reason=%s", agent_id, session_id, task_id, fallback_count, json_result.get("fallback_reason")) await self._do_on_complete_async(on_complete, agent_id, outcome) if db_path and task_id: self._mark_task(db_path, task_id, "failed", { "reason": "fallback_exhausted", "fallback_count": fallback_count, "fallback_reason": json_result.get("fallback_reason"), }) else: # A3b: fallback 未达上限,retry + cooldown logger.warning("A3b fallback retry: agent=%s session=%s task=%s " "fallback_count=%d/%d reason=%s", agent_id, session_id, task_id, fallback_count, self.max_retries, json_result.get("fallback_reason")) if self.counter: self.counter.set_cooldown(agent_id, seconds=60) await self._do_retry( session_id, agent_id, task_id, on_complete, db_path, "fallback_retry_count" # 独立计数,不与 gateway_timeout 的 retry_count 共用 ) else: # 其他:A1(completed), A4(agent_failed), A7(auth_failed), # A8(gateway_unreachable), A11(lock_conflict), # A10(compact_failed), A12(agent_error) # v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间 if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck", "compact_hanging", "agent_error", "compact_interrupted") and self.counter: self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟 logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome) # F1: 不可恢复 outcome → 立刻标 failed + 写黑板 if outcome in ("auth_failed", "agent_error") and db_path and task_id: logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome) self._mark_task(db_path, task_id, "failed", { "reason": outcome, "stderr_preview": (stderr_text or "")[:500], }) # 注意: cooldown 期间任务状态仍为 working,但 counter 已释放。 # DB 中的 working 是"假 working"——ticker 不会重新分配,_check_timeouts 会 # 在 cooldown 结束后回收。如果 ticker 在此期间给同一 agent 分配新任务,属正常行为。 # 进程退出 → on_complete release counter # 任务状态由各 outcome 自行处理(或等 ticker) await self._do_on_complete_async(on_complete, agent_id, outcome) async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count): """情况 B:monitor timeout""" # 读已缓冲的 stderr try: remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0) if remaining: stderr_chunks.append(remaining) except Exception: pass state = self._check_session_state(agent_id) # B1: 假死 - 先复活,连续假死 ≥2 次再 failed if state.get("status") == "running" and not state.get("lock_pid_alive", True): # 假死计数 stuck_count = self._stuck_counts.get(task_id, 0) + 1 self._stuck_counts[task_id] = stuck_count if stuck_count >= 2: # 连续假死 ≥2 次,标 failed logger.error("Agent %s session stuck %d times (session=%s, lock PID dead)", agent_id, stuck_count, session_id) self._mark_task(db_path, task_id, "failed", {"reason": "session_stuck", "stuck_count": stuck_count, "diagnostics": state}) await self._do_on_complete_async(on_complete, agent_id, "session_stuck") return # 第 1 次假死 → 尝试复活 logger.warning("Agent %s session stuck (attempt %d), reviving (session=%s)", agent_id, stuck_count, session_id) revived = self._revive_session(agent_id) if revived: logger.info("Agent %s session revived, releasing counter for ticker re-dispatch", agent_id) # release counter → 任务保持 working → ticker 下次 re-dispatch await self._do_on_complete_async(on_complete, agent_id, "session_revived") else: # 复活失败 → 标 failed logger.error("Agent %s revive failed, marking failed", agent_id) self._mark_task(db_path, task_id, "failed", {"reason": "revive_failed", "stuck_count": stuck_count, "diagnostics": state}) await self._do_on_complete_async(on_complete, agent_id, "revive_failed") return # B2/B3/B4: 进程还活着 # B2: compact 进行中 - 不计入 monitor timeout 计数,继续等 if state.get("recent_compact"): logger.info("Agent %s recent compaction detected, extending patience " "(session=%s, monitor=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) # 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次 # 用独立计数器防止无限等待 compact_wait_count = self._compact_waits.get(task_id, 0) + 1 self._compact_waits[task_id] = compact_wait_count if compact_wait_count >= self.max_monitor_timeouts: # #07.3 ACT-2: compact_hanging 不标 failed,只 release counter # 进程还活着但不 monitor,等 ticker _check_timeouts 超时回收 → 重新 dispatch logger.warning("Agent %s compact hanging after %d waits, releasing counter for ticker re-dispatch", agent_id, compact_wait_count) self._compact_waits.pop(task_id, None) await self._do_on_complete_async(on_complete, agent_id, "compact_hanging") return # 继续等 asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) return # B3/B4: 无 compact,正常计数 monitor_timeout_count += 1 if monitor_timeout_count >= self.max_monitor_timeouts: logger.error("Agent %s max monitor timeouts (session=%s, count=%d)", agent_id, session_id, monitor_timeout_count) self._mark_task(db_path, task_id, "failed", { "reason": "max_monitor_timeouts", "count": monitor_timeout_count, "elapsed_seconds": monitor_timeout_count * int(self.agent_timeout), "diagnostics": state, }) await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts") return # 未超限:继续等(不 release counter) logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): """续杯:手动 release counter 后通过 spawn_full_agent 重新 spawn v2.7.2: 进程已退出但 wrapped_on_complete 未被调用(只有 should_retry 分支走到这里)。 需要手动 release counter,然后 spawn_full_agent 内部会 acquire。 on_complete(含 counter release)置为 None,避免 double release。 """ # v2.8.1 Bug-4 fix: 不再手动 release counter + 置 None on_complete # counter 从原始 spawn 保持到 retry 完成,避免窗口期 ticker acquire 同一 agent # on_complete 保留原始 wrapped_on_complete,retry 完成后自然 release counter # 续杯前检查任务状态,已终态则跳过 if db_path and task_id: try: conn = get_connection(db_path) try: row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() # Bug-6 fix: pending 不是终态 if row and row["status"] in ("done", "failed", "cancelled", "review"): logger.info("Retry skip: task %s already %s (agent=%s)", task_id, row["status"], agent_id) # on_complete = wrapped_on_complete,会 release counter await self._do_on_complete_async(on_complete, agent_id, "task_already_done") return finally: conn.close() except Exception: logger.warning("Retry status check failed for %s, proceeding", task_id) # 直接读写 tasks 表的 retry_count if retry_field == "retry_count" and db_path and task_id: try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET retry_count = COALESCE(retry_count, 0) + 1 WHERE id=?", (task_id,), ) conn.commit() row = conn.execute( "SELECT retry_count FROM tasks WHERE id=?", (task_id,) ).fetchone() count = row["retry_count"] if row else 1 finally: conn.close() except Exception: logger.exception("Failed to update retry_count for task %s", task_id) count = 1 else: retry_counts = self._get_retry_counts(db_path, task_id) count = retry_counts.get(retry_field, 0) + 1 retry_counts[retry_field] = count self._update_retry_counts(db_path, task_id, retry_counts) if count >= self.max_retries: logger.error("Agent %s max retries (session=%s, %s=%d)", agent_id, session_id, retry_field, count) self._mark_task(db_path, task_id, "failed", { "reason": f"max_{retry_field}", "count": count, }) await self._do_on_complete_async(on_complete, agent_id, "max_retries") return logger.info("Agent %s retry %s=%d/%d (session=%s)", agent_id, retry_field, count, self.max_retries, session_id) # 构建续杯 message(Mail 用专用模板,Task 用标准模板) task_info = self._get_task_info(db_path, task_id) or {} project_id = task_info.get("project_id", "") is_mail = project_id == "_mail" if is_mail: must_haves = task_info.get("must_haves", "{}") try: meta = json.loads(must_haves) if must_haves else {} except Exception: meta = {} message = MAIL_RETRY_PROMPT.format( from_agent=meta.get("from", "unknown"), title=task_info.get("title", ""), retry_count=count, max_retries=self.max_retries, ) else: fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" message = self.RETRY_PROMPT.format( project_id=project_id, task_id=task_id or "", title=task_info.get("title", ""), retry_count=count, max_retries=self.max_retries, api_host=self.api_host, api_port=self.api_port, agent_id=agent_id, fallback_hint=fallback_hint, ) # v2.7.2: 通过 spawn_full_agent 重新 spawn(内部 can_acquire + acquire) # on_complete = wrapped_on_complete(含 counter release),作为业务回调传入 try: await self.spawn_full_agent( agent_id=agent_id, message=message, task_id=task_id, on_complete=on_complete, use_main_session=True, # #02: 续杯走 main session task_db_path=db_path, skip_counter=True, # Bug-4 fix: counter 已在原始 spawn 中持有 ) except AgentBusyError as e: # #07.3 ACT-3: session busy(compact/lock/running)= 暂时性阻塞 # release counter → 任务保持 working → ticker 重新 dispatch logger.warning("Retry spawn deferred: %s session busy (%s), releasing counter for ticker re-dispatch", agent_id, e.reason) await self._do_on_complete_async(on_complete, agent_id, "retry_session_busy") except Exception: logger.exception("Retry spawn failed for %s", agent_id) await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed") # ── 辅助方法 ── @staticmethod def _parse_stdout_json(stdout_text: str) -> dict: """解析 openclaw agent --json 的 stdout 输出 返回可直接使用的字段:status, summary, fallback_used, fallback_reason, payloads 不再提取 meta,直接用顶层字段。 """ text = stdout_text.strip() if not text: return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} try: data = json.loads(text) except json.JSONDecodeError: # 多行输出,找最后一个 JSON for line in reversed(text.splitlines()): try: data = json.loads(line) break except json.JSONDecodeError: continue else: return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []} # 从 data.result.meta.executionTrace 取 fallback 信息 result = data.get("result", {}) meta = result.get("meta", {}) trace = meta.get("executionTrace", {}) return { "status": data.get("status"), "summary": data.get("summary"), "fallback_used": trace.get("fallbackUsed", False), "fallback_reason": trace.get("fallbackReason"), "payloads": result.get("payloads", []), } @staticmethod def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]: """查任务实际 API 状态""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() return row["status"] if row else None finally: conn.close() except Exception: return None @staticmethod def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]: """查任务基本信息""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT id, title, status FROM tasks WHERE id=?", (task_id,) ).fetchone() if not row: return None info = dict(row) # 从 db_path 推断 project_id: data//blackboard.db info["project_id"] = db_path.parent.name return info finally: conn.close() except Exception: return None @staticmethod def _revive_session(agent_id: str) -> bool: """假死复活术:修改 sessions.json status 从 running 改为 idle""" sessions_path = Path(os.environ.get( "OPENCLAW_HOME", str(Path.home() / ".openclaw") )) / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return False try: with open(sessions_path) as f: sessions = json.load(f) main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) if main_session.get("status") != "running": return False # 不是 running 状态,不需要复活 main_session["status"] = "idle" sessions[main_key] = main_session with open(sessions_path, "w") as f: json.dump(sessions, f, indent=2) logger.info("Revived %s: sessions.json status changed running→idle", agent_id) # #07 O4: 同时清理残留 lock 文件 sf = main_session.get("sessionFile", "") if sf: lock_path = Path(sf + ".lock") if lock_path.exists(): try: lock_path.unlink() logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name) except Exception: pass return True except Exception: logger.exception("Failed to revive %s", agent_id) return False @staticmethod def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 900) -> bool: """v2.8.2 Fix-2: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。 比 compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录, 但不保证更新 compactionCheckpoints。 v2.8.2: 窗口从 300s→900s(15min), 尾部读取从 50KB→1MB。 实测 50KB 在长对话中不够(compact 记录被推出窗口导致漏检)。 正常扫描量不变:从尾部往前扫,遇到超过 15min 的 timestamp 即 break。 """ if not session_file or not Path(session_file).exists(): return False try: from datetime import datetime, timezone now = datetime.now(timezone.utc) with open(session_file, "rb") as sf: sf.seek(0, 2) size = sf.tell() sf.seek(max(0, size - 1048576)) tail = sf.read().decode("utf-8", errors="replace") for line in reversed(tail.splitlines()): if not line.strip(): continue try: import json as _json obj = _json.loads(line) except (_json.JSONDecodeError, ValueError): continue if obj.get("type") == "compaction": ts = obj.get("timestamp", "") if ts: try: ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) if (now - ct).total_seconds() < window_seconds: return True except (ValueError, TypeError): pass ts = obj.get("timestamp", "") if ts: try: ct = datetime.fromisoformat(ts.replace("Z", "+00:00")) if (now - ct).total_seconds() >= window_seconds: break except (ValueError, TypeError): pass return False except Exception: return False @staticmethod def _check_session_state(agent_id: str) -> dict: """检查 sessions.json 和 lock 状态 v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1), 替代失效的 compactionCheckpoints 检测。 """ result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} sessions_path = Path(os.environ.get( "OPENCLAW_HOME", str(Path.home() / ".openclaw") )) / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return result try: with open(sessions_path) as f: sessions = json.load(f) main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) result["status"] = main_session.get("status", "unknown") # 检查 lock (v3.1: done/timeout 时 lock 视为过期) sf = main_session.get("sessionFile", "") if sf: lock_path = Path(sf + ".lock") if lock_path.exists(): try: lock_data = json.loads(lock_path.read_text()) pid = lock_data.get("pid") result["lock_pid"] = pid if pid: try: os.kill(pid, 0) result["lock_pid_alive"] = True except ProcessLookupError: result["lock_pid_alive"] = False # session 已完成/超时 > lock 是 Gateway 冷却锁,不阻塞新 turn if result["status"] in ("done", "timeout"): result["lock_pid_alive"] = False result["lock_expired"] = True # running + lock 超时 >30分钟 > 视为 idle,允许 dispatch elif result["status"] == "running" and result["lock_pid_alive"]: try: lock_data = json.loads(lock_path.read_text()) created_at_str = lock_data.get("createdAt", "") if created_at_str: from datetime import datetime as _dt, timezone as _tz created_dt = _dt.fromisoformat(created_at_str.replace("Z", "+00:00")) elapsed = (_dt.now(_tz.utc) - created_dt).total_seconds() if elapsed > 1800: # 30 minutes result["lock_pid_alive"] = False result["lock_expired"] = True logger.info("Lock expired for %s: running + lock age %.0fs > 1800s", agent_id, elapsed) except Exception: pass except Exception: pass # v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描 # 只在 agent 非空闲时才扫描(减少不必要 I/O) if result["status"] not in ("done", "idle", "unknown", None) and sf: result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf) except Exception: pass return result @staticmethod def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str, task_status: Optional[str], stdout_text: str = "") -> dict: """分类退出原因,返回处理策略 v3.1: A0 拆分为 A14-A17(信号中断/stderr 智能分类)。 A8/A10 改为可恢复 retry。cooldown 统一 60s。 """ status = json_result.get("status") summary = json_result.get("summary", "") fallback_used = json_result.get("fallback_used", False) # A4: 任务 DB status=failed(Agent 自己标的) if task_status == "failed": return {"outcome": "agent_failed", "should_retry": False} # A1: status=ok + completed + 非 fallback if status == "ok" and summary == "completed" and not fallback_used: return {"outcome": "completed", "should_retry": False} # A5/A6: status=ok + fallback if status == "ok" and fallback_used: return {"outcome": "fallback_timeout", "should_retry": False} # A2/A3: status=timeout → 唯一续杯场景 # 注意: PM2 restart 时 daemon 自身也收到 SIGTERM,此时 retry spawn 的新进程 # 会随 daemon 一起被杀。A14 retry 假设 daemon 存活,PM2 级重启不在此场景内。 if status == "timeout": return {"outcome": "gateway_timeout", "should_retry": True, "retry_field": "retry_count"} # A0 拆分: 无 JSON 输出 + exit≠0 if status is None and not stdout_text.strip() and exit_code != 0: # A14: SIGINT(130) / SIGTERM(143) → 外部中断,可恢复 if exit_code in (130, 143): return {"outcome": "interrupted", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} # A15/A16: stderr 含 network/compact 关键字 → 可恢复 if stderr_text: stderr_lower = stderr_text.lower() if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]): return {"outcome": "gateway_unreachable", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): return {"outcome": "compact_interrupted", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} # A17: 真正的 crash → 保持 working,ticker 兜底 return {"outcome": "crashed", "should_retry": False, "original": "process_crash"} # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 # 查任务状态判断 if status is None and not stdout_text.strip() and exit_code == 0: terminal_statuses = {"done", "review"} if task_status in terminal_statuses: return {"outcome": "completed", "should_retry": False} return {"outcome": "agent_error", "should_retry": False} # A7-A12: status=error → 不续杯,stderr 辅助分类 if status == "error": stderr_lower = stderr_text.lower() if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]): return {"outcome": "auth_failed", "should_retry": False} if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]): return {"outcome": "gateway_unreachable", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]): return {"outcome": "api_error", "should_retry": False} if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]): return {"outcome": "compact_failed", "should_retry": False} if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]): return {"outcome": "lock_conflict", "should_retry": True, "retry_field": "retry_count", "cooldown_seconds": 60} return {"outcome": "agent_error", "should_retry": False} # 兜底:status 未知值 return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"} @staticmethod def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict: """从最新 task_attempt 的 metadata 读计数器""" defaults = {"retry_count": 0, "connect_retry_count": 0, "api_retry_count": 0, "lock_retry_count": 0, "monitor_timeout_count": 0} if not db_path or not task_id: return defaults try: conn = get_connection(db_path) try: row = conn.execute( "SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1", (task_id,) ).fetchone() if row and row["metadata"]: stored = json.loads(row["metadata"]) for k in defaults: if k in stored: defaults[k] = stored[k] finally: conn.close() except Exception: pass return defaults def _update_retry_counts(self, db_path: Optional[Path], task_id: Optional[str], counts: dict): """将 retry counts 写回最新 task_attempt 的 metadata""" if not db_path or not task_id: return try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT rowid, metadata FROM task_attempts " "WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1", (task_id,) ).fetchone() if row: meta = json.loads(row["metadata"]) if row["metadata"] else {} meta.update(counts) conn.execute( "UPDATE task_attempts SET metadata=? WHERE rowid=?", (json.dumps(meta), row["rowid"]) ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to update retry counts for task %s", task_id) def _mark_task(self, db_path: Optional[Path], task_id: Optional[str], status: str, detail: Optional[dict] = None): """标记任务状态(用于 failed/escalate)""" if not db_path or not task_id: return try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?", (status, task_id) ) if detail: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", status, json.dumps(detail, ensure_ascii=False)) ) conn.commit() finally: conn.close() # F2: conn 已关闭,Blackboard 内部自己 get_connection if status == "failed": reason = (detail or {}).get("reason", "unknown") try: from src.daemon.mail_notify import _is_mail_project, notify_mail_failed if _is_mail_project(db_path): # Mail 失败:通知发件人,不 @pangtong notify_mail_failed(db_path, task_id, reason, detail) else: # Task 失败:@pangtong(F2 原逻辑) from src.blackboard.operations import Blackboard bb = Blackboard(db_path) cid = bb.add_comment(task_id, "daemon", f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入", comment_type="system") bb.record_mentions(cid, task_id, ["pangtong-fujunshi"]) logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason) except Exception as e: logger.warning("Task %s: failed to notify: %s", task_id, e) except Exception: logger.exception("Failed to mark task %s as %s", task_id, status) @staticmethod def _do_on_complete(on_complete, agent_id, outcome): """执行 on_complete 回调(同步+异步兼容)""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): # 注意:这里是同步调用的,不能 await # 在 _monitor_process 的 async 上下文中应该用 await pass except Exception: pass async def _do_on_complete_async(self, on_complete, agent_id, outcome): """异步执行 on_complete 回调""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): await result except Exception: logger.warning("on_complete callback failed for %s", agent_id, exc_info=True) def _register_session( self, session_id: str, agent_id: str, task_id: Optional[str], pid: Optional[int], broadcast_task_ids: Optional[List[str]] = None, ) -> None: """注册 spawn session""" self._sessions[session_id] = { "agent_id": agent_id, "task_id": task_id, "pid": pid, "status": "running", "started_at": datetime.utcnow().isoformat(), "completed_at": None, "broadcast_task_ids": broadcast_task_ids, } def _record_attempt( self, task_id: Optional[str], agent_id: str, outcome: str, exit_code: Optional[int] = None, error: Optional[str] = None, metadata: Optional[dict] = None, db_path: Optional[Path] = None, ) -> None: """记录 task_attempt""" # 广播 spawn 产生的 "broadcast" task_id 不记录 attempts,避免脏数据 if task_id == "broadcast": return effective_db = db_path or self.db_path if not task_id or not effective_db: return try: conn = get_connection(effective_db) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?", (task_id,), ).fetchone() attempt_number = (row["max_a"] or 0) + 1 meta = metadata or {} if error: meta["error"] = error conn.execute( "INSERT INTO task_attempts " "(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) " "VALUES (?,?,?,?,?,?,datetime('now'))", (task_id, attempt_number, agent_id, outcome, exit_code, json.dumps(meta)), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent_id, "agent_completed" if outcome == "completed" else "daemon_tick", json.dumps({"outcome": outcome, "attempt": attempt_number})), ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to record attempt for task %s", task_id) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """获取 session 信息""" return self._sessions.get(session_id) def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]: """v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)""" for sid, info in self._sessions.items(): if info.get("agent_id") == agent_id and info.get("status") == "running": return info return None def cleanup_session(self, session_id: str) -> None: """清理 session""" if session_id in self._sessions: session = self._sessions[session_id] task_id = session.get("task_id") del self._sessions[session_id] # 清理 B2 compact 等待计数器 if task_id and task_id in self._compact_waits: del self._compact_waits[task_id]