"""mail_handler.py — Mail 任务 handler。 处理 Agent 间通信(飞鸽传书),含 inform 和 request 两种类型。 """ from __future__ import annotations import json import logging from pathlib import Path from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.handler.mail") class MailHandler(BaseTaskHandler): """Mail 任务 handler。""" task_type = "mail" virtual_project = "_mail" display_name = "飞鸽传书" def target_success_status(self) -> str: return "done" def pre_spawn(self, task_id: str, db_path: Path) -> bool: """auto_working:pending → working""" return self._auto_mark_working(task_id, db_path) def build_prompt(self, context: PromptContext) -> str: """通过 PromptComposer 拼装 3 个 section。""" composer = PromptComposer() composer.add_many(self.get_sections()) return composer.compose(context) def get_sections(self) -> list: return [MailContextSection(), MailApiSection(), MailConstraintsSection(), GiteaConventionSection(), WikiGuideSection()] def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: """Mail 完成验证:区分 inform/request。 - inform: 始终通过(通知已阅即 done,不需要检查产出) - request: 检查是否已回复 """ performative = self._parse_performative(task_id, db_path) if performative == "inform": return VerifyResult(True, "inform_auto", f"performative={performative}") # request: 检查是否已回复 has_reply = self._check_reply(task_id, db_path) if has_reply: return VerifyResult(True, "has_reply", f"performative={performative}") return VerifyResult(False, "no_reply", f"performative={performative}") # post_complete 由基类 BaseTaskHandler 统一处理(crash→verify→mark→notify) # inform: verify 始终通过 → 基类 mark done ✅ # request 有回复: verify 通过 → 基类 mark done ✅ # request 无回复: verify 失败 → 基类调 on_failure ✅ def on_failure(self, task_id: str, agent_id: str, db_path: Path, verify: VerifyResult) -> None: """request 验证失败 → 标 failed + 通知发件人""" self._mark_task_status(db_path, task_id, "failed") logger.info("Mail %s: request verify failed (%s), marked failed", task_id, verify.reason) # 通知发件人 try: from src.daemon.mail_notify import notify_mail_failed notify_mail_failed(db_path, task_id, "no_reply_found") except Exception as e: logger.warning("Mail %s: failed to send notification: %s", task_id, e) # === 内部方法 === def _parse_performative(self, task_id: str, db_path: Path) -> str: """解析 mail 类型(inform/request)""" try: conn = get_connection(db_path) try: row = conn.execute( "SELECT must_haves FROM tasks WHERE id=?", (task_id,) ).fetchone() if row and row["must_haves"]: meta = json.loads(row["must_haves"]) return meta.get("performative", meta.get("type", "request")) finally: conn.close() except Exception: pass return "request" def _check_reply(self, task_id: str, db_path: Path) -> bool: """检查是否已回复(查 tasks 表找 in_reply_to 回复邮件) 从 dispatcher._mail_check_reply 迁移。 Mail 回复机制:创建新 task,must_haves JSON 中包含 in_reply_to = original_task_id。 不能查 comments 表——回复邮件是独立的 task,不是 comment。 """ try: conn = get_connection(db_path) try: row = conn.execute( "SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1", (task_id, f'%{task_id}%'), ).fetchone() return row is not None finally: conn.close() except Exception as e: logger.error("Mail %s: check reply error: %s", task_id, e) # 查询失败时保守处理:假设有回复(避免误标 failed) return True def check_completion(self, task_id: str, db_path: Path) -> bool: """ticker 级别的完成检查:检查是否已回复""" return self._check_reply(task_id, db_path) # =================================================================== # Mail PromptSections # =================================================================== class MailContextSection: """邮件上下文段 — 发件人/收件人/主题/内容,区分 inform/request。""" name: str = "mail_context" priority: int = 10 def render(self, context: PromptContext) -> str: if context.mail_type == "inform": return self._render_inform(context) return self._render_request(context) def should_include(self, context: PromptContext) -> bool: # noqa: ARG002 return True @staticmethod def _render_inform(context: PromptContext) -> str: return ( f"你收到一封飞鸽传书(纯通知)。\n\n" f"发件者: {context.from_agent}\n" f"主题: {context.title}\n" f"内容: {context.description}\n\n" f"已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。\n" f"⚠️ 不要执行任何状态转换命令。" ) @staticmethod def _render_request(context: PromptContext) -> str: return ( f"你收到一封飞鸽传书,需要你处理并回复。\n\n" f"发件者: {context.from_agent}\n" f"主题: {context.title}\n" f"内容: {context.description}\n\n" f"### 如何回复发件者\n\n" f'curl -s -X POST http://localhost:8083/api/mail \\\n' f" -H 'Content-Type: application/json' \\\n" f' -d \'{{"from": "{context.agent_id}", ' f'"in_reply_to": "{context.task_id}", ' f'"title": "回复: {context.title}", ' f'"text": "你的回复内容"}}\'\n\n' f"⚠️ 不需要填 \"to\",系统自动回复给发件者。" ) class MailApiSection: """Mail API 操作指令段。""" name: str = "mail_api" priority: int = 40 def render(self, context: PromptContext) -> str: return ( f"### 如何给其他人发新邮件\n\n" f'curl -s -X POST http://localhost:8083/api/mail \\\n' f" -H 'Content-Type: application/json' \\\n" f' -d \'{{"from": "{context.agent_id}", ' f'"to": "对方agent-id", ' f'"title": "标题", ' f'"text": "正文", ' f'"type": "inform"}}\'\n\n' f"⚠️ to 必须是有效的 agent id\n" f"⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)" ) def should_include(self, context: PromptContext) -> bool: return context.mail_type == "request" class MailConstraintsSection: """Mail 硬约束段。""" name: str = "mail_constraints" priority: int = 50 def render(self, context: PromptContext) -> str: # noqa: ARG002 return ( "## 硬约束\n\n" "1. ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。\n" "2. ⚠️ 不能给自己发邮件\n" "3. ⚠️ 发邮件时 to 必须是有效的 agent id\n" "4. ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)" ) def should_include(self, context: PromptContext) -> bool: # noqa: ARG002 return True