From 0e2721de8064e52b02a76f2f7810d6d72a5904b4 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 05:59:56 +0800 Subject: [PATCH] auto-sync: 2026-05-17 05:59:56 --- src/daemon/dispatcher.py | 236 +++++++++++++++++++++++++++++++++++++++ src/daemon/spawner.py | 218 ++++++++++++++++++++++++++++++++++++ 2 files changed, 454 insertions(+) create mode 100644 src/daemon/dispatcher.py create mode 100644 src/daemon/spawner.py diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py new file mode 100644 index 0000000..3e344c3 --- /dev/null +++ b/src/daemon/dispatcher.py @@ -0,0 +1,236 @@ +"""Agent 调度器 — 三级决策树 + +Level 1: 纯机械检查 → Daemon 本地执行 +Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec) +Level 3: 无名字的一次性任务 → Subagent (sessions_spawn) +Level 4: 未知 → 庞统裁决 + +调度器只做决策,实际 spawn 由 spawner.py 执行。 +""" + +from __future__ import annotations + +import logging +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional + +from src.blackboard.models import Task +from src.blackboard.queries import Queries + +logger = logging.getLogger("moziplus-v2.dispatcher") + + +class DispatchLevel(str, Enum): + """调度级别""" + LOCAL = "local" # Daemon 本地执行 + FULL_AGENT = "full" # Full Agent spawn + SUB_AGENT = "sub" # Subagent spawn + ESCALATE = "escalate" # 升级庞统 + + +class Dispatcher: + """Agent 调度决策器""" + + # L1 本地执行的 action type + LOCAL_ACTIONS = frozenset({ + "L1_guardrail", + "format_check", + "file_exists_check", + "dependency_advance", + }) + + def __init__( + self, + registered_agents: Optional[List[str]] = None, + spawner: Optional[Any] = None, + counter: Optional[Any] = None, + ): + """ + Args: + registered_agents: 项目注册的 Agent ID 列表 + spawner: AgentSpawner 实例(执行实际 spawn) + counter: ActiveAgentCounter 实例(控制并发) + """ + self.registered_agents = set(registered_agents or []) + self.spawner = spawner + self.counter = counter + + def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: + """调度决策 + + Returns: + {"level": DispatchLevel, "agent_id": str, "reason": str} + """ + assignee = task.assignee + + # Level 1: 纯机械检查 → 本地执行 + if action_type in self.LOCAL_ACTIONS: + return { + "level": DispatchLevel.LOCAL, + "agent_id": "daemon", + "reason": f"Local action: {action_type}", + } + + # Level 2: 有名字的注册角色 → Full Agent + if assignee and assignee in self.registered_agents: + new_session = action_type == "adjudication" + return { + "level": DispatchLevel.FULL_AGENT, + "agent_id": assignee, + "new_session": new_session, + "reason": f"Registered agent: {assignee}", + } + + # Level 3: 无名字或未注册 → Subagent + if not assignee: + return { + "level": DispatchLevel.SUB_AGENT, + "agent_id": "subagent", + "reason": "No assignee, dispatch as subagent", + } + + # Level 4: 有 assignee 但未注册 → 升级庞统 + return { + "level": DispatchLevel.ESCALATE, + "agent_id": "pangtong-fujunshi", + "new_session": True, + "reason": f"Unknown agent '{assignee}', escalate to pangtong", + } + + async def dispatch(self, task: Task, action_type: str = "", + project_config: Optional[Dict] = None) -> Dict[str, Any]: + """执行调度(决策 + spawn) + + Returns: + {"level": str, "agent_id": str, "session_id": Optional[str], + "status": "dispatched"|"skipped"|"error", "reason": str} + """ + decision = self.decide(task, action_type) + level = decision["level"] + agent_id = decision["agent_id"] + + # 检查并发限制 + if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): + if not await self.counter.can_acquire(agent_id): + return { + "level": level.value, + "agent_id": agent_id, + "session_id": None, + "status": "skipped", + "reason": "Agent busy (concurrent limit)", + } + + # 本地执行 + if level == DispatchLevel.LOCAL: + return { + "level": level.value, + "agent_id": "daemon", + "session_id": None, + "status": "dispatched", + "reason": decision["reason"], + } + + # Full Agent spawn + if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): + if not self.spawner: + return { + "level": level.value, + "agent_id": agent_id, + "session_id": None, + "status": "error", + "reason": "No spawner configured", + } + + try: + if self.counter: + await self.counter.acquire(agent_id) + session_id = await self.spawner.spawn_full_agent( + agent_id=agent_id, + message=self._build_message(task, action_type), + new_session=decision.get("new_session", False), + task_id=task.id, + ) + return { + "level": level.value, + "agent_id": agent_id, + "session_id": session_id, + "status": "dispatched", + "reason": decision["reason"], + } + except Exception as e: + if self.counter: + self.counter.release(agent_id) + return { + "level": level.value, + "agent_id": agent_id, + "session_id": None, + "status": "error", + "reason": str(e), + } + + # Subagent spawn + if level == DispatchLevel.SUB_AGENT: + if not self.spawner: + return { + "level": level.value, + "agent_id": "subagent", + "session_id": None, + "status": "error", + "reason": "No spawner configured", + } + + try: + session_id = await self.spawner.spawn_subagent( + task_description=self._build_message(task, action_type), + task_id=task.id, + ) + return { + "level": level.value, + "agent_id": "subagent", + "session_id": session_id, + "status": "dispatched", + "reason": decision["reason"], + } + except Exception as e: + return { + "level": level.value, + "agent_id": "subagent", + "session_id": None, + "status": "error", + "reason": str(e), + } + + return { + "level": level.value, + "agent_id": agent_id, + "session_id": None, + "status": "error", + "reason": "Unknown dispatch level", + } + + def _build_message(self, task: Task, action_type: str) -> str: + """构建给 Agent 的消息""" + parts = [f"Task: {task.title}"] + if task.description: + parts.append(f"Description: {task.description}") + if action_type: + parts.append(f"Action: {action_type}") + if task.must_haves: + import json + parts.append(f"Must-haves: {task.must_haves}") + return "\n".join(parts) + + def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]: + """批量决策(不 spawn,只返回决策列表) + + 用于 ticker 扫描可调度任务后批量判断。 + """ + results = [] + for task in tasks: + decision = self.decide(task) + results.append({ + "task_id": task.id, + **decision, + }) + return results diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py new file mode 100644 index 0000000..c8e6e39 --- /dev/null +++ b/src/daemon/spawner.py @@ -0,0 +1,218 @@ +"""Agent Spawner — 异步 spawn Full Agent / Subagent + +Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成) +Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawn,F17 完善) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + +from src.blackboard.db import get_connection, init_db + +logger = logging.getLogger("moziplus-v2.spawner") + + +class AgentSpawner: + """Agent spawn 管理""" + + def __init__( + self, + db_path: Optional[Path] = None, + agent_timeout: float = 600.0, + dry_run: bool = False, + ): + """ + Args: + db_path: 项目黑板 DB 路径(用于写 task_attempts) + agent_timeout: Agent 超时秒数 + dry_run: 测试模式,不实际 spawn + """ + self.db_path = db_path + self.agent_timeout = agent_timeout + self.dry_run = dry_run + + # session 注册表 {session_id: {...}} + self._sessions: Dict[str, Dict[str, Any]] = {} + + @property + def active_sessions(self) -> Dict[str, Dict[str, Any]]: + """当前活跃的 spawn sessions""" + return {sid: s for sid, s in self._sessions.items() + if s.get("status") == "running"} + + async def spawn_full_agent( + self, + agent_id: str, + message: str, + new_session: bool = False, + task_id: Optional[str] = None, + ) -> str: + """Spawn Full Agent(异步非阻塞) + + Returns: + session_id + """ + session_id = str(uuid.uuid4()) + + if self.dry_run: + logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id) + self._register_session(session_id, agent_id, task_id, pid=None) + return session_id + + cmd = [ + "openclaw", "agent", + "--agent", agent_id, + "--session-id", session_id, + "--message", message, + "--json", + ] + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + self._register_session(session_id, agent_id, task_id, proc.pid) + logger.info("Spawned agent %s (session=%s, pid=%d)", + agent_id, session_id, proc.pid) + + # Schedule timeout + cleanup + asyncio.create_task( + self._monitor_process(session_id, proc, agent_id, task_id) + ) + + return session_id + + except Exception as e: + logger.exception("Failed to spawn agent %s", agent_id) + self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e)) + raise + + async def spawn_subagent( + self, + task_description: str, + task_id: Optional[str] = None, + ) -> str: + """Spawn Subagent(占位,实际通过 Gateway API) + + Returns: + session_id + """ + session_id = str(uuid.uuid4()) + + if self.dry_run: + logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id) + self._register_session(session_id, "subagent", task_id, pid=None) + return session_id + + # TODO: F17 通过 Gateway API sessions_spawn 实现 + logger.info("Subagent spawn (session=%s) - placeholder", session_id) + self._register_session(session_id, "subagent", task_id, pid=None) + return session_id + + async def _monitor_process( + self, + session_id: str, + proc: asyncio.subprocess.Process, + agent_id: str, + task_id: Optional[str], + ) -> None: + """监控子进程,超时 kill,完成后记录""" + try: + await asyncio.wait_for(proc.wait(), timeout=self.agent_timeout) + outcome = "completed" + exit_code = proc.returncode + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + outcome = "timed_out" + exit_code = -1 + logger.warning("Agent %s timed out (session=%s)", agent_id, session_id) + + # 更新 session 状态 + if session_id in self._sessions: + self._sessions[session_id]["status"] = outcome + self._sessions[session_id]["completed_at"] = datetime.utcnow().isoformat() + + # 记录 task_attempt + self._record_attempt(task_id, agent_id, outcome, exit_code=exit_code) + + logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d)", + agent_id, session_id, outcome, exit_code) + + def _register_session( + self, + session_id: str, + agent_id: str, + task_id: Optional[str], + pid: Optional[int], + ) -> None: + """注册 spawn session""" + self._sessions[session_id] = { + "agent_id": agent_id, + "task_id": task_id, + "pid": pid, + "status": "running", + "started_at": datetime.utcnow().isoformat(), + "completed_at": None, + } + + def _record_attempt( + self, + task_id: Optional[str], + agent_id: str, + outcome: str, + exit_code: Optional[int] = None, + error: Optional[str] = None, + ) -> None: + """记录 task_attempt""" + if not task_id or not self.db_path: + return + + try: + conn = get_connection(self.db_path) + try: + conn.execute("BEGIN IMMEDIATE") + # 获取 attempt_number + row = conn.execute( + "SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?", + (task_id,), + ).fetchone() + attempt_number = (row["max_a"] or 0) + 1 + + metadata = {"error": error} if error else {} + conn.execute( + "INSERT INTO task_attempts " + "(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) " + "VALUES (?,?,?,?,?,?,datetime('now'))", + (task_id, attempt_number, agent_id, outcome, + exit_code, json.dumps(metadata)), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)", + (task_id, agent_id, + "agent_completed" if outcome == "completed" else "daemon_tick", + json.dumps({"outcome": outcome, "attempt": attempt_number})), + ) + conn.commit() + finally: + conn.close() + except Exception: + logger.exception("Failed to record attempt for task %s", task_id) + + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + """获取 session 信息""" + return self._sessions.get(session_id) + + def cleanup_session(self, session_id: str) -> None: + """清理 session""" + if session_id in self._sessions: + del self._sessions[session_id]