"""Agent Spawner — 异步 spawn Full Agent / Subagent Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional from src.blackboard.db import get_connection, init_db logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照下面的步骤执行。 ## 任务信息 - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} - 描述: {description} - 类型: {task_type} - 优先级: {priority} - 必要条件: {must_haves} {retry_context} ## 状态机(你必须遵守的状态流转) ``` pending → claimed → working → review → done │ │ │ └→ pending(驳回重做) ├──→ failed ├──→ blocked └──→ cancelled ``` 你当前处于 **{current_status}** 状态。 ## 执行步骤 ### 步骤 1: 开始工作 立即调 API 标记你已开始: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 步骤 2: 执行任务 根据任务描述完成你的工作(编码/回测/数据检查/审查等)。 ### 步骤 3: 写入产出 ⚠️ 这一步是必须的!不写产出 = 任务没完成。 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<产出类型>", "title": "<产出标题>", "content": "<你的产出内容>", "summary": "<简要说明>"}}' ``` **type 必须是以下之一**: code, document, data, config, other 如果产出太长,可以写文件后用路径引用: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "code", "title": "main.py", "content_path": "/path/to/file.py", "summary": "主程序"}}' ``` ### 步骤 4: 提交审查或标记失败 ✅ 成功完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` ❌ 无法完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` ## Fallback(API 调用失败时) 如果 API 失败 2 次,尝试: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}' ``` ## 参考链接 - 查看任务完整信息: GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all - 写评论: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments {{"author": "{agent_id}", "body": "..."}} - 完整 API 契约: docs/design/agent-api-contract.md """ class AgentSpawner: """Agent spawn 管理""" def __init__( self, db_path: Optional[Path] = None, agent_timeout: float = 630.0, dry_run: bool = False, api_host: str = "127.0.0.1", api_port: int = 8083, bootstrap_builder: Optional[Any] = None, gateway_timeout: float = 600.0, max_retries: int = 3, max_monitor_timeouts: int = 3, ): """ Args: db_path: 项目黑板 DB 路径(用于写 task_attempts) agent_timeout: Agent 超时秒数 dry_run: 测试模式,不实际 spawn api_host: API 地址(供 Agent 回写) api_port: API 端口(供 Agent 回写) """ self.db_path = db_path self.agent_timeout = agent_timeout self.dry_run = dry_run self.api_host = api_host self.api_port = api_port self.bootstrap_builder = bootstrap_builder self.gateway_timeout = gateway_timeout self.max_retries = max_retries self.max_monitor_timeouts = max_monitor_timeouts # session 注册表 {session_id: {...}} self._sessions: Dict[str, Dict[str, Any]] = {} @property def active_sessions(self) -> Dict[str, Dict[str, Any]]: """当前活跃的 spawn sessions""" return {sid: s for sid, s in self._sessions.items() if s.get("status") == "running"} def build_spawn_message( self, task_id: str, title: str, description: str, task_type: str = "", priority: int = 5, must_haves: str = "", project_id: str = "", agent_id: str = "", current_status: str = "claimed", retry_context: str = "", task: Optional[Any] = None, project_config: Optional[Dict[str, Any]] = None, ) -> str: """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) Args: current_status: 任务当前状态(动态生成状态机提示) retry_context: 重试上下文(前轮产出摘要 + 审查意见) task: Task 对象(BootstrapBuilder 用) project_config: 项目配置(BootstrapBuilder 用) """ # 尝试 BootstrapBuilder if self.bootstrap_builder and task is not None: try: bootstrap_prompt = self.bootstrap_builder.build_for_task( task=task, role="executor", project_config=project_config, ) # 在 bootstrap 后追加操作指令(状态机 + API 回写) api_section = self._build_api_section( project_id, task_id, agent_id) return bootstrap_prompt + "\n\n---\n\n" + api_section except Exception: logger.exception("BootstrapBuilder failed, falling back to template") # Fallback: 使用硬编码模板 return SPAWN_PROMPT_TEMPLATE.format( project_id=project_id, task_id=task_id, title=title, description=description or "(无描述)", task_type=task_type or "general", priority=priority, must_haves=must_haves or "(无)", agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, current_status=current_status or "claimed", retry_context=retry_context or "", ) def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" return f"""## 操作指令 ### 状态回写 开始工作: ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 写入产出 ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 完成后 成功:status → "review" | 失败:status → "failed" """ async def spawn_full_agent( self, agent_id: str, message: str, new_session: bool = False, task_id: Optional[str] = None, on_complete: Optional[Any] = None, use_main_session: bool = False, task_db_path: Optional[Path] = None, reuse_session_id: Optional[str] = None, ) -> str: """Spawn Full Agent(异步非阻塞) Args: on_complete: async callback(agent_id, outcome) — Agent 完成后调用 use_main_session: True = 投递到主 Agent session(不传 --session-id) reuse_session_id: 传入指定 session-id 复用(用于续杯) Returns: session_id """ # Session 策略:main > reuse > new if use_main_session: session_id = None elif reuse_session_id: session_id = reuse_session_id else: session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id or "main") self._register_session(session_id or "main", agent_id, task_id, pid=None) return session_id or "main" cmd = [ "openclaw", "agent", "--agent", agent_id, ] if session_id: cmd.extend(["--session-id", session_id]) cmd.extend([ "--message", message, "--json", "--timeout", str(int(self.gateway_timeout)), ]) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._register_session(session_id, agent_id, task_id, proc.pid) logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) # Schedule monitor asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=task_db_path or self.db_path) ) return session_id except Exception as e: logger.exception("Failed to spawn agent %s", agent_id) self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) raise async def spawn_subagent( self, task_description: str, task_id: Optional[str] = None, ) -> str: """Spawn Subagent(占位,实际通过 Gateway API) Returns: session_id """ session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # TODO: F17 通过 Gateway API sessions_spawn 实现 logger.info("Subagent spawn (session=%s) - placeholder", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # ── 续杯 Prompt 模板 ── RETRY_PROMPT = """你收到一个续杯提醒。你的任务在执行过程中被中断了。 ## 任务信息 - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} - 续杯次数: 第 {retry_count} 次(上限 {max_retries} 次) 请检查 session 历史中你之前做了什么,然后继续未完成的工作。 ## 操作指令 ### 查看任务当前状态 ```bash curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all ``` ### 如果已经完成,标记 review ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` ### 写入产出(如果之前没写) ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \\ -H 'Content-Type: application/json' \\ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 如果无法解决,标记失败 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` {fallback_hint}""" async def _monitor_process( self, session_id: Optional[str], proc: asyncio.subprocess.Process, agent_id: str, task_id: Optional[str], on_complete: Optional[Any] = None, db_path: Optional[Path] = None, monitor_timeout_count: int = 0, ) -> None: """监控子进程全生命周期(设计文档 spawner-monitor-design.md)""" stdout_chunks: list = [] stderr_chunks: list = [] try: # ── 等待进程退出 + 流式读取 ── async def _read_streams(): async def _read_out(): while True: chunk = await proc.stdout.read(4096) if not chunk: break stdout_chunks.append(chunk) async def _read_err(): while True: chunk = await proc.stderr.read(4096) if not chunk: break stderr_chunks.append(chunk) await asyncio.gather(_read_out(), _read_err(), proc.wait()) await asyncio.wait_for(_read_streams(), timeout=self.agent_timeout) # ── 情况 A:进程退出 ── exit_code = proc.returncode await self._handle_exit( session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path ) except asyncio.TimeoutError: # ── 情况 B:monitor timeout(进程没退出)── logger.warning("Agent %s monitor timeout (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count + 1, self.max_monitor_timeouts) await self._handle_monitor_timeout( session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count ) async def _handle_exit(self, session_id, agent_id, task_id, exit_code, stdout_chunks, stderr_chunks, on_complete, db_path): """情况 A:进程退出后的处理""" stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace") stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # 解析 stdout JSON meta = self._parse_stdout_json(stdout_text) # 查任务实际状态 task_status = self._get_task_status(db_path, task_id) if task_id else None # 分类 cls = self._classify_outcome(exit_code, meta, stderr_text, task_status) outcome = cls["outcome"] # 更新 session 状态 sid = session_id or "main" if sid in self._sessions: self._sessions[sid]["status"] = outcome self._sessions[sid]["completed_at"] = datetime.utcnow().isoformat() self._sessions[sid]["exit_code"] = exit_code if meta: self._sessions[sid]["meta"] = meta # 记录 attempt self._record_attempt( task_id, agent_id, outcome, exit_code=exit_code, db_path=db_path, metadata={ "transport": meta.get("transport"), "fallback_reason": meta.get("fallbackReason"), "duration_ms": meta.get("durationMs"), "task_status_at_exit": task_status, } ) logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d, task_status=%s)", agent_id, session_id, outcome, exit_code, task_status) if cls["release_counter"]: await self._do_on_complete_async(on_complete, agent_id, outcome) elif cls["should_retry"]: # 续杯:先释放 counter,再 spawn # on_complete 不传入续杯链(避免 double release) # 续杯 Agent 退出后由 ticker 自然发现状态变化 if on_complete: on_complete(agent_id, "retry_release") await self._do_retry( session_id, agent_id, task_id, None, db_path, cls.get("retry_field", "retry_count") ) # else: 暂时性失败(A8/A9/A11),不 release,不 retry,等 ticker async def _handle_monitor_timeout(self, session_id, agent_id, task_id, proc, on_complete, db_path, stderr_chunks, monitor_timeout_count): """情况 B:monitor timeout""" # 读已缓冲的 stderr try: remaining = await asyncio.wait_for(proc.stderr.read(), timeout=2.0) if remaining: stderr_chunks.append(remaining) except Exception: pass stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # 检查 session 状态 state = self._check_session_state(agent_id) # B1: 假死 if state.get("status") == "running" and not state.get("lock_pid_alive", True): logger.error("Agent %s session stuck (session=%s, lock PID dead)", agent_id, session_id) self._mark_task(db_path, task_id, "failed", {"reason": "session_stuck", "diagnostics": state}) await self._do_on_complete_async(on_complete, agent_id, "session_stuck") return # B2/B3/B4: 进程还活着 # B2: compact 进行中 — 不计入 monitor timeout 计数,继续等 if state.get("recent_compact"): logger.info("Agent %s recent compaction detected, extending patience " "(session=%s, monitor=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) # 不递增 monitor_timeout_count,但最多额外等 max_monitor_timeouts 次 # 用独立计数器防止无限等待 compact_wait_count = getattr(self, f"_compact_waits_{task_id}", 0) + 1 setattr(self, f"_compact_waits_{task_id}", compact_wait_count) if compact_wait_count >= self.max_monitor_timeouts: logger.error("Agent %s max compact waits reached (session=%s, count=%d)", agent_id, session_id, compact_wait_count) self._mark_task(db_path, task_id, "failed", { "reason": "compact_hanging", "compact_wait_count": compact_wait_count, "diagnostics": state, }) await self._do_on_complete_async(on_complete, agent_id, "compact_hanging") return # 继续等 asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) return # B3/B4: 无 compact,正常计数 monitor_timeout_count += 1 if monitor_timeout_count >= self.max_monitor_timeouts: logger.error("Agent %s max monitor timeouts (session=%s, count=%d)", agent_id, session_id, monitor_timeout_count) self._mark_task(db_path, task_id, "failed", { "reason": "max_monitor_timeouts", "count": monitor_timeout_count, "elapsed_seconds": monitor_timeout_count * int(self.agent_timeout), "diagnostics": state, }) await self._do_on_complete_async(on_complete, agent_id, "max_monitor_timeouts") return # 未超限:继续等(不 release counter) logger.info("Agent %s continuing monitor (session=%s, count=%d/%d)", agent_id, session_id, monitor_timeout_count, self.max_monitor_timeouts) asyncio.create_task( self._monitor_process( session_id, proc, agent_id, task_id, on_complete=on_complete, db_path=db_path, monitor_timeout_count=monitor_timeout_count, ) ) async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): """续杯:用同一 session_id 再 spawn 一次""" retry_counts = self._get_retry_counts(db_path, task_id) count = retry_counts.get(retry_field, 0) + 1 # 更新计数器并写回最新 attempt 的 metadata retry_counts[retry_field] = count self._update_retry_counts(db_path, task_id, retry_counts) if count >= self.max_retries: logger.error("Agent %s max retries (session=%s, %s=%d)", agent_id, session_id, retry_field, count) self._mark_task(db_path, task_id, "failed", { "reason": f"max_{retry_field}", "count": count, }) await self._do_on_complete_async(on_complete, agent_id, "max_retries") return logger.info("Agent %s retry %s=%d/%d (session=%s)", agent_id, retry_field, count, self.max_retries, session_id) # 构建续杯 message task_info = self._get_task_info(db_path, task_id) or {} fallback_hint = "\n⚠️ 之前有 fallback 执行,请调 API 检查任务当前状态和已有产出,确认是否已完成。" if retry_field == "retry_count" else "" message = self.RETRY_PROMPT.format( project_id=task_info.get("project_id", ""), task_id=task_id or "", title=task_info.get("title", ""), retry_count=count, max_retries=self.max_retries, api_host=self.api_host, api_port=self.api_port, agent_id=agent_id, fallback_hint=fallback_hint, ) # 续杯 spawn(counter 已在 _handle_exit 释放) try: await self.spawn_full_agent( agent_id=agent_id, message=message, task_id=task_id, on_complete=on_complete, reuse_session_id=session_id, task_db_path=db_path, ) except Exception: logger.exception("Retry spawn failed for %s", agent_id) await self._do_on_complete_async(on_complete, agent_id, "retry_spawn_failed") # ── 辅助方法 ── @staticmethod def _parse_stdout_json(stdout_text: str) -> dict: """解析 openclaw agent --json 的 stdout 输出""" text = stdout_text.strip() if not text: return {} try: data = json.loads(text) return data.get("meta", {}) except json.JSONDecodeError: # 多行输出,找最后一个 JSON for line in reversed(text.splitlines()): try: data = json.loads(line) return data.get("meta", {}) except json.JSONDecodeError: continue return {} @staticmethod def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]: """查任务实际 API 状态""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT status FROM tasks WHERE id=?", (task_id,) ).fetchone() return row["status"] if row else None finally: conn.close() except Exception: return None @staticmethod def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]: """查任务基本信息""" if not db_path or not task_id: return None try: conn = get_connection(db_path) try: row = conn.execute( "SELECT id, title, status FROM tasks WHERE id=?", (task_id,) ).fetchone() if not row: return None info = dict(row) # 从 db_path 推断 project_id: data//blackboard.db info["project_id"] = db_path.parent.name return info finally: conn.close() except Exception: return None @staticmethod def _check_session_state(agent_id: str) -> dict: """检查 sessions.json 和 lock 状态""" result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False} sessions_path = Path.home() / ".openclaw" / "agents" / agent_id / "sessions" / "sessions.json" if not sessions_path.exists(): return result try: with open(sessions_path) as f: sessions = json.load(f) main_key = f"agent:{agent_id}:main" main_session = sessions.get(main_key, {}) result["status"] = main_session.get("status", "unknown") # 检查 lock sf = main_session.get("sessionFile", "") if sf: lock_path = Path(sf + ".lock") if lock_path.exists(): try: lock_data = json.loads(lock_path.read_text()) pid = lock_data.get("pid") result["lock_pid"] = pid if pid: import os try: os.kill(pid, 0) result["lock_pid_alive"] = True except ProcessLookupError: result["lock_pid_alive"] = False except Exception: pass # 最近 5 分钟的 compact import time now_ms = time.time() * 1000 for cp in main_session.get("compactionCheckpoints", []): if (now_ms - cp.get("createdAt", 0)) < 300_000: result["recent_compact"] = True break except Exception: pass return result @staticmethod def _classify_outcome(exit_code: int, meta: dict, stderr_text: str, task_status: Optional[str]) -> dict: """分类退出原因,返回处理策略""" transport = meta.get("transport", "") fallback_reason = meta.get("fallbackReason") # 终态判断 terminal_statuses = {"done", "review", "failed", "cancelled"} is_terminal = task_status in terminal_statuses # A4: 任务自己 failed if task_status == "failed": return {"outcome": "agent_failed", "release_counter": True, "should_retry": False} # A1: 正常完成 if exit_code == 0 and transport != "embedded" and is_terminal: return {"outcome": "completed", "release_counter": True, "should_retry": False} # A5/A6: fallback if exit_code == 0 and transport == "embedded": if is_terminal: return {"outcome": "fallback_timeout", "release_counter": True, "should_retry": False} # fallback 完成但任务没 done → 续杯 return {"outcome": "fallback_timeout", "release_counter": False, "should_retry": True, "retry_field": "retry_count"} # A2/A3: Gateway timeout(任务没完成) if exit_code == 0 and not is_terminal: return {"outcome": "gateway_timeout", "release_counter": False, "should_retry": True, "retry_field": "retry_count"} # A7: 认证失败 if exit_code != 0 and any(kw in stderr_text for kw in ["401", "403", "unauthorized", "auth"]): return {"outcome": "auth_failed", "release_counter": True, "should_retry": False} # A8: Gateway 不可达 if exit_code != 0 and any(kw in stderr_text for kw in ["ECONNREFUSED", "ETIMEDOUT", "gateway closed", "ECONNRESET"]): return {"outcome": "gateway_unreachable", "release_counter": True, "should_retry": False, # 让 ticker 自然重试 "count_field": "connect_retry_count"} # A9: API 错误 if exit_code != 0 and any(kw in stderr_text for kw in ["rate_limit", "500", "503", "API error"]): return {"outcome": "api_error", "release_counter": True, "should_retry": False, "count_field": "api_retry_count"} # A10: compact 失败 if exit_code != 0 and any(kw in stderr_text for kw in ["compaction-diag", "context-overflow", "timeout-compaction"]): return {"outcome": "compact_failed", "release_counter": False, "should_retry": True, "retry_field": "retry_count"} # A11: Lock 冲突 if exit_code != 0 and any(kw in stderr_text for kw in ["lock", "busy", "concurrent", "lane task error"]): return {"outcome": "lock_conflict", "release_counter": True, "should_retry": False, "count_field": "lock_retry_count"} # A12: 其他 return {"outcome": "agent_error", "release_counter": False, "should_retry": True, "retry_field": "retry_count"} @staticmethod def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict: """从最新 task_attempt 的 metadata 读计数器""" defaults = {"retry_count": 0, "connect_retry_count": 0, "api_retry_count": 0, "lock_retry_count": 0, "monitor_timeout_count": 0} if not db_path or not task_id: return defaults try: conn = get_connection(db_path) try: row = conn.execute( "SELECT metadata FROM task_attempts WHERE task_id=? ORDER BY attempt_number DESC LIMIT 1", (task_id,) ).fetchone() if row and row["metadata"]: stored = json.loads(row["metadata"]) for k in defaults: if k in stored: defaults[k] = stored[k] finally: conn.close() except Exception: pass return defaults def _mark_task(self, db_path: Optional[Path], task_id: Optional[str], status: str, detail: Optional[dict] = None): """标记任务状态(用于 failed/escalate)""" if not db_path or not task_id: return try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") conn.execute( "UPDATE tasks SET status=?, completed_at=datetime('now') WHERE id=?", (status, task_id) ) if detail: conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, "daemon", status, json.dumps(detail, ensure_ascii=False)) ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to mark task %s as %s", task_id, status) @staticmethod def _do_on_complete(on_complete, agent_id, outcome): """执行 on_complete 回调(同步+异步兼容)""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): # 注意:这里是同步调用的,不能 await # 在 _monitor_process 的 async 上下文中应该用 await pass except Exception: pass async def _do_on_complete_async(self, on_complete, agent_id, outcome): """异步执行 on_complete 回调""" if not on_complete: return try: result = on_complete(agent_id, outcome) if asyncio.iscoroutine(result): await result except Exception: logger.warning("on_complete callback failed for %s", agent_id, exc_info=True) def _register_session( self, session_id: str, agent_id: str, task_id: Optional[str], pid: Optional[int], ) -> None: """注册 spawn session""" self._sessions[session_id] = { "agent_id": agent_id, "task_id": task_id, "pid": pid, "status": "running", "started_at": datetime.utcnow().isoformat(), "completed_at": None, } def _record_attempt( self, task_id: Optional[str], agent_id: str, outcome: str, exit_code: Optional[int] = None, error: Optional[str] = None, metadata: Optional[dict] = None, db_path: Optional[Path] = None, ) -> None: """记录 task_attempt""" effective_db = db_path or self.db_path if not task_id or not effective_db: return try: conn = get_connection(effective_db) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( "SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?", (task_id,), ).fetchone() attempt_number = (row["max_a"] or 0) + 1 meta = metadata or {} if error: meta["error"] = error conn.execute( "INSERT INTO task_attempts " "(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) " "VALUES (?,?,?,?,?,?,datetime('now'))", (task_id, attempt_number, agent_id, outcome, exit_code, json.dumps(meta)), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent_id, "agent_completed" if outcome == "completed" else "daemon_tick", json.dumps({"outcome": outcome, "attempt": attempt_number})), ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to record attempt for task %s", task_id) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """获取 session 信息""" return self._sessions.get(session_id) def cleanup_session(self, session_id: str) -> None: """清理 session""" if session_id in self._sessions: del self._sessions[session_id]