220 lines
9.4 KiB
Python
220 lines
9.4 KiB
Python
"""Mail 失败通知 v2.0 — 以 system 身份通知发件人(AI Native)"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, Optional
|
||
|
||
from src.blackboard.models import Task
|
||
from src.blackboard.operations import Blackboard
|
||
from src.config.agents import AGENT_IDS
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ── Reason 人话翻译 + detail 提取 ──────────────────────────────
|
||
|
||
def _extract_stderr(detail: dict, max_len: int = 200) -> str:
|
||
"""从 detail 中提取 stderr_preview"""
|
||
preview = (detail or {}).get("stderr_preview", "")
|
||
if preview and len(preview) > max_len:
|
||
preview = preview[:max_len] + "..."
|
||
return preview
|
||
|
||
|
||
def _fmt_retry_info(reason: str, detail: dict) -> str:
|
||
"""格式化重试情况描述"""
|
||
_NO_RETRY_REASONS = {
|
||
"no_reply_found", "auth_failed", "agent_error",
|
||
"agent_failed", "compact_failed",
|
||
}
|
||
if reason in _NO_RETRY_REASONS:
|
||
reason_human = _REASON_MAP.get(reason, _REASON_MAP.get("_default", ("未知原因", lambda d: "")))[0]
|
||
return f"无法重试({reason_human})"
|
||
|
||
count = (detail or {}).get("count", 0)
|
||
fallback_count = (detail or {}).get("fallback_count", 0)
|
||
|
||
if count > 0:
|
||
return f"已自动重试 {count} 次"
|
||
if fallback_count > 0:
|
||
return f"已自动重试 {fallback_count} 次(fallback)"
|
||
return "系统已尝试恢复,但仍失败"
|
||
|
||
|
||
# reason_raw → (reason_human_readable, detail_format_fn)
|
||
_REASON_MAP: Dict[str, tuple] = {
|
||
"no_reply_found": ("收件人未回复(Agent 未能识别或处理此邮件)", lambda d: ""),
|
||
"crashed": ("处理时进程崩溃", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else "无 stderr 输出"),
|
||
"max_crash_count": ("连续崩溃达上限", lambda d: f"崩溃 {d.get('count', '?')} 次"),
|
||
"max_retries": ("续杯耗尽(已自动重试)", lambda d: f"重试 {d.get('count', '?')} 次"),
|
||
"max_api_retry_count": ("API 连续失败达上限", lambda d: f"API 重试 {d.get('count', '?')} 次"),
|
||
"max_monitor_timeouts": (
|
||
"处理超时达上限",
|
||
lambda d: f"超时 {d.get('count', '?')} 次,"
|
||
f"共约 {d.get('elapsed_seconds', 0) // 60} 分钟"),
|
||
"gateway_timeout": ("Agent 执行超时(已续杯重试)", lambda d: ""),
|
||
"session_stuck": ("会话假死(lock PID 死亡)", lambda d: f"假死 {d.get('stuck_count', '?')} 次"),
|
||
"revive_failed": ("会话恢复失败", lambda d: f"假死 {d.get('stuck_count', '?')} 次"),
|
||
"auth_failed": ("Agent 认证失败(配置问题)", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
|
||
"fallback_exhausted": (
|
||
"主模型和备用模型均失败",
|
||
lambda d: f"fallback {d.get('fallback_count', '?')} 次,"
|
||
f"原因: {d.get('fallback_reason', '未知')}"),
|
||
"agent_error": (
|
||
"Agent 内部错误",
|
||
lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
|
||
"agent_failed": ("收件人主动标记失败", lambda d: ""),
|
||
"compact_failed": ("上下文压缩失败", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
|
||
"compact_hanging": ("上下文压缩长时间未完成", lambda d: ""),
|
||
"compact_interrupted": ("上下文压缩被中断(已自动重试)", lambda d: ""),
|
||
"gateway_unreachable": (
|
||
"Gateway 不可达(已自动重试)",
|
||
lambda d: f"stderr: {_extract_stderr(d)}"
|
||
if _extract_stderr(d) else ""),
|
||
"lock_conflict": ("会话锁冲突(已自动重试)", lambda d: ""),
|
||
"max_retry_count": ("重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"),
|
||
"max_lock_retry_count": ("锁冲突重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"),
|
||
"max_connect_retry_count": ("连接重试耗尽", lambda d: f"重试 {d.get('count', '?')} 次"),
|
||
"_default": ("未知原因", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
|
||
}
|
||
|
||
# 常见失败原因参考(AI Native:提供知识库让收件 AI 自行判断)
|
||
_REASON_REFERENCE = """常见失败原因参考:
|
||
• no_reply_found:收件人未回复(Agent 未能识别或处理此邮件)
|
||
• crashed / max_crash_count:收件人处理时进程崩溃(已自动重试 3 次)
|
||
• max_retries:续杯耗尽(已自动重试 3 次,共约 34 分钟)
|
||
• max_api_retry_count:API 连续失败达上限(rate_limit/500/503)
|
||
• max_monitor_timeouts:处理超时达上限(共约 31.5 分钟)
|
||
• gateway_timeout:Agent 执行超时(已续杯重试)
|
||
• session_stuck:Agent 会话假死(lock PID 死亡,revive 失败)
|
||
• revive_failed:会话假死后恢复失败
|
||
• auth_failed:Agent 认证失败(配置问题)
|
||
• fallback_exhausted:主模型和备用模型均失败
|
||
• agent_failed:收件人主动标记失败
|
||
• compact_failed:上下文压缩失败
|
||
• compact_hanging:上下文压缩长时间未完成(等待超 31.5 分钟)
|
||
• compact_interrupted:上下文压缩被中断(已自动重试 3 次)
|
||
• gateway_unreachable:Gateway 不可达(已自动重试 3 次)
|
||
• lock_conflict:会话锁冲突(已自动重试 3 次)
|
||
• 其他:建议排查系统日志"""
|
||
|
||
|
||
def _build_notify_text(title: str, to_agent: str, reason: str,
|
||
detail: Optional[dict] = None) -> str:
|
||
"""构建通知正文(v2.0 AI Native)"""
|
||
reason_human, detail_fn = _REASON_MAP.get(reason, _REASON_MAP["_default"])
|
||
detail_info = detail_fn(detail or {})
|
||
|
||
retry_info = _fmt_retry_info(reason, detail or {})
|
||
|
||
lines = [
|
||
"邮件投递失败通知",
|
||
"",
|
||
f"📧 原始邮件:「{title}」",
|
||
f"👤 收件人:{to_agent}",
|
||
f"❌ 失败原因:{reason_human}({reason})",
|
||
f"📊 重试情况:{retry_info}",
|
||
]
|
||
|
||
if detail_info:
|
||
lines.append("📋 上下文信息:")
|
||
lines.append(f" {detail_info}")
|
||
|
||
lines.append("")
|
||
lines.append(_REASON_REFERENCE)
|
||
lines.append("")
|
||
lines.append("——系统自动通知")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _is_mail_project(db_path: Path) -> bool:
|
||
"""从 db_path 推断是否为 _mail 项目"""
|
||
path_str = str(db_path)
|
||
return "/_mail/" in path_str or path_str.endswith("_mail.db")
|
||
|
||
|
||
def notify_mail_failed(db_path: Path, original_mail_id: str,
|
||
reason: str, detail: Optional[dict] = None) -> None:
|
||
"""Mail 失败后以 system 身份给发件人发通知邮件
|
||
|
||
直接通过 Blackboard 创建 Task,不走 HTTP API。
|
||
防递归:检查原邮件 must_haves.system_notify,为 true 则跳过。
|
||
发件人不是有效 Agent(如 system)→ 通知庞统代处理,避免广播风暴。
|
||
"""
|
||
try:
|
||
bb = Blackboard(db_path)
|
||
original = bb.get_task(original_mail_id)
|
||
if not original:
|
||
logger.warning(
|
||
"notify_mail_failed: original mail %s not found",
|
||
original_mail_id)
|
||
return
|
||
|
||
# 解析原邮件元数据
|
||
meta = json.loads(original.must_haves) if original.must_haves else {}
|
||
|
||
# 防递归:系统通知邮件失败不再发通知
|
||
if meta.get("system_notify"):
|
||
logger.info(
|
||
"Mail %s: system notify mail failed, skipping recursive notification",
|
||
original_mail_id)
|
||
return
|
||
|
||
# 获取发件人(优先 assigned_by,fallback must_haves.from)
|
||
from_agent = original.assigned_by or meta.get("from", "")
|
||
to_agent = original.assignee or ""
|
||
title = original.title or ""
|
||
|
||
if not from_agent:
|
||
logger.warning(
|
||
"notify_mail_failed: cannot determine sender for mail %s",
|
||
original_mail_id)
|
||
return
|
||
|
||
# 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播
|
||
target_agent = from_agent
|
||
if from_agent not in AGENT_IDS:
|
||
logger.warning("Mail %s: sender '%s' is not a valid agent, routing failure notice to pangtong-fujunshi",
|
||
original_mail_id, from_agent)
|
||
target_agent = "pangtong-fujunshi"
|
||
|
||
# 构造通知正文(v2.0 AI Native)
|
||
text = _build_notify_text(title, to_agent, reason, detail)
|
||
|
||
# 创建通知邮件 Task
|
||
notify_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
|
||
notify_meta = {
|
||
"type": "inform",
|
||
"performative": "inform",
|
||
"is_read": False,
|
||
"conversation_id": meta.get("conversation_id", ""),
|
||
"in_reply_to": original_mail_id,
|
||
"from": "system",
|
||
"system_notify": True,
|
||
}
|
||
|
||
notify_task = Task(
|
||
id=notify_id,
|
||
title=f"[投递失败] {title}",
|
||
description=text,
|
||
assignee=target_agent,
|
||
assigned_by="system",
|
||
must_haves=json.dumps(notify_meta, ensure_ascii=False),
|
||
task_type="mail",
|
||
status="pending",
|
||
)
|
||
bb.create_task(notify_task)
|
||
logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)",
|
||
original_mail_id, target_agent, from_agent, reason, notify_id)
|
||
|
||
except Exception as e:
|
||
logger.warning(
|
||
"notify_mail_failed: failed to send notification for mail %s: %s",
|
||
original_mail_id,
|
||
e)
|