auto-sync: 2026-05-17 21:13:19

This commit is contained in:
cfdaily
2026-05-17 21:13:19 +08:00
parent 55d7a6b37a
commit 45b24e9336
+376
View File
@@ -0,0 +1,376 @@
"""Agent 路由器 — 决策层(司马懿建议 1Router/Dispatcher 分层)
三种路由模式:
Mode A: LLM 路由(中心化)— 首次分配、异常升级
Mode B: Agent 声明式交接(去中心化)— 执行者声明下一步需要什么
Mode C: Agent 自主领活 — 未来演进
Router 只做决策,不做 spawn。返回 RouteDecision。
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger("moziplus-v2.router")
@dataclass
class RouteDecision:
"""路由决策结果"""
agent_id: str
reason: str
mode: str # "deterministic" | "agent_handoff" | "llm_route" | "fallback"
confidence: float = 1.0
model: Optional[str] = None # Mode A 时记录使用的模型
latency_ms: int = 0
@dataclass
class AgentProfile:
"""Agent 能力画像"""
agent_id: str
capabilities: List[str] = field(default_factory=list)
can_review: bool = False
max_concurrent: int = 1
is_fallback: bool = False
# 已知 capability 白名单(司马懿 BUG-2next_capability 校验)
KNOWN_CAPABILITIES = frozenset({
"coding", "implementation", "scripting",
"review", "quality_check", "debate",
"deploy", "infrastructure", "docker", "vnpy",
"risk", "compliance", "position_check",
"data", "acquisition", "cleaning", "verification",
"planning", "coordination", "escalation", "strategy",
})
class LLMDriver:
"""Mode A: bMAS Control Unit — 轻量 LLM 路由决策
一次 LLM API 调用,~300 token prompt~1-2s,返回路由决策。
不是 spawn Agent session,是 API 调用。
"""
def __init__(self, model: str, api_base: str = "", api_key: str = "",
timeout: float = 5.0, max_tokens: int = 200,
temperature: float = 0.1):
self.model = model
self.api_base = api_base
self.api_key = api_key
self.timeout = timeout
self.max_tokens = max_tokens
self.temperature = temperature
self._client = None
def _get_client(self):
"""延迟初始化 OpenAI client"""
if self._client is not None:
return self._client
try:
from openai import OpenAI
kwargs = {}
if self.api_base:
kwargs["base_url"] = self.api_base
if self.api_key:
kwargs["api_key"] = self.api_key
# 如果都没配,走 OpenClaw Gateway 的默认环境变量
self._client = OpenAI(**kwargs) if kwargs else OpenAI()
except ImportError:
logger.warning("openai package not installed, LLM routing disabled")
self._client = None
return self._client
def route(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> RouteDecision:
"""LLM 路由决策"""
start = time.monotonic()
client = self._get_client()
if client is None:
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM driver unavailable, fallback",
mode="fallback",
confidence=0.0,
)
prompt = self._build_prompt(task_info, agent_profiles, active_agents)
try:
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout,
)
raw = response.choices[0].message.content
result = json.loads(raw)
latency = int((time.monotonic() - start) * 1000)
agent_id = result.get("agent_id", "")
reason = result.get("reason", "LLM routing")
confidence = float(result.get("confidence", 0.5))
return RouteDecision(
agent_id=agent_id,
reason=reason,
mode="llm_route",
confidence=confidence,
model=self.model,
latency_ms=latency,
)
except json.JSONDecodeError:
latency = int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM returned non-JSON, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
except Exception as e:
latency = int((time.monotonic() - start) * 1000)
logger.warning("LLM routing failed: %s", e)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason=f"LLM routing error: {e}, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
def _build_prompt(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> str:
"""构建路由 prompt"""
agents_block = []
for aid, prof in agent_profiles.items():
load = active_agents.get(aid, 0)
agents_block.append(
f"- {aid}: capabilities={prof.capabilities}, "
f"can_review={prof.can_review}, load={load}"
)
agents_str = "\n".join(agents_block)
return f"""你是任务路由器。根据任务需求和 Agent 能力,选择最合适的 Agent。
## 当前任务
- ID: {task_info.get('id', '')}
- 标题: {task_info.get('title', '')}
- 状态: {task_info.get('status', '')}
- 类型: {task_info.get('task_type', '')}
- 描述: {task_info.get('description', '')}
- 当前执行者: {task_info.get('current_agent', '')}
## 可用 Agent
{agents_str}
## 约束
1. review/quality_check 不能选当前执行者
2. 同等能力优先选负载最低的
3. 必须匹配任务所需能力
4. agent_id 必须是上面列出的 Agent 之一
## 输出(JSON
{{"agent_id": "...", "reason": "...", "confidence": 0.0-1.0}}"""
class AgentRouter:
"""Agent 路由器 — 决策层
三种路由模式按场景自动选择:
- 确定性快速路径:机械检查、retry、已知 assignee
- Mode BAgent Handoff):执行者声明 next_capability
- Mode A(LLM 路由):首次分配、异常场景
"""
# 确定性 action type(不需要路由)
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check", "file_exists_check",
"dependency_advance",
})
# 生命周期流转的状态-能力映射
LIFECYCLE_CAPABILITY = {
"review": {"capability": "review", "exclude_current": True},
"failed": {"capability": "escalation", "exclude_current": False},
}
FALLBACK_AGENT = "pangtong-fujunshi"
def __init__(
self,
agent_profiles: Optional[Dict[str, AgentProfile]] = None,
llm_driver: Optional[LLMDriver] = None,
counter: Optional[Any] = None,
):
self.agent_profiles = agent_profiles or {}
self.llm_driver = llm_driver
self.counter = counter
# 构建 known capability 集合(从 profiles 中提取)
self._known_capabilities: Set[str] = set()
for prof in self.agent_profiles.values():
self._known_capabilities.update(prof.capabilities)
def route(self, task_info: Dict[str, Any],
action_type: str = "") -> RouteDecision:
"""路由决策入口
Args:
task_info: 任务信息字典(需含 id, title, status, assignee,
current_agent, next_capability, task_type, description
action_type: 动作类型
Returns:
RouteDecision
"""
start = time.monotonic()
# ── 快速路径 1: 本地执行 ──
if action_type in self.LOCAL_ACTIONS:
return RouteDecision(
agent_id="daemon",
reason=f"Local action: {action_type}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 2: retry → 原执行者 ──
if action_type == "retry":
current = task_info.get("current_agent") or task_info.get("assignee")
if current and current in self.agent_profiles:
return RouteDecision(
agent_id=current,
reason="Retry original executor",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode B: Agent 声明式交接 ──
next_cap = task_info.get("next_capability")
if next_cap and self._validate_capability(next_cap):
current = task_info.get("current_agent") or task_info.get("assignee")
exclude = {current} if current else set()
matched = self._match_capability(next_cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Agent handoff: needs {next_cap}",
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
# next_capability 合法但无匹配 → 降级 Mode A
logger.info("next_capability '%s' no match, falling back to LLM", next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
if not lifecycle:
# review 以外的 action 也可能走生命周期
lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status"))
if lifecycle:
cap = lifecycle["capability"]
exclude_current = lifecycle.get("exclude_current", False)
exclude = set()
if exclude_current:
current = task_info.get("current_agent") or task_info.get("assignee")
if current:
exclude.add(current)
matched = self._match_capability(cap, exclude)
if matched:
return RouteDecision(
agent_id=matched,
reason=f"Lifecycle: {action_type or task_info.get('status')} needs {cap}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── 快速路径 4: 有 assignee 且非生命周期流转 ──
assignee = task_info.get("assignee")
if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"):
return RouteDecision(
agent_id=assignee,
reason=f"Direct assignee: {assignee}",
mode="deterministic",
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode A: LLM 路由 ──
if self.llm_driver:
decision = self.llm_driver.route(task_info, self.agent_profiles,
self._get_active_agents())
# 合法性校验
if decision.agent_id not in self.agent_profiles:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM returned invalid agent '{decision.agent_id}', fallback",
mode="fallback",
confidence=0.0,
model=decision.model,
latency_ms=latency,
)
if decision.confidence < 0.7:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM low confidence ({decision.confidence}): {decision.reason}",
mode="fallback",
confidence=decision.confidence,
model=decision.model,
latency_ms=latency,
)
# 补充 latency
if not decision.latency_ms:
decision.latency_ms = int((time.monotonic() - start) * 1000)
return decision
# ── 最终 fallback ──
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason="No routing mode available, fallback to coordinator",
mode="fallback",
latency_ms=int((time.monotonic() - start) * 1000),
)
def _validate_capability(self, capability: str) -> bool:
"""BUG-2 修复:校验 next_capability 在已知集合内"""
return capability in self._known_capabilities
def _match_capability(self, capability: str,
exclude: Optional[Set[str]] = None) -> Optional[str]:
"""从能力画像中匹配 Agent"""
candidates = []
for aid, prof in self.agent_profiles.items():
if aid in (exclude or set()):
continue
if capability in prof.capabilities:
candidates.append(aid)
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
# 多候选:选负载最低的
if self.counter:
return min(candidates,
key=lambda a: self.counter.active_agents.get(a, 0))
return candidates[0]
def _get_active_agents(self) -> Dict[str, int]:
"""获取当前活跃 Agent 负载"""
if self.counter:
return self.counter.active_agents
return {}