Files
sanguo_moziplus_v2/src/daemon/dispatcher.py
T
2026-05-17 05:59:56 +08:00

237 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 调度器 — 三级决策树
Level 1: 纯机械检查 → Daemon 本地执行
Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec)
Level 3: 无名字的一次性任务 → Subagent (sessions_spawn)
Level 4: 未知 → 庞统裁决
调度器只做决策,实际 spawn 由 spawner.py 执行。
"""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.dispatcher")
class DispatchLevel(str, Enum):
"""调度级别"""
LOCAL = "local" # Daemon 本地执行
FULL_AGENT = "full" # Full Agent spawn
SUB_AGENT = "sub" # Subagent spawn
ESCALATE = "escalate" # 升级庞统
class Dispatcher:
"""Agent 调度决策器"""
# L1 本地执行的 action type
LOCAL_ACTIONS = frozenset({
"L1_guardrail",
"format_check",
"file_exists_check",
"dependency_advance",
})
def __init__(
self,
registered_agents: Optional[List[str]] = None,
spawner: Optional[Any] = None,
counter: Optional[Any] = None,
):
"""
Args:
registered_agents: 项目注册的 Agent ID 列表
spawner: AgentSpawner 实例(执行实际 spawn
counter: ActiveAgentCounter 实例(控制并发)
"""
self.registered_agents = set(registered_agents or [])
self.spawner = spawner
self.counter = counter
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策
Returns:
{"level": DispatchLevel, "agent_id": str, "reason": str}
"""
assignee = task.assignee
# Level 1: 纯机械检查 → 本地执行
if action_type in self.LOCAL_ACTIONS:
return {
"level": DispatchLevel.LOCAL,
"agent_id": "daemon",
"reason": f"Local action: {action_type}",
}
# Level 2: 有名字的注册角色 → Full Agent
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {
"level": DispatchLevel.FULL_AGENT,
"agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}",
}
# Level 3: 无名字或未注册 → Subagent
if not assignee:
return {
"level": DispatchLevel.SUB_AGENT,
"agent_id": "subagent",
"reason": "No assignee, dispatch as subagent",
}
# Level 4: 有 assignee 但未注册 → 升级庞统
return {
"level": DispatchLevel.ESCALATE,
"agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong",
}
async def dispatch(self, task: Task, action_type: str = "",
project_config: Optional[Dict] = None) -> Dict[str, Any]:
"""执行调度(决策 + spawn
Returns:
{"level": str, "agent_id": str, "session_id": Optional[str],
"status": "dispatched"|"skipped"|"error", "reason": str}
"""
decision = self.decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
# 检查并发限制
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not await self.counter.can_acquire(agent_id):
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "skipped",
"reason": "Agent busy (concurrent limit)",
}
# 本地执行
if level == DispatchLevel.LOCAL:
return {
"level": level.value,
"agent_id": "daemon",
"session_id": None,
"status": "dispatched",
"reason": decision["reason"],
}
# Full Agent spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
if self.counter:
await self.counter.acquire(agent_id)
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=self._build_message(task, action_type),
new_session=decision.get("new_session", False),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
if self.counter:
self.counter.release(agent_id)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": str(e),
}
# Subagent spawn
if level == DispatchLevel.SUB_AGENT:
if not self.spawner:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
session_id = await self.spawner.spawn_subagent(
task_description=self._build_message(task, action_type),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": "subagent",
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": str(e),
}
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "Unknown dispatch level",
}
def _build_message(self, task: Task, action_type: str) -> str:
"""构建给 Agent 的消息"""
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if action_type:
parts.append(f"Action: {action_type}")
if task.must_haves:
import json
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
"""批量决策(不 spawn,只返回决策列表)
用于 ticker 扫描可调度任务后批量判断。
"""
results = []
for task in tasks:
decision = self.decide(task)
results.append({
"task_id": task.id,
**decision,
})
return results