"""Agent 路由器 — 决策层(v3.0: 去掉独立 LLM,模糊场景 delegate 庞统) 路由模式: 确定性路由:能力匹配、retry、handoff、assignee(纯 L1,不调 LLM) 模糊路由:delegate 庞统(L3 spawn,通过 Gateway) Router 只做决策,不做 spawn。返回 RouteDecision。 """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Set logger = logging.getLogger("moziplus-v2.router") @dataclass class RouteDecision: """路由决策结果""" agent_id: str reason: str mode: str # "deterministic" | "agent_handoff" | "delegate" | "fallback" confidence: float = 1.0 latency_ms: int = 0 @dataclass class AgentProfile: """Agent 能力画像""" agent_id: str capabilities: List[str] = field(default_factory=list) can_review: bool = False max_concurrent: int = 1 is_fallback: bool = False # 已知 capability 白名单(司马懿 BUG-2:next_capability 校验) KNOWN_CAPABILITIES = frozenset({ "coding", "implementation", "scripting", "review", "quality_check", "debate", "deploy", "infrastructure", "docker", "vnpy", "risk", "compliance", "position_check", "data", "acquisition", "cleaning", "verification", "planning", "coordination", "escalation", "strategy", }) class AgentRouter: """Agent 路由器 — 决策层 v3.0: 去掉 LLMDriver(独立 OpenAI 客户端)。 模糊场景不再调独立 LLM,改为 delegate 庞统(L3 spawn,走 Gateway)。 """ # 确定性 action type(不需要路由) LOCAL_ACTIONS = frozenset({ "L1_guardrail", "format_check", "file_exists_check", "dependency_advance", }) # 生命周期流转的状态-能力映射 LIFECYCLE_CAPABILITY = { "review": {"capability": "review", "exclude_current": True}, "failed": {"capability": "escalation", "exclude_current": False}, } FALLBACK_AGENT = "pangtong-fujunshi" def __init__( self, agent_profiles: Optional[Dict[str, AgentProfile]] = None, counter: Optional[Any] = None, ): self.agent_profiles = agent_profiles or {} self.counter = counter # 构建 known capability 集合(从 profiles 中提取) self._known_capabilities: Set[str] = set() for prof in self.agent_profiles.values(): self._known_capabilities.update(prof.capabilities) def route(self, task_info: Dict[str, Any], action_type: str = "") -> RouteDecision: """路由决策入口 Args: task_info: 任务信息字典(需含 id, title, status, assignee, current_agent, next_capability, task_type, description) action_type: 动作类型 Returns: RouteDecision """ start = time.monotonic() # ── 快速路径 1: 本地执行 ── if action_type in self.LOCAL_ACTIONS: return RouteDecision( agent_id="daemon", reason=f"Local action: {action_type}", mode="deterministic", latency_ms=int((time.monotonic() - start) * 1000), ) # ── 快速路径 2: retry → 原执行者 ── if action_type == "retry": current = task_info.get("current_agent") or task_info.get("assignee") if current and current in self.agent_profiles: return RouteDecision( agent_id=current, reason="Retry original executor", mode="deterministic", latency_ms=int((time.monotonic() - start) * 1000), ) # ── Mode B: Agent 声明式交接 ── next_cap = task_info.get("next_capability") if next_cap and self._validate_capability(next_cap): current = task_info.get("current_agent") or task_info.get("assignee") exclude = {current} if current else set() matched = self._match_capability(next_cap, exclude) if matched: return RouteDecision( agent_id=matched, reason=f"Agent handoff: needs {next_cap}", mode="agent_handoff", latency_ms=int((time.monotonic() - start) * 1000), ) logger.info("next_capability '%s' no match, delegate to coordinator", next_cap) # ── 快速路径 3: 生命周期流转查表 ── lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type) if not lifecycle: lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status")) if lifecycle: cap = lifecycle["capability"] exclude_current = lifecycle.get("exclude_current", False) exclude = set() if exclude_current: current = task_info.get("current_agent") or task_info.get("assignee") if current: exclude.add(current) matched = self._match_capability(cap, exclude) if matched: return RouteDecision( agent_id=matched, reason=f"Lifecycle: {action_type or task_info.get('status')} needs {cap}", mode="deterministic", latency_ms=int((time.monotonic() - start) * 1000), ) # ── 快速路径 4: 有 assignee 且非生命周期流转 ── assignee = task_info.get("assignee") if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"): return RouteDecision( agent_id=assignee, reason=f"Direct assignee: {assignee}", mode="deterministic", latency_ms=int((time.monotonic() - start) * 1000), ) # ── 模糊场景: delegate 庞统(L3 spawn,走 Gateway)── return RouteDecision( agent_id=self.FALLBACK_AGENT, reason="Uncertain routing, delegate to coordinator", mode="delegate", confidence=0.0, latency_ms=int((time.monotonic() - start) * 1000), ) def _validate_capability(self, capability: str) -> bool: """校验 next_capability 在已知集合内""" return capability in self._known_capabilities def _match_capability(self, capability: str, exclude: Optional[Set[str]] = None) -> Optional[str]: """从能力画像中匹配 Agent""" candidates = [] for aid, prof in self.agent_profiles.items(): if aid in (exclude or set()): continue if capability in prof.capabilities: candidates.append(aid) if not candidates: return None if len(candidates) == 1: return candidates[0] # 多候选:选负载最低的 if self.counter: return min(candidates, key=lambda a: self.counter.active_agents.get(a, 0)) return candidates[0]