Files
sanguo_moziplus_v2/src/daemon/router.py
T
2026-05-17 21:13:19 +08:00

377 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 路由器 — 决策层(司马懿建议 1Router/Dispatcher 分层)
三种路由模式:
Mode A: LLM 路由(中心化)— 首次分配、异常升级
Mode B: Agent 声明式交接(去中心化)— 执行者声明下一步需要什么
Mode C: Agent 自主领活 — 未来演进
Router 只做决策,不做 spawn。返回 RouteDecision。
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger("moziplus-v2.router")
@dataclass
class RouteDecision:
"""路由决策结果"""
agent_id: str
reason: str
mode: str # "deterministic" | "agent_handoff" | "llm_route" | "fallback"
confidence: float = 1.0
model: Optional[str] = None # Mode A 时记录使用的模型
latency_ms: int = 0
@dataclass
class AgentProfile:
"""Agent 能力画像"""
agent_id: str
capabilities: List[str] = field(default_factory=list)
can_review: bool = False
max_concurrent: int = 1
is_fallback: bool = False
# 已知 capability 白名单(司马懿 BUG-2next_capability 校验)
KNOWN_CAPABILITIES = frozenset({
"coding", "implementation", "scripting",
"review", "quality_check", "debate",
"deploy", "infrastructure", "docker", "vnpy",
"risk", "compliance", "position_check",
"data", "acquisition", "cleaning", "verification",
"planning", "coordination", "escalation", "strategy",
})
class LLMDriver:
"""Mode A: bMAS Control Unit — 轻量 LLM 路由决策
一次 LLM API 调用,~300 token prompt~1-2s,返回路由决策。
不是 spawn Agent session,是 API 调用。
"""
def __init__(self, model: str, api_base: str = "", api_key: str = "",
timeout: float = 5.0, max_tokens: int = 200,
temperature: float = 0.1):
self.model = model
self.api_base = api_base
self.api_key = api_key
self.timeout = timeout
self.max_tokens = max_tokens
self.temperature = temperature
self._client = None
def _get_client(self):
"""延迟初始化 OpenAI client"""
if self._client is not None:
return self._client
try:
from openai import OpenAI
kwargs = {}
if self.api_base:
kwargs["base_url"] = self.api_base
if self.api_key:
kwargs["api_key"] = self.api_key
# 如果都没配,走 OpenClaw Gateway 的默认环境变量
self._client = OpenAI(**kwargs) if kwargs else OpenAI()
except ImportError:
logger.warning("openai package not installed, LLM routing disabled")
self._client = None
return self._client
def route(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> RouteDecision:
"""LLM 路由决策"""
start = time.monotonic()
client = self._get_client()
if client is None:
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM driver unavailable, fallback",
mode="fallback",
confidence=0.0,
)
prompt = self._build_prompt(task_info, agent_profiles, active_agents)
try:
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout,
)
raw = response.choices[0].message.content
result = json.loads(raw)
latency = int((time.monotonic() - start) * 1000)
agent_id = result.get("agent_id", "")
reason = result.get("reason", "LLM routing")
confidence = float(result.get("confidence", 0.5))
return RouteDecision(
agent_id=agent_id,
reason=reason,
mode="llm_route",
confidence=confidence,
model=self.model,
latency_ms=latency,
)
except json.JSONDecodeError:
latency = int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM returned non-JSON, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
except Exception as e:
latency = int((time.monotonic() - start) * 1000)
logger.warning("LLM routing failed: %s", e)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason=f"LLM routing error: {e}, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
def _build_prompt(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> str:
"""构建路由 prompt"""
agents_block = []
for aid, prof in agent_profiles.items():
load = active_agents.get(aid, 0)
agents_block.append(
f"- {aid}: capabilities={prof.capabilities}, "
f"can_review={prof.can_review}, load={load}"
)
agents_str = "\n".join(agents_block)
return f"""你是任务路由器。根据任务需求和 Agent 能力,选择最合适的 Agent。
## 当前任务
- ID: {task_info.get('id', '')}
- 标题: {task_info.get('title', '')}
- 状态: {task_info.get('status', '')}
- 类型: {task_info.get('task_type', '')}
- 描述: {task_info.get('description', '')}
- 当前执行者: {task_info.get('current_agent', '')}
## 可用 Agent
{agents_str}
## 约束
1. review/quality_check 不能选当前执行者
2. 同等能力优先选负载最低的
3. 必须匹配任务所需能力
4. agent_id 必须是上面列出的 Agent 之一
## 输出(JSON
{{"agent_id": "...", "reason": "...", "confidence": 0.0-1.0}}"""
class AgentRouter:
"""Agent 路由器 — 决策层
三种路由模式按场景自动选择:
- 确定性快速路径:机械检查、retry、已知 assignee
- Mode BAgent Handoff):执行者声明 next_capability
- Mode A(LLM 路由):首次分配、异常场景
"""
# 确定性 action type(不需要路由)
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check", "file_exists_check",
"dependency_advance",
})
# 生命周期流转的状态-能力映射
LIFECYCLE_CAPABILITY = {
"review": {"capability": "review", "exclude_current": True},
"failed": {"capability": "escalation", "exclude_current": False},
}
FALLBACK_AGENT = "pangtong-fujunshi"
def __init__(
self,
agent_profiles: Optional[Dict[str, AgentProfile]] = None,
llm_driver: Optional[LLMDriver] = None,
counter: Optional[Any] = None,
):
self.agent_profiles = agent_profiles or {}
self.llm_driver = llm_driver
self.counter = counter
# 构建 known capability 集合(从 profiles 中提取)
self._known_capabilities: Set[str] = set()
for prof in self.agent_profiles.values():
self._known_capabilities.update(prof.capabilities)
def route(self, task_info: Dict[str, Any],
action_type: str = "") -> RouteDecision:
"""路由决策入口
Args:
task_info: 任务信息字典(需含 id, title, status, assignee,
current_agent, next_capability, task_type, description
action_type: 动作类型
Returns:
RouteDecision
"""
start = time.monotonic()
# ── 快速路径 1: 本地执行 ──
if action_type in self.LOCAL_ACTIONS:
return RouteDecision(
agent_id="daemon",
reason=f"Local action: {action_type}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 2: retry → 原执行者 ──
if action_type == "retry":
current = task_info.get("current_agent") or task_info.get("assignee")
if current and current in self.agent_profiles:
return RouteDecision(
agent_id=current,
reason="Retry original executor",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode B: Agent 声明式交接 ──
next_cap = task_info.get("next_capability")
if next_cap and self._validate_capability(next_cap):
current = task_info.get("current_agent") or task_info.get("assignee")
exclude = {current} if current else set()
matched = self._match_capability(next_cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Agent handoff: needs {next_cap}",
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
# next_capability 合法但无匹配 → 降级 Mode A
logger.info("next_capability '%s' no match, falling back to LLM", next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
if not lifecycle:
# review 以外的 action 也可能走生命周期
lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status"))
if lifecycle:
cap = lifecycle["capability"]
exclude_current = lifecycle.get("exclude_current", False)
exclude = set()
if exclude_current:
current = task_info.get("current_agent") or task_info.get("assignee")
if current:
exclude.add(current)
matched = self._match_capability(cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Lifecycle: {action_type or task_info.get('status')} needs {cap}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 4: 有 assignee 且非生命周期流转 ──
assignee = task_info.get("assignee")
if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"):
return RouteDecision(
agent_id=assignee,
reason=f"Direct assignee: {assignee}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode A: LLM 路由 ──
if self.llm_driver:
decision = self.llm_driver.route(task_info, self.agent_profiles,
self._get_active_agents())
# 合法性校验
if decision.agent_id not in self.agent_profiles:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM returned invalid agent '{decision.agent_id}', fallback",
mode="fallback",
confidence=0.0,
model=decision.model,
latency_ms=latency,
)
if decision.confidence < 0.7:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM low confidence ({decision.confidence}): {decision.reason}",
mode="fallback",
confidence=decision.confidence,
model=decision.model,
latency_ms=latency,
)
# 补充 latency
if not decision.latency_ms:
decision.latency_ms = int((time.monotonic() - start) * 1000)
return decision
# ── 最终 fallback ──
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason="No routing mode available, fallback to coordinator",
mode="fallback",
latency_ms=int((time.monotonic() - start) * 1000),
)
def _validate_capability(self, capability: str) -> bool:
"""BUG-2 修复:校验 next_capability 在已知集合内"""
return capability in self._known_capabilities
def _match_capability(self, capability: str,
exclude: Optional[Set[str]] = None) -> Optional[str]:
"""从能力画像中匹配 Agent"""
candidates = []
for aid, prof in self.agent_profiles.items():
if aid in (exclude or set()):
continue
if capability in prof.capabilities:
candidates.append(aid)
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
# 多候选:选负载最低的
if self.counter:
return min(candidates,
key=lambda a: self.counter.active_agents.get(a, 0))
return candidates[0]
def _get_active_agents(self) -> Dict[str, int]:
"""获取当前活跃 Agent 负载"""
if self.counter:
return self.counter.active_agents
return {}