From af2382ddcb6a06d979ed071cfc6641925d922405 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 21:14:21 +0800 Subject: [PATCH] auto-sync: 2026-05-17 21:14:20 --- src/daemon/dispatcher.py | 415 +++++++++++++++++++++++---------------- 1 file changed, 244 insertions(+), 171 deletions(-) diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index 7206e08..6870c98 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -1,22 +1,26 @@ -"""Agent 调度器 — 三级决策树 +"""Agent 调度器 — 执行层(司马懿建议 1:Router/Dispatcher 分层) -Level 1: 纯机械检查 → Daemon 本地执行 -Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec) -Level 3: 无名字的一次性任务 → Subagent (sessions_spawn) -Level 4: 未知 → 庞统裁决 +Dispatcher 负责: +1. 从 Router 获取路由决策 +2. 执行 spawn(通过 Spawner) +3. 更新 counter(并发控制) +4. 写路由审计日志(routing_decisions) -调度器只做决策,实际 spawn 由 spawner.py 执行。 +路由决策全部委托给 AgentRouter。 """ from __future__ import annotations +import json import logging +import sqlite3 +from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional from src.blackboard.models import Task -from src.blackboard.queries import Queries +from src.daemon.router import AgentRouter, RouteDecision logger = logging.getLogger("moziplus-v2.dispatcher") @@ -30,86 +34,90 @@ class DispatchLevel(str, Enum): class Dispatcher: - """Agent 调度决策器""" + """Agent 调度执行器 - # L1 本地执行的 action type - LOCAL_ACTIONS = frozenset({ - "L1_guardrail", - "format_check", - "file_exists_check", - "dependency_advance", - }) + v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。 + """ def __init__( self, - registered_agents: Optional[List[str]] = None, + router: Optional[AgentRouter] = None, spawner: Optional[Any] = None, counter: Optional[Any] = None, + db_path: Optional[Path] = None, + # 兼容旧接口(deprecated,逐步移除) + registered_agents: Optional[List[str]] = None, capability_map: Optional[Dict[str, List[str]]] = None, ): - """ - Args: - registered_agents: 项目注册的 Agent ID 列表 - spawner: AgentSpawner 实例(执行实际 spawn) - counter: ActiveAgentCounter 实例(控制并发) - capability_map: task_type → [agent_id] 映射(无 assignee 时的自动分配) - """ - self.registered_agents = set(registered_agents or []) - self.capability_map = capability_map or {} + self.router = router or AgentRouter() self.spawner = spawner self.counter = counter + self.db_path = db_path + + # 兼容:如果没有 router,用旧逻辑(临时) + self._legacy_mode = router is None + if self._legacy_mode: + self.registered_agents = set(registered_agents or []) + self.capability_map = capability_map or {} + logger.warning("Dispatcher running in legacy mode (no AgentRouter)") def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: - """调度决策 + """调度决策(委托给 Router) Returns: - {"level": DispatchLevel, "agent_id": str, "reason": str} + {"level": DispatchLevel, "agent_id": str, "reason": str, ...} """ - assignee = task.assignee + if self._legacy_mode: + return self._legacy_decide(task, action_type) - # Level 1: 纯机械检查 → 本地执行 - if action_type in self.LOCAL_ACTIONS: + # 构建 task_info 给 Router + task_info = { + "id": task.id, + "title": task.title, + "description": task.description, + "status": task.status, + "assignee": task.assignee, + "task_type": getattr(task, 'task_type', ''), + "current_agent": getattr(task, 'current_agent', None), + "next_capability": getattr(task, 'next_capability', None), + } + + decision = self.router.route(task_info, action_type) + + # Router 返回 agent_id="daemon" → 本地执行 + if decision.agent_id == "daemon": return { "level": DispatchLevel.LOCAL, "agent_id": "daemon", - "reason": f"Local action: {action_type}", + "reason": decision.reason, + "mode": decision.mode, } - # Level 2: 有名字的注册角色 → Full Agent - if assignee and assignee in self.registered_agents: - new_session = action_type == "adjudication" - return { - "level": DispatchLevel.FULL_AGENT, - "agent_id": assignee, - "new_session": new_session, - "reason": f"Registered agent: {assignee}", - } + # 判断是否升级(fallback 庞统) + level = DispatchLevel.FULL_AGENT + if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi": + level = DispatchLevel.ESCALATE - # Level 3: 无 assignee → 能力映射或 fallback 庞统 - if not assignee: - agent_id = self._resolve_by_capability(task) - return { - "level": DispatchLevel.FULL_AGENT, - "agent_id": agent_id, - "reason": f"Auto-assigned via capability_map: {agent_id}", - } - - # Level 4: 有 assignee 但未注册 → 升级庞统 return { - "level": DispatchLevel.ESCALATE, - "agent_id": "pangtong-fujunshi", - "new_session": True, - "reason": f"Unknown agent '{assignee}', escalate to pangtong", + "level": level, + "agent_id": decision.agent_id, + "reason": decision.reason, + "mode": decision.mode, + "confidence": decision.confidence, + "routed_by": decision.mode, } async def dispatch(self, task: Task, action_type: str = "", project_config: Optional[Dict] = None) -> Dict[str, Any]: - """执行调度(决策 + spawn) + """执行调度(决策 + spawn + 审计日志) Returns: {"level": str, "agent_id": str, "session_id": Optional[str], "status": "dispatched"|"skipped"|"error", "reason": str} """ + if self._legacy_mode: + return await self._legacy_dispatch(task, action_type, project_config) + decision = self.decide(task, action_type) level = decision["level"] agent_id = decision["agent_id"] @@ -117,6 +125,7 @@ class Dispatcher: # 检查并发限制 if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not await self.counter.can_acquire(agent_id): + self._record_routing(task, decision, "skipped", "Agent busy") return { "level": level.value, "agent_id": agent_id, @@ -127,6 +136,7 @@ class Dispatcher: # 本地执行 if level == DispatchLevel.LOCAL: + self._record_routing(task, decision, "dispatched", None) return { "level": level.value, "agent_id": "daemon", @@ -135,9 +145,10 @@ class Dispatcher: "reason": decision["reason"], } - # Full Agent spawn + # Full Agent / Escalate spawn if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not self.spawner: + self._record_routing(task, decision, "error", "No spawner") return { "level": level.value, "agent_id": agent_id, @@ -149,30 +160,20 @@ class Dispatcher: try: if self.counter: await self.counter.acquire(agent_id) - # 优先使用 spawner 的 prompt 模板 - if hasattr(self.spawner, 'build_spawn_message') and project_config: - # 构建重试上下文(如果有前轮审查意见) - retry_ctx = self._build_retry_context(task) - message = self.spawner.build_spawn_message( - task_id=task.id, - title=task.title, - description=task.description or "", - task_type=task.task_type or "", - priority=task.priority, - must_haves=task.must_haves or "", - project_id=project_config.get("project_id", ""), - agent_id=agent_id, - current_status=task.status or "claimed", - retry_context=retry_ctx, - ) - else: - message = self._build_message(task, action_type) + + # 构建 spawn message + message = self._build_spawn_message(task, agent_id, project_config) + session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=message, - new_session=decision.get("new_session", False), + new_session=(level == DispatchLevel.ESCALATE), task_id=task.id, ) + + self._record_routing(task, decision, "dispatched", + f"session={session_id}") + return { "level": level.value, "agent_id": agent_id, @@ -183,6 +184,7 @@ class Dispatcher: except Exception as e: if self.counter: self.counter.release(agent_id) + self._record_routing(task, decision, "error", str(e)) return { "level": level.value, "agent_id": agent_id, @@ -191,38 +193,6 @@ class Dispatcher: "reason": str(e), } - # Subagent spawn - if level == DispatchLevel.SUB_AGENT: - if not self.spawner: - return { - "level": level.value, - "agent_id": "subagent", - "session_id": None, - "status": "error", - "reason": "No spawner configured", - } - - try: - session_id = await self.spawner.spawn_subagent( - task_description=self._build_message(task, action_type), - task_id=task.id, - ) - return { - "level": level.value, - "agent_id": "subagent", - "session_id": session_id, - "status": "dispatched", - "reason": decision["reason"], - } - except Exception as e: - return { - "level": level.value, - "agent_id": "subagent", - "session_id": None, - "status": "error", - "reason": str(e), - } - return { "level": level.value, "agent_id": agent_id, @@ -231,82 +201,185 @@ class Dispatcher: "reason": "Unknown dispatch level", } - def _resolve_by_capability(self, task: Task) -> str: - """两级路由:capability_map 匹配 → fallback 庞统""" - task_type = getattr(task, 'task_type', '') or '' - - # Tier 1: task_type → capability_map - if task_type in self.capability_map: - candidates = self.capability_map[task_type] - # 过滤只保留注册的 Agent - registered = [a for a in candidates if a in self.registered_agents] - if registered: - # 多候选时选负载最低的 - if self.counter and len(registered) > 1: - best = min(registered, - key=lambda a: self.counter.active_agents.get(a, 0)) - return best - return registered[0] - - # Tier 2: fallback 庞统 - return "pangtong-fujunshi" + def _build_spawn_message(self, task: Task, agent_id: str, + project_config: Optional[Dict]) -> str: + """构建 Agent spawn 消息""" + if hasattr(self.spawner, 'build_spawn_message') and project_config: + retry_ctx = self._build_retry_context(task) + return self.spawner.build_spawn_message( + task_id=task.id, + title=task.title, + description=task.description or "", + task_type=getattr(task, 'task_type', '') or "", + priority=task.priority, + must_haves=task.must_haves or "", + project_id=project_config.get("project_id", ""), + agent_id=agent_id, + current_status=task.status or "claimed", + retry_context=retry_ctx, + ) + # fallback + parts = [f"Task: {task.title}"] + if task.description: + parts.append(f"Description: {task.description}") + if task.must_haves: + parts.append(f"Must-haves: {task.must_haves}") + return "\n".join(parts) def _build_retry_context(self, task: Task) -> str: - """构建重试上下文(前轮产出摘要 + 审查意见) - - 如果任务是从 failed/pending 重新调度(有历史 attempts), - 注入前轮的失败原因,让 Agent 知道上轮哪里不对。 - """ + """构建重试上下文""" if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0: return "" parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"] - parts.append(f"这是第 {task.retry_count + 1} 次尝试。上次执行的问题:") + parts.append(f"这是第 {task.retry_count + 1} 次尝试。") + return "\n".join(parts) - # 从 task_attempts 查上次 attempt 的 summary/outcome - if hasattr(self, '_queries') and self._queries: + def _record_routing(self, task: Task, decision: Dict[str, Any], + outcome: str, detail: Optional[str]) -> None: + """写路由审计日志到 routing_decisions 表""" + if not self.db_path: + return + try: + conn = sqlite3.connect(str(self.db_path)) + conn.row_factory = sqlite3.Row try: - rows = self._queries.conn.execute( - "SELECT outcome, summary, agent FROM task_attempts " - "WHERE task_id = ? ORDER BY attempt_number DESC LIMIT 1", - (task.id,) - ).fetchall() - if rows: - outcome, summary, agent = rows[0] - if outcome: - parts.append(f"上轮结果: {outcome}") - if summary: - parts.append(f"上轮说明: {summary}") - if agent: - parts.append(f"上轮执行者: {agent}") - except Exception: - pass + conn.execute("BEGIN IMMEDIATE") + conn.execute( + "INSERT INTO routing_decisions " + "(task_id, from_status, to_status, mode, selected_agent, " + " previous_agent, reason, confidence, model, latency_ms, " + " task_type, requested_capability, outcome, detail) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + ( + task.id, + task.status, + None, # to_status 在 dispatch 时还不知道 + decision.get("mode", ""), + decision.get("agent_id", ""), + getattr(task, 'current_agent', None) or task.assignee, + decision.get("reason", ""), + decision.get("confidence"), + decision.get("routed_by") == "llm_route" and None, + None, + getattr(task, 'task_type', ''), + getattr(task, 'next_capability', ''), + outcome, + detail, + ), + ) + conn.commit() + finally: + conn.close() + except Exception: + logger.debug("Failed to record routing decision", exc_info=True) - parts.append("请仔细检查上次的错误,避免重复犯错。") - return "\n".join(parts) - - def _build_message(self, task: Task, action_type: str) -> str: - """构建给 Agent 的消息""" - parts = [f"Task: {task.title}"] - if task.description: - parts.append(f"Description: {task.description}") - if action_type: - parts.append(f"Action: {action_type}") - if task.must_haves: - import json - parts.append(f"Must-haves: {task.must_haves}") - return "\n".join(parts) + # ── 批量决策(兼容接口) ── def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]: - """批量决策(不 spawn,只返回决策列表) - - 用于 ticker 扫描可调度任务后批量判断。 - """ + """批量决策(不 spawn,只返回决策列表)""" results = [] for task in tasks: decision = self.decide(task) - results.append({ - "task_id": task.id, - **decision, - }) + results.append({"task_id": task.id, **decision}) return results + + # ── Legacy 兼容(deprecated) ── + + def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: + """旧版三级决策树(兼容过渡用)""" + LOCAL_ACTIONS = frozenset({ + "L1_guardrail", "format_check", + "file_exists_check", "dependency_advance", + }) + assignee = task.assignee + + if action_type in LOCAL_ACTIONS: + return {"level": DispatchLevel.LOCAL, "agent_id": "daemon", + "reason": f"Local action: {action_type}"} + + if assignee and assignee in self.registered_agents: + new_session = action_type == "adjudication" + return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee, + "new_session": new_session, + "reason": f"Registered agent: {assignee}"} + + if not assignee: + agent_id = self._resolve_by_capability(task) + return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id, + "reason": f"Auto-assigned via capability_map: {agent_id}"} + + return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi", + "new_session": True, + "reason": f"Unknown agent '{assignee}', escalate to pangtong"} + + def _resolve_by_capability(self, task: Task) -> str: + """旧版能力映射""" + task_type = getattr(task, 'task_type', '') or '' + if task_type in self.capability_map: + candidates = self.capability_map[task_type] + registered = [a for a in candidates if a in self.registered_agents] + if registered: + if self.counter and len(registered) > 1: + return min(registered, + key=lambda a: self.counter.active_agents.get(a, 0)) + return registered[0] + return "pangtong-fujunshi" + + async def _legacy_dispatch(self, task, action_type="", project_config=None): + """旧版 dispatch(兼容过渡用)""" + decision = self._legacy_decide(task, action_type) + level = decision["level"] + agent_id = decision["agent_id"] + + if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): + if not await self.counter.can_acquire(agent_id): + return {"level": level.value, "agent_id": agent_id, + "session_id": None, "status": "skipped", + "reason": "Agent busy (concurrent limit)"} + + if level == DispatchLevel.LOCAL: + return {"level": level.value, "agent_id": "daemon", + "session_id": None, "status": "dispatched", + "reason": decision["reason"]} + + if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): + if not self.spawner: + return {"level": level.value, "agent_id": agent_id, + "session_id": None, "status": "error", + "reason": "No spawner configured"} + try: + if self.counter: + await self.counter.acquire(agent_id) + if hasattr(self.spawner, 'build_spawn_message') and project_config: + retry_ctx = self._build_retry_context(task) + message = self.spawner.build_spawn_message( + task_id=task.id, title=task.title, + description=task.description or "", + task_type=getattr(task, 'task_type', '') or "", + priority=task.priority, + must_haves=task.must_haves or "", + project_id=project_config.get("project_id", ""), + agent_id=agent_id, + current_status=task.status or "claimed", + retry_context=retry_ctx, + ) + else: + message = f"Task: {task.title}" + session_id = await self.spawner.spawn_full_agent( + agent_id=agent_id, message=message, + new_session=decision.get("new_session", False), + task_id=task.id, + ) + return {"level": level.value, "agent_id": agent_id, + "session_id": session_id, "status": "dispatched", + "reason": decision["reason"]} + except Exception as e: + if self.counter: + self.counter.release(agent_id) + return {"level": level.value, "agent_id": agent_id, + "session_id": None, "status": "error", + "reason": str(e)} + return {"level": level.value, "agent_id": agent_id, + "session_id": None, "status": "error", + "reason": "Unknown dispatch level"}