388 lines
15 KiB
Python
388 lines
15 KiB
Python
"""Agent 调度器 — 执行层(司马懿建议 1:Router/Dispatcher 分层)
|
||
|
||
Dispatcher 负责:
|
||
1. 从 Router 获取路由决策
|
||
2. 执行 spawn(通过 Spawner)
|
||
3. 更新 counter(并发控制)
|
||
4. 写路由审计日志(routing_decisions)
|
||
|
||
路由决策全部委托给 AgentRouter。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import sqlite3
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from src.blackboard.models import Task
|
||
from src.daemon.router import AgentRouter, RouteDecision
|
||
|
||
logger = logging.getLogger("moziplus-v2.dispatcher")
|
||
|
||
|
||
class DispatchLevel(str, Enum):
|
||
"""调度级别"""
|
||
LOCAL = "local" # Daemon 本地执行
|
||
FULL_AGENT = "full" # Full Agent spawn
|
||
SUB_AGENT = "sub" # Subagent spawn
|
||
ESCALATE = "escalate" # 升级庞统
|
||
|
||
|
||
class Dispatcher:
|
||
"""Agent 调度执行器
|
||
|
||
v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
router: Optional[AgentRouter] = None,
|
||
spawner: Optional[Any] = None,
|
||
counter: Optional[Any] = None,
|
||
db_path: Optional[Path] = None,
|
||
# 兼容旧接口(deprecated,逐步移除)
|
||
registered_agents: Optional[List[str]] = None,
|
||
capability_map: Optional[Dict[str, List[str]]] = None,
|
||
):
|
||
self.router = router or AgentRouter()
|
||
self.spawner = spawner
|
||
self.counter = counter
|
||
self.db_path = db_path
|
||
|
||
# 兼容:如果没有 router,用旧逻辑(临时)
|
||
self._legacy_mode = router is None
|
||
if self._legacy_mode:
|
||
self.registered_agents = set(registered_agents or [])
|
||
self.capability_map = capability_map or {}
|
||
logger.warning("Dispatcher running in legacy mode (no AgentRouter)")
|
||
|
||
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
|
||
"""调度决策(委托给 Router)
|
||
|
||
Returns:
|
||
{"level": DispatchLevel, "agent_id": str, "reason": str, ...}
|
||
"""
|
||
if self._legacy_mode:
|
||
return self._legacy_decide(task, action_type)
|
||
|
||
# 构建 task_info 给 Router
|
||
task_info = {
|
||
"id": task.id,
|
||
"title": task.title,
|
||
"description": task.description,
|
||
"status": task.status,
|
||
"assignee": task.assignee,
|
||
"task_type": getattr(task, 'task_type', ''),
|
||
"current_agent": getattr(task, 'current_agent', None),
|
||
"next_capability": getattr(task, 'next_capability', None),
|
||
}
|
||
|
||
decision = self.router.route(task_info, action_type)
|
||
|
||
# Router 返回 agent_id="daemon" → 本地执行
|
||
if decision.agent_id == "daemon":
|
||
return {
|
||
"level": DispatchLevel.LOCAL,
|
||
"agent_id": "daemon",
|
||
"reason": decision.reason,
|
||
"mode": decision.mode,
|
||
}
|
||
|
||
# 判断是否升级(fallback 庞统)
|
||
level = DispatchLevel.FULL_AGENT
|
||
if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi":
|
||
level = DispatchLevel.ESCALATE
|
||
|
||
return {
|
||
"level": level,
|
||
"agent_id": decision.agent_id,
|
||
"reason": decision.reason,
|
||
"mode": decision.mode,
|
||
"confidence": decision.confidence,
|
||
"routed_by": decision.mode,
|
||
"model": decision.model,
|
||
"latency_ms": decision.latency_ms,
|
||
}
|
||
|
||
async def dispatch(self, task: Task, action_type: str = "",
|
||
project_config: Optional[Dict] = None) -> Dict[str, Any]:
|
||
"""执行调度(决策 + spawn + 审计日志)
|
||
|
||
Returns:
|
||
{"level": str, "agent_id": str, "session_id": Optional[str],
|
||
"status": "dispatched"|"skipped"|"error", "reason": str}
|
||
"""
|
||
if self._legacy_mode:
|
||
return await self._legacy_dispatch(task, action_type, project_config)
|
||
|
||
decision = self.decide(task, action_type)
|
||
level = decision["level"]
|
||
agent_id = decision["agent_id"]
|
||
|
||
# 检查并发限制
|
||
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not await self.counter.can_acquire(agent_id):
|
||
self._record_routing(task, decision, "skipped", "Agent busy")
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "skipped",
|
||
"reason": "Agent busy (concurrent limit)",
|
||
}
|
||
|
||
# 本地执行
|
||
if level == DispatchLevel.LOCAL:
|
||
self._record_routing(task, decision, "dispatched", None)
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": "daemon",
|
||
"session_id": None,
|
||
"status": "dispatched",
|
||
"reason": decision["reason"],
|
||
}
|
||
|
||
# Full Agent / Escalate spawn
|
||
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not self.spawner:
|
||
self._record_routing(task, decision, "error", "No spawner")
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": "No spawner configured",
|
||
}
|
||
|
||
try:
|
||
if self.counter:
|
||
await self.counter.acquire(agent_id)
|
||
|
||
# 构建 spawn message
|
||
message = self._build_spawn_message(task, agent_id, project_config)
|
||
|
||
session_id = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=message,
|
||
new_session=(level == DispatchLevel.ESCALATE),
|
||
task_id=task.id,
|
||
)
|
||
|
||
self._record_routing(task, decision, "dispatched",
|
||
f"session={session_id}")
|
||
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": session_id,
|
||
"status": "dispatched",
|
||
"reason": decision["reason"],
|
||
}
|
||
except Exception as e:
|
||
if self.counter:
|
||
self.counter.release(agent_id)
|
||
self._record_routing(task, decision, "error", str(e))
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": str(e),
|
||
}
|
||
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": "Unknown dispatch level",
|
||
}
|
||
|
||
def _build_spawn_message(self, task: Task, agent_id: str,
|
||
project_config: Optional[Dict]) -> str:
|
||
"""构建 Agent spawn 消息"""
|
||
if hasattr(self.spawner, 'build_spawn_message') and project_config:
|
||
retry_ctx = self._build_retry_context(task)
|
||
return self.spawner.build_spawn_message(
|
||
task_id=task.id,
|
||
title=task.title,
|
||
description=task.description or "",
|
||
task_type=getattr(task, 'task_type', '') or "",
|
||
priority=task.priority,
|
||
must_haves=task.must_haves or "",
|
||
project_id=project_config.get("project_id", ""),
|
||
agent_id=agent_id,
|
||
current_status=task.status or "claimed",
|
||
retry_context=retry_ctx,
|
||
)
|
||
# fallback
|
||
parts = [f"Task: {task.title}"]
|
||
if task.description:
|
||
parts.append(f"Description: {task.description}")
|
||
if task.must_haves:
|
||
parts.append(f"Must-haves: {task.must_haves}")
|
||
return "\n".join(parts)
|
||
|
||
def _build_retry_context(self, task: Task) -> str:
|
||
"""构建重试上下文"""
|
||
if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0:
|
||
return ""
|
||
|
||
parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"]
|
||
parts.append(f"这是第 {task.retry_count + 1} 次尝试。")
|
||
return "\n".join(parts)
|
||
|
||
def _record_routing(self, task: Task, decision: Dict[str, Any],
|
||
outcome: str, detail: Optional[str]) -> None:
|
||
"""写路由审计日志到 routing_decisions 表"""
|
||
if not self.db_path:
|
||
return
|
||
try:
|
||
conn = sqlite3.connect(str(self.db_path))
|
||
conn.row_factory = sqlite3.Row
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"INSERT INTO routing_decisions "
|
||
"(task_id, from_status, to_status, mode, selected_agent, "
|
||
" previous_agent, reason, confidence, model, latency_ms, "
|
||
" task_type, requested_capability, outcome, detail) "
|
||
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
||
(
|
||
task.id,
|
||
task.status,
|
||
None, # to_status 在 dispatch 时还不知道
|
||
decision.get("mode", ""),
|
||
decision.get("agent_id", ""),
|
||
getattr(task, 'current_agent', None) or task.assignee,
|
||
decision.get("reason", ""),
|
||
decision.get("confidence"),
|
||
decision.get("routed_by") == "llm_route" and None,
|
||
None,
|
||
getattr(task, 'task_type', ''),
|
||
getattr(task, 'next_capability', ''),
|
||
outcome,
|
||
detail,
|
||
),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.debug("Failed to record routing decision", exc_info=True)
|
||
|
||
# ── 批量决策(兼容接口) ──
|
||
|
||
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
|
||
"""批量决策(不 spawn,只返回决策列表)"""
|
||
results = []
|
||
for task in tasks:
|
||
decision = self.decide(task)
|
||
results.append({"task_id": task.id, **decision})
|
||
return results
|
||
|
||
# ── Legacy 兼容(deprecated) ──
|
||
|
||
def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
|
||
"""旧版三级决策树(兼容过渡用)"""
|
||
LOCAL_ACTIONS = frozenset({
|
||
"L1_guardrail", "format_check",
|
||
"file_exists_check", "dependency_advance",
|
||
})
|
||
assignee = task.assignee
|
||
|
||
if action_type in LOCAL_ACTIONS:
|
||
return {"level": DispatchLevel.LOCAL, "agent_id": "daemon",
|
||
"reason": f"Local action: {action_type}"}
|
||
|
||
if assignee and assignee in self.registered_agents:
|
||
new_session = action_type == "adjudication"
|
||
return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee,
|
||
"new_session": new_session,
|
||
"reason": f"Registered agent: {assignee}"}
|
||
|
||
if not assignee:
|
||
agent_id = self._resolve_by_capability(task)
|
||
return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id,
|
||
"reason": f"Auto-assigned via capability_map: {agent_id}"}
|
||
|
||
return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi",
|
||
"new_session": True,
|
||
"reason": f"Unknown agent '{assignee}', escalate to pangtong"}
|
||
|
||
def _resolve_by_capability(self, task: Task) -> str:
|
||
"""旧版能力映射"""
|
||
task_type = getattr(task, 'task_type', '') or ''
|
||
if task_type in self.capability_map:
|
||
candidates = self.capability_map[task_type]
|
||
registered = [a for a in candidates if a in self.registered_agents]
|
||
if registered:
|
||
if self.counter and len(registered) > 1:
|
||
return min(registered,
|
||
key=lambda a: self.counter.active_agents.get(a, 0))
|
||
return registered[0]
|
||
return "pangtong-fujunshi"
|
||
|
||
async def _legacy_dispatch(self, task, action_type="", project_config=None):
|
||
"""旧版 dispatch(兼容过渡用)"""
|
||
decision = self._legacy_decide(task, action_type)
|
||
level = decision["level"]
|
||
agent_id = decision["agent_id"]
|
||
|
||
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not await self.counter.can_acquire(agent_id):
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "skipped",
|
||
"reason": "Agent busy (concurrent limit)"}
|
||
|
||
if level == DispatchLevel.LOCAL:
|
||
return {"level": level.value, "agent_id": "daemon",
|
||
"session_id": None, "status": "dispatched",
|
||
"reason": decision["reason"]}
|
||
|
||
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not self.spawner:
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": "No spawner configured"}
|
||
try:
|
||
if self.counter:
|
||
await self.counter.acquire(agent_id)
|
||
if hasattr(self.spawner, 'build_spawn_message') and project_config:
|
||
retry_ctx = self._build_retry_context(task)
|
||
message = self.spawner.build_spawn_message(
|
||
task_id=task.id, title=task.title,
|
||
description=task.description or "",
|
||
task_type=getattr(task, 'task_type', '') or "",
|
||
priority=task.priority,
|
||
must_haves=task.must_haves or "",
|
||
project_id=project_config.get("project_id", ""),
|
||
agent_id=agent_id,
|
||
current_status=task.status or "claimed",
|
||
retry_context=retry_ctx,
|
||
)
|
||
else:
|
||
message = f"Task: {task.title}"
|
||
session_id = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id, message=message,
|
||
new_session=decision.get("new_session", False),
|
||
task_id=task.id,
|
||
)
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": session_id, "status": "dispatched",
|
||
"reason": decision["reason"]}
|
||
except Exception as e:
|
||
if self.counter:
|
||
self.counter.release(agent_id)
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": str(e)}
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": "Unknown dispatch level"}
|