auto-sync: 2026-05-17 05:59:56

This commit is contained in:
cfdaily
2026-05-17 05:59:56 +08:00
parent a477581605
commit 0e2721de80
2 changed files with 454 additions and 0 deletions
+236
View File
@@ -0,0 +1,236 @@
"""Agent 调度器 — 三级决策树
Level 1: 纯机械检查 → Daemon 本地执行
Level 2: 有名字的注册角色 → Full Agent (asyncio.create_subprocess_exec)
Level 3: 无名字的一次性任务 → Subagent (sessions_spawn)
Level 4: 未知 → 庞统裁决
调度器只做决策,实际 spawn 由 spawner.py 执行。
"""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.dispatcher")
class DispatchLevel(str, Enum):
"""调度级别"""
LOCAL = "local" # Daemon 本地执行
FULL_AGENT = "full" # Full Agent spawn
SUB_AGENT = "sub" # Subagent spawn
ESCALATE = "escalate" # 升级庞统
class Dispatcher:
"""Agent 调度决策器"""
# L1 本地执行的 action type
LOCAL_ACTIONS = frozenset({
"L1_guardrail",
"format_check",
"file_exists_check",
"dependency_advance",
})
def __init__(
self,
registered_agents: Optional[List[str]] = None,
spawner: Optional[Any] = None,
counter: Optional[Any] = None,
):
"""
Args:
registered_agents: 项目注册的 Agent ID 列表
spawner: AgentSpawner 实例(执行实际 spawn
counter: ActiveAgentCounter 实例(控制并发)
"""
self.registered_agents = set(registered_agents or [])
self.spawner = spawner
self.counter = counter
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策
Returns:
{"level": DispatchLevel, "agent_id": str, "reason": str}
"""
assignee = task.assignee
# Level 1: 纯机械检查 → 本地执行
if action_type in self.LOCAL_ACTIONS:
return {
"level": DispatchLevel.LOCAL,
"agent_id": "daemon",
"reason": f"Local action: {action_type}",
}
# Level 2: 有名字的注册角色 → Full Agent
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {
"level": DispatchLevel.FULL_AGENT,
"agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}",
}
# Level 3: 无名字或未注册 → Subagent
if not assignee:
return {
"level": DispatchLevel.SUB_AGENT,
"agent_id": "subagent",
"reason": "No assignee, dispatch as subagent",
}
# Level 4: 有 assignee 但未注册 → 升级庞统
return {
"level": DispatchLevel.ESCALATE,
"agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong",
}
async def dispatch(self, task: Task, action_type: str = "",
project_config: Optional[Dict] = None) -> Dict[str, Any]:
"""执行调度(决策 + spawn
Returns:
{"level": str, "agent_id": str, "session_id": Optional[str],
"status": "dispatched"|"skipped"|"error", "reason": str}
"""
decision = self.decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
# 检查并发限制
if self.counter and level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not await self.counter.can_acquire(agent_id):
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "skipped",
"reason": "Agent busy (concurrent limit)",
}
# 本地执行
if level == DispatchLevel.LOCAL:
return {
"level": level.value,
"agent_id": "daemon",
"session_id": None,
"status": "dispatched",
"reason": decision["reason"],
}
# Full Agent spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
if self.counter:
await self.counter.acquire(agent_id)
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=self._build_message(task, action_type),
new_session=decision.get("new_session", False),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
if self.counter:
self.counter.release(agent_id)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": str(e),
}
# Subagent spawn
if level == DispatchLevel.SUB_AGENT:
if not self.spawner:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
session_id = await self.spawner.spawn_subagent(
task_description=self._build_message(task, action_type),
task_id=task.id,
)
return {
"level": level.value,
"agent_id": "subagent",
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except Exception as e:
return {
"level": level.value,
"agent_id": "subagent",
"session_id": None,
"status": "error",
"reason": str(e),
}
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "Unknown dispatch level",
}
def _build_message(self, task: Task, action_type: str) -> str:
"""构建给 Agent 的消息"""
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if action_type:
parts.append(f"Action: {action_type}")
if task.must_haves:
import json
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
"""批量决策(不 spawn,只返回决策列表)
用于 ticker 扫描可调度任务后批量判断。
"""
results = []
for task in tasks:
decision = self.decide(task)
results.append({
"task_id": task.id,
**decision,
})
return results
+218
View File
@@ -0,0 +1,218 @@
"""Agent Spawner — 异步 spawn Full Agent / Subagent
Full Agent: asyncio.create_subprocess_exec(异步非阻塞,不 await 完成)
Subagent: 占位(实际通过 OpenClaw Gateway API sessions_spawnF17 完善)
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from src.blackboard.db import get_connection, init_db
logger = logging.getLogger("moziplus-v2.spawner")
class AgentSpawner:
"""Agent spawn 管理"""
def __init__(
self,
db_path: Optional[Path] = None,
agent_timeout: float = 600.0,
dry_run: bool = False,
):
"""
Args:
db_path: 项目黑板 DB 路径(用于写 task_attempts
agent_timeout: Agent 超时秒数
dry_run: 测试模式,不实际 spawn
"""
self.db_path = db_path
self.agent_timeout = agent_timeout
self.dry_run = dry_run
# session 注册表 {session_id: {...}}
self._sessions: Dict[str, Dict[str, Any]] = {}
@property
def active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""当前活跃的 spawn sessions"""
return {sid: s for sid, s in self._sessions.items()
if s.get("status") == "running"}
async def spawn_full_agent(
self,
agent_id: str,
message: str,
new_session: bool = False,
task_id: Optional[str] = None,
) -> str:
"""Spawn Full Agent(异步非阻塞)
Returns:
session_id
"""
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, session_id)
self._register_session(session_id, agent_id, task_id, pid=None)
return session_id
cmd = [
"openclaw", "agent",
"--agent", agent_id,
"--session-id", session_id,
"--message", message,
"--json",
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
# Schedule timeout + cleanup
asyncio.create_task(
self._monitor_process(session_id, proc, agent_id, task_id)
)
return session_id
except Exception as e:
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
raise
async def spawn_subagent(
self,
task_description: str,
task_id: Optional[str] = None,
) -> str:
"""Spawn Subagent(占位,实际通过 Gateway API
Returns:
session_id
"""
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
# TODO: F17 通过 Gateway API sessions_spawn 实现
logger.info("Subagent spawn (session=%s) - placeholder", session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
async def _monitor_process(
self,
session_id: str,
proc: asyncio.subprocess.Process,
agent_id: str,
task_id: Optional[str],
) -> None:
"""监控子进程,超时 kill,完成后记录"""
try:
await asyncio.wait_for(proc.wait(), timeout=self.agent_timeout)
outcome = "completed"
exit_code = proc.returncode
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
outcome = "timed_out"
exit_code = -1
logger.warning("Agent %s timed out (session=%s)", agent_id, session_id)
# 更新 session 状态
if session_id in self._sessions:
self._sessions[session_id]["status"] = outcome
self._sessions[session_id]["completed_at"] = datetime.utcnow().isoformat()
# 记录 task_attempt
self._record_attempt(task_id, agent_id, outcome, exit_code=exit_code)
logger.info("Agent %s finished (session=%s, outcome=%s, exit=%d)",
agent_id, session_id, outcome, exit_code)
def _register_session(
self,
session_id: str,
agent_id: str,
task_id: Optional[str],
pid: Optional[int],
) -> None:
"""注册 spawn session"""
self._sessions[session_id] = {
"agent_id": agent_id,
"task_id": task_id,
"pid": pid,
"status": "running",
"started_at": datetime.utcnow().isoformat(),
"completed_at": None,
}
def _record_attempt(
self,
task_id: Optional[str],
agent_id: str,
outcome: str,
exit_code: Optional[int] = None,
error: Optional[str] = None,
) -> None:
"""记录 task_attempt"""
if not task_id or not self.db_path:
return
try:
conn = get_connection(self.db_path)
try:
conn.execute("BEGIN IMMEDIATE")
# 获取 attempt_number
row = conn.execute(
"SELECT MAX(attempt_number) as max_a FROM task_attempts WHERE task_id=?",
(task_id,),
).fetchone()
attempt_number = (row["max_a"] or 0) + 1
metadata = {"error": error} if error else {}
conn.execute(
"INSERT INTO task_attempts "
"(task_id, attempt_number, agent, outcome, exit_code, metadata, completed_at) "
"VALUES (?,?,?,?,?,?,datetime('now'))",
(task_id, attempt_number, agent_id, outcome,
exit_code, json.dumps(metadata)),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent_id,
"agent_completed" if outcome == "completed" else "daemon_tick",
json.dumps({"outcome": outcome, "attempt": attempt_number})),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.exception("Failed to record attempt for task %s", task_id)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取 session 信息"""
return self._sessions.get(session_id)
def cleanup_session(self, session_id: str) -> None:
"""清理 session"""
if session_id in self._sessions:
del self._sessions[session_id]