Files
sanguo_moziplus_v2/src/daemon/dispatcher.py
T
cfdaily d58e38d58f
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。
修复内容:
- autoflake 移除未使用导入/变量
- autopep8 修复缩进/空格
- 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量)
- 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
2026-06-09 23:53:29 +08:00

986 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 调度器 — 执行层(司马懿建议 1Router/Dispatcher 分层)
Dispatcher 负责:
1. 从 Router 获取路由决策
2. 执行 spawn(通过 Spawner
3. 更新 counter(并发控制)
4. 写路由审计日志(routing_decisions
路由决策全部委托给 AgentRouter。
"""
from __future__ import annotations
import json
import logging
import sqlite3
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
from src.daemon.router import AgentRouter
logger = logging.getLogger("moziplus-v2.dispatcher")
class DispatchLevel(str, Enum):
"""调度级别"""
LOCAL = "local" # Daemon 本地执行
FULL_AGENT = "full" # Full Agent spawn
SUB_AGENT = "sub" # Subagent spawn
ESCALATE = "escalate" # 升级庞统
BLOCKED = "blocked" # 安全红线拦截
class Dispatcher:
"""Agent 调度执行器
v2.6.1: 路由决策委托给 AgentRouter,本类只做执行。
"""
def __init__(
self,
router: Optional[AgentRouter] = None,
spawner: Optional[Any] = None,
counter: Optional[Any] = None,
db_path: Optional[Path] = None,
guardrails: Optional[Any] = None,
# 兼容旧接口(deprecated,逐步移除)
registered_agents: Optional[List[str]] = None,
capability_map: Optional[Dict[str, List[str]]] = None,
):
self.router = router or AgentRouter()
self.spawner = spawner
self.counter = counter
self.db_path = db_path
self.guardrails = guardrails
# 兼容:如果没有 router,用旧逻辑(临时)
self._legacy_mode = router is None
if self._legacy_mode:
self.registered_agents = set(registered_agents or [])
self.capability_map = capability_map or {}
logger.warning(
"Dispatcher running in legacy mode (no AgentRouter)")
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策(委托给 Router
Returns:
{"level": DispatchLevel, "agent_id": str, "reason": str, ...}
"""
if self._legacy_mode:
return self._legacy_decide(task, action_type)
# 构建 task_info 给 Router
task_info = {
"id": task.id,
"title": task.title,
"description": task.description,
"status": task.status,
"assignee": task.assignee,
"task_type": getattr(task, 'task_type', ''),
"current_agent": getattr(task, 'current_agent', None),
"next_capability": getattr(task, 'next_capability', None),
}
decision = self.router.route(task_info, action_type)
# Router 返回 agent_id="daemon" → 本地执行
if decision.agent_id == "daemon":
return {
"level": DispatchLevel.LOCAL,
"agent_id": "daemon",
"reason": decision.reason,
"mode": decision.mode,
}
# 判断是否升级(fallback 庞统)
level = DispatchLevel.FULL_AGENT
if decision.mode == "fallback" and decision.agent_id == "pangtong-fujunshi":
level = DispatchLevel.ESCALATE
return {
"level": level,
"agent_id": decision.agent_id,
"reason": decision.reason,
"mode": decision.mode,
"confidence": decision.confidence,
"routed_by": decision.mode,
"model": getattr(decision, "model", None),
"latency_ms": decision.latency_ms,
}
async def dispatch(self, task: Task, action_type: str = "",
project_config: Optional[Dict] = None) -> Dict[str, Any]:
"""执行调度(决策 + spawn + 审计日志)
Returns:
{"level": str, "agent_id": str, "session_id": Optional[str],
"status": "dispatched"|"skipped"|"error"|"blocked", "reason": str}
"""
# 安全红线检查(调度前拦截)
# Mail 是 Agent 间通信,不做 guardrail 检查
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
critical = [
v for v in violations if v.action in (
"block_and_notify",
"terminate_and_escalate")]
if critical:
v = critical[0]
logger.warning("Task '%s' blocked by guardrail: %s - %s",
task.title, v.rule_id, v.message)
# 写入黑板事件
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
if _routing_db:
self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none",
"reason": v.message}, "blocked", v.message, _routing_db)
return {
"level": "blocked",
"agent_id": "none",
"session_id": None,
"status": "blocked",
"reason": v.message,
"violations": [v.rule_id for v in violations],
}
if self._legacy_mode:
return await self._legacy_dispatch(task, action_type, project_config)
decision = self.decide(task, action_type)
level = decision["level"]
# 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB)
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
agent_id = decision["agent_id"]
# v2.7.2: counter 检查移到 spawn_full_agent 内部
# dispatcher 不再管 counter acquire/release
# 本地执行
if level == DispatchLevel.LOCAL:
self._record_routing(
task, decision, "dispatched", None, _routing_db)
return {
"level": level.value,
"agent_id": "daemon",
"session_id": None,
"status": "dispatched",
"reason": decision["reason"],
}
# Full Agent / Escalate spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
self._record_routing(
task, decision, "error", "No spawner", _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "No spawner configured",
}
try:
# [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前)
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail:
db_path = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
# on_checks_passed: 所有检查通过后才标 working,检查失败不标
on_checks_passed = None
_mail_marked_working = False
if is_mail and db_path:
_task_id = task.id
_mail_db = db_path
_disp = self
def _mail_on_checks_passed():
nonlocal _mail_marked_working
if not _disp._mail_auto_working(_task_id, _mail_db):
raise RuntimeError("mail_auto_working_failed")
_mail_marked_working = True
on_checks_passed = _mail_on_checks_passed
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config,
mode=decision.get(
"mode", ""),
spawn_type=action_type or "executor")
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
# counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证
on_complete = None
if is_mail:
_task_id = task.id
_mail_db = db_path
_must_haves = task.must_haves or ""
_dispatcher = self
def _mail_on_complete(aid, outcome):
# 幻觉门控:检查是否有回复,自动标 done/failed
try:
_dispatcher._mail_auto_complete(
_task_id, aid, _mail_db, _must_haves, outcome=outcome)
except Exception as e:
logger.error(
"Mail %s: on_complete error: %s", _task_id, e)
on_complete = _mail_on_complete
else:
# #02: Task 路径也加 on_complete(幻觉门控)
_task_id = task.id
_task_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
_dispatcher = self
_is_review = action_type == "review"
# #07.2: executor/review 统一 crash 回退
ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({
"crashed", "compact_failed", "process_crash",
"session_stuck", "compact_hanging",
})
def _task_on_complete(aid, outcome):
try:
# #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db:
_dispatcher._rollback_current_agent(
_task_db, _task_id, aid)
if _is_review:
if _task_db and outcome in (
"completed", "session_revived"):
# #09: 读 verdict 决定后续动作
conn = get_connection(_task_db)
try:
review = conn.execute(
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(_task_id,)
).fetchone()
finally:
conn.close()
if review and review["verdict"] == "approved":
_dispatcher._mark_task_status(
_task_db, _task_id, "done")
logger.info(
"Task %s: review approved, marking done", _task_id)
else:
# 非 approved → @mention 被审
# agentassignee,非 current_agent
verdict_str = review["verdict"] if review else "未知"
conn2 = get_connection(_task_db)
try:
task_row = conn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
finally:
conn2.close()
if task_row and task_row["assignee"]:
from src.blackboard.blackboard import Blackboard
bb = Blackboard(_task_db)
bb.add_comment(_task_id, "daemon",
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Task %s: review verdict=%s, notified assignee=%s",
_task_id, verdict_str, task_row["assignee"] if task_row else "?")
# 不标 done,保持 review 状态
else:
logger.warning(
"Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
else:
# executor: 三信号验证 → 标 review
_dispatcher._task_auto_complete(
_task_id, _task_db)
except Exception as e:
logger.error(
"Task %s: on_complete error: %s", _task_id, e)
on_complete = _task_on_complete
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task.id,
on_complete=on_complete,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
on_checks_passed=on_checks_passed,
)
self._record_routing(task, decision, "dispatched",
f"session={session_id}", _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": session_id,
"status": "dispatched",
"reason": decision["reason"],
}
except AgentBusyError as e:
# #07: 区分外部占用 vs mozi 内部占用
reason = getattr(e, 'reason', 'busy')
if reason.startswith("session_"):
log_level = logger.info
detail_msg = f"Session busy: {reason}"
else:
log_level = logger.debug
detail_msg = f"Agent busy: {reason}"
log_level(
"Dispatch skipped %s for task %s: %s",
agent_id,
task.id,
detail_msg)
# on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退
self._record_routing(
task, decision, "skipped", detail_msg, _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "skipped",
"reason": detail_msg,
}
except Exception as e:
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
if _mail_marked_working:
self._mail_revert_to_pending(task.id, db_path)
self._record_routing(
task, decision, "error", str(e), _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": str(e),
}
return {
"level": level.value,
"agent_id": agent_id,
"session_id": None,
"status": "error",
"reason": "Unknown dispatch level",
}
def _build_spawn_message(self, task: Task, agent_id: str,
project_config: Optional[Dict],
mode: str = "",
spawn_type: str = "executor") -> str:
"""构建 Agent spawn 消息
Args:
mode: 路由模式("delegate" 时生成协调员 prompt
spawn_type: spawn 类型(executor/discussion/review
"""
# delegate 模式:生成协调员分配 prompt
if mode == "delegate":
return self._build_delegate_prompt(task, project_config)
if hasattr(self.spawner, 'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
return self.spawner.build_spawn_message(
task_id=task.id,
title=task.title,
description=task.description or "",
task_type=getattr(task, 'task_type', '') or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
spawn_type=spawn_type,
task=task,
project_config=project_config,
)
# fallback
parts = [f"Task: {task.title}"]
if task.description:
parts.append(f"Description: {task.description}")
if task.must_haves:
parts.append(f"Must-haves: {task.must_haves}")
return "\n".join(parts)
def _build_delegate_prompt(self, task: Task,
project_config: Optional[Dict]) -> str:
"""构建 delegate 模式的 prompt(协调员分配任务)"""
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
project_id = project_config.get(
"project_id", "") if project_config else ""
return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。
## 任务信息
- 项目: {project_id}
- 任务ID: {task.id}
- 标题: {task.title}
- 描述: {task.description or '(无描述)'}
- 类型: {getattr(task, 'task_type', '') or 'general'}
- 优先级: {task.priority}
## 团队
- 张飞(zhangfei-dev): 编码、实现、脚本
- 司马懿(simayi-challenger): 审查、质量检查、辩论
- 关羽(guanyu-dev): 风控、合规
- 赵云(zhaoyun-data): 数据获取、清洗、验证
- 姜维(jiangwei-infra): 部署、基础设施、Docker、vnpy
- 庞统(pangtong-fujunshi): 规划、协调、策略
## 操作
1. 分析任务需求,选择最合适的 Agent
2. 通过 API 回写分配结果:
curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task.id}/status \\
-H 'Content-Type: application/json' \\
-d '{{"status": "claimed", "agent": "<agent_id>"}}'
3. 如果你自己能做,直接认领并执行(状态改为 working)
"""
def _build_retry_context(self, task: Task) -> str:
"""构建重试上下文"""
if not hasattr(task, 'retry_count') or (task.retry_count or 0) == 0:
return ""
parts = ["## ⚠️ 重试上下文(上次执行失败,请注意以下反馈)"]
parts.append(f"这是第 {task.retry_count + 1} 次尝试。")
return "\n".join(parts)
def _record_routing(self, task: Task, decision: Dict[str, Any],
outcome: str, detail: Optional[str],
override_db_path: Optional[Path] = None) -> None:
"""写路由审计日志到 routing_decisions 表"""
effective_db = override_db_path or self.db_path
if not effective_db:
return
try:
conn = sqlite3.connect(str(effective_db))
conn.row_factory = sqlite3.Row
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO routing_decisions "
"(task_id, from_status, to_status, mode, selected_agent, "
" previous_agent, reason, confidence, model, latency_ms, "
" task_type, requested_capability, outcome, detail) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
task.id,
task.status,
None, # to_status 在 dispatch 时还不知道
decision.get("mode", ""),
decision.get("agent_id", ""),
getattr(task, 'current_agent', None) or task.assignee,
decision.get("reason", ""),
decision.get("confidence"),
decision.get("model"),
decision.get("latency_ms"),
getattr(task, 'task_type', ''),
getattr(task, 'next_capability', ''),
outcome,
detail,
),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.debug("Failed to record routing decision", exc_info=True)
# ── 批量决策(兼容接口) ──
def dispatch_pending(self, tasks: List[Task]) -> List[Dict[str, Any]]:
"""批量决策(不 spawn,只返回决策列表)"""
results = []
for task in tasks:
decision = self.decide(task)
results.append({"task_id": task.id, **decision})
return results
# ── Legacy 兼容(deprecated ──
def _legacy_decide(
self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""旧版三级决策树(兼容过渡用)"""
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check",
"file_exists_check", "dependency_advance",
})
assignee = task.assignee
if action_type in LOCAL_ACTIONS:
return {"level": DispatchLevel.LOCAL, "agent_id": "daemon",
"reason": f"Local action: {action_type}"}
if assignee and assignee in self.registered_agents:
new_session = action_type == "adjudication"
return {"level": DispatchLevel.FULL_AGENT, "agent_id": assignee,
"new_session": new_session,
"reason": f"Registered agent: {assignee}"}
if not assignee:
agent_id = self._resolve_by_capability(task)
return {"level": DispatchLevel.FULL_AGENT, "agent_id": agent_id,
"reason": f"Auto-assigned via capability_map: {agent_id}"}
return {"level": DispatchLevel.ESCALATE, "agent_id": "pangtong-fujunshi",
"new_session": True,
"reason": f"Unknown agent '{assignee}', escalate to pangtong"}
def _resolve_by_capability(self, task: Task) -> str:
"""旧版能力映射"""
task_type = getattr(task, 'task_type', '') or ''
if task_type in self.capability_map:
candidates = self.capability_map[task_type]
registered = [a for a in candidates if a in self.registered_agents]
if registered:
if self.counter and len(registered) > 1:
return min(registered,
key=lambda a: self.counter.active_agents.get(a, 0))
return registered[0]
return "pangtong-fujunshi"
async def _legacy_dispatch(
self, task, action_type="", project_config=None):
"""旧版 dispatch(兼容过渡用)
v2.7.2: counter acquire/release 移到 spawn_full_agent 内部。
"""
decision = self._legacy_decide(task, action_type)
level = decision["level"]
agent_id = decision["agent_id"]
if level == DispatchLevel.LOCAL:
return {"level": level.value, "agent_id": "daemon",
"session_id": None, "status": "dispatched",
"reason": decision["reason"]}
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "No spawner configured"}
try:
# NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。
# Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。
# 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。
is_mail_legacy = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail_legacy:
db_path_legacy = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(
task.id, db_path_legacy):
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "mail_auto_working_failed"}
if hasattr(self.spawner,
'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id, title=task.title,
description=task.description or "",
task_type=getattr(task, 'task_type', '') or "",
priority=task.priority,
must_haves=task.must_haves or "",
project_id=project_config.get("project_id", ""),
agent_id=agent_id,
current_status=task.status or "claimed",
retry_context=retry_ctx,
spawn_type="executor",
)
else:
message = f"Task: {task.title}"
# v2.7.2: on_complete 只含业务逻辑
on_complete_legacy = None
if is_mail_legacy:
_t_id = task.id
_m_db = db_path_legacy
_m_mh = task.must_haves or ""
_disp = self
def _mail_oc_legacy(aid, outcome):
try:
_disp._mail_auto_complete(
_t_id, aid, _m_db, _m_mh, outcome=outcome)
except Exception as e:
logger.error(
"Mail %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _mail_oc_legacy
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id, message=message,
task_id=task.id,
on_complete=on_complete_legacy,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
)
return {"level": level.value, "agent_id": agent_id,
"session_id": session_id, "status": "dispatched",
"reason": decision["reason"]}
except AgentBusyError as e:
reason = getattr(e, 'reason', 'busy')
detail_msg = f"Session busy: {reason}" if reason.startswith(
"session_") else f"Agent busy: {reason}"
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "skipped",
"reason": detail_msg}
except Exception as e:
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": str(e)}
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "Unknown dispatch level"}
# ── Mail 信封/载荷分离辅助方法 ──
def _mail_auto_working(self, task_id: str, db_path: Path) -> bool:
"""Mail 任务:系统自动标 workingspawn 前)
Mail 不需要 claimed 中间态,直接 pending → working。
Returns:
True=标成功, False=标失败(需中止 spawn
"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
logger.warning(
"Mail %s: cannot mark working (task not found)", task_id)
return False
if row["status"] not in ("pending", "claimed"):
logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)",
task_id, row["status"])
return False
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.commit()
logger.info(
"Mail %s: auto-marked working (system, was %s)",
task_id,
row["status"])
return True
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: failed to mark working: %s", task_id, e)
return False
def _mail_revert_to_pending(self, task_id: str, db_path: Path) -> None:
"""Mail spawn 失败时回退 working → pending,避免永久死锁"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if row and row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.commit()
logger.info(
"Mail %s: reverted working → pending (spawn failed)", task_id)
else:
logger.debug(
"Mail %s: skip revert (status=%s, expected working)",
task_id,
row["status"] if row else "not_found")
finally:
conn.close()
except Exception as e:
logger.error(
"Mail %s: failed to revert to pending: %s",
task_id,
e)
def _mail_auto_complete(self, task_id: str, agent_id: str,
db_path: Path, must_haves: str, outcome=None) -> None:
"""Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)"""
try:
# 解析 performative
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
performative = meta.get(
"performative", meta.get(
"type", "request"))
except Exception:
pass
# request 类型:幻觉门控验证
if performative == "request":
has_reply = self._mail_check_reply(task_id, db_path)
if not has_reply:
# F3: 立刻标 failed(不等 ticker 30 分钟)
logger.error(
"Mail %s: no reply found, marking failed (no_reply_found)", task_id)
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='failed', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", "failed",
json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)),
)
conn.commit()
logger.info(
"Mail %s: marked failed (no_reply_found)", task_id)
# Mail 失败通知:通知发件人
try:
from src.daemon.mail_notify import notify_mail_failed
notify_mail_failed(
db_path, task_id, "no_reply_found")
except Exception as ne:
logger.warning(
"Mail %s: failed to send no_reply_found notification: %s", task_id, ne)
return
finally:
conn.close()
except Exception as e:
logger.warning(
"Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error(
"Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
return
# inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投
# Task 路径不受此 bug 影响(走 _task_auto_complete 独立逻辑)
if performative == "inform":
INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"}
if outcome not in INFORM_DONE_OUTCOMES:
logger.info(
"Mail %s: inform outcome=%s, skip auto-done",
task_id,
outcome)
return
# 标 done(重试 3 次)
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='done', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.commit()
logger.info("Mail %s: auto-marked done (system, performative=%s)",
task_id, performative)
return
finally:
conn.close()
except Exception as e:
logger.warning(
"Mail %s: done attempt %d failed: %s",
task_id,
attempt + 1,
e)
# 3 次都失败,留 working 等 ticker 超时兜底
logger.error(
"Mail %s: all 3 done attempts failed, leaving for ticker",
task_id)
except Exception as e:
logger.error("Mail %s: auto-complete error: %s", task_id, e)
def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool:
"""幻觉门控:检查是否有回复邮件(in_reply_to = original_task_id"""
try:
conn = get_connection(db_path)
try:
# 查 must_haves JSON 里包含 in_reply_to = original_task_id 的记录
row = conn.execute(
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
(original_task_id, f'%{original_task_id}%'),
).fetchone()
return row is not None
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: reply check error: %s", original_task_id, e)
# 查询失败时保守处理:假设有回复(避免误标 failed)
return True
# ══════════════════════════════════════════════
# #02: Task 路径幻觉门控
# ══════════════════════════════════════════════
def _task_auto_complete(self, task_id: str, db_path) -> None:
"""Task on_complete 后自动标 review/failed(三层幻觉门控第一层)
设计意图(#02 §5.2 + §10 Phase 1):
所有任务完成后统一标 review,由庞统 review(第二层+第三层幻觉门控)。
不区分顶层/子任务。
"""
from pathlib import Path
if not db_path:
logger.warning("Task %s: no db_path, skip auto_complete", task_id)
return
db_path = Path(db_path) if not isinstance(db_path, Path) else db_path
try:
passed = self._task_verify_completion(task_id, db_path)
if passed:
logger.info("Task %s: verify passed, marking review", task_id)
self._mark_task_status(db_path, task_id, "review")
else:
logger.info(
"Task %s: verify not passed (no signal), leaving working",
task_id)
except Exception as e:
logger.error("Task %s: auto-complete error: %s", task_id, e)
def _task_verify_completion(self, task_id: str, db_path: Path):
"""普通 Task 完成验证(三信号检查)
Returns: True=passed, False=no signal
"""
TERMINAL_STATES = {"review", "done", "failed", "cancelled"}
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row or row["status"] in TERMINAL_STATES:
return True
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return True
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? AND author != 'system' AND LENGTH(content) >= 50",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return True
return False
finally:
conn.close()
except Exception as e:
logger.error("Task %s: verify error: %s", task_id, e)
return True
def _rollback_current_agent(
self, db_path: Path, task_id: str, agent_id: str) -> None:
"""#07.2: crash 后回退 current_agent 到 assignee,避免 exclude_current 卡死"""
try:
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent = "
"(SELECT assignee FROM tasks WHERE id=?) "
"WHERE id=? AND current_agent=?",
(task_id, task_id, agent_id)
)
conn.commit()
finally:
conn.close()
logger.info(
"Task %s: rolled back current_agent from %s to assignee",
task_id,
agent_id)
except Exception as e:
logger.warning(
"Task %s: failed to rollback current_agent: %s",
task_id,
e)
def _mark_task_status(self, db_path: Path,
task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
old_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
old_status = old_row["status"] if old_row else "unknown"
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(status, task_id),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, payload) VALUES (?, 'dispatcher', 'status_change', ?)",
(task_id,
f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
)
conn.commit()
finally:
conn.close()
except Exception as e:
logger.error("Task %s: mark status error: %s", task_id, e)
@staticmethod
def _check_crash_limit(task_id: str, db_path: Path, limit: int = 3,
window_minutes: int = 30) -> bool:
"""v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。
基于 task_attempts 表(持久化),PM2 重启不丢失。
Returns: True = 已超限,应 escalate。
"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM task_attempts "
"WHERE task_id=? AND outcome='crashed' "
"AND started_at > datetime('now', ?)",
(task_id, f'-{window_minutes} minutes')
).fetchone()
count = row["cnt"] if row else 0
if count >= limit:
logger.warning("Task %s: crash limit reached (%d/%d in %dm)",
task_id, count, limit, window_minutes)
return True
return False
finally:
conn.close()
except Exception:
return False