diff --git a/src/daemon/router.py b/src/daemon/router.py new file mode 100644 index 0000000..649f748 --- /dev/null +++ b/src/daemon/router.py @@ -0,0 +1,376 @@ +"""Agent 路由器 — 决策层(司马懿建议 1:Router/Dispatcher 分层) + +三种路由模式: + Mode A: LLM 路由(中心化)— 首次分配、异常升级 + Mode B: Agent 声明式交接(去中心化)— 执行者声明下一步需要什么 + Mode C: Agent 自主领活 — 未来演进 + +Router 只做决策,不做 spawn。返回 RouteDecision。 +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Set + +logger = logging.getLogger("moziplus-v2.router") + + +@dataclass +class RouteDecision: + """路由决策结果""" + agent_id: str + reason: str + mode: str # "deterministic" | "agent_handoff" | "llm_route" | "fallback" + confidence: float = 1.0 + model: Optional[str] = None # Mode A 时记录使用的模型 + latency_ms: int = 0 + + +@dataclass +class AgentProfile: + """Agent 能力画像""" + agent_id: str + capabilities: List[str] = field(default_factory=list) + can_review: bool = False + max_concurrent: int = 1 + is_fallback: bool = False + + +# 已知 capability 白名单(司马懿 BUG-2:next_capability 校验) +KNOWN_CAPABILITIES = frozenset({ + "coding", "implementation", "scripting", + "review", "quality_check", "debate", + "deploy", "infrastructure", "docker", "vnpy", + "risk", "compliance", "position_check", + "data", "acquisition", "cleaning", "verification", + "planning", "coordination", "escalation", "strategy", +}) + + +class LLMDriver: + """Mode A: bMAS Control Unit — 轻量 LLM 路由决策 + + 一次 LLM API 调用,~300 token prompt,~1-2s,返回路由决策。 + 不是 spawn Agent session,是 API 调用。 + """ + + def __init__(self, model: str, api_base: str = "", api_key: str = "", + timeout: float = 5.0, max_tokens: int = 200, + temperature: float = 0.1): + self.model = model + self.api_base = api_base + self.api_key = api_key + self.timeout = timeout + self.max_tokens = max_tokens + self.temperature = temperature + self._client = None + + def _get_client(self): + """延迟初始化 OpenAI client""" + if self._client is not None: + return self._client + try: + from openai import OpenAI + kwargs = {} + if self.api_base: + kwargs["base_url"] = self.api_base + if self.api_key: + kwargs["api_key"] = self.api_key + # 如果都没配,走 OpenClaw Gateway 的默认环境变量 + self._client = OpenAI(**kwargs) if kwargs else OpenAI() + except ImportError: + logger.warning("openai package not installed, LLM routing disabled") + self._client = None + return self._client + + def route(self, task_info: Dict[str, Any], + agent_profiles: Dict[str, AgentProfile], + active_agents: Dict[str, int]) -> RouteDecision: + """LLM 路由决策""" + start = time.monotonic() + client = self._get_client() + + if client is None: + return RouteDecision( + agent_id="pangtong-fujunshi", + reason="LLM driver unavailable, fallback", + mode="fallback", + confidence=0.0, + ) + + prompt = self._build_prompt(task_info, agent_profiles, active_agents) + + try: + response = client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + response_format={"type": "json_object"}, + max_tokens=self.max_tokens, + temperature=self.temperature, + timeout=self.timeout, + ) + raw = response.choices[0].message.content + result = json.loads(raw) + latency = int((time.monotonic() - start) * 1000) + + agent_id = result.get("agent_id", "") + reason = result.get("reason", "LLM routing") + confidence = float(result.get("confidence", 0.5)) + + return RouteDecision( + agent_id=agent_id, + reason=reason, + mode="llm_route", + confidence=confidence, + model=self.model, + latency_ms=latency, + ) + except json.JSONDecodeError: + latency = int((time.monotonic() - start) * 1000) + return RouteDecision( + agent_id="pangtong-fujunshi", + reason="LLM returned non-JSON, fallback", + mode="fallback", + confidence=0.0, + model=self.model, + latency_ms=latency, + ) + except Exception as e: + latency = int((time.monotonic() - start) * 1000) + logger.warning("LLM routing failed: %s", e) + return RouteDecision( + agent_id="pangtong-fujunshi", + reason=f"LLM routing error: {e}, fallback", + mode="fallback", + confidence=0.0, + model=self.model, + latency_ms=latency, + ) + + def _build_prompt(self, task_info: Dict[str, Any], + agent_profiles: Dict[str, AgentProfile], + active_agents: Dict[str, int]) -> str: + """构建路由 prompt""" + agents_block = [] + for aid, prof in agent_profiles.items(): + load = active_agents.get(aid, 0) + agents_block.append( + f"- {aid}: capabilities={prof.capabilities}, " + f"can_review={prof.can_review}, load={load}" + ) + agents_str = "\n".join(agents_block) + + return f"""你是任务路由器。根据任务需求和 Agent 能力,选择最合适的 Agent。 + +## 当前任务 +- ID: {task_info.get('id', '')} +- 标题: {task_info.get('title', '')} +- 状态: {task_info.get('status', '')} +- 类型: {task_info.get('task_type', '')} +- 描述: {task_info.get('description', '')} +- 当前执行者: {task_info.get('current_agent', '无')} + +## 可用 Agent +{agents_str} + +## 约束 +1. review/quality_check 不能选当前执行者 +2. 同等能力优先选负载最低的 +3. 必须匹配任务所需能力 +4. agent_id 必须是上面列出的 Agent 之一 + +## 输出(JSON) +{{"agent_id": "...", "reason": "...", "confidence": 0.0-1.0}}""" + + +class AgentRouter: + """Agent 路由器 — 决策层 + + 三种路由模式按场景自动选择: + - 确定性快速路径:机械检查、retry、已知 assignee + - Mode B(Agent Handoff):执行者声明 next_capability + - Mode A(LLM 路由):首次分配、异常场景 + """ + + # 确定性 action type(不需要路由) + LOCAL_ACTIONS = frozenset({ + "L1_guardrail", "format_check", "file_exists_check", + "dependency_advance", + }) + + # 生命周期流转的状态-能力映射 + LIFECYCLE_CAPABILITY = { + "review": {"capability": "review", "exclude_current": True}, + "failed": {"capability": "escalation", "exclude_current": False}, + } + + FALLBACK_AGENT = "pangtong-fujunshi" + + def __init__( + self, + agent_profiles: Optional[Dict[str, AgentProfile]] = None, + llm_driver: Optional[LLMDriver] = None, + counter: Optional[Any] = None, + ): + self.agent_profiles = agent_profiles or {} + self.llm_driver = llm_driver + self.counter = counter + # 构建 known capability 集合(从 profiles 中提取) + self._known_capabilities: Set[str] = set() + for prof in self.agent_profiles.values(): + self._known_capabilities.update(prof.capabilities) + + def route(self, task_info: Dict[str, Any], + action_type: str = "") -> RouteDecision: + """路由决策入口 + + Args: + task_info: 任务信息字典(需含 id, title, status, assignee, + current_agent, next_capability, task_type, description) + action_type: 动作类型 + + Returns: + RouteDecision + """ + start = time.monotonic() + + # ── 快速路径 1: 本地执行 ── + if action_type in self.LOCAL_ACTIONS: + return RouteDecision( + agent_id="daemon", + reason=f"Local action: {action_type}", + mode="deterministic", + latency_ms=int((time.monotonic() - start) * 1000), + ) + + # ── 快速路径 2: retry → 原执行者 ── + if action_type == "retry": + current = task_info.get("current_agent") or task_info.get("assignee") + if current and current in self.agent_profiles: + return RouteDecision( + agent_id=current, + reason="Retry original executor", + mode="deterministic", + latency_ms=int((time.monotonic() - start) * 1000), + ) + + # ── Mode B: Agent 声明式交接 ── + next_cap = task_info.get("next_capability") + if next_cap and self._validate_capability(next_cap): + current = task_info.get("current_agent") or task_info.get("assignee") + exclude = {current} if current else set() + matched = self._match_capability(next_cap, exclude) + if matched: + return RouteDecision( + agent_id=matched, + reason=f"Agent handoff: needs {next_cap}", + mode="agent_handoff", + latency_ms=int((time.monotonic() - start) * 1000), + ) + # next_capability 合法但无匹配 → 降级 Mode A + logger.info("next_capability '%s' no match, falling back to LLM", next_cap) + + # ── 快速路径 3: 生命周期流转查表 ── + lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type) + if not lifecycle: + # review 以外的 action 也可能走生命周期 + lifecycle = self.LIFECYCLE_CAPABILITY.get(task_info.get("status")) + if lifecycle: + cap = lifecycle["capability"] + exclude_current = lifecycle.get("exclude_current", False) + exclude = set() + if exclude_current: + current = task_info.get("current_agent") or task_info.get("assignee") + if current: + exclude.add(current) + matched = self._match_capability(cap, exclude) + if matched: + return RouteDecision( + agent_id=matched, + reason=f"Lifecycle: {action_type or task_info.get('status')} needs {cap}", + mode="deterministic", + latency_ms=int((time.monotonic() - start) * 1000), + ) + + # ── 快速路径 4: 有 assignee 且非生命周期流转 ── + assignee = task_info.get("assignee") + if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"): + return RouteDecision( + agent_id=assignee, + reason=f"Direct assignee: {assignee}", + mode="deterministic", + latency_ms=int((time.monotonic() - start) * 1000), + ) + + # ── Mode A: LLM 路由 ── + if self.llm_driver: + decision = self.llm_driver.route(task_info, self.agent_profiles, + self._get_active_agents()) + # 合法性校验 + if decision.agent_id not in self.agent_profiles: + latency = decision.latency_ms or int((time.monotonic() - start) * 1000) + return RouteDecision( + agent_id=self.FALLBACK_AGENT, + reason=f"LLM returned invalid agent '{decision.agent_id}', fallback", + mode="fallback", + confidence=0.0, + model=decision.model, + latency_ms=latency, + ) + if decision.confidence < 0.7: + latency = decision.latency_ms or int((time.monotonic() - start) * 1000) + return RouteDecision( + agent_id=self.FALLBACK_AGENT, + reason=f"LLM low confidence ({decision.confidence}): {decision.reason}", + mode="fallback", + confidence=decision.confidence, + model=decision.model, + latency_ms=latency, + ) + # 补充 latency + if not decision.latency_ms: + decision.latency_ms = int((time.monotonic() - start) * 1000) + return decision + + # ── 最终 fallback ── + return RouteDecision( + agent_id=self.FALLBACK_AGENT, + reason="No routing mode available, fallback to coordinator", + mode="fallback", + latency_ms=int((time.monotonic() - start) * 1000), + ) + + def _validate_capability(self, capability: str) -> bool: + """BUG-2 修复:校验 next_capability 在已知集合内""" + return capability in self._known_capabilities + + def _match_capability(self, capability: str, + exclude: Optional[Set[str]] = None) -> Optional[str]: + """从能力画像中匹配 Agent""" + candidates = [] + for aid, prof in self.agent_profiles.items(): + if aid in (exclude or set()): + continue + if capability in prof.capabilities: + candidates.append(aid) + + if not candidates: + return None + if len(candidates) == 1: + return candidates[0] + # 多候选:选负载最低的 + if self.counter: + return min(candidates, + key=lambda a: self.counter.active_agents.get(a, 0)) + return candidates[0] + + def _get_active_agents(self) -> Dict[str, int]: + """获取当前活跃 Agent 负载""" + if self.counter: + return self.counter.active_agents + return {}