diff --git a/src/daemon/router.py b/src/daemon/router.py index 649f748..96a66e4 100644 --- a/src/daemon/router.py +++ b/src/daemon/router.py @@ -1,17 +1,14 @@ -"""Agent 路由器 — 决策层(司马懿建议 1:Router/Dispatcher 分层) +"""Agent 路由器 — 决策层(v3.0: 去掉独立 LLM,模糊场景 delegate 庞统) -三种路由模式: - Mode A: LLM 路由(中心化)— 首次分配、异常升级 - Mode B: Agent 声明式交接(去中心化)— 执行者声明下一步需要什么 - Mode C: Agent 自主领活 — 未来演进 +路由模式: + 确定性路由:能力匹配、retry、handoff、assignee(纯 L1,不调 LLM) + 模糊路由:delegate 庞统(L3 spawn,通过 Gateway) Router 只做决策,不做 spawn。返回 RouteDecision。 """ from __future__ import annotations -import asyncio -import json import logging import time from dataclasses import dataclass, field @@ -25,9 +22,8 @@ class RouteDecision: """路由决策结果""" agent_id: str reason: str - mode: str # "deterministic" | "agent_handoff" | "llm_route" | "fallback" + mode: str # "deterministic" | "agent_handoff" | "delegate" | "fallback" confidence: float = 1.0 - model: Optional[str] = None # Mode A 时记录使用的模型 latency_ms: int = 0 @@ -52,149 +48,11 @@ KNOWN_CAPABILITIES = frozenset({ }) -class LLMDriver: - """Mode A: bMAS Control Unit — 轻量 LLM 路由决策 - - 一次 LLM API 调用,~300 token prompt,~1-2s,返回路由决策。 - 不是 spawn Agent session,是 API 调用。 - """ - - def __init__(self, model: str, api_base: str = "", api_key: str = "", - timeout: float = 5.0, max_tokens: int = 200, - temperature: float = 0.1): - self.model = model - self.api_base = api_base - self.api_key = api_key - self.timeout = timeout - self.max_tokens = max_tokens - self.temperature = temperature - self._client = None - - def _get_client(self): - """延迟初始化 OpenAI client""" - if self._client is not None: - return self._client - try: - from openai import OpenAI - kwargs = {} - if self.api_base: - kwargs["base_url"] = self.api_base - if self.api_key: - kwargs["api_key"] = self.api_key - # 如果都没配,走 OpenClaw Gateway 的默认环境变量 - self._client = OpenAI(**kwargs) if kwargs else OpenAI() - except ImportError: - logger.warning("openai package not installed, LLM routing disabled") - self._client = None - return self._client - - def route(self, task_info: Dict[str, Any], - agent_profiles: Dict[str, AgentProfile], - active_agents: Dict[str, int]) -> RouteDecision: - """LLM 路由决策""" - start = time.monotonic() - client = self._get_client() - - if client is None: - return RouteDecision( - agent_id="pangtong-fujunshi", - reason="LLM driver unavailable, fallback", - mode="fallback", - confidence=0.0, - ) - - prompt = self._build_prompt(task_info, agent_profiles, active_agents) - - try: - response = client.chat.completions.create( - model=self.model, - messages=[{"role": "user", "content": prompt}], - response_format={"type": "json_object"}, - max_tokens=self.max_tokens, - temperature=self.temperature, - timeout=self.timeout, - ) - raw = response.choices[0].message.content - result = json.loads(raw) - latency = int((time.monotonic() - start) * 1000) - - agent_id = result.get("agent_id", "") - reason = result.get("reason", "LLM routing") - confidence = float(result.get("confidence", 0.5)) - - return RouteDecision( - agent_id=agent_id, - reason=reason, - mode="llm_route", - confidence=confidence, - model=self.model, - latency_ms=latency, - ) - except json.JSONDecodeError: - latency = int((time.monotonic() - start) * 1000) - return RouteDecision( - agent_id="pangtong-fujunshi", - reason="LLM returned non-JSON, fallback", - mode="fallback", - confidence=0.0, - model=self.model, - latency_ms=latency, - ) - except Exception as e: - latency = int((time.monotonic() - start) * 1000) - logger.warning("LLM routing failed: %s", e) - return RouteDecision( - agent_id="pangtong-fujunshi", - reason=f"LLM routing error: {e}, fallback", - mode="fallback", - confidence=0.0, - model=self.model, - latency_ms=latency, - ) - - def _build_prompt(self, task_info: Dict[str, Any], - agent_profiles: Dict[str, AgentProfile], - active_agents: Dict[str, int]) -> str: - """构建路由 prompt""" - agents_block = [] - for aid, prof in agent_profiles.items(): - load = active_agents.get(aid, 0) - agents_block.append( - f"- {aid}: capabilities={prof.capabilities}, " - f"can_review={prof.can_review}, load={load}" - ) - agents_str = "\n".join(agents_block) - - return f"""你是任务路由器。根据任务需求和 Agent 能力,选择最合适的 Agent。 - -## 当前任务 -- ID: {task_info.get('id', '')} -- 标题: {task_info.get('title', '')} -- 状态: {task_info.get('status', '')} -- 类型: {task_info.get('task_type', '')} -- 描述: {task_info.get('description', '')} -- 当前执行者: {task_info.get('current_agent', '无')} - -## 可用 Agent -{agents_str} - -## 约束 -1. review/quality_check 不能选当前执行者 -2. 同等能力优先选负载最低的 -3. 必须匹配任务所需能力 -4. agent_id 必须是上面列出的 Agent 之一 - -## 输出(JSON) -{{"agent_id": "...", "reason": "...", "confidence": 0.0-1.0}}""" - - class AgentRouter: """Agent 路由器 — 决策层 - 三种路由模式按场景自动选择: - - 确定性快速路径:机械检查、retry、已知 assignee - - Mode B(Agent Handoff):执行者声明 next_capability - - Mode A(LLM 路由):首次分配、异常场景 + v3.0: 去掉 LLMDriver(独立 OpenAI 客户端)。 + 模糊场景不再调独立 LLM,改为 delegate 庞统(L3 spawn,走 Gateway)。 """ # 确定性 action type(不需要路由) @@ -214,11 +72,9 @@ class AgentRouter: def __init__( self, agent_profiles: Optional[Dict[str, AgentProfile]] = None, - llm_driver: Optional[LLMDriver] = None, counter: Optional[Any] = None, ): self.agent_profiles = agent_profiles or {} - self.llm_driver = llm_driver self.counter = counter # 构建 known capability 集合(从 profiles 中提取) self._known_capabilities: Set[str] = set() @@ -272,13 +128,11 @@ class AgentRouter: mode="agent_handoff", latency_ms=int((time.monotonic() - start) * 1000), ) - # next_capability 合法但无匹配 → 降级 Mode A - logger.info("next_capability '%s' no match, falling back to LLM", next_cap) + logger.info("next_capability '%s' no match, delegate to coordinator", next_cap) # ── 快速路径 3: 生命周期流转查表 ── lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type) if not lifecycle: - # review 以外的 action 也可能走生命周期 lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status")) if lifecycle: cap = lifecycle["capability"] @@ -307,46 +161,17 @@ class AgentRouter: latency_ms=int((time.monotonic() - start) * 1000), ) - # ── Mode A: LLM 路由 ── - if self.llm_driver: - decision = self.llm_driver.route(task_info, self.agent_profiles, - self._get_active_agents()) - # 合法性校验 - if decision.agent_id not in self.agent_profiles: - latency = decision.latency_ms or int((time.monotonic() - start) * 1000) - return RouteDecision( - agent_id=self.FALLBACK_AGENT, - reason=f"LLM returned invalid agent '{decision.agent_id}', fallback", - mode="fallback", - confidence=0.0, - model=decision.model, - latency_ms=latency, - ) - if decision.confidence < 0.7: - latency = decision.latency_ms or int((time.monotonic() - start) * 1000) - return RouteDecision( - agent_id=self.FALLBACK_AGENT, - reason=f"LLM low confidence ({decision.confidence}): {decision.reason}", - mode="fallback", - confidence=decision.confidence, - model=decision.model, - latency_ms=latency, - ) - # 补充 latency - if not decision.latency_ms: - decision.latency_ms = int((time.monotonic() - start) * 1000) - return decision - - # ── 最终 fallback ── + # ── 模糊场景: delegate 庞统(L3 spawn,走 Gateway)── return RouteDecision( agent_id=self.FALLBACK_AGENT, - reason="No routing mode available, fallback to coordinator", - mode="fallback", + reason="Uncertain routing, delegate to coordinator", + mode="delegate", + confidence=0.0, latency_ms=int((time.monotonic() - start) * 1000), ) def _validate_capability(self, capability: str) -> bool: - """BUG-2 修复:校验 next_capability 在已知集合内""" + """校验 next_capability 在已知集合内""" return capability in self._known_capabilities def _match_capability(self, capability: str, @@ -368,9 +193,3 @@ class AgentRouter: return min(candidates, key=lambda a: self.counter.active_agents.get(a, 0)) return candidates[0] - - def _get_active_agents(self) -> Dict[str, int]: - """获取当前活跃 Agent 负载""" - if self.counter: - return self.counter.active_agents - return {}