Files
sanguo_moziplus_v2/src/api/mail_routes.py
T
2026-06-02 11:35:40 +08:00

321 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""API 路由 — Mail Tabv2.7
Mail 是一个特殊的 Project (_mail),每封 Mail 是一个两点 Taskfrom → to)。
显示为独立 Tab,list 形式展示:时间 | From | To | Title | 状态(已读/已执行)。
"""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.utils import get_data_root
def _get_valid_agents() -> set:
"""从 config/default.yaml 读取有效 Agent ID 列表"""
config_path = Path(__file__).parent.parent / "config" / "default.yaml"
if config_path.exists():
import yaml
try:
with open(config_path) as f:
cfg = yaml.safe_load(f)
profiles = cfg.get("daemon", {}).get("agent_profiles", {})
if profiles:
return set(profiles.keys())
except Exception:
pass
# fallback:硬编码
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
router = APIRouter(prefix="/api/mail", tags=["mail"])
MAIL_PROJECT_ID = "_mail"
def _db_path() -> Path:
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _bb() -> Blackboard:
return Blackboard(_db_path())
def _q() -> Queries:
return Queries(_db_path())
def _mail_meta(task: Task) -> Dict[str, Any]:
"""从 Task 的 must_haves 字段解析 Mail 元数据"""
if task.must_haves:
try:
return json.loads(task.must_haves)
except (json.JSONDecodeError, TypeError):
pass
return {}
def _task_to_mail(task: Task) -> Dict[str, Any]:
"""Task → Mail JSON"""
meta = _mail_meta(task)
return {
"id": task.id,
"title": task.title,
"from": task.assigned_by or "unknown",
"to": task.assignee,
"status": task.status,
"type": meta.get("type", "inform"),
"performative": meta.get("performative", "inform"),
"is_read": meta.get("is_read", False),
"created_at": task.created_at,
"updated_at": task.updated_at,
"description": task.description,
"conversation_id": meta.get("conversation_id"),
"in_reply_to": meta.get("in_reply_to"),
}
@router.get("")
async def list_mail(
status: Optional[str] = None,
from_agent: Optional[str] = None,
to_agent: Optional[str] = None,
unread: Optional[bool] = None,
limit: int = Query(50, le=200),
):
"""Mail 列表(按时间倒序)"""
bb = _bb()
tasks = bb.list_tasks(status=status, assignee=to_agent, assigned_by=from_agent)
mails = []
for t in tasks:
meta = _mail_meta(t)
# unread 过滤(must_haves JSON 内字段,无法 SQL WHERE
if unread is True and meta.get("is_read", False):
continue
mails.append(_task_to_mail(t))
# 按时间倒序
mails.sort(key=lambda m: m.get("created_at", ""), reverse=True)
mails = mails[:limit]
return {"mails": mails, "total": len(mails)}
@router.get("/agents/list")
async def list_mail_agents():
"""列出参与过 Mail 的所有 Agent(用于筛选)"""
q = _q()
conn = q._conn()
try:
senders = conn.execute(
"SELECT DISTINCT assigned_by FROM tasks WHERE assigned_by IS NOT NULL"
).fetchall()
receivers = conn.execute(
"SELECT DISTINCT assignee FROM tasks WHERE assignee IS NOT NULL"
).fetchall()
agents = list(set(
[r["assigned_by"] for r in senders] +
[r["assignee"] for r in receivers]
))
agents.sort()
return {"agents": agents}
finally:
conn.close()
@router.get("/summary")
async def mail_summary():
"""Mail 摘要(未读数、总数)"""
bb = _bb()
all_tasks = bb.list_tasks()
total = len(all_tasks)
unread = 0
by_type: Dict[str, int] = {}
for t in all_tasks:
meta = _mail_meta(t)
if not meta.get("is_read", False):
unread += 1
mtype = meta.get("type", "inform")
by_type[mtype] = by_type.get(mtype, 0) + 1
return {"total": total, "unread": unread, "by_type": by_type}
@router.get("/{mail_id}")
async def get_mail(mail_id: str):
"""Mail 详情"""
bb = _bb()
task = bb.get_task(mail_id)
if not task:
raise HTTPException(404, f"Mail not found: {mail_id}")
result = _task_to_mail(task)
# 附加 comments(用于对话线程)
comments = bb.get_comments(mail_id)
result["comments"] = [
{
"id": c.id,
"author": c.author,
"type": c.comment_type,
"body": c.body,
"created_at": c.created_at,
}
for c in comments
]
return result
@router.post("")
async def send_mail(body: Dict[str, Any]):
"""发送 Mail(创建 Task
API 层防御(A1-A10):
A1: from 必填
A9: from 必须是有效 Agent
A5: in_reply_to 存在性校验
A6/A7: 自动纠正 to
A2: to 必填(非回复)
A3: from != to 防自环
A4: to 必须是有效 Agent
A8: 回复权限校验(严格 1 对 1)
A10: 正文非空
"""
bb = _bb()
valid_agents = _get_valid_agents()
auto_corrected = None
# --- A1: from 必填 ---
from_agent = body.get("from", "").strip()
if not from_agent:
raise HTTPException(400, "`from` 必填")
# --- A9: from 必须是有效 Agent ---
if from_agent not in valid_agents:
raise HTTPException(400, f"`from` 不是有效的 Agent: {from_agent}")
# --- A5/A6/A7/A8: in_reply_to 处理 ---
in_reply_to = body.get("in_reply_to")
original = None
if in_reply_to:
original = bb.get_task(in_reply_to)
# A5: 原邮件必须存在
if not original:
raise HTTPException(400, f"回复的邮件不存在: {in_reply_to}")
orig_from = original.assigned_by or ""
orig_to = original.assignee or ""
# A8: 只有原邮件的双方能回复(严格 1 对 1)
if from_agent not in (orig_from, orig_to):
raise HTTPException(400, f"只有邮件的发送者或接收者可以回复")
# A6/A7: 自动纠正 to → 原邮件发件者
to_agent = body.get("to", "").strip()
corrected_to = orig_from # 回复方向固定: reply → original sender
if to_agent and to_agent != corrected_to:
auto_corrected = {"field": "to", "original": to_agent, "corrected": corrected_to}
to_agent = corrected_to
else:
# --- A2: to 必填(非回复场景) ---
to_agent = body.get("to", "").strip()
if not to_agent:
raise HTTPException(400, "`to` 必填")
# --- A3: from != to 防自环 ---
if from_agent == to_agent:
raise HTTPException(400, "不能给自己发邮件")
# --- A4: to 必须是有效 Agent ---
if to_agent not in valid_agents:
raise HTTPException(400, f"`to` 不是有效的 Agent: {to_agent}")
# --- A10: 正文非空 ---
text = body.get("text", body.get("description", "")) or ""
if not text.strip():
raise HTTPException(400, "邮件正文不能为空")
mail_id = body.get("id", f"mail-{int(datetime.now().timestamp() * 1000)}")
# 自动处理 conversation_id:有 in_reply_to 时继承原邮件的 conversation_id
conversation_id = body.get("conversation_id")
if not conversation_id and original:
try:
orig_meta = json.loads(original.must_haves) if original.must_haves else {}
conversation_id = orig_meta.get("conversation_id")
except Exception:
pass
if not conversation_id:
conversation_id = f"conv-{int(datetime.now().timestamp() * 1000)}"
# 自动处理 type:有 in_reply_to 且未显式指定 type 时默认 inform(防循环)
mail_type = body.get("type")
if mail_type is None:
mail_type = "inform" if in_reply_to else "request"
# performative 与 type 同义,type 优先
performative = body.get("performative", mail_type)
meta = {
"type": mail_type,
"performative": performative,
"is_read": False,
"conversation_id": conversation_id,
"in_reply_to": in_reply_to,
"from": from_agent,
}
task = Task(
id=mail_id,
title=body.get("title", ""),
description=text,
assignee=to_agent,
assigned_by=from_agent,
must_haves=json.dumps(meta),
task_type="mail",
status="pending",
)
bb.create_task(task)
result = {"ok": True, "mail_id": task.id}
if auto_corrected:
result["auto_corrected"] = auto_corrected
return result
@router.patch("/{mail_id}")
async def update_mail(mail_id: str, body: Dict[str, Any]):
"""更新 Mail(标记已读/已执行)"""
bb = _bb()
task = bb.get_task(mail_id)
if not task:
raise HTTPException(404, f"Mail not found: {mail_id}")
meta = _mail_meta(task)
if "is_read" in body:
meta["is_read"] = body["is_read"]
if body.get("mark_executed"):
meta["is_read"] = True
if task.status not in ("done", "cancelled"):
bb.update_task_status(mail_id, "done", agent="mail-api")
bb.update_must_haves(mail_id, json.dumps(meta))
return {"ok": True}