"""Mail 失败通知 v2.0 — 以 system 身份通知发件人(AI Native)""" from __future__ import annotations import json import logging from datetime import datetime from pathlib import Path from typing import Dict, Optional from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.config.agents import AGENT_IDS logger = logging.getLogger(__name__) # ── Reason 人话翻译 + detail 提取 ────────────────────────────── def _extract_stderr(detail: dict, max_len: int = 200) -> str: """从 detail 中提取 stderr_preview""" preview = (detail or {}).get("stderr_preview", "") if preview and len(preview) > max_len: preview = preview[:max_len] + "..." return preview def _fmt_retry_info(reason: str, detail: dict) -> str: """格式化重试情况描述""" _NO_RETRY_REASONS = { "no_reply_found", "auth_failed", "agent_error", "agent_failed", "compact_failed", } if reason in _NO_RETRY_REASONS: reason_human = _REASON_MAP.get(reason, _REASON_MAP.get("_default", ("未知原因", lambda d: "")))[0] return f"无法重试({reason_human})" count = (detail or {}).get("count", 0) fallback_count = (detail or {}).get("fallback_count", 0) if count > 0: return f"已自动重试 {count} 次" if fallback_count > 0: return f"已自动重试 {fallback_count} 次(fallback)" return "系统已尝试恢复,但仍失败" # reason_raw → (reason_human_readable, detail_format_fn) _REASON_MAP: Dict[str, tuple] = { "no_reply_found": ("收件人未回复(Agent 未能识别或处理此邮件)", lambda d: ""), "crashed": ("处理时进程崩溃", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else "无 stderr 输出"), "max_crash_count": ("连续崩溃达上限", lambda d: f"崩溃 {d.get('count', '?')} 次"), "max_retries": ("续杯耗尽(已自动重试)", lambda d: f"重试 {d.get('count', '?')} 次"), "max_api_retry_count": ("API 连续失败达上限", lambda d: f"API 重试 {d.get('count', '?')} 次"), "max_monitor_timeouts": ( "处理超时达上限", lambda d: f"超时 {d.get('count', '?')} 次," f"共约 {d.get('elapsed_seconds', 0) // 60} 分钟"), "gateway_timeout": ("Agent 执行超时(已续杯重试)", lambda d: ""), "session_stuck": ("会话假死(lock PID 死亡)", lambda d: f"假死 {d.get('stuck_count', '?')} 次"), "revive_failed": ("会话恢复失败", lambda d: f"假死 {d.get('stuck_count', '?')} 次"), "auth_failed": ("Agent 认证失败(配置问题)", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""), "fallback_exhausted": ( "主模型和备用模型均失败", lambda d: f"fallback {d.get('fallback_count', '?')} 次," f"原因: {d.get('fallback_reason', '未知')}"), "agent_error": ( "Agent 内部错误", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""), "agent_failed": ("收件人主动标记失败", lambda d: ""), "compact_failed": ("上下文压缩失败", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""), "compact_hanging": ("上下文压缩长时间未完成", lambda d: ""), "compact_interrupted": ("上下文压缩被中断(已自动重试)", lambda d: ""), "gateway_unreachable": ( "Gateway 不可达(已自动重试)", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""), "lock_conflict": ("会话锁冲突(已自动重试)", lambda d: ""), "max_retry_count": ("重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"), "max_lock_retry_count": ("锁冲突重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"), "max_connect_retry_count": ("连接重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"), "_default": ("未知原因", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""), } # 常见失败原因参考(AI Native:提供知识库让收件 AI 自行判断) _REASON_REFERENCE = """常见失败原因参考: • no_reply_found:收件人未回复(Agent 未能识别或处理此邮件) • crashed / max_crash_count:收件人处理时进程崩溃(已自动重试 3 次) • max_retries:续杯耗尽(已自动重试 3 次,共约 34 分钟) • max_api_retry_count:API 连续失败达上限(rate_limit/500/503) • max_monitor_timeouts:处理超时达上限(共约 31.5 分钟) • gateway_timeout:Agent 执行超时(已续杯重试) • session_stuck:Agent 会话假死(lock PID 死亡,revive 失败) • revive_failed:会话假死后恢复失败 • auth_failed:Agent 认证失败(配置问题) • fallback_exhausted:主模型和备用模型均失败 • agent_failed:收件人主动标记失败 • compact_failed:上下文压缩失败 • compact_hanging:上下文压缩长时间未完成(等待超 31.5 分钟) • compact_interrupted:上下文压缩被中断(已自动重试 3 次) • gateway_unreachable:Gateway 不可达(已自动重试 3 次) • lock_conflict:会话锁冲突(已自动重试 3 次) • 其他:建议排查系统日志""" def _build_notify_text(title: str, to_agent: str, reason: str, detail: Optional[dict] = None) -> str: """构建通知正文(v2.0 AI Native)""" reason_human, detail_fn = _REASON_MAP.get(reason, _REASON_MAP["_default"]) detail_info = detail_fn(detail or {}) retry_info = _fmt_retry_info(reason, detail or {}) lines = [ "邮件投递失败通知", "", f"📧 原始邮件:「{title}」", f"👤 收件人:{to_agent}", f"❌ 失败原因:{reason_human}({reason})", f"📊 重试情况:{retry_info}", ] if detail_info: lines.append("📋 上下文信息:") lines.append(f" {detail_info}") lines.append("") lines.append(_REASON_REFERENCE) lines.append("") lines.append("——系统自动通知") return "\n".join(lines) def _is_mail_project(db_path: Path) -> bool: """从 db_path 推断是否为 _mail 项目""" path_str = str(db_path) return "/_mail/" in path_str or path_str.endswith("_mail.db") def notify_mail_failed(db_path: Path, original_mail_id: str, reason: str, detail: Optional[dict] = None) -> None: """Mail 失败后以 system 身份给发件人发通知邮件 直接通过 Blackboard 创建 Task,不走 HTTP API。 防递归:检查原邮件 must_haves.system_notify,为 true 则跳过。 发件人不是有效 Agent(如 system)→ 通知庞统代处理,避免广播风暴。 """ try: bb = Blackboard(db_path) original = bb.get_task(original_mail_id) if not original: logger.warning( "notify_mail_failed: original mail %s not found", original_mail_id) return # 解析原邮件元数据 meta = json.loads(original.must_haves) if original.must_haves else {} # 防递归:系统通知邮件失败不再发通知 if meta.get("system_notify"): logger.info( "Mail %s: system notify mail failed, skipping recursive notification", original_mail_id) return # 获取发件人(优先 assigned_by,fallback must_haves.from) from_agent = original.assigned_by or meta.get("from", "") to_agent = original.assignee or "" title = original.title or "" if not from_agent: logger.warning( "notify_mail_failed: cannot determine sender for mail %s", original_mail_id) return # 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播 target_agent = from_agent if from_agent not in AGENT_IDS: logger.warning("Mail %s: sender '%s' is not a valid agent, routing failure notice to pangtong-fujunshi", original_mail_id, from_agent) target_agent = "pangtong-fujunshi" # 构造通知正文(v2.0 AI Native) text = _build_notify_text(title, to_agent, reason, detail) # 创建通知邮件 Task notify_id = f"mail-{int(datetime.now().timestamp() * 1000)}" notify_meta = { "type": "inform", "performative": "inform", "is_read": False, "conversation_id": meta.get("conversation_id", ""), "in_reply_to": original_mail_id, "from": "system", "system_notify": True, } notify_task = Task( id=notify_id, title=f"[投递失败] {title}", description=text, assignee=target_agent, assigned_by="system", must_haves=json.dumps(notify_meta, ensure_ascii=False), task_type="mail", status="pending", ) bb.create_task(notify_task) logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)", original_mail_id, target_agent, from_agent, reason, notify_id) except Exception as e: logger.warning( "notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e)