Files
sanguo_moziplus_v2/src/daemon/dispatcher.py
T
2026-05-17 18:36:46 +08:00

274 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 调度器 — 三级决策树
Level 1: 纯机械检查 → Daemon 本地执行
Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec)
Level 3: 无名字的一次性任务 → Subagent (sessions_spawn)
Level 4: 未知 → 庞统裁决
调度器只做决策,实际 spawn 由 spawner.py 执行。
"""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.dispatcher")
class DispatchLevel(str, Enum):
"""调度级别"""
LOCAL = "local" # Daemon 本地执行
FULL_AGENT = "full" # Full Agent spawn
SUB_AGENT = "sub" # Subagent spawn
ESCALATE = "escalate" # 升级庞统
class Dispatcher:
"""Agent 调度决策器"""
# L1 本地执行的 action type
LOCAL_ACTIONS = frozenset({
"L1_guardrail",
"format_check",
"file_exists_check",
"dependency_advance",
})
def __init__(
self,
registered_agents: Optional[List[str]] = None,
spawner: Optional[Any] = None,
counter: Optional[Any] = None,
):
"""
Args:
registered_agents: 项目注册的 Agent ID 列表
spawner: AgentSpawner 实例(执行实际 spawn
counter: ActiveAgentCounter 实例(控制并发)
"""
self.registered_agents = set(registered_agents or [])
self.spawner = spawner
self.counter = counter
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策
Returns:
{"level": DispatchLevel, "agent_id": str, "reason": str}
"""
assignee = task.assignee
# Level 1: 纯机械检查 → 本地执行
if action_type in self.LOCAL_ACTIONS:
return {
"level": DispatchLevel.LOCAL,
"agent_id": "daemon",
"reason": f"Local action: {action_type}",
}
# Level 2: 有名字的注册角色 → Full Agent
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {
"level": DispatchLevel.FULL_AGENT,
"agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}",
}
# Level 3: 无名字或未注册 → Subagent
if not assignee:
return {
"level": DispatchLevel.SUB_AGENT,
"agent_id": "subagent",
"reason": "No assignee, dispatch as subagent",
}
# Level 4: 有 assignee 但未注册 → 升级庞统
return {
"level": DispatchLevel.ESCALATE,
"agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong",
}
async def dispatch(self, task: Task, action_type: str = "",
project_config: Optional[Dict] = None) -> Dict[str, Any]:
"""执行调度(决策 + spawn
Returns:
{"level": str, "agent_id": str, "session_id": Optional[str],
"status": "dispatched"|"skipped"|"error", "reason": str}
"""
decision = self.decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
# 检查并发限制
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not await self.counter.can_acquire(agent_id):
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "skipped",
"reason": "Agent busy (concurrent limit)",
}
# 本地执行
if level == DispatchLevel.LOCAL:
return {
"level": level.value,
"agent_id": "daemon",
"session_id": None,
"status": "dispatched",
"reason": decision["reason"],
}
# Full Agent spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
if self.counter:
await self.counter.acquire(agent_id)
# 优先使用 spawner 的 prompt 模板
if hasattr(self.spawner, 'build_spawn_message') and project_config:
# 构建重试上下文(如果有前轮审查意见)
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id,
title=task.title,
description=task.description or "",
task_type=task.task_type or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
)
else:
message = self._build_message(task, action_type)
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
new_session=decision.get("new_session", False),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
if self.counter:
self.counter.release(agent_id)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": str(e),
}
# Subagent spawn
if level == DispatchLevel.SUB_AGENT:
if not self.spawner:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
session_id = await self.spawner.spawn_subagent(
task_description=self._build_message(task, action_type),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": "subagent",
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": str(e),
}
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "Unknown dispatch level",
}
def _build_retry_context(self, task: Task) -> str:
"""构建重试上下文(前轮产出摘要 + 审查意见)
如果任务是从 failed/pending 重新调度(有历史 attempts),
注入前轮的产出摘要和审查意见,让 Agent 知道上轮哪里不对。
"""
if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0:
return ""
parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"]
parts.append(f"这是第 {task.retry_count + 1} 次尝试。上次执行的问题:")
# 尝试从 task 的 metadata 获取上次失败原因
if hasattr(task, 'notes') and task.notes:
parts.append(f"上轮说明: {task.notes}")
parts.append("请仔细检查上次的错误,避免重复犯错。")
return "\n".join(parts)
def _build_message(self, task: Task, action_type: str) -> str:
"""构建给 Agent 的消息"""
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if action_type:
parts.append(f"Action: {action_type}")
if task.must_haves:
import json
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
"""批量决策(不 spawn,只返回决策列表)
用于 ticker 扫描可调度任务后批量判断。
"""
results = []
for task in tasks:
decision = self.decide(task)
results.append({
"task_id": task.id,
**decision,
})
return results