From 9e4fbabe211ef8fc4ab2a4c06630a32730955c2a Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 24 May 2026 19:51:27 +0800 Subject: [PATCH] auto-sync: 2026-05-24 19:51:27 --- src/daemon/dispatcher.py | 98 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index f6f85a3..b0d59a4 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -20,6 +20,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from src.blackboard.models import Task +from src.blackboard.db import get_connection from src.daemon.router import AgentRouter, RouteDecision logger = logging.getLogger("moziplus-v2.dispatcher") @@ -473,3 +474,100 @@ class Dispatcher: return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", "reason": "Unknown dispatch level"} + + # ── Mail 信封/载荷分离辅助方法 ── + + def _mail_auto_working(self, task_id: str, db_path: Path) -> bool: + """Mail 任务:系统自动标 working(spawn 前) + + Returns: + True=标成功, False=标失败(需中止 spawn) + """ + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + if not row or row["status"] != "claimed": + logger.warning("Mail %s: cannot mark working (status=%s, expected claimed)", + task_id, row["status"] if row else "None") + return False + conn.execute( + "UPDATE tasks SET status='working', updated_at=datetime('now') WHERE id=?", + (task_id,), + ) + conn.commit() + logger.info("Mail %s: auto-marked working (system)", task_id) + return True + finally: + conn.close() + except Exception as e: + logger.error("Mail %s: failed to mark working: %s", task_id, e) + return False + + def _mail_auto_complete(self, task_id: str, agent_id: str, + db_path: Path, must_haves: str) -> None: + """Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)""" + try: + # 解析 performative + performative = "request" + try: + meta = json.loads(must_haves) if must_haves else {} + performative = meta.get("performative", meta.get("type", "request")) + except Exception: + pass + + # request 类型:幻觉门控验证 + if performative == "request": + has_reply = self._mail_check_reply(task_id, db_path) + if not has_reply: + # 不直接标 failed,留 working 等 ticker 下一轮再查 + logger.warning("Mail %s: no reply found on on_complete, " + "leaving working for ticker recheck", task_id) + return + + # 标 done(重试 3 次) + for attempt in range(3): + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + if not row: + return + if row["status"] == "working": + conn.execute( + "UPDATE tasks SET status='done', updated_at=datetime('now') WHERE id=?", + (task_id,), + ) + conn.commit() + logger.info("Mail %s: auto-marked done (system, performative=%s)", + task_id, performative) + return + finally: + conn.close() + except Exception as e: + logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e) + # 3 次都失败,留 working 等 ticker 超时兜底 + logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id) + + except Exception as e: + logger.error("Mail %s: auto-complete error: %s", task_id, e) + + def _mail_check_reply(self, original_task_id: str, db_path: Path) -> bool: + """幻觉门控:检查是否有回复邮件(in_reply_to = original_task_id)""" + try: + conn = get_connection(db_path) + try: + # 查 must_haves JSON 里包含 in_reply_to = original_task_id 的记录 + row = conn.execute( + "SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1", + (original_task_id, f'%{original_task_id}%'), + ).fetchone() + return row is not None + finally: + conn.close() + except Exception as e: + logger.error("Mail %s: reply check error: %s", original_task_id, e) + # 查询失败时保守处理:假设有回复(避免误标 failed) + return True