auto-sync: 2026-05-17 21:14:20

This commit is contained in:
cfdaily
2026-05-17 21:14:21 +08:00
parent 45b24e9336
commit af2382ddcb
+244 -171
View File
@@ -1,22 +1,26 @@
"""Agent 调度器 — 三级决策树
"""Agent 调度器 — 执行层(司马懿建议 1Router/Dispatcher 分层)
Level 1: 纯机械检查 → Daemon 本地执行
Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec)
Level 3: 无名字的一次性任务 → Subagent (sessions_spawn)
Level 4: 未知 → 庞统裁决
Dispatcher 负责:
1. 从 Router 获取路由决策
2. 执行 spawn(通过 Spawner
3. 更新 counter(并发控制)
4. 写路由审计日志(routing_decisions
调度器只做决策,实际 spawn 由 spawner.py 执行
路由决策全部委托给 AgentRouter
"""
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.queries import Queries
from src.daemon.router import AgentRouter, RouteDecision
logger = logging.getLogger("moziplus-v2.dispatcher")
@@ -30,86 +34,90 @@ class DispatchLevel(str, Enum):
class Dispatcher:
"""Agent 调度决策"""
"""Agent 调度执行
# L1 本地执行的 action type
LOCAL_ACTIONS = frozenset({
"L1_guardrail",
"format_check",
"file_exists_check",
"dependency_advance",
})
v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。
"""
def __init__(
self,
registered_agents: Optional[List[str]] = None,
router: Optional[AgentRouter] = None,
spawner: Optional[Any] = None,
counter: Optional[Any] = None,
db_path: Optional[Path] = None,
# 兼容旧接口(deprecated,逐步移除)
registered_agents: Optional[List[str]] = None,
capability_map: Optional[Dict[str, List[str]]] = None,
):
"""
Args:
registered_agents: 项目注册的 Agent ID 列表
spawner: AgentSpawner 实例(执行实际 spawn
counter: ActiveAgentCounter 实例(控制并发)
capability_map: task_type → [agent_id] 映射(无 assignee 时的自动分配)
"""
self.registered_agents = set(registered_agents or [])
self.capability_map = capability_map or {}
self.router = router or AgentRouter()
self.spawner = spawner
self.counter = counter
self.db_path = db_path
# 兼容:如果没有 router,用旧逻辑(临时)
self._legacy_mode = router is None
if self._legacy_mode:
self.registered_agents = set(registered_agents or [])
self.capability_map = capability_map or {}
logger.warning("Dispatcher running in legacy mode (no AgentRouter)")
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策
"""调度决策(委托给 Router
Returns:
{"level": DispatchLevel, "agent_id": str, "reason": str}
{"level": DispatchLevel, "agent_id": str, "reason": str, ...}
"""
assignee = task.assignee
if self._legacy_mode:
return self._legacy_decide(task, action_type)
# Level 1: 纯机械检查 → 本地执行
if action_type in self.LOCAL_ACTIONS:
# 构建 task_info 给 Router
task_info = {
"id": task.id,
"title": task.title,
"description": task.description,
"status": task.status,
"assignee": task.assignee,
"task_type": getattr(task, 'task_type', ''),
"current_agent": getattr(task, 'current_agent', None),
"next_capability": getattr(task, 'next_capability', None),
}
decision = self.router.route(task_info, action_type)
# Router 返回 agent_id="daemon" → 本地执行
if decision.agent_id == "daemon":
return {
"level": DispatchLevel.LOCAL,
"agent_id": "daemon",
"reason": f"Local action: {action_type}",
"reason": decision.reason,
"mode": decision.mode,
}
# Level 2: 有名字的注册角色 → Full Agent
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {
"level": DispatchLevel.FULL_AGENT,
"agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}",
}
# 判断是否升级(fallback 庞统)
level = DispatchLevel.FULL_AGENT
if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi":
level = DispatchLevel.ESCALATE
# Level 3: 无 assignee → 能力映射或 fallback 庞统
if not assignee:
agent_id = self._resolve_by_capability(task)
return {
"level": DispatchLevel.FULL_AGENT,
"agent_id": agent_id,
"reason": f"Auto-assigned via capability_map: {agent_id}",
}
# Level 4: 有 assignee 但未注册 → 升级庞统
return {
"level": DispatchLevel.ESCALATE,
"agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong",
"level": level,
"agent_id": decision.agent_id,
"reason": decision.reason,
"mode": decision.mode,
"confidence": decision.confidence,
"routed_by": decision.mode,
}
async def dispatch(self, task: Task, action_type: str = "",
project_config: Optional[Dict] = None) -> Dict[str, Any]:
"""执行调度(决策 + spawn
"""执行调度(决策 + spawn + 审计日志
Returns:
{"level": str, "agent_id": str, "session_id": Optional[str],
"status": "dispatched"|"skipped"|"error", "reason": str}
"""
if self._legacy_mode:
return await self._legacy_dispatch(task, action_type, project_config)
decision = self.decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
@@ -117,6 +125,7 @@ class Dispatcher:
# 检查并发限制
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not await self.counter.can_acquire(agent_id):
self._record_routing(task, decision, "skipped", "Agent busy")
return {
"level": level.value,
"agent_id": agent_id,
@@ -127,6 +136,7 @@ class Dispatcher:
# 本地执行
if level == DispatchLevel.LOCAL:
self._record_routing(task, decision, "dispatched", None)
return {
"level": level.value,
"agent_id": "daemon",
@@ -135,9 +145,10 @@ class Dispatcher:
"reason": decision["reason"],
}
# Full Agent spawn
# Full Agent / Escalate spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
self._record_routing(task, decision, "error", "No spawner")
return {
"level": level.value,
"agent_id": agent_id,
@@ -149,30 +160,20 @@ class Dispatcher:
try:
if self.counter:
await self.counter.acquire(agent_id)
# 优先使用 spawner 的 prompt 模板
if hasattr(self.spawner, 'build_spawn_message') and project_config:
# 构建重试上下文(如果有前轮审查意见)
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id,
title=task.title,
description=task.description or "",
task_type=task.task_type or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
)
else:
message = self._build_message(task, action_type)
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config)
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
new_session=decision.get("new_session", False),
new_session=(level == DispatchLevel.ESCALATE),
task_id=task.id,
)
self._record_routing(task, decision, "dispatched",
f"session={session_id}")
return {
"level": level.value,
"agent_id": agent_id,
@@ -183,6 +184,7 @@ class Dispatcher:
except Exception as e:
if self.counter:
self.counter.release(agent_id)
self._record_routing(task, decision, "error", str(e))
return {
"level": level.value,
"agent_id": agent_id,
@@ -191,38 +193,6 @@ class Dispatcher:
"reason": str(e),
}
# Subagent spawn
if level == DispatchLevel.SUB_AGENT:
if not self.spawner:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
session_id = await self.spawner.spawn_subagent(
task_description=self._build_message(task, action_type),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": "subagent",
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": str(e),
}
return {
"level": level.value,
"agent_id": agent_id,
@@ -231,82 +201,185 @@ class Dispatcher:
"reason": "Unknown dispatch level",
}
def _resolve_by_capability(self, task: Task) -> str:
"""两级路由:capability_map 匹配 → fallback 庞统"""
task_type = getattr(task, 'task_type', '') or ''
# Tier 1: task_type → capability_map
if task_type in self.capability_map:
candidates = self.capability_map[task_type]
# 过滤只保留注册的 Agent
registered = [a for a in candidates if a in self.registered_agents]
if registered:
# 多候选时选负载最低的
if self.counter and len(registered) > 1:
best = min(registered,
key=lambda a: self.counter.active_agents.get(a, 0))
return best
return registered[0]
# Tier 2: fallback 庞统
return "pangtong-fujunshi"
def _build_spawn_message(self, task: Task, agent_id: str,
project_config: Optional[Dict]) -> str:
"""构建 Agent spawn 消息"""
if hasattr(self.spawner, 'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
return self.spawner.build_spawn_message(
task_id=task.id,
title=task.title,
description=task.description or "",
task_type=getattr(task, 'task_type', '') or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
)
# fallback
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if task.must_haves:
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
def _build_retry_context(self, task: Task) -> str:
"""构建重试上下文(前轮产出摘要 + 审查意见)
如果任务是从 failed/pending 重新调度(有历史 attempts),
注入前轮的失败原因,让 Agent 知道上轮哪里不对。
"""
"""构建重试上下文"""
if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0:
return ""
parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"]
parts.append(f"这是第 {task.retry_count + 1} 次尝试。上次执行的问题:")
parts.append(f"这是第 {task.retry_count + 1} 次尝试。")
return "\n".join(parts)
# 从 task_attempts 查上次 attempt 的 summary/outcome
if hasattr(self, '_queries') and self._queries:
def _record_routing(self, task: Task, decision: Dict[str, Any],
outcome: str, detail: Optional[str]) -> None:
"""写路由审计日志到 routing_decisions 表"""
if not self.db_path:
return
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
try:
rows = self._queries.conn.execute(
"SELECT outcome, summary, agent FROM task_attempts "
"WHERE task_id = ? ORDER BY attempt_number DESC LIMIT 1",
(task.id,)
).fetchall()
if rows:
outcome, summary, agent = rows[0]
if outcome:
parts.append(f"上轮结果: {outcome}")
if summary:
parts.append(f"上轮说明: {summary}")
if agent:
parts.append(f"上轮执行者: {agent}")
except Exception:
pass
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO routing_decisions "
"(task_id, from_status, to_status, mode, selected_agent, "
" previous_agent, reason, confidence, model, latency_ms, "
" task_type, requested_capability, outcome, detail) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
task.id,
task.status,
None, # to_status 在 dispatch 时还不知道
decision.get("mode", ""),
decision.get("agent_id", ""),
getattr(task, 'current_agent', None) or task.assignee,
decision.get("reason", ""),
decision.get("confidence"),
decision.get("routed_by") == "llm_route" and None,
None,
getattr(task, 'task_type', ''),
getattr(task, 'next_capability', ''),
outcome,
detail,
),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.debug("Failed to record routing decision", exc_info=True)
parts.append("请仔细检查上次的错误,避免重复犯错。")
return "\n".join(parts)
def _build_message(self, task: Task, action_type: str) -> str:
"""构建给 Agent 的消息"""
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if action_type:
parts.append(f"Action: {action_type}")
if task.must_haves:
import json
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
# ── 批量决策(兼容接口) ──
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
"""批量决策(不 spawn,只返回决策列表)
用于 ticker 扫描可调度任务后批量判断。
"""
"""批量决策(不 spawn,只返回决策列表)"""
results = []
for task in tasks:
decision = self.decide(task)
results.append({
"task_id": task.id,
**decision,
})
results.append({"task_id": task.id, **decision})
return results
# ── Legacy 兼容(deprecated ──
def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""旧版三级决策树(兼容过渡用)"""
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check",
"file_exists_check", "dependency_advance",
})
assignee = task.assignee
if action_type in LOCAL_ACTIONS:
return {"level": DispatchLevel.LOCAL, "agent_id": "daemon",
"reason": f"Local action: {action_type}"}
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}"}
if not assignee:
agent_id = self._resolve_by_capability(task)
return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id,
"reason": f"Auto-assigned via capability_map: {agent_id}"}
return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong"}
def _resolve_by_capability(self, task: Task) -> str:
"""旧版能力映射"""
task_type = getattr(task, 'task_type', '') or ''
if task_type in self.capability_map:
candidates = self.capability_map[task_type]
registered = [a for a in candidates if a in self.registered_agents]
if registered:
if self.counter and len(registered) > 1:
return min(registered,
key=lambda a: self.counter.active_agents.get(a, 0))
return registered[0]
return "pangtong-fujunshi"
async def _legacy_dispatch(self, task, action_type="", project_config=None):
"""旧版 dispatch(兼容过渡用)"""
decision = self._legacy_decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not await self.counter.can_acquire(agent_id):
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "skipped",
"reason": "Agent busy (concurrent limit)"}
if level == DispatchLevel.LOCAL:
return {"level": level.value, "agent_id": "daemon",
"session_id": None, "status": "dispatched",
"reason": decision["reason"]}
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "No spawner configured"}
try:
if self.counter:
await self.counter.acquire(agent_id)
if hasattr(self.spawner, 'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id, title=task.title,
description=task.description or "",
task_type=getattr(task, 'task_type', '') or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
)
else:
message = f"Task: {task.title}"
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id, message=message,
new_session=decision.get("new_session", False),
task_id=task.id,
)
return {"level": level.value, "agent_id": agent_id,
"session_id": session_id, "status": "dispatched",
"reason": decision["reason"]}
except Exception as e:
if self.counter:
self.counter.release(agent_id)
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": str(e)}
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "Unknown dispatch level"}