"""Agent Spawner — 异步 spawn Full Agent / Subagent Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional from src.blackboard.db import get_connection, init_db logger = logging.getLogger("moziplus-v2.spawner") # ── Prompt 模板 ── SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务。请严格按照下面的步骤执行。 ## 任务信息 - 项目: {project_id} - 任务ID: {task_id} - 标题: {title} - 描述: {description} - 类型: {task_type} - 优先级: {priority} - 必要条件: {must_haves} {retry_context} ## 状态机(你必须遵守的状态流转) ``` pending → claimed → working → review → done │ │ │ └→ pending(驳回重做) ├──→ failed ├──→ blocked └──→ cancelled ``` 你当前处于 **{current_status}** 状态。 ## 执行步骤 ### 步骤 1: 开始工作 立即调 API 标记你已开始: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 步骤 2: 执行任务 根据任务描述完成你的工作(编码/回测/数据检查/审查等)。 ### 步骤 3: 写入产出 ⚠️ 这一步是必须的!不写产出 = 任务没完成。 ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<产出类型>", "title": "<产出标题>", "content": "<你的产出内容>", "summary": "<简要说明>"}}' ``` **type 必须是以下之一**: code, document, data, config, other 如果产出太长,可以写文件后用路径引用: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "code", "title": "main.py", "content_path": "/path/to/file.py", "summary": "主程序"}}' ``` ### 步骤 4: 提交审查或标记失败 ✅ 成功完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "review", "agent": "{agent_id}"}}' ``` ❌ 无法完成: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}' ``` ## Fallback(API 调用失败时) 如果 API 失败 2 次,尝试: ```bash curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "failed", "agent": "{agent_id}", "detail": "API回写失败,产出在本地文件"}}' ``` ## 参考链接 - 查看任务完整信息: GET http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}?expand=all - 写评论: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/comments {{"author": "{agent_id}", "body": "..."}} - 完整 API 契约: docs/design/agent-api-contract.md """ class AgentSpawner: """Agent spawn 管理""" def __init__( self, db_path: Optional[Path] = None, agent_timeout: float = 600.0, dry_run: bool = False, api_host: str = "127.0.0.1", api_port: int = 8083, bootstrap_builder: Optional[Any] = None, ): """ Args: db_path: 项目黑板 DB 路径(用于写 task_attempts) agent_timeout: Agent 超时秒数 dry_run: 测试模式,不实际 spawn api_host: API 地址(供 Agent 回写) api_port: API 端口(供 Agent 回写) """ self.db_path = db_path self.agent_timeout = agent_timeout self.dry_run = dry_run self.api_host = api_host self.api_port = api_port self.bootstrap_builder = bootstrap_builder # session 注册表 {session_id: {...}} self._sessions: Dict[str, Dict[str, Any]] = {} @property def active_sessions(self) -> Dict[str, Dict[str, Any]]: """当前活跃的 spawn sessions""" return {sid: s for sid, s in self._sessions.items() if s.get("status") == "running"} def build_spawn_message( self, task_id: str, title: str, description: str, task_type: str = "", priority: int = 5, must_haves: str = "", project_id: str = "", agent_id: str = "", current_status: str = "claimed", retry_context: str = "", task: Optional[Any] = None, project_config: Optional[Dict[str, Any]] = None, ) -> str: """构建 Agent spawn 的消息(优先用 BootstrapBuilder,fallback 用模板) Args: current_status: 任务当前状态(动态生成状态机提示) retry_context: 重试上下文(前轮产出摘要 + 审查意见) task: Task 对象(BootstrapBuilder 用) project_config: 项目配置(BootstrapBuilder 用) """ # 尝试 BootstrapBuilder if self.bootstrap_builder and task is not None: try: bootstrap_prompt = self.bootstrap_builder.build_for_task( task=task, role="executor", project_config=project_config, ) # 在 bootstrap 后追加操作指令(状态机 + API 回写) api_section = self._build_api_section( project_id, task_id, agent_id) return bootstrap_prompt + "\n\n---\n\n" + api_section except Exception: logger.exception("BootstrapBuilder failed, falling back to template") # Fallback: 使用硬编码模板 return SPAWN_PROMPT_TEMPLATE.format( project_id=project_id, task_id=task_id, title=title, description=description or "(无描述)", task_type=task_type or "general", priority=priority, must_haves=must_haves or "(无)", agent_id=agent_id, api_host=self.api_host, api_port=self.api_port, current_status=current_status or "claimed", retry_context=retry_context or "", ) def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" return f"""## 操作指令 ### 状态回写 开始工作: ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/status \ -H 'Content-Type: application/json' \ -d '{{"status": "working", "agent": "{agent_id}"}}' ``` ### 写入产出 ```bash curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/tasks/{task_id}/outputs \ -H 'Content-Type: application/json' \ -d '{{"agent": "{agent_id}", "type": "<类型>", "title": "<标题>", "content": "<内容>", "summary": "<摘要>"}}' ``` ### 完成后 成功:status → "review" | 失败:status → "failed" """ async def spawn_full_agent( self, agent_id: str, message: str, new_session: bool = False, task_id: Optional[str] = None, on_complete: Optional[Any] = None, ) -> str: """Spawn Full Agent(异步非阻塞) Args: on_complete: async callback(agent_id, outcome) — Agent 完成后调用 Returns: session_id """ session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id) self._register_session(session_id, agent_id, task_id, pid=None) return session_id cmd = [ "openclaw", "agent", "--agent", agent_id, "--session-id", session_id, "--message", message, "--json", ] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._register_session(session_id, agent_id, task_id, proc.pid) logger.info("Spawned agent %s (session=%s, pid=%d)", agent_id, session_id, proc.pid) # Schedule timeout + cleanup asyncio.create_task( self._monitor_process(session_id, proc, agent_id, task_id, on_complete=on_complete) ) return session_id except Exception as e: logger.exception("Failed to spawn agent %s", agent_id) self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) raise async def spawn_subagent( self, task_description: str, task_id: Optional[str] = None, ) -> str: """Spawn Subagent(占位,实际通过 Gateway API) Returns: session_id """ session_id = str(uuid.uuid4()) if self.dry_run: logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id # TODO: F17 通过 Gateway API sessions_spawn 实现 logger.info("Subagent spawn (session=%s) - placeholder", session_id) self._register_session(session_id, "subagent", task_id, pid=None) return session_id async def _monitor_process( self, session_id: str, proc: asyncio.subprocess.Process, agent_id: str, task_id: Optional[str], on_complete: Optional[Any] = None, ) -> None: """监控子进程,超时 kill,完成后记录""" try: await asyncio.wait_for(proc.wait(), timeout=self.agent_timeout) outcome = "completed" exit_code = proc.returncode except asyncio.TimeoutError: proc.kill() await proc.wait() outcome = "timed_out" exit_code = -1 logger.warning("Agent %s timed out (session=%s)", agent_id, session_id) # 更新 session 状态 if session_id in self._sessions: self._sessions[session_id]["status"] = outcome self._sessions[session_id]["completed_at"] = datetime.utcnow().isoformat() # 记录 task_attempt self._record_attempt(task_id, agent_id, outcome, exit_code=exit_code) logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d)", agent_id, session_id, outcome, exit_code) # 完成回调(释放 counter 等) if on_complete: try: result = on_complete(agent_id, outcome) import asyncio if asyncio.iscoroutine(result): await result except Exception: logger.warning("on_complete callback failed for %s", agent_id, exc_info=True) def _register_session( self, session_id: str, agent_id: str, task_id: Optional[str], pid: Optional[int], ) -> None: """注册 spawn session""" self._sessions[session_id] = { "agent_id": agent_id, "task_id": task_id, "pid": pid, "status": "running", "started_at": datetime.utcnow().isoformat(), "completed_at": None, } def _record_attempt( self, task_id: Optional[str], agent_id: str, outcome: str, exit_code: Optional[int] = None, error: Optional[str] = None, ) -> None: """记录 task_attempt""" if not task_id or not self.db_path: return try: conn = get_connection(self.db_path) try: conn.execute("BEGIN IMMEDIATE") # 获取 attempt_number row = conn.execute( "SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?", (task_id,), ).fetchone() attempt_number = (row["max_a"] or 0) + 1 metadata = {"error": error} if error else {} conn.execute( "INSERT INTO task_attempts " "(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) " "VALUES (?,?,?,?,?,?,datetime('now'))", (task_id, attempt_number, agent_id, outcome, exit_code, json.dumps(metadata)), ) conn.execute( "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", (task_id, agent_id, "agent_completed" if outcome == "completed" else "daemon_tick", json.dumps({"outcome": outcome, "attempt": attempt_number})), ) conn.commit() finally: conn.close() except Exception: logger.exception("Failed to record attempt for task %s", task_id) def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """获取 session 信息""" return self._sessions.get(session_id) def cleanup_session(self, session_id: str) -> None: """清理 session""" if session_id in self._sessions: del self._sessions[session_id]