Files
sanguo_moziplus_v2/src/daemon/router.py
T
2026-05-21 11:15:34 +08:00

196 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 路由器 — 决策层(v3.0: 去掉独立 LLM,模糊场景 delegate 庞统)
路由模式:
确定性路由:能力匹配、retry、handoff、assignee(纯 L1,不调 LLM
模糊路由:delegate 庞统(L3 spawn,通过 Gateway
Router 只做决策,不做 spawn。返回 RouteDecision。
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger("moziplus-v2.router")
@dataclass
class RouteDecision:
"""路由决策结果"""
agent_id: str
reason: str
mode: str # "deterministic" | "agent_handoff" | "delegate" | "fallback"
confidence: float = 1.0
latency_ms: int = 0
@dataclass
class AgentProfile:
"""Agent 能力画像"""
agent_id: str
capabilities: List[str] = field(default_factory=list)
can_review: bool = False
max_concurrent: int = 1
is_fallback: bool = False
# 已知 capability 白名单(司马懿 BUG-2next_capability 校验)
KNOWN_CAPABILITIES = frozenset({
"coding", "implementation", "scripting",
"review", "quality_check", "debate",
"deploy", "infrastructure", "docker", "vnpy",
"risk", "compliance", "position_check",
"data", "acquisition", "cleaning", "verification",
"planning", "coordination", "escalation", "strategy",
})
class AgentRouter:
"""Agent 路由器 — 决策层
v3.0: 去掉 LLMDriver(独立 OpenAI 客户端)。
模糊场景不再调独立 LLM,改为 delegate 庞统(L3 spawn,走 Gateway)。
"""
# 确定性 action type(不需要路由)
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check", "file_exists_check",
"dependency_advance",
})
# 生命周期流转的状态-能力映射
LIFECYCLE_CAPABILITY = {
"review": {"capability": "review", "exclude_current": True},
"failed": {"capability": "escalation", "exclude_current": False},
}
FALLBACK_AGENT = "pangtong-fujunshi"
def __init__(
self,
agent_profiles: Optional[Dict[str, AgentProfile]] = None,
counter: Optional[Any] = None,
):
self.agent_profiles = agent_profiles or {}
self.counter = counter
# 构建 known capability 集合(从 profiles 中提取)
self._known_capabilities: Set[str] = set()
for prof in self.agent_profiles.values():
self._known_capabilities.update(prof.capabilities)
def route(self, task_info: Dict[str, Any],
action_type: str = "") -> RouteDecision:
"""路由决策入口
Args:
task_info: 任务信息字典(需含 id, title, status, assignee,
current_agent, next_capability, task_type, description
action_type: 动作类型
Returns:
RouteDecision
"""
start = time.monotonic()
# ── 快速路径 1: 本地执行 ──
if action_type in self.LOCAL_ACTIONS:
return RouteDecision(
agent_id="daemon",
reason=f"Local action: {action_type}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 2: retry → 原执行者 ──
if action_type == "retry":
current = task_info.get("current_agent") or task_info.get("assignee")
if current and current in self.agent_profiles:
return RouteDecision(
agent_id=current,
reason="Retry original executor",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode B: Agent 声明式交接 ──
next_cap = task_info.get("next_capability")
if next_cap and self._validate_capability(next_cap):
current = task_info.get("current_agent") or task_info.get("assignee")
exclude = {current} if current else set()
matched = self._match_capability(next_cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Agent handoff: needs {next_cap}",
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
logger.info("next_capability '%s' no match, delegate to coordinator", next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
if not lifecycle:
lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status"))
if lifecycle:
cap = lifecycle["capability"]
exclude_current = lifecycle.get("exclude_current", False)
exclude = set()
if exclude_current:
current = task_info.get("current_agent") or task_info.get("assignee")
if current:
exclude.add(current)
matched = self._match_capability(cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Lifecycle: {action_type or task_info.get('status')} needs {cap}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 4: 有 assignee 且非生命周期流转 ──
assignee = task_info.get("assignee")
if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"):
return RouteDecision(
agent_id=assignee,
reason=f"Direct assignee: {assignee}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 模糊场景: delegate 庞统(L3 spawn,走 Gateway)──
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason="Uncertain routing, delegate to coordinator",
mode="delegate",
confidence=0.0,
latency_ms=int((time.monotonic() - start) * 1000),
)
def _validate_capability(self, capability: str) -> bool:
"""校验 next_capability 在已知集合内"""
return capability in self._known_capabilities
def _match_capability(self, capability: str,
exclude: Optional[Set[str]] = None) -> Optional[str]:
"""从能力画像中匹配 Agent"""
candidates = []
for aid, prof in self.agent_profiles.items():
if aid in (exclude or set()):
continue
if capability in prof.capabilities:
candidates.append(aid)
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
# 多候选:选负载最低的
if self.counter:
return min(candidates,
key=lambda a: self.counter.active_agents.get(a, 0))
return candidates[0]