764 lines
34 KiB
Python
764 lines
34 KiB
Python
"""Agent 调度器 — 执行层(司马懿建议 1:Router/Dispatcher 分层)
|
||
|
||
Dispatcher 负责:
|
||
1. 从 Router 获取路由决策
|
||
2. 执行 spawn(通过 Spawner)
|
||
3. 更新 counter(并发控制)
|
||
4. 写路由审计日志(routing_decisions)
|
||
|
||
路由决策全部委托给 AgentRouter。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import sqlite3
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from src.blackboard.models import Task
|
||
from src.blackboard.db import get_connection
|
||
from src.daemon.spawner import AgentBusyError
|
||
from src.daemon.router import AgentRouter, RouteDecision
|
||
|
||
logger = logging.getLogger("moziplus-v2.dispatcher")
|
||
|
||
|
||
class DispatchLevel(str, Enum):
|
||
"""调度级别"""
|
||
LOCAL = "local" # Daemon 本地执行
|
||
FULL_AGENT = "full" # Full Agent spawn
|
||
SUB_AGENT = "sub" # Subagent spawn
|
||
ESCALATE = "escalate" # 升级庞统
|
||
BLOCKED = "blocked" # 安全红线拦截
|
||
|
||
|
||
class Dispatcher:
|
||
"""Agent 调度执行器
|
||
|
||
v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
router: Optional[AgentRouter] = None,
|
||
spawner: Optional[Any] = None,
|
||
counter: Optional[Any] = None,
|
||
db_path: Optional[Path] = None,
|
||
guardrails: Optional[Any] = None,
|
||
# 兼容旧接口(deprecated,逐步移除)
|
||
registered_agents: Optional[List[str]] = None,
|
||
capability_map: Optional[Dict[str, List[str]]] = None,
|
||
):
|
||
self.router = router or AgentRouter()
|
||
self.spawner = spawner
|
||
self.counter = counter
|
||
self.db_path = db_path
|
||
self.guardrails = guardrails
|
||
|
||
# 兼容:如果没有 router,用旧逻辑(临时)
|
||
self._legacy_mode = router is None
|
||
if self._legacy_mode:
|
||
self.registered_agents = set(registered_agents or [])
|
||
self.capability_map = capability_map or {}
|
||
logger.warning("Dispatcher running in legacy mode (no AgentRouter)")
|
||
|
||
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
|
||
"""调度决策(委托给 Router)
|
||
|
||
Returns:
|
||
{"level": DispatchLevel, "agent_id": str, "reason": str, ...}
|
||
"""
|
||
if self._legacy_mode:
|
||
return self._legacy_decide(task, action_type)
|
||
|
||
# 构建 task_info 给 Router
|
||
task_info = {
|
||
"id": task.id,
|
||
"title": task.title,
|
||
"description": task.description,
|
||
"status": task.status,
|
||
"assignee": task.assignee,
|
||
"task_type": getattr(task, 'task_type', ''),
|
||
"current_agent": getattr(task, 'current_agent', None),
|
||
"next_capability": getattr(task, 'next_capability', None),
|
||
}
|
||
|
||
decision = self.router.route(task_info, action_type)
|
||
|
||
# Router 返回 agent_id="daemon" → 本地执行
|
||
if decision.agent_id == "daemon":
|
||
return {
|
||
"level": DispatchLevel.LOCAL,
|
||
"agent_id": "daemon",
|
||
"reason": decision.reason,
|
||
"mode": decision.mode,
|
||
}
|
||
|
||
# 判断是否升级(fallback 庞统)
|
||
level = DispatchLevel.FULL_AGENT
|
||
if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi":
|
||
level = DispatchLevel.ESCALATE
|
||
|
||
return {
|
||
"level": level,
|
||
"agent_id": decision.agent_id,
|
||
"reason": decision.reason,
|
||
"mode": decision.mode,
|
||
"confidence": decision.confidence,
|
||
"routed_by": decision.mode,
|
||
"model": getattr(decision, "model", None),
|
||
"latency_ms": decision.latency_ms,
|
||
}
|
||
|
||
async def dispatch(self, task: Task, action_type: str = "",
|
||
project_config: Optional[Dict] = None) -> Dict[str, Any]:
|
||
"""执行调度(决策 + spawn + 审计日志)
|
||
|
||
Returns:
|
||
{"level": str, "agent_id": str, "session_id": Optional[str],
|
||
"status": "dispatched"|"skipped"|"error"|"blocked", "reason": str}
|
||
"""
|
||
# 安全红线检查(调度前拦截)
|
||
# Mail 是 Agent 间通信,不做 guardrail 检查
|
||
is_mail = project_config.get("project_id") == "_mail" if project_config else False
|
||
if self.guardrails and not is_mail:
|
||
violations = self.guardrails.check_task(task)
|
||
critical = [v for v in violations if v.action in ("block_and_notify", "terminate_and_escalate")]
|
||
if critical:
|
||
v = critical[0]
|
||
logger.warning("Task '%s' blocked by guardrail: %s - %s",
|
||
task.title, v.rule_id, v.message)
|
||
# 写入黑板事件
|
||
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
|
||
if _routing_db:
|
||
self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none",
|
||
"reason": v.message}, "blocked", v.message, _routing_db)
|
||
return {
|
||
"level": "blocked",
|
||
"agent_id": "none",
|
||
"session_id": None,
|
||
"status": "blocked",
|
||
"reason": v.message,
|
||
"violations": [v.rule_id for v in violations],
|
||
}
|
||
|
||
if self._legacy_mode:
|
||
return await self._legacy_dispatch(task, action_type, project_config)
|
||
|
||
decision = self.decide(task, action_type)
|
||
level = decision["level"]
|
||
# 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB)
|
||
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
|
||
agent_id = decision["agent_id"]
|
||
|
||
# v2.7.2: counter 检查移到 spawn_full_agent 内部
|
||
# dispatcher 不再管 counter acquire/release
|
||
|
||
# 本地执行
|
||
if level == DispatchLevel.LOCAL:
|
||
self._record_routing(task, decision, "dispatched", None, _routing_db)
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": "daemon",
|
||
"session_id": None,
|
||
"status": "dispatched",
|
||
"reason": decision["reason"],
|
||
}
|
||
|
||
# Full Agent / Escalate spawn
|
||
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not self.spawner:
|
||
self._record_routing(task, decision, "error", "No spawner", _routing_db)
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": "No spawner configured",
|
||
}
|
||
|
||
try:
|
||
# [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前)
|
||
is_mail = project_config.get("project_id") == "_mail" if project_config else False
|
||
if is_mail:
|
||
db_path = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
|
||
|
||
# on_checks_passed: 所有检查通过后才标 working,检查失败不标
|
||
on_checks_passed = None
|
||
_mail_marked_working = False
|
||
if is_mail and db_path:
|
||
_task_id = task.id
|
||
_mail_db = db_path
|
||
_disp = self
|
||
def _mail_on_checks_passed():
|
||
nonlocal _mail_marked_working
|
||
if not _disp._mail_auto_working(_task_id, _mail_db):
|
||
raise RuntimeError("mail_auto_working_failed")
|
||
_mail_marked_working = True
|
||
on_checks_passed = _mail_on_checks_passed
|
||
|
||
# 构建 spawn message
|
||
message = self._build_spawn_message(task, agent_id, project_config,
|
||
mode=decision.get("mode", ""))
|
||
|
||
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
|
||
# counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证
|
||
on_complete = None
|
||
if is_mail:
|
||
_task_id = task.id
|
||
_mail_db = db_path
|
||
_must_haves = task.must_haves or ""
|
||
_dispatcher = self
|
||
|
||
def _mail_on_complete(aid, outcome):
|
||
# 幻觉门控:检查是否有回复,自动标 done/failed
|
||
try:
|
||
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves)
|
||
except Exception as e:
|
||
logger.error("Mail %s: on_complete error: %s", _task_id, e)
|
||
on_complete = _mail_on_complete
|
||
else:
|
||
# #02: Task 路径也加 on_complete(幻觉门控)
|
||
_task_id = task.id
|
||
_task_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
|
||
_dispatcher = self
|
||
|
||
def _task_on_complete(aid, outcome):
|
||
try:
|
||
_dispatcher._task_auto_complete(_task_id, _task_db)
|
||
except Exception as e:
|
||
logger.error("Task %s: on_complete error: %s", _task_id, e)
|
||
on_complete = _task_on_complete
|
||
|
||
session_id = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id,
|
||
message=message,
|
||
task_id=task.id,
|
||
on_complete=on_complete,
|
||
use_main_session=True, # #02: 统一投递到 main session
|
||
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
|
||
on_checks_passed=on_checks_passed,
|
||
)
|
||
|
||
self._record_routing(task, decision, "dispatched",
|
||
f"session={session_id}", _routing_db)
|
||
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": session_id,
|
||
"status": "dispatched",
|
||
"reason": decision["reason"],
|
||
}
|
||
except AgentBusyError:
|
||
# on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退
|
||
self._record_routing(task, decision, "skipped", "Agent busy", _routing_db)
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "skipped",
|
||
"reason": "Agent busy (concurrent limit or cooling down)",
|
||
}
|
||
except Exception as e:
|
||
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
|
||
if _mail_marked_working:
|
||
self._mail_revert_to_pending(task.id, db_path)
|
||
self._record_routing(task, decision, "error", str(e), _routing_db)
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": str(e),
|
||
}
|
||
|
||
return {
|
||
"level": level.value,
|
||
"agent_id": agent_id,
|
||
"session_id": None,
|
||
"status": "error",
|
||
"reason": "Unknown dispatch level",
|
||
}
|
||
|
||
def _build_spawn_message(self, task: Task, agent_id: str,
|
||
project_config: Optional[Dict],
|
||
mode: str = "",
|
||
spawn_type: str = "executor") -> str:
|
||
"""构建 Agent spawn 消息
|
||
|
||
Args:
|
||
mode: 路由模式("delegate" 时生成协调员 prompt)
|
||
spawn_type: spawn 类型(executor/discussion/review)
|
||
"""
|
||
# delegate 模式:生成协调员分配 prompt
|
||
if mode == "delegate":
|
||
return self._build_delegate_prompt(task, project_config)
|
||
|
||
if hasattr(self.spawner, 'build_spawn_message') and project_config:
|
||
retry_ctx = self._build_retry_context(task)
|
||
return self.spawner.build_spawn_message(
|
||
task_id=task.id,
|
||
title=task.title,
|
||
description=task.description or "",
|
||
task_type=getattr(task, 'task_type', '') or "",
|
||
priority=task.priority,
|
||
must_haves=task.must_haves or "",
|
||
project_id=project_config.get("project_id", ""),
|
||
agent_id=agent_id,
|
||
current_status=task.status or "claimed",
|
||
retry_context=retry_ctx,
|
||
spawn_type=spawn_type,
|
||
)
|
||
# fallback
|
||
parts = [f"Task: {task.title}"]
|
||
if task.description:
|
||
parts.append(f"Description: {task.description}")
|
||
if task.must_haves:
|
||
parts.append(f"Must-haves: {task.must_haves}")
|
||
return "\n".join(parts)
|
||
|
||
def _build_delegate_prompt(self, task: Task,
|
||
project_config: Optional[Dict]) -> str:
|
||
"""构建 delegate 模式的 prompt(协调员分配任务)"""
|
||
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
|
||
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
|
||
project_id = project_config.get("project_id", "") if project_config else ""
|
||
|
||
return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。
|
||
|
||
## 任务信息
|
||
- 项目: {project_id}
|
||
- 任务ID: {task.id}
|
||
- 标题: {task.title}
|
||
- 描述: {task.description or '(无描述)'}
|
||
- 类型: {getattr(task, 'task_type', '') or 'general'}
|
||
- 优先级: {task.priority}
|
||
|
||
## 团队
|
||
- 张飞(zhangfei-dev): 编码、实现、脚本
|
||
- 司马懿(simayi-challenger): 审查、质量检查、辩论
|
||
- 关羽(guanyu-dev): 风控、合规
|
||
- 赵云(zhaoyun-data): 数据获取、清洗、验证
|
||
- 姜维(jiangwei-infra): 部署、基础设施、Docker、vnpy
|
||
- 庞统(pangtong-fujunshi): 规划、协调、策略
|
||
|
||
## 操作
|
||
1. 分析任务需求,选择最合适的 Agent
|
||
2. 通过 API 回写分配结果:
|
||
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task.id}/status \\
|
||
-H 'Content-Type: application/json' \\
|
||
-d '{{"status": "claimed", "agent": "<agent_id>"}}'
|
||
3. 如果你自己能做,直接认领并执行(状态改为 working)
|
||
"""
|
||
|
||
def _build_retry_context(self, task: Task) -> str:
|
||
"""构建重试上下文"""
|
||
if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0:
|
||
return ""
|
||
|
||
parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"]
|
||
parts.append(f"这是第 {task.retry_count + 1} 次尝试。")
|
||
return "\n".join(parts)
|
||
|
||
def _record_routing(self, task: Task, decision: Dict[str, Any],
|
||
outcome: str, detail: Optional[str],
|
||
override_db_path: Optional[Path] = None) -> None:
|
||
"""写路由审计日志到 routing_decisions 表"""
|
||
effective_db = override_db_path or self.db_path
|
||
if not effective_db:
|
||
return
|
||
try:
|
||
conn = sqlite3.connect(str(effective_db))
|
||
conn.row_factory = sqlite3.Row
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
conn.execute(
|
||
"INSERT INTO routing_decisions "
|
||
"(task_id, from_status, to_status, mode, selected_agent, "
|
||
" previous_agent, reason, confidence, model, latency_ms, "
|
||
" task_type, requested_capability, outcome, detail) "
|
||
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
||
(
|
||
task.id,
|
||
task.status,
|
||
None, # to_status 在 dispatch 时还不知道
|
||
decision.get("mode", ""),
|
||
decision.get("agent_id", ""),
|
||
getattr(task, 'current_agent', None) or task.assignee,
|
||
decision.get("reason", ""),
|
||
decision.get("confidence"),
|
||
decision.get("model"),
|
||
decision.get("latency_ms"),
|
||
getattr(task, 'task_type', ''),
|
||
getattr(task, 'next_capability', ''),
|
||
outcome,
|
||
detail,
|
||
),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.debug("Failed to record routing decision", exc_info=True)
|
||
|
||
# ── 批量决策(兼容接口) ──
|
||
|
||
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
|
||
"""批量决策(不 spawn,只返回决策列表)"""
|
||
results = []
|
||
for task in tasks:
|
||
decision = self.decide(task)
|
||
results.append({"task_id": task.id, **decision})
|
||
return results
|
||
|
||
# ── Legacy 兼容(deprecated) ──
|
||
|
||
def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
|
||
"""旧版三级决策树(兼容过渡用)"""
|
||
LOCAL_ACTIONS = frozenset({
|
||
"L1_guardrail", "format_check",
|
||
"file_exists_check", "dependency_advance",
|
||
})
|
||
assignee = task.assignee
|
||
|
||
if action_type in LOCAL_ACTIONS:
|
||
return {"level": DispatchLevel.LOCAL, "agent_id": "daemon",
|
||
"reason": f"Local action: {action_type}"}
|
||
|
||
if assignee and assignee in self.registered_agents:
|
||
new_session = action_type == "adjudication"
|
||
return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee,
|
||
"new_session": new_session,
|
||
"reason": f"Registered agent: {assignee}"}
|
||
|
||
if not assignee:
|
||
agent_id = self._resolve_by_capability(task)
|
||
return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id,
|
||
"reason": f"Auto-assigned via capability_map: {agent_id}"}
|
||
|
||
return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi",
|
||
"new_session": True,
|
||
"reason": f"Unknown agent '{assignee}', escalate to pangtong"}
|
||
|
||
def _resolve_by_capability(self, task: Task) -> str:
|
||
"""旧版能力映射"""
|
||
task_type = getattr(task, 'task_type', '') or ''
|
||
if task_type in self.capability_map:
|
||
candidates = self.capability_map[task_type]
|
||
registered = [a for a in candidates if a in self.registered_agents]
|
||
if registered:
|
||
if self.counter and len(registered) > 1:
|
||
return min(registered,
|
||
key=lambda a: self.counter.active_agents.get(a, 0))
|
||
return registered[0]
|
||
return "pangtong-fujunshi"
|
||
|
||
async def _legacy_dispatch(self, task, action_type="", project_config=None):
|
||
"""旧版 dispatch(兼容过渡用)
|
||
|
||
v2.7.2: counter acquire/release 移到 spawn_full_agent 内部。
|
||
"""
|
||
decision = self._legacy_decide(task, action_type)
|
||
level = decision["level"]
|
||
agent_id = decision["agent_id"]
|
||
|
||
if level == DispatchLevel.LOCAL:
|
||
return {"level": level.value, "agent_id": "daemon",
|
||
"session_id": None, "status": "dispatched",
|
||
"reason": decision["reason"]}
|
||
|
||
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
|
||
if not self.spawner:
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": "No spawner configured"}
|
||
try:
|
||
# NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。
|
||
# Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。
|
||
# 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。
|
||
is_mail_legacy = project_config.get("project_id") == "_mail" if project_config else False
|
||
if is_mail_legacy:
|
||
db_path_legacy = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
|
||
if not db_path_legacy or not self._mail_auto_working(task.id, db_path_legacy):
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": "mail_auto_working_failed"}
|
||
|
||
if hasattr(self.spawner, 'build_spawn_message') and project_config:
|
||
retry_ctx = self._build_retry_context(task)
|
||
message = self.spawner.build_spawn_message(
|
||
task_id=task.id, title=task.title,
|
||
description=task.description or "",
|
||
task_type=getattr(task, 'task_type', '') or "",
|
||
priority=task.priority,
|
||
must_haves=task.must_haves or "",
|
||
project_id=project_config.get("project_id", ""),
|
||
agent_id=agent_id,
|
||
current_status=task.status or "claimed",
|
||
retry_context=retry_ctx,
|
||
spawn_type="executor",
|
||
)
|
||
else:
|
||
message = f"Task: {task.title}"
|
||
|
||
# v2.7.2: on_complete 只含业务逻辑
|
||
on_complete_legacy = None
|
||
if is_mail_legacy:
|
||
_t_id = task.id
|
||
_m_db = db_path_legacy
|
||
_m_mh = task.must_haves or ""
|
||
_disp = self
|
||
|
||
def _mail_oc_legacy(aid, outcome):
|
||
try:
|
||
_disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh)
|
||
except Exception as e:
|
||
logger.error("Mail %s: legacy on_complete error: %s", _t_id, e)
|
||
on_complete_legacy = _mail_oc_legacy
|
||
|
||
session_id = await self.spawner.spawn_full_agent(
|
||
agent_id=agent_id, message=message,
|
||
task_id=task.id,
|
||
on_complete=on_complete_legacy,
|
||
use_main_session=True, # #02: 统一投递到 main session
|
||
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
|
||
)
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": session_id, "status": "dispatched",
|
||
"reason": decision["reason"]}
|
||
except AgentBusyError:
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "skipped",
|
||
"reason": "Agent busy (concurrent limit or cooling down)"}
|
||
except Exception as e:
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": str(e)}
|
||
return {"level": level.value, "agent_id": agent_id,
|
||
"session_id": None, "status": "error",
|
||
"reason": "Unknown dispatch level"}
|
||
|
||
# ── Mail 信封/载荷分离辅助方法 ──
|
||
|
||
def _mail_auto_working(self, task_id: str, db_path: Path) -> bool:
|
||
"""Mail 任务:系统自动标 working(spawn 前)
|
||
|
||
Mail 不需要 claimed 中间态,直接 pending → working。
|
||
Returns:
|
||
True=标成功, False=标失败(需中止 spawn)
|
||
"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
if not row:
|
||
logger.warning("Mail %s: cannot mark working (task not found)", task_id)
|
||
return False
|
||
if row["status"] not in ("pending", "claimed"):
|
||
logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)",
|
||
task_id, row["status"])
|
||
return False
|
||
conn.execute(
|
||
"UPDATE tasks SET status='working', updated_at=datetime('now') WHERE id=?",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
logger.info("Mail %s: auto-marked working (system, was %s)", task_id, row["status"])
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Mail %s: failed to mark working: %s", task_id, e)
|
||
return False
|
||
|
||
def _mail_revert_to_pending(self, task_id: str, db_path: Path) -> None:
|
||
"""Mail spawn 失败时回退 working → pending,避免永久死锁"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
if row and row["status"] == "working":
|
||
conn.execute(
|
||
"UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
logger.info("Mail %s: reverted working → pending (spawn failed)", task_id)
|
||
else:
|
||
logger.debug("Mail %s: skip revert (status=%s, expected working)", task_id, row["status"] if row else "not_found")
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Mail %s: failed to revert to pending: %s", task_id, e)
|
||
|
||
def _mail_auto_complete(self, task_id: str, agent_id: str,
|
||
db_path: Path, must_haves: str) -> None:
|
||
"""Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)"""
|
||
try:
|
||
# 解析 performative
|
||
performative = "request"
|
||
try:
|
||
meta = json.loads(must_haves) if must_haves else {}
|
||
performative = meta.get("performative", meta.get("type", "request"))
|
||
except Exception:
|
||
pass
|
||
|
||
# request 类型:幻觉门控验证
|
||
if performative == "request":
|
||
has_reply = self._mail_check_reply(task_id, db_path)
|
||
if not has_reply:
|
||
# 不直接标 failed,留 working 等 ticker 下一轮再查
|
||
logger.warning("Mail %s: no reply found on on_complete, "
|
||
"leaving working for ticker recheck", task_id)
|
||
return
|
||
|
||
# 标 done(重试 3 次)
|
||
for attempt in range(3):
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
if not row:
|
||
return
|
||
if row["status"] == "working":
|
||
conn.execute(
|
||
"UPDATE tasks SET status='done', updated_at=datetime('now') WHERE id=?",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
logger.info("Mail %s: auto-marked done (system, performative=%s)",
|
||
task_id, performative)
|
||
return
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e)
|
||
# 3 次都失败,留 working 等 ticker 超时兜底
|
||
logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id)
|
||
|
||
except Exception as e:
|
||
logger.error("Mail %s: auto-complete error: %s", task_id, e)
|
||
|
||
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
|
||
"""幻觉门控:检查是否有回复邮件(in_reply_to = original_task_id)"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
# 查 must_haves JSON 里包含 in_reply_to = original_task_id 的记录
|
||
row = conn.execute(
|
||
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
|
||
(original_task_id, f'%{original_task_id}%'),
|
||
).fetchone()
|
||
return row is not None
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Mail %s: reply check error: %s", original_task_id, e)
|
||
# 查询失败时保守处理:假设有回复(避免误标 failed)
|
||
return True
|
||
|
||
# ══════════════════════════════════════════════
|
||
# #02: Task 路径幻觉门控
|
||
# ══════════════════════════════════════════════
|
||
|
||
def _task_auto_complete(self, task_id: str, db_path) -> None:
|
||
"""Task on_complete 后自动标 review/done/failed(三层幻觉门控第一层)
|
||
|
||
设计意图:
|
||
- 子任务(有 parent_task):working → review,等 parent review
|
||
- 顶层任务(无 parent_task):working → done,直接完成
|
||
"""
|
||
from pathlib import Path
|
||
if not db_path:
|
||
logger.warning("Task %s: no db_path, skip auto_complete", task_id)
|
||
return
|
||
db_path = Path(db_path) if not isinstance(db_path, Path) else db_path
|
||
try:
|
||
passed = self._task_verify_completion(task_id, db_path)
|
||
if passed:
|
||
# 判断是否是顶层任务
|
||
is_top_level = True
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute("SELECT parent_task FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||
is_top_level = not (row and row["parent_task"])
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
if is_top_level:
|
||
logger.info("Task %s: top-level task verify passed, marking done", task_id)
|
||
self._mark_task_status(db_path, task_id, "done")
|
||
else:
|
||
logger.info("Task %s: sub-task verify passed, marking review", task_id)
|
||
self._mark_task_status(db_path, task_id, "review")
|
||
else:
|
||
logger.info("Task %s: verify not passed (no signal), leaving working", task_id)
|
||
except Exception as e:
|
||
logger.error("Task %s: auto-complete error: %s", task_id, e)
|
||
|
||
def _task_verify_completion(self, task_id: str, db_path: Path):
|
||
"""普通 Task 完成验证(三信号检查)
|
||
Returns: True=passed, False=no signal
|
||
"""
|
||
TERMINAL_STATES = {"review", "done", "failed", "cancelled"}
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
if not row or row["status"] in TERMINAL_STATES:
|
||
return True
|
||
output_count = conn.execute(
|
||
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
|
||
).fetchone()[0]
|
||
if output_count > 0:
|
||
return True
|
||
comment_count = conn.execute(
|
||
"SELECT COUNT(*) FROM comments WHERE task_id=? AND author != 'system' AND LENGTH(content) >= 50",
|
||
(task_id,)
|
||
).fetchone()[0]
|
||
if comment_count > 0:
|
||
return True
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Task %s: verify error: %s", task_id, e)
|
||
return True
|
||
|
||
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
|
||
"""更新任务状态 + 写审计事件"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
old_row = conn.execute(
|
||
"SELECT status FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
old_status = old_row["status"] if old_row else "unknown"
|
||
conn.execute(
|
||
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
|
||
(status, task_id),
|
||
)
|
||
conn.execute(
|
||
"INSERT INTO events (task_id, agent, event_type, payload) VALUES (?, 'dispatcher', 'status_change', ?)",
|
||
(task_id, f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Task %s: mark status error: %s", task_id, e)
|