ce1b0902dd
- S1: vp_name 硬编码字典 → handler.display_name 属性 - S2: ticker/spawner 中 TaskTypeRegistry 局部 import → 移文件顶部 - W1: TaskHandler executor verify 失败不调 on_failure 加注释说明
211 lines
8.0 KiB
Python
211 lines
8.0 KiB
Python
"""mail_handler.py — Mail 任务 handler。
|
||
|
||
处理 Agent 间通信(飞鸽传书),含 inform 和 request 两种类型。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Dict, Optional
|
||
|
||
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
|
||
from src.daemon.prompt_composer import PromptComposer, PromptContext
|
||
from src.blackboard.db import get_connection
|
||
|
||
logger = logging.getLogger("moziplus-v2.handler.mail")
|
||
|
||
class MailHandler(BaseTaskHandler):
|
||
"""Mail 任务 handler。"""
|
||
|
||
task_type = "mail"
|
||
virtual_project = "_mail"
|
||
display_name = "飞鸽传书"
|
||
|
||
def target_success_status(self) -> str:
|
||
return "done"
|
||
|
||
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
|
||
"""auto_working:pending → working"""
|
||
return self._auto_mark_working(task_id, db_path)
|
||
|
||
def build_prompt(self, context: PromptContext) -> str:
|
||
"""通过 PromptComposer 拼装 3 个 section。"""
|
||
composer = PromptComposer()
|
||
composer.add_many(self.get_sections())
|
||
return composer.compose(context)
|
||
|
||
def get_sections(self) -> list:
|
||
return [MailContextSection(), MailApiSection(), MailConstraintsSection()]
|
||
|
||
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
|
||
"""Mail 完成验证:区分 inform/request。
|
||
|
||
- inform: 始终通过(通知已阅即 done,不需要检查产出)
|
||
- request: 检查是否已回复
|
||
"""
|
||
performative = self._parse_performative(task_id, db_path)
|
||
|
||
if performative == "inform":
|
||
return VerifyResult(True, "inform_auto", f"performative={performative}")
|
||
|
||
# request: 检查是否已回复
|
||
has_reply = self._check_reply(task_id, db_path)
|
||
if has_reply:
|
||
return VerifyResult(True, "has_reply", f"performative={performative}")
|
||
return VerifyResult(False, "no_reply", f"performative={performative}")
|
||
|
||
# post_complete 由基类 BaseTaskHandler 统一处理(crash→verify→mark→notify)
|
||
# inform: verify 始终通过 → 基类 mark done ✅
|
||
# request 有回复: verify 通过 → 基类 mark done ✅
|
||
# request 无回复: verify 失败 → 基类调 on_failure ✅
|
||
|
||
def on_failure(self, task_id: str, agent_id: str,
|
||
db_path: Path, verify: VerifyResult) -> None:
|
||
"""request 验证失败 → 标 failed + 通知发件人"""
|
||
self._mark_task_status(db_path, task_id, "failed")
|
||
logger.info("Mail %s: request verify failed (%s), marked failed",
|
||
task_id, verify.reason)
|
||
|
||
# 通知发件人
|
||
try:
|
||
from src.daemon.mail_notify import notify_mail_failed
|
||
notify_mail_failed(db_path, task_id, "no_reply_found")
|
||
except Exception as e:
|
||
logger.warning("Mail %s: failed to send notification: %s", task_id, e)
|
||
|
||
# === 内部方法 ===
|
||
|
||
def _parse_performative(self, task_id: str, db_path: Path) -> str:
|
||
"""解析 mail 类型(inform/request)"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
if row and row["must_haves"]:
|
||
meta = json.loads(row["must_haves"])
|
||
return meta.get("performative", meta.get("type", "request"))
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return "request"
|
||
|
||
def _check_reply(self, task_id: str, db_path: Path) -> bool:
|
||
"""检查是否已回复(查 tasks 表找 in_reply_to 回复邮件)
|
||
|
||
从 dispatcher._mail_check_reply 迁移。
|
||
Mail 回复机制:创建新 task,must_haves JSON 中包含 in_reply_to = original_task_id。
|
||
不能查 comments 表——回复邮件是独立的 task,不是 comment。
|
||
"""
|
||
try:
|
||
conn = get_connection(db_path)
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
|
||
(task_id, f'%{task_id}%'),
|
||
).fetchone()
|
||
return row is not None
|
||
finally:
|
||
conn.close()
|
||
except Exception as e:
|
||
logger.error("Mail %s: check reply error: %s", task_id, e)
|
||
# 查询失败时保守处理:假设有回复(避免误标 failed)
|
||
return True
|
||
|
||
def check_completion(self, task_id: str, db_path: Path) -> bool:
|
||
"""ticker 级别的完成检查:检查是否已回复"""
|
||
return self._check_reply(task_id, db_path)
|
||
|
||
|
||
# ===================================================================
|
||
# Mail PromptSections
|
||
# ===================================================================
|
||
|
||
class MailContextSection:
|
||
"""邮件上下文段 — 发件人/收件人/主题/内容,区分 inform/request。"""
|
||
|
||
name: str = "mail_context"
|
||
priority: int = 10
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
if context.mail_type == "inform":
|
||
return self._render_inform(context)
|
||
return self._render_request(context)
|
||
|
||
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
|
||
return True
|
||
|
||
@staticmethod
|
||
def _render_inform(context: PromptContext) -> str:
|
||
return (
|
||
f"你收到一封飞鸽传书(纯通知)。\n\n"
|
||
f"发件者: {context.from_agent}\n"
|
||
f"主题: {context.title}\n"
|
||
f"内容: {context.description}\n\n"
|
||
f"已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。\n"
|
||
f"⚠️ 不要执行任何状态转换命令。"
|
||
)
|
||
|
||
@staticmethod
|
||
def _render_request(context: PromptContext) -> str:
|
||
return (
|
||
f"你收到一封飞鸽传书,需要你处理并回复。\n\n"
|
||
f"发件者: {context.from_agent}\n"
|
||
f"主题: {context.title}\n"
|
||
f"内容: {context.description}\n\n"
|
||
f"### 如何回复发件者\n\n"
|
||
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
|
||
f" -H 'Content-Type: application/json' \\\n"
|
||
f' -d \'{{"from": "{context.agent_id}", '
|
||
f'"in_reply_to": "{context.task_id}", '
|
||
f'"title": "回复: {context.title}", '
|
||
f'"text": "你的回复内容"}}\'\n\n'
|
||
f"⚠️ 不需要填 \"to\",系统自动回复给发件者。"
|
||
)
|
||
|
||
|
||
class MailApiSection:
|
||
"""Mail API 操作指令段。"""
|
||
|
||
name: str = "mail_api"
|
||
priority: int = 40
|
||
|
||
def render(self, context: PromptContext) -> str:
|
||
return (
|
||
f"### 如何给其他人发新邮件\n\n"
|
||
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
|
||
f" -H 'Content-Type: application/json' \\\n"
|
||
f' -d \'{{"from": "{context.agent_id}", '
|
||
f'"to": "对方agent-id", '
|
||
f'"title": "标题", '
|
||
f'"text": "正文", '
|
||
f'"type": "inform"}}\'\n\n'
|
||
f"⚠️ to 必须是有效的 agent id\n"
|
||
f"⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
|
||
)
|
||
|
||
def should_include(self, context: PromptContext) -> bool:
|
||
return context.mail_type == "request"
|
||
|
||
|
||
class MailConstraintsSection:
|
||
"""Mail 硬约束段。"""
|
||
|
||
name: str = "mail_constraints"
|
||
priority: int = 50
|
||
|
||
def render(self, context: PromptContext) -> str: # noqa: ARG002
|
||
return (
|
||
"## 硬约束\n\n"
|
||
"1. ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。\n"
|
||
"2. ⚠️ 不能给自己发邮件\n"
|
||
"3. ⚠️ 发邮件时 to 必须是有效的 agent id\n"
|
||
"4. ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
|
||
)
|
||
|
||
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
|
||
return True
|