"""Agent 调度器 — 执行层(司马懿建议 1:Router/Dispatcher 分层) Dispatcher 负责: 1. 从 Router 获取路由决策 2. 执行 spawn(通过 Spawner) 3. 更新 counter(并发控制) 4. 写路由审计日志(routing_decisions) 路由决策全部委托给 AgentRouter。 """ from __future__ import annotations import json import logging import sqlite3 from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional from src.blackboard.models import Task from src.blackboard.db import get_connection from src.daemon.router import AgentRouter, RouteDecision logger = logging.getLogger("moziplus-v2.dispatcher") class DispatchLevel(str, Enum): """调度级别""" LOCAL = "local" # Daemon 本地执行 FULL_AGENT = "full" # Full Agent spawn SUB_AGENT = "sub" # Subagent spawn ESCALATE = "escalate" # 升级庞统 BLOCKED = "blocked" # 安全红线拦截 class Dispatcher: """Agent 调度执行器 v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。 """ def __init__( self, router: Optional[AgentRouter] = None, spawner: Optional[Any] = None, counter: Optional[Any] = None, db_path: Optional[Path] = None, guardrails: Optional[Any] = None, # 兼容旧接口(deprecated,逐步移除) registered_agents: Optional[List[str]] = None, capability_map: Optional[Dict[str, List[str]]] = None, ): self.router = router or AgentRouter() self.spawner = spawner self.counter = counter self.db_path = db_path self.guardrails = guardrails # 兼容:如果没有 router,用旧逻辑(临时) self._legacy_mode = router is None if self._legacy_mode: self.registered_agents = set(registered_agents or []) self.capability_map = capability_map or {} logger.warning("Dispatcher running in legacy mode (no AgentRouter)") def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: """调度决策(委托给 Router) Returns: {"level": DispatchLevel, "agent_id": str, "reason": str, ...} """ if self._legacy_mode: return self._legacy_decide(task, action_type) # 构建 task_info 给 Router task_info = { "id": task.id, "title": task.title, "description": task.description, "status": task.status, "assignee": task.assignee, "task_type": getattr(task, 'task_type', ''), "current_agent": getattr(task, 'current_agent', None), "next_capability": getattr(task, 'next_capability', None), } decision = self.router.route(task_info, action_type) # Router 返回 agent_id="daemon" → 本地执行 if decision.agent_id == "daemon": return { "level": DispatchLevel.LOCAL, "agent_id": "daemon", "reason": decision.reason, "mode": decision.mode, } # 判断是否升级(fallback 庞统) level = DispatchLevel.FULL_AGENT if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi": level = DispatchLevel.ESCALATE return { "level": level, "agent_id": decision.agent_id, "reason": decision.reason, "mode": decision.mode, "confidence": decision.confidence, "routed_by": decision.mode, "model": getattr(decision, "model", None), "latency_ms": decision.latency_ms, } async def dispatch(self, task: Task, action_type: str = "", project_config: Optional[Dict] = None) -> Dict[str, Any]: """执行调度(决策 + spawn + 审计日志) Returns: {"level": str, "agent_id": str, "session_id": Optional[str], "status": "dispatched"|"skipped"|"error"|"blocked", "reason": str} """ # 安全红线检查(调度前拦截) # Mail 是 Agent 间通信,不做 guardrail 检查 is_mail = project_config.get("project_id") == "_mail" if project_config else False if self.guardrails and not is_mail: violations = self.guardrails.check_task(task) critical = [v for v in violations if v.action in ("block_and_notify", "terminate_and_escalate")] if critical: v = critical[0] logger.warning("Task '%s' blocked by guardrail: %s - %s", task.title, v.rule_id, v.message) # 写入黑板事件 _routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path if _routing_db: self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none", "reason": v.message}, "blocked", v.message, _routing_db) return { "level": "blocked", "agent_id": "none", "session_id": None, "status": "blocked", "reason": v.message, "violations": [v.rule_id for v in violations], } if self._legacy_mode: return await self._legacy_dispatch(task, action_type, project_config) decision = self.decide(task, action_type) level = decision["level"] # 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB) _routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None agent_id = decision["agent_id"] # 检查并发限制 if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not await self.counter.can_acquire(agent_id): self._record_routing(task, decision, "skipped", "Agent busy", _routing_db) return { "level": level.value, "agent_id": agent_id, "session_id": None, "status": "skipped", "reason": "Agent busy (concurrent limit)", } # 本地执行 if level == DispatchLevel.LOCAL: self._record_routing(task, decision, "dispatched", None, _routing_db) return { "level": level.value, "agent_id": "daemon", "session_id": None, "status": "dispatched", "reason": decision["reason"], } # Full Agent / Escalate spawn if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not self.spawner: self._record_routing(task, decision, "error", "No spawner", _routing_db) return { "level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "No spawner configured", } try: if self.counter: await self.counter.acquire(agent_id) # [v2.7.1] Mail: spawn 前系统标 working is_mail = project_config.get("project_id") == "_mail" if project_config else False if is_mail: db_path = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None if not db_path or not self._mail_auto_working(task.id, db_path): if self.counter: self.counter.release(agent_id) self._record_routing(task, decision, "error", "mail working failed", _routing_db) return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "mail_auto_working_failed"} # 构建 spawn message message = self._build_spawn_message(task, agent_id, project_config, mode=decision.get("mode", "")) # [v2.7.1] Mail: on_complete 增强自动标 done/failed on_complete = None if is_mail: _task_id = task.id _agent_id = agent_id _mail_db = db_path _must_haves = task.must_haves or "" _counter = self.counter _dispatcher = self def _mail_on_complete(aid, outcome): # 先 release counter if _counter: _counter.release(aid) # 自动标 done/failed(幻觉门控) # 不管 outcome 是什么,都检查是否有回复 try: _dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves) except Exception as e: logger.error("Mail %s: on_complete error: %s", _task_id, e) on_complete = _mail_on_complete else: on_complete = ( lambda aid, _outcome: self.counter.release(aid) ) if self.counter else None session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=message, new_session=(level == DispatchLevel.ESCALATE), task_id=task.id, on_complete=on_complete, use_main_session=is_mail, task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None, ) self._record_routing(task, decision, "dispatched", f"session={session_id}", _routing_db) return { "level": level.value, "agent_id": agent_id, "session_id": session_id, "status": "dispatched", "reason": decision["reason"], } except Exception as e: if self.counter: self.counter.release(agent_id) self._record_routing(task, decision, "error", str(e), _routing_db) return { "level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": str(e), } return { "level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "Unknown dispatch level", } def _build_spawn_message(self, task: Task, agent_id: str, project_config: Optional[Dict], mode: str = "") -> str: """构建 Agent spawn 消息 Args: mode: 路由模式("delegate" 时生成协调员 prompt) """ # delegate 模式:生成协调员分配 prompt if mode == "delegate": return self._build_delegate_prompt(task, project_config) if hasattr(self.spawner, 'build_spawn_message') and project_config: retry_ctx = self._build_retry_context(task) return self.spawner.build_spawn_message( task_id=task.id, title=task.title, description=task.description or "", task_type=getattr(task, 'task_type', '') or "", priority=task.priority, must_haves=task.must_haves or "", project_id=project_config.get("project_id", ""), agent_id=agent_id, current_status=task.status or "claimed", retry_context=retry_ctx, ) # fallback parts = [f"Task: {task.title}"] if task.description: parts.append(f"Description: {task.description}") if task.must_haves: parts.append(f"Must-haves: {task.must_haves}") return "\n".join(parts) def _build_delegate_prompt(self, task: Task, project_config: Optional[Dict]) -> str: """构建 delegate 模式的 prompt(协调员分配任务)""" api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1' api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083 project_id = project_config.get("project_id", "") if project_config else "" return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。 ## 任务信息 - 项目: {project_id} - 任务ID: {task.id} - 标题: {task.title} - 描述: {task.description or '(无描述)'} - 类型: {getattr(task, 'task_type', '') or 'general'} - 优先级: {task.priority} ## 团队 - 张飞(zhangfei-dev): 编码、实现、脚本 - 司马懿(simayi-challenger): 审查、质量检查、辩论 - 关羽(guanyu-dev): 风控、合规 - 赵云(zhaoyun-data): 数据获取、清洗、验证 - 姜维(jiangwei-infra): 部署、基础设施、Docker、vnpy - 庞统(pangtong-fujunshi): 规划、协调、策略 ## 操作 1. 分析任务需求,选择最合适的 Agent 2. 通过 API 回写分配结果: curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task.id}/status \\ -H 'Content-Type: application/json' \\ -d '{{"status": "claimed", "agent": ""}}' 3. 如果你自己能做,直接认领并执行(状态改为 working) """ def _build_retry_context(self, task: Task) -> str: """构建重试上下文""" if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0: return "" parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"] parts.append(f"这是第 {task.retry_count + 1} 次尝试。") return "\n".join(parts) def _record_routing(self, task: Task, decision: Dict[str, Any], outcome: str, detail: Optional[str], override_db_path: Optional[Path] = None) -> None: """写路由审计日志到 routing_decisions 表""" effective_db = override_db_path or self.db_path if not effective_db: return try: conn = sqlite3.connect(str(effective_db)) conn.row_factory = sqlite3.Row try: conn.execute("BEGIN IMMEDIATE") conn.execute( "INSERT INTO routing_decisions " "(task_id, from_status, to_status, mode, selected_agent, " " previous_agent, reason, confidence, model, latency_ms, " " task_type, requested_capability, outcome, detail) " "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ( task.id, task.status, None, # to_status 在 dispatch 时还不知道 decision.get("mode", ""), decision.get("agent_id", ""), getattr(task, 'current_agent', None) or task.assignee, decision.get("reason", ""), decision.get("confidence"), decision.get("model"), decision.get("latency_ms"), getattr(task, 'task_type', ''), getattr(task, 'next_capability', ''), outcome, detail, ), ) conn.commit() finally: conn.close() except Exception: logger.debug("Failed to record routing decision", exc_info=True) # ── 批量决策(兼容接口) ── def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]: """批量决策(不 spawn,只返回决策列表)""" results = [] for task in tasks: decision = self.decide(task) results.append({"task_id": task.id, **decision}) return results # ── Legacy 兼容(deprecated) ── def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]: """旧版三级决策树(兼容过渡用)""" LOCAL_ACTIONS = frozenset({ "L1_guardrail", "format_check", "file_exists_check", "dependency_advance", }) assignee = task.assignee if action_type in LOCAL_ACTIONS: return {"level": DispatchLevel.LOCAL, "agent_id": "daemon", "reason": f"Local action: {action_type}"} if assignee and assignee in self.registered_agents: new_session = action_type == "adjudication" return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee, "new_session": new_session, "reason": f"Registered agent: {assignee}"} if not assignee: agent_id = self._resolve_by_capability(task) return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id, "reason": f"Auto-assigned via capability_map: {agent_id}"} return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi", "new_session": True, "reason": f"Unknown agent '{assignee}', escalate to pangtong"} def _resolve_by_capability(self, task: Task) -> str: """旧版能力映射""" task_type = getattr(task, 'task_type', '') or '' if task_type in self.capability_map: candidates = self.capability_map[task_type] registered = [a for a in candidates if a in self.registered_agents] if registered: if self.counter and len(registered) > 1: return min(registered, key=lambda a: self.counter.active_agents.get(a, 0)) return registered[0] return "pangtong-fujunshi" async def _legacy_dispatch(self, task, action_type="", project_config=None): """旧版 dispatch(兼容过渡用)""" decision = self._legacy_decide(task, action_type) level = decision["level"] agent_id = decision["agent_id"] if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not await self.counter.can_acquire(agent_id): return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "skipped", "reason": "Agent busy (concurrent limit)"} if level == DispatchLevel.LOCAL: return {"level": level.value, "agent_id": "daemon", "session_id": None, "status": "dispatched", "reason": decision["reason"]} if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE): if not self.spawner: return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "No spawner configured"} try: if self.counter: await self.counter.acquire(agent_id) # [v2.7.1] Mail: spawn 前系统标 working is_mail_legacy = project_config.get("project_id") == "_mail" if project_config else False if is_mail_legacy: db_path_legacy = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None if not db_path_legacy or not self._mail_auto_working(task.id, db_path_legacy): if self.counter: self.counter.release(agent_id) return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "mail_auto_working_failed"} if hasattr(self.spawner, 'build_spawn_message') and project_config: retry_ctx = self._build_retry_context(task) message = self.spawner.build_spawn_message( task_id=task.id, title=task.title, description=task.description or "", task_type=getattr(task, 'task_type', '') or "", priority=task.priority, must_haves=task.must_haves or "", project_id=project_config.get("project_id", ""), agent_id=agent_id, current_status=task.status or "claimed", retry_context=retry_ctx, ) else: message = f"Task: {task.title}" # [v2.7.1] Mail: on_complete 增强 on_complete_legacy = None if is_mail_legacy: _t_id = task.id _a_id = agent_id _m_db = db_path_legacy _m_mh = task.must_haves or "" _ct = self.counter _disp = self def _mail_oc_legacy(aid, outcome): if _ct: _ct.release(aid) try: _disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh) except Exception as e: logger.error("Mail %s: legacy on_complete error: %s", _t_id, e) on_complete_legacy = _mail_oc_legacy else: on_complete_legacy = ( lambda aid, _outcome: self.counter.release(aid) ) if self.counter else None session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=message, new_session=decision.get("new_session", False), task_id=task.id, on_complete=on_complete_legacy, use_main_session=is_mail_legacy, task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None, ) return {"level": level.value, "agent_id": agent_id, "session_id": session_id, "status": "dispatched", "reason": decision["reason"]} except Exception as e: if self.counter: self.counter.release(agent_id) return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": str(e)} return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "Unknown dispatch level"} # ── Mail 信封/载荷分离辅助方法 ── def _mail_auto_working(self, task_id: str, db_path: Path) -> bool: """Mail 任务:系统自动标 working(spawn 前) Mail 不需要 claimed 中间态,直接 pending → working。 Returns: True=标成功, False=标失败(需中止 spawn) """ try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: logger.warning("Mail %s: cannot mark working (task not found)", task_id) return False if row["status"] not in ("pending", "claimed"): logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)", task_id, row["status"]) return False conn.execute( "UPDATE tasks SET status='working', updated_at=datetime('now') WHERE id=?", (task_id,), ) conn.commit() logger.info("Mail %s: auto-marked working (system, was %s)", task_id, row["status"]) return True finally: conn.close() except Exception as e: logger.error("Mail %s: failed to mark working: %s", task_id, e) return False def _mail_auto_complete(self, task_id: str, agent_id: str, db_path: Path, must_haves: str) -> None: """Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)""" try: # 解析 performative performative = "request" try: meta = json.loads(must_haves) if must_haves else {} performative = meta.get("performative", meta.get("type", "request")) except Exception: pass # request 类型:幻觉门控验证 if performative == "request": has_reply = self._mail_check_reply(task_id, db_path) if not has_reply: # 不直接标 failed,留 working 等 ticker 下一轮再查 logger.warning("Mail %s: no reply found on on_complete, " "leaving working for ticker recheck", task_id) return # 标 done(重试 3 次) for attempt in range(3): try: conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() if not row: return if row["status"] == "working": conn.execute( "UPDATE tasks SET status='done', updated_at=datetime('now') WHERE id=?", (task_id,), ) conn.commit() logger.info("Mail %s: auto-marked done (system, performative=%s)", task_id, performative) return finally: conn.close() except Exception as e: logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e) # 3 次都失败,留 working 等 ticker 超时兜底 logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id) except Exception as e: logger.error("Mail %s: auto-complete error: %s", task_id, e) def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool: """幻觉门控:检查是否有回复邮件(in_reply_to = original_task_id)""" try: conn = get_connection(db_path) try: # 查 must_haves JSON 里包含 in_reply_to = original_task_id 的记录 row = conn.execute( "SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1", (original_task_id, f'%{original_task_id}%'), ).fetchone() return row is not None finally: conn.close() except Exception as e: logger.error("Mail %s: reply check error: %s", original_task_id, e) # 查询失败时保守处理:假设有回复(避免误标 failed) return True