From 1b0007f2443a5493d1936c6fadab1a571fd0175c Mon Sep 17 00:00:00 2001 From: cfdaily Date: Wed, 10 Jun 2026 20:45:06 +0800 Subject: [PATCH 1/2] feat: Step 2-4 Task/Mail/Toolchain handlers + PromptSections + BaseTaskHandler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - base_task_handler.py: 基类统一4步流程(crash→verify→mark→notify) - task_handler.py: 5 PromptSections + 三信号验证 + review流程 - mail_handler.py: 3 PromptSections + inform/request区分 + 基类统一流程 - toolchain_handler.py: 3 PromptSections + 模板引擎渲染 + Mail API通知 - 背靠背设计-编码一致性检查通过(4严重已修/6轻微保留) --- src/daemon/base_task_handler.py | 179 +++++++++++++++++ src/daemon/mail_handler.py | 206 ++++++++++++++++++++ src/daemon/task_handler.py | 330 ++++++++++++++++++++++++++++++++ src/daemon/toolchain_handler.py | 206 ++++++++++++++++++++ 4 files changed, 921 insertions(+) create mode 100644 src/daemon/base_task_handler.py create mode 100644 src/daemon/mail_handler.py create mode 100644 src/daemon/task_handler.py create mode 100644 src/daemon/toolchain_handler.py diff --git a/src/daemon/base_task_handler.py b/src/daemon/base_task_handler.py new file mode 100644 index 0000000..b494dd8 --- /dev/null +++ b/src/daemon/base_task_handler.py @@ -0,0 +1,179 @@ +"""base_task_handler.py — Task type handler 基类。 + +收敛合理的共性能力(crash rollback + verify + mark + notify), +子类只实现差异点。 +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +from src.daemon.prompt_composer import PromptContext, PromptComposer, PromptSection +from src.blackboard.db import get_connection + +logger = logging.getLogger("moziplus-v2.handler") + + +@dataclass +class VerifyResult: + """验证结果""" + passed: bool + reason: str # "has_output" / "no_reply" / "no_signal" / ... + evidence: str # "output_count=1, comment_count=0" + can_retry: bool = True + retry_count: int = 0 + + +class BaseTaskHandler: + """所有 task type handler 的基类。 + + 职责:L2 引擎注入层的业务逻辑——prompt 构建、完成验证、状态标记。 + 不管:进程生命周期、exit 分类、重试决策(这些归 spawner)。 + """ + + # crash 类 outcome(进程级异常,需要 rollback) + CRASH_OUTCOMES = frozenset({ + "crashed", "compact_failed", "process_crash", + "session_stuck", "compact_hanging", + }) + + task_type: str = "" + virtual_project: Optional[str] = None + + # === 子类必须实现 === + + def build_prompt(self, context: PromptContext) -> str: + """构建 L2 prompt(通过 PromptComposer 拼 section)。子类实现。""" + raise NotImplementedError + + def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: + """验证任务完成质量。每个 handler 自己的验证逻辑。子类实现。""" + raise NotImplementedError + + def target_success_status(self) -> str: + """验证通过后的目标状态。task='review', mail/toolchain='done'""" + return "review" + + def get_sections(self) -> list[PromptSection]: + """返回此 handler 的 prompt section 列表。子类实现。""" + return [] + + # === 基类提供统一流程 === + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """spawn 前业务准备。默认 True。 + mail/toolchain override 为 auto_working。""" + return True + + def post_complete(self, task_id: str, agent_id: str, + outcome: str, db_path: Path) -> None: + """spawn 完成后的业务处理。统一 4 步流程: + 1. crash 处理 → rollback current_agent + 2. verify → 验证产出 + 3. mark → 标目标状态 + 4. notify → 失败时 on_failure + """ + # 1. crash 处理(基类提供,所有 handler 继承) + if outcome in self.CRASH_OUTCOMES: + self._rollback_current_agent(db_path, task_id, agent_id) + return + + # 2. verify + result = self.verify_completion(task_id, db_path) + + # 3. mark + if result.passed: + self._mark_task_status(db_path, task_id, self.target_success_status()) + logger.info("Task %s: verify passed (%s), marked %s", + task_id, result.reason, self.target_success_status()) + else: + # 4. notify + self.on_failure(task_id, agent_id, db_path, result) + + def on_failure(self, task_id: str, agent_id: str, + db_path: Path, verify: VerifyResult) -> None: + """验证失败处理。默认:标 failed。子类可 override。""" + self._mark_task_status(db_path, task_id, "failed") + logger.info("Task %s: verify failed (%s), marked failed", + task_id, verify.reason) + + def check_completion(self, task_id: str, db_path: Path) -> bool: + """ticker 级别的完成检查。默认:False。""" + return False + + # === 内部工具方法 === + + def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None: + """crash 后回退 current_agent → assignee,避免 exclude_current 卡死。 + 从 dispatcher._rollback_current_agent 迁移。""" + try: + conn = get_connection(db_path) + try: + conn.execute( + "UPDATE tasks SET current_agent = " + "(SELECT assignee FROM tasks WHERE id=?) " + "WHERE id=? AND current_agent=?", + (task_id, task_id, agent_id) + ) + conn.commit() + finally: + conn.close() + logger.info("Task %s: rolled back current_agent from %s to assignee", + task_id, agent_id) + except Exception as e: + logger.warning("Task %s: failed to rollback current_agent: %s", + task_id, e) + + def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None: + """更新任务状态 + 写审计事件。 + 从 dispatcher._mark_task_status 迁移。""" + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + old_row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + old_status = old_row["status"] if old_row else "unknown" + conn.execute( + "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", + (status, task_id), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, payload) " + "VALUES (?, 'handler', 'status_change', ?)", + (task_id, + f'{{"from": "{old_status}", "to": "{status}", ' + f'"source": "{self.task_type}_handler"}}'), + ) + conn.commit() + finally: + conn.close() + except Exception as e: + logger.error("Task %s: mark status error: %s", task_id, e) + + def _auto_mark_working(self, task_id: str, db_path: Path) -> bool: + """pending → working(mail/toolchain 通用)。""" + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone() + if not row or row["status"] not in ("pending", "claimed"): + logger.warning("Task %s: cannot mark working (status=%s)", + task_id, row["status"] if row else "not found") + return False + conn.execute( + "UPDATE tasks SET status='working', updated_at=datetime('now') " + "WHERE id=?", (task_id,)) + conn.commit() + logger.info("Task %s: auto-marked working", task_id) + return True + finally: + conn.close() + except Exception as e: + logger.error("Task %s: failed to mark working: %s", task_id, e) + return False diff --git a/src/daemon/mail_handler.py b/src/daemon/mail_handler.py new file mode 100644 index 0000000..2221725 --- /dev/null +++ b/src/daemon/mail_handler.py @@ -0,0 +1,206 @@ +"""mail_handler.py — Mail 任务 handler。 + +处理 Agent 间通信(飞鸽传书),含 inform 和 request 两种类型。 +""" +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Dict, List, Optional + +from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult +from src.daemon.prompt_composer import PromptComposer, PromptContext +from src.blackboard.db import get_connection + +logger = logging.getLogger("moziplus-v2.handler.mail") + +class MailHandler(BaseTaskHandler): + """Mail 任务 handler。""" + + task_type = "mail" + virtual_project = "_mail" + + def target_success_status(self) -> str: + return "done" + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """auto_working:pending → working""" + return self._auto_mark_working(task_id, db_path) + + def build_prompt(self, context: PromptContext) -> str: + """通过 PromptComposer 拼装 3 个 section。""" + composer = PromptComposer() + composer.add_many(self.get_sections()) + return composer.compose(context) + + def get_sections(self) -> List: + return [MailContextSection(), MailApiSection(), MailConstraintsSection()] + + def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: + """Mail 完成验证:区分 inform/request。 + + - inform: 始终通过(通知已阅即 done,不需要检查产出) + - request: 检查是否已回复 + """ + performative = self._parse_performative(task_id, db_path) + + if performative == "inform": + return VerifyResult(True, "inform_auto", f"performative={performative}") + + # request: 检查是否已回复 + has_reply = self._check_reply(task_id, db_path) + if has_reply: + return VerifyResult(True, "has_reply", f"performative={performative}") + return VerifyResult(False, "no_reply", f"performative={performative}") + + # post_complete 由基类 BaseTaskHandler 统一处理(crash→verify→mark→notify) + # inform: verify 始终通过 → 基类 mark done ✅ + # request 有回复: verify 通过 → 基类 mark done ✅ + # request 无回复: verify 失败 → 基类调 on_failure ✅ + + def on_failure(self, task_id: str, agent_id: str, + db_path: Path, verify: VerifyResult) -> None: + """request 验证失败 → 标 failed + 通知发件人""" + self._mark_task_status(db_path, task_id, "failed") + logger.info("Mail %s: request verify failed (%s), marked failed", + task_id, verify.reason) + + # 通知发件人 + try: + from src.daemon.mail_notify import notify_mail_failed + notify_mail_failed(db_path, task_id, "no_reply_found") + except Exception as e: + logger.warning("Mail %s: failed to send notification: %s", task_id, e) + + # === 内部方法 === + + def _parse_performative(self, task_id: str, db_path: Path) -> str: + """解析 mail 类型(inform/request)""" + try: + conn = get_connection(db_path) + try: + row = conn.execute( + "SELECT must_haves FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if row and row["must_haves"]: + meta = json.loads(row["must_haves"]) + return meta.get("performative", meta.get("type", "request")) + finally: + conn.close() + except Exception: + pass + return "request" + + def _check_reply(self, task_id: str, db_path: Path) -> bool: + """检查是否已回复(从 dispatcher._mail_check_reply 迁移)""" + try: + conn = get_connection(db_path) + try: + row = conn.execute( + "SELECT COUNT(*) as cnt FROM comments " + "WHERE task_id=? AND author != 'daemon' " + "AND comment_type != 'system'", + (task_id,) + ).fetchone() + count = row["cnt"] if row else 0 + return count > 0 + finally: + conn.close() + except Exception as e: + logger.error("Mail %s: check reply error: %s", task_id, e) + return False + + def check_completion(self, task_id: str, db_path: Path) -> bool: + """ticker 级别的完成检查:检查是否已回复""" + return self._check_reply(task_id, db_path) + + +# =================================================================== +# Mail PromptSections +# =================================================================== + +class MailContextSection: + """邮件上下文段 — 发件人/收件人/主题/内容,区分 inform/request。""" + + name: str = "mail_context" + priority: int = 10 + + def render(self, context: PromptContext) -> str: + if context.mail_type == "inform": + return self._render_inform(context) + return self._render_request(context) + + def should_include(self, context: PromptContext) -> bool: # noqa: ARG002 + return True + + @staticmethod + def _render_inform(context: PromptContext) -> str: + return ( + f"你收到一封飞鸽传书(纯通知)。\n\n" + f"发件者: {context.from_agent}\n" + f"主题: {context.title}\n" + f"内容: {context.description}\n\n" + f"已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。\n" + f"⚠️ 不要执行任何状态转换命令。" + ) + + @staticmethod + def _render_request(context: PromptContext) -> str: + return ( + f"你收到一封飞鸽传书,需要你处理并回复。\n\n" + f"发件者: {context.from_agent}\n" + f"主题: {context.title}\n" + f"内容: {context.description}\n\n" + f"### 如何回复发件者\n\n" + f'curl -s -X POST http://localhost:8083/api/mail \\\n' + f" -H 'Content-Type: application/json' \\\n" + f' -d \'{{"from": "{context.agent_id}", ' + f'"in_reply_to": "{context.task_id}", ' + f'"title": "回复: {context.title}", ' + f'"text": "你的回复内容"}}\'\n\n' + f"⚠️ 不需要填 \"to\",系统自动回复给发件者。" + ) + + +class MailApiSection: + """Mail API 操作指令段。""" + + name: str = "mail_api" + priority: int = 40 + + def render(self, context: PromptContext) -> str: + return ( + f"### 如何给其他人发新邮件\n\n" + f'curl -s -X POST http://localhost:8083/api/mail \\\n' + f" -H 'Content-Type: application/json' \\\n" + f' -d \'{{"from": "{context.agent_id}", ' + f'"to": "对方agent-id", ' + f'"title": "标题", ' + f'"text": "正文", ' + f'"type": "inform"}}\'\n\n' + f"⚠️ to 必须是有效的 agent id\n" + f"⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)" + ) + + def should_include(self, context: PromptContext) -> bool: + return context.mail_type == "request" + + +class MailConstraintsSection: + """Mail 硬约束段。""" + + name: str = "mail_constraints" + priority: int = 50 + + def render(self, context: PromptContext) -> str: # noqa: ARG002 + return ( + "## 硬约束\n\n" + "1. ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。\n" + "2. ⚠️ 不能给自己发邮件\n" + "3. ⚠️ 发邮件时 to 必须是有效的 agent id\n" + "4. ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)" + ) + + def should_include(self, context: PromptContext) -> bool: # noqa: ARG002 + return True diff --git a/src/daemon/task_handler.py b/src/daemon/task_handler.py new file mode 100644 index 0000000..2a402a1 --- /dev/null +++ b/src/daemon/task_handler.py @@ -0,0 +1,330 @@ +"""task_handler.py — 黑板任务 handler(task_type='task')。 + +标准黑板任务:三信号验证 → review 状态。 +""" +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Dict, List, Optional + +from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult +from src.daemon.prompt_composer import PromptComposer, PromptContext +from src.blackboard.db import get_connection + +logger = logging.getLogger("moziplus-v2.handler") + +TERMINAL_STATES = frozenset({"review", "done", "failed", "cancelled"}) + +# --------------------------------------------------------------------------- +# Role → Skill 映射(D8 决策:L2 只给索引+引导语,不注全文) +# --------------------------------------------------------------------------- +ROLE_SKILL_MAP: Dict[str, str] = { + "executor": "blackboard-executor", + "reviewer": "blackboard-reviewer", + "reviewer-simayi": "blackboard-reviewer-simayi", + "reviewer-pangtong": "blackboard-reviewer-pangtong", + "planner": "blackboard-planner", + "claim": "blackboard-claim", +} + +SKILL_BASE_PATH = "/Users/chufeng/.sanguo_projects/sanguo_mozi/skills" + + +# --------------------------------------------------------------------------- +# PromptSection 实现 +# --------------------------------------------------------------------------- + +class TaskContextSection: + """段 1:任务上下文(title / desc / must_haves / status)。""" + + name: str = "task_context" + priority: int = 10 + + def render(self, context: PromptContext) -> str: + parts = ["## 任务上下文"] + if context.task_id: + parts.append(f"任务ID: {context.task_id}") + if context.title: + parts.append(f"标题: {context.title}") + if context.description: + parts.append(f"描述: {context.description}") + if context.must_haves: + parts.append(f"必须完成: {context.must_haves}") + if context.task and context.task.get("status"): + parts.append(f"当前状态: {context.task['status']}") + return "\n".join(parts) + + def should_include(self, context: PromptContext) -> bool: + return bool(context.task_id or context.title) + + +class PriorOutputsSection: + """段 2:前序产出摘要(depends_on 非空时注入)。""" + + name: str = "prior_outputs" + priority: int = 20 + + def render(self, context: PromptContext) -> str: + outputs = context.depends_on_outputs or [] + parts = ["## 前序产出"] + for out in outputs: + tid = out.get("task_id", "?") + summary = out.get("summary", "无摘要") + parts.append(f"- [{tid}] {summary}") + return "\n".join(parts) + + def should_include(self, context: PromptContext) -> bool: + return bool(context.depends_on_outputs) + + +class RoleSkillSection: + """段 3:角色 Skill 索引+引导语(D8 决策:不注全文)。""" + + name: str = "role_skill" + priority: int = 30 + + def render(self, context: PromptContext) -> str: + skill_name = ROLE_SKILL_MAP.get(context.role, "") + lines = [ + "## 角色操作规范", + f"你的角色:{context.role}", + ] + if skill_name: + lines.append(f"对应 Skill:{skill_name}") + lines.append( + f"请用 read 工具读取 {SKILL_BASE_PATH}/{skill_name}/SKILL.md " + "获取完整操作规范。" + ) + else: + lines.append("无对应 Skill 文件,按通用规范执行。") + return "\n".join(lines) + + def should_include(self, context: PromptContext) -> bool: + return True + + +class TaskApiSection: + """段 4:API 操作指令。""" + + name: str = "task_api" + priority: int = 40 + + API_HOST = "localhost" + API_PORT = 8083 + + def render(self, context: PromptContext) -> str: + pid = context.project_id + tid = context.task_id + aid = context.agent_id + success_status = '"review"' + base = f"http://{self.API_HOST}:{self.API_PORT}/api/projects/{pid}/tasks/{tid}" + return ( + "## 操作指令\n" + "### 状态回写\n" + f"开始工作:\n" + f'curl -X POST {base}/status \\\n' + f' -H "Content-Type: application/json" \\\n' + f' -d \'{{"status": "working", "agent": "{aid}"}}\'\n\n' + "### 写入产出\n" + f'curl -X POST {base}/outputs \\\n' + f' -H "Content-Type: application/json" \\\n' + f" -d '{{\"type\": \"text\", \"content\": \"\"}}'\n\n" + "### 完成后\n" + f"成功: status → {success_status} | 失败: status → \"failed\"" + ) + + def should_include(self, context: PromptContext) -> bool: + return True + + +class TaskConstraintsSection: + """段 5:硬约束。""" + + name: str = "task_constraints" + priority: int = 50 + + def render(self, context: PromptContext) -> str: + constraints = ["## 硬约束"] + role = context.role + if role == "executor": + constraints.extend([ + "- 完成后必须标 review", + "- 产出物不能为空(系统会验证)", + "- handoff comment ≥ 50 字符", + ]) + elif role.startswith("reviewer"): + constraints.extend([ + "- 审查结果必须明确 pass/fail", + "- 评审意见须附证据(文件:行号)", + ]) + elif role == "planner": + constraints.extend([ + "- 需求不清时提问,不要猜", + "- 子任务必须有明确的终态定义", + ]) + else: + constraints.append("- 按规范完成 assigned 任务") + return "\n".join(constraints) + + def should_include(self, context: PromptContext) -> bool: + return True + + +class TaskHandler(BaseTaskHandler): + """黑板标准任务 handler。 + + - verify: 三信号检查(output / comment / terminal status) + - 成功 → review + - 失败 → 保持 working,让 ticker 重试 + - review 完成 → 读取 verdict,approved 则 mark done + """ + + task_type: str = "task" + virtual_project: Optional[str] = None + + # === 子类实现 === + + def target_success_status(self) -> str: + """task 类型验证通过后进 review。""" + return "review" + + def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: + """三信号验证:output / comment / terminal status。""" + try: + conn = get_connection(db_path) + try: + # 信号 1:terminal status + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if not row: + return VerifyResult(False, "not_found", "task not found", + can_retry=False) + status = row["status"] + if status in TERMINAL_STATES: + return VerifyResult( + True, "terminal_status", + f"status={status}", can_retry=False + ) + + # 信号 2:outputs + output_count = conn.execute( + "SELECT COUNT(*) as cnt FROM outputs WHERE task_id=?", + (task_id,) + ).fetchone()["cnt"] + if output_count > 0: + return VerifyResult( + True, "has_output", + f"output_count={output_count}" + ) + + # 信号 3:非 system 且内容 >= 50 字的 comment + comment_count = conn.execute( + "SELECT COUNT(*) as cnt FROM comments " + "WHERE task_id=? AND author != 'system' " + "AND LENGTH(content) >= 50", + (task_id,) + ).fetchone()["cnt"] + if comment_count > 0: + return VerifyResult( + True, "has_comment", + f"comment_count={comment_count}" + ) + + # 无信号 + return VerifyResult( + False, "no_signal", + f"output=0, comment=0, status={status}" + ) + finally: + conn.close() + except Exception as e: + logger.error("Task %s: verify error: %s", task_id, e) + return VerifyResult(False, "verify_error", str(e)) + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """task 类型不需要 pre_spawn 逻辑。""" + return True + + def get_sections(self) -> list: + """返回 5 个 PromptSection 实例。""" + return [ + TaskContextSection(), + PriorOutputsSection(), + RoleSkillSection(), + TaskApiSection(), + TaskConstraintsSection(), + ] + + def build_prompt(self, context: PromptContext) -> str: + """通过 PromptComposer 拼装 prompt sections。""" + composer = PromptComposer() + composer.add_many(self.get_sections()) + return composer.compose(context) + + def on_failure(self, task_id: str, agent_id: str, + db_path: Path, verify: VerifyResult) -> None: + """验证失败:不标 failed,保持 working 让 ticker 重试。""" + logger.info( + "Task %s: verify failed (%s, evidence=%s), leaving working for ticker retry", + task_id, verify.reason, verify.evidence + ) + + # === Review 流程 === + + def handle_review_complete(self, task_id: str, db_path: Path) -> None: + """Review 完成后处理:读取 verdict → approved 则 mark done, + 否则 @mention assignee via blackboard comment。""" + try: + conn = get_connection(db_path) + try: + # 读取最新 review + review_row = conn.execute( + "SELECT verdict, reviewer, comment FROM reviews " + "WHERE task_id=? ORDER BY created_at DESC LIMIT 1", + (task_id,) + ).fetchone() + + if not review_row: + logger.warning("Task %s: no review found", task_id) + return + + verdict = review_row["verdict"] + reviewer = review_row["reviewer"] + review_comment = review_row["comment"] or "" + + # 获取 assignee + task_row = conn.execute( + "SELECT assignee FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if not task_row: + logger.warning("Task %s: task not found for review", task_id) + return + assignee = task_row["assignee"] + + if verdict == "approved": + self._mark_task_status(db_path, task_id, "done") + logger.info("Task %s: review approved by %s, marked done", + task_id, reviewer) + else: + # 非 approved:通过 blackboard comment @mention assignee + conn.execute( + "INSERT INTO comments (task_id, author, content) " + "VALUES (?, 'system', ?)", + (task_id, + f"@{assignee} review 未通过 (verdict={verdict}, " + f"reviewer={reviewer}): {review_comment}") + ) + conn.commit() + # 回到 working 让 assignee 重新处理 + self._mark_task_status(db_path, task_id, "working") + logger.info( + "Task %s: review not approved (%s by %s), " + "@mentioned assignee %s, back to working", + task_id, verdict, reviewer, assignee + ) + finally: + conn.close() + except Exception as e: + logger.error("Task %s: handle_review_complete error: %s", task_id, e) diff --git a/src/daemon/toolchain_handler.py b/src/daemon/toolchain_handler.py new file mode 100644 index 0000000..16152e1 --- /dev/null +++ b/src/daemon/toolchain_handler.py @@ -0,0 +1,206 @@ +"""toolchain_handler.py — 工具链事件 handler。 + +处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。 +""" +from __future__ import annotations + +import json +import logging +import subprocess +from pathlib import Path +from typing import Dict, List + +from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult +from src.daemon.prompt_composer import PromptComposer, PromptContext +from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP +from src.blackboard.db import get_connection + +logger = logging.getLogger("moziplus-v2.handler.toolchain") + + +# --------------------------------------------------------------------------- +# Toolchain PromptSections +# --------------------------------------------------------------------------- + +class ToolchainContextSection: + """事件类型 + 事件详情(priority=10)""" + + name: str = "toolchain_context" + priority: int = 10 + + def render(self, context: PromptContext) -> str: + event_type = context.event_type + event_data: Dict = context.event_data or {} + + if event_type in _TEMPLATE_MAP: + # 使用模板引擎渲染已知事件 + variables = {k: str(v) for k, v in event_data.items()} + return render_template(event_type, variables) + + # fallback:通用事件描述 + lines = [f"## 工具链事件", f""] + lines.append(f"- **事件类型**: {event_type or '未知'}") + if event_data: + lines.append(f"- **事件详情**:") + for key, value in event_data.items(): + lines.append(f" - {key}: {value}") + lines.append(f"") + return "\n".join(lines) + + def should_include(self, context: PromptContext) -> bool: + return True + + +class ToolchainApiSection: + """API 操作指令(priority=40),success_status=done""" + + name: str = "toolchain_api" + priority: int = 40 + + API_HOST = "localhost:8083" + + def render(self, context: PromptContext) -> str: + lines = [ + "## API 操作指令", + "", + f"项目 ID: `{context.project_id}`", + f"任务 ID: `{context.task_id}`", + "", + "### 完成后必须更新任务状态", + "完成后务必通过以下命令将任务标记为 **done**:", + "```bash", + f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\', + ' -H "Content-Type: application/json" \\', + ' -d \'{"status": "done"}\'', + "```", + "", + "### 提交产出", + "如有产出(如 review 结果、修复方案),提交到任务 outputs:", + "```bash", + f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\', + ' -H "Content-Type: application/json" \\', + ' -d \'{"content": "<你的产出内容>", "type": "text"}\'', + "```", + "", + ] + return "\n".join(lines) + + def should_include(self, context: PromptContext) -> bool: + return True + + +class ToolchainConstraintsSection: + """硬约束(priority=50)""" + + name: str = "toolchain_constraints" + priority: int = 50 + + def render(self, context: PromptContext) -> str: + lines = [ + "## 硬约束", + "", + "1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成", + "2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态", + "3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务", + "4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done", + "5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务", + "", + ] + return "\n".join(lines) + + def should_include(self, context: PromptContext) -> bool: + return True + + +# --------------------------------------------------------------------------- +# ToolchainHandler +# --------------------------------------------------------------------------- + +class ToolchainHandler(BaseTaskHandler): + """工具链事件 handler。""" + + task_type = "toolchain" + virtual_project = "_toolchain" + + def target_success_status(self) -> str: + return "done" + + def pre_spawn(self, task_id: str, db_path: Path) -> bool: + """auto_working:pending → working""" + return self._auto_mark_working(task_id, db_path) + + def get_sections(self) -> List: + """返回 3 个 Toolchain PromptSection 实例""" + return [ + ToolchainContextSection(), + ToolchainApiSection(), + ToolchainConstraintsSection(), + ] + + def build_prompt(self, context: PromptContext) -> str: + """通过 PromptComposer 拼装 sections 为最终 prompt""" + composer = PromptComposer() + composer.add_many(self.get_sections()) + return composer.compose(context) + + def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: + """检查行动输出(output 或 comment 有实质内容)""" + try: + conn = get_connection(db_path) + try: + # 检查 output + output_count = conn.execute( + "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) + ).fetchone()[0] + if output_count > 0: + return VerifyResult(True, "has_output", f"output_count={output_count}") + + # 检查 comment(非系统、有实质内容) + comment_count = conn.execute( + "SELECT COUNT(*) FROM comments WHERE task_id=? " + "AND author != 'system' AND LENGTH(content) >= 20", + (task_id,) + ).fetchone()[0] + if comment_count > 0: + return VerifyResult(True, "has_comment", f"comment_count={comment_count}") + + return VerifyResult(False, "no_action", "output=0, comment=0") + finally: + conn.close() + except Exception as e: + logger.error("Toolchain %s: verify error: %s", task_id, e) + return VerifyResult(False, "verify_error", str(e)) + + def on_failure(self, task_id: str, agent_id: str, + db_path: Path, verify: VerifyResult) -> None: + """验证失败 → 标 failed + Mail API 通知主公""" + self._mark_task_status(db_path, task_id, "failed") + logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason) + self._notify_via_mail_api(task_id, verify.reason, verify.evidence) + + def _notify_via_mail_api(self, task_id: str, reason: str, evidence: str) -> None: + """通过 Mail API 发通知给主公""" + payload = json.dumps({ + "from": "daemon", + "to": "pangtong-fujunshi", + "title": f"工具链事件处理失败: {task_id}", + "text": ( + f"任务 {task_id} 验证失败: {reason}\n" + f"证据: {evidence}\n\n请人工检查。" + ), + "type": "inform", + }, ensure_ascii=False) + try: + subprocess.run( + [ + "curl", "-s", "-X", "POST", + "http://localhost:8083/api/mail", + "-H", "Content-Type: application/json", + "-d", payload, + ], + timeout=5, + capture_output=True, + ) + logger.info("Toolchain %s: sent failure notification to pangtong-fujunshi via Mail API", task_id) + except Exception as e: + logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e) From 4a4e99f73885efe06465221225be2eafcbb820f5 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Wed, 10 Jun 2026 21:44:47 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20S1-S3=20review=20suggestions=20?= =?UTF-8?q?=E2=80=94=20type=20annotations=20unified,=20urllib=20replaces?= =?UTF-8?q?=20curl,=20rich=20notification=20content?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/daemon/mail_handler.py | 4 +- src/daemon/toolchain_handler.py | 94 +++++++++++++++++++++++++-------- 2 files changed, 74 insertions(+), 24 deletions(-) diff --git a/src/daemon/mail_handler.py b/src/daemon/mail_handler.py index 2221725..d14dadf 100644 --- a/src/daemon/mail_handler.py +++ b/src/daemon/mail_handler.py @@ -7,7 +7,7 @@ from __future__ import annotations import json import logging from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, Optional from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext @@ -34,7 +34,7 @@ class MailHandler(BaseTaskHandler): composer.add_many(self.get_sections()) return composer.compose(context) - def get_sections(self) -> List: + def get_sections(self) -> list: return [MailContextSection(), MailApiSection(), MailConstraintsSection()] def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: diff --git a/src/daemon/toolchain_handler.py b/src/daemon/toolchain_handler.py index 16152e1..8e33799 100644 --- a/src/daemon/toolchain_handler.py +++ b/src/daemon/toolchain_handler.py @@ -6,9 +6,9 @@ from __future__ import annotations import json import logging -import subprocess +import urllib.request from pathlib import Path -from typing import Dict, List +from typing import Dict from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext @@ -129,7 +129,7 @@ class ToolchainHandler(BaseTaskHandler): """auto_working:pending → working""" return self._auto_mark_working(task_id, db_path) - def get_sections(self) -> List: + def get_sections(self) -> list: """返回 3 个 Toolchain PromptSection 实例""" return [ ToolchainContextSection(), @@ -176,31 +176,81 @@ class ToolchainHandler(BaseTaskHandler): """验证失败 → 标 failed + Mail API 通知主公""" self._mark_task_status(db_path, task_id, "failed") logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason) - self._notify_via_mail_api(task_id, verify.reason, verify.evidence) - def _notify_via_mail_api(self, task_id: str, reason: str, evidence: str) -> None: - """通过 Mail API 发通知给主公""" + # 从 db 读取事件上下文 + event_type = "" + event_data: Dict = {} + try: + conn = get_connection(db_path) + row = conn.execute( + "SELECT must_haves FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if row and row["must_haves"]: + meta = json.loads(row["must_haves"]) + event_type = meta.get("event_type", "") + raw = meta.get("event_data", "{}") + event_data = json.loads(raw) if isinstance(raw, str) else raw + conn.close() + except Exception: + pass + + self._notify_via_mail_api( + task_id, verify.reason, verify.evidence, + event_type, event_data, + ) + + def _notify_via_mail_api( + self, + task_id: str, + reason: str, + evidence: str, + event_type: str, + event_data: Dict, + ) -> None: + """通过 Mail API 发送丰富的失败通知给主公。""" + # 构建行动指引 + action_hint = "请检查黑板任务并手动处理。" + et_lower = event_type.lower() + if "ci" in et_lower or "deploy" in et_lower: + action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。" + elif "review" in et_lower: + action_hint = "建议查看 PR review 状态,必要时通知相关开发者。" + elif "issue" in et_lower: + action_hint = "建议创建任务派给对应开发者处理 Issue。" + + # 构建事件详情 + event_details = "" + if event_data: + event_details = "\n".join( + f" - {k}: {v}" for k, v in event_data.items() + ) + + title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" + text = ( + f"任务 {task_id} 验证失败\n\n" + f"事件类型: {event_type or '未知'}\n" + f"事件详情:\n{event_details or ' (无)'}\n\n" + f"失败原因: {reason}\n" + f"证据: {evidence}\n\n" + f"黑板任务: http://localhost:8083/ → 项目 _toolchain → 任务 {task_id}\n\n" + f"行动指引: {action_hint}" + ) + payload = json.dumps({ "from": "daemon", "to": "pangtong-fujunshi", - "title": f"工具链事件处理失败: {task_id}", - "text": ( - f"任务 {task_id} 验证失败: {reason}\n" - f"证据: {evidence}\n\n请人工检查。" - ), + "title": title, + "text": text, "type": "inform", - }, ensure_ascii=False) + }, ensure_ascii=False).encode("utf-8") + try: - subprocess.run( - [ - "curl", "-s", "-X", "POST", - "http://localhost:8083/api/mail", - "-H", "Content-Type: application/json", - "-d", payload, - ], - timeout=5, - capture_output=True, + req = urllib.request.Request( + "http://localhost:8083/api/mail", + data=payload, + headers={"Content-Type": "application/json"}, ) - logger.info("Toolchain %s: sent failure notification to pangtong-fujunshi via Mail API", task_id) + urllib.request.urlopen(req, timeout=5) + logger.info("Toolchain %s: sent failure notification via Mail API", task_id) except Exception as e: logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)