auto-sync: 2026-05-21 11:15:34

This commit is contained in:
cfdaily
2026-05-21 11:15:34 +08:00
parent c1fc3b6485
commit dcb48278fd
+13 -194
View File
@@ -1,17 +1,14 @@
"""Agent 路由器 — 决策层(司马懿建议 1Router/Dispatcher 分层
"""Agent 路由器 — 决策层(v3.0: 去掉独立 LLM,模糊场景 delegate 庞统
三种路由模式:
Mode A: LLM 路由(中心化)— 首次分配、异常升级
Mode B: Agent 声明式交接(去中心化)— 执行者声明下一步需要什么
Mode C: Agent 自主领活 — 未来演进
路由模式:
确定性路由:能力匹配、retry、handoff、assignee(纯 L1,不调 LLM
模糊路由:delegate 庞统(L3 spawn,通过 Gateway
Router 只做决策,不做 spawn。返回 RouteDecision。
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
@@ -25,9 +22,8 @@ class RouteDecision:
"""路由决策结果"""
agent_id: str
reason: str
mode: str # "deterministic" | "agent_handoff" | "llm_route" | "fallback"
mode: str # "deterministic" | "agent_handoff" | "delegate" | "fallback"
confidence: float = 1.0
model: Optional[str] = None # Mode A 时记录使用的模型
latency_ms: int = 0
@@ -52,149 +48,11 @@ KNOWN_CAPABILITIES = frozenset({
})
class LLMDriver:
"""Mode A: bMAS Control Unit — 轻量 LLM 路由决策
一次 LLM API 调用,~300 token prompt~1-2s,返回路由决策。
不是 spawn Agent session,是 API 调用。
"""
def __init__(self, model: str, api_base: str = "", api_key: str = "",
timeout: float = 5.0, max_tokens: int = 200,
temperature: float = 0.1):
self.model = model
self.api_base = api_base
self.api_key = api_key
self.timeout = timeout
self.max_tokens = max_tokens
self.temperature = temperature
self._client = None
def _get_client(self):
"""延迟初始化 OpenAI client"""
if self._client is not None:
return self._client
try:
from openai import OpenAI
kwargs = {}
if self.api_base:
kwargs["base_url"] = self.api_base
if self.api_key:
kwargs["api_key"] = self.api_key
# 如果都没配,走 OpenClaw Gateway 的默认环境变量
self._client = OpenAI(**kwargs) if kwargs else OpenAI()
except ImportError:
logger.warning("openai package not installed, LLM routing disabled")
self._client = None
return self._client
def route(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> RouteDecision:
"""LLM 路由决策"""
start = time.monotonic()
client = self._get_client()
if client is None:
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM driver unavailable, fallback",
mode="fallback",
confidence=0.0,
)
prompt = self._build_prompt(task_info, agent_profiles, active_agents)
try:
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout,
)
raw = response.choices[0].message.content
result = json.loads(raw)
latency = int((time.monotonic() - start) * 1000)
agent_id = result.get("agent_id", "")
reason = result.get("reason", "LLM routing")
confidence = float(result.get("confidence", 0.5))
return RouteDecision(
agent_id=agent_id,
reason=reason,
mode="llm_route",
confidence=confidence,
model=self.model,
latency_ms=latency,
)
except json.JSONDecodeError:
latency = int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason="LLM returned non-JSON, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
except Exception as e:
latency = int((time.monotonic() - start) * 1000)
logger.warning("LLM routing failed: %s", e)
return RouteDecision(
agent_id="pangtong-fujunshi",
reason=f"LLM routing error: {e}, fallback",
mode="fallback",
confidence=0.0,
model=self.model,
latency_ms=latency,
)
def _build_prompt(self, task_info: Dict[str, Any],
agent_profiles: Dict[str, AgentProfile],
active_agents: Dict[str, int]) -> str:
"""构建路由 prompt"""
agents_block = []
for aid, prof in agent_profiles.items():
load = active_agents.get(aid, 0)
agents_block.append(
f"- {aid}: capabilities={prof.capabilities}, "
f"can_review={prof.can_review}, load={load}"
)
agents_str = "\n".join(agents_block)
return f"""你是任务路由器。根据任务需求和 Agent 能力,选择最合适的 Agent。
## 当前任务
- ID: {task_info.get('id', '')}
- 标题: {task_info.get('title', '')}
- 状态: {task_info.get('status', '')}
- 类型: {task_info.get('task_type', '')}
- 描述: {task_info.get('description', '')}
- 当前执行者: {task_info.get('current_agent', '')}
## 可用 Agent
{agents_str}
## 约束
1. review/quality_check 不能选当前执行者
2. 同等能力优先选负载最低的
3. 必须匹配任务所需能力
4. agent_id 必须是上面列出的 Agent 之一
## 输出(JSON
{{"agent_id": "...", "reason": "...", "confidence": 0.0-1.0}}"""
class AgentRouter:
"""Agent 路由器 — 决策层
三种路由模式按场景自动选择:
- 确定性快速路径:机械检查、retry、已知 assignee
- Mode BAgent Handoff):执行者声明 next_capability
- Mode A(LLM 路由):首次分配、异常场景
v3.0: 去掉 LLMDriver(独立 OpenAI 客户端)。
模糊场景不再调独立 LLM,改为 delegate 庞统(L3 spawn,走 Gateway)。
"""
# 确定性 action type(不需要路由)
@@ -214,11 +72,9 @@ class AgentRouter:
def __init__(
self,
agent_profiles: Optional[Dict[str, AgentProfile]] = None,
llm_driver: Optional[LLMDriver] = None,
counter: Optional[Any] = None,
):
self.agent_profiles = agent_profiles or {}
self.llm_driver = llm_driver
self.counter = counter
# 构建 known capability 集合(从 profiles 中提取)
self._known_capabilities: Set[str] = set()
@@ -272,13 +128,11 @@ class AgentRouter:
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
# next_capability 合法但无匹配 → 降级 Mode A
logger.info("next_capability '%s' no match, falling back to LLM", next_cap)
logger.info("next_capability '%s' no match, delegate to coordinator", next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
if not lifecycle:
# review 以外的 action 也可能走生命周期
lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status"))
if lifecycle:
cap = lifecycle["capability"]
@@ -307,46 +161,17 @@ class AgentRouter:
latency_ms=int((time.monotonic() - start) * 1000),
)
# ── Mode A: LLM 路由 ──
if self.llm_driver:
decision = self.llm_driver.route(task_info, self.agent_profiles,
self._get_active_agents())
# 合法性校验
if decision.agent_id not in self.agent_profiles:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM returned invalid agent '{decision.agent_id}', fallback",
mode="fallback",
confidence=0.0,
model=decision.model,
latency_ms=latency,
)
if decision.confidence < 0.7:
latency = decision.latency_ms or int((time.monotonic() - start) * 1000)
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason=f"LLM low confidence ({decision.confidence}): {decision.reason}",
mode="fallback",
confidence=decision.confidence,
model=decision.model,
latency_ms=latency,
)
# 补充 latency
if not decision.latency_ms:
decision.latency_ms = int((time.monotonic() - start) * 1000)
return decision
# ── 最终 fallback ──
# ── 模糊场景: delegate 庞统(L3 spawn,走 Gateway──
return RouteDecision(
agent_id=self.FALLBACK_AGENT,
reason="No routing mode available, fallback to coordinator",
mode="fallback",
reason="Uncertain routing, delegate to coordinator",
mode="delegate",
confidence=0.0,
latency_ms=int((time.monotonic() - start) * 1000),
)
def _validate_capability(self, capability: str) -> bool:
"""BUG-2 修复:校验 next_capability 在已知集合内"""
"""校验 next_capability 在已知集合内"""
return capability in self._known_capabilities
def _match_capability(self, capability: str,
@@ -368,9 +193,3 @@ class AgentRouter:
return min(candidates,
key=lambda a: self.counter.active_agents.get(a, 0))
return candidates[0]
def _get_active_agents(self) -> Dict[str, int]:
"""获取当前活跃 Agent 负载"""
if self.counter:
return self.counter.active_agents
return {}