"""toolchain_handler.py — 工具链事件 handler。 处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。 """ from __future__ import annotations import json import logging import urllib.request from pathlib import Path from typing import Dict from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.handler.toolchain") # --------------------------------------------------------------------------- # Toolchain PromptSections # --------------------------------------------------------------------------- class ToolchainContextSection: """事件类型 + 事件详情(priority=10)""" name: str = "toolchain_context" priority: int = 10 def render(self, context: PromptContext) -> str: event_type = context.event_type event_data: Dict = context.event_data or {} if event_type in _TEMPLATE_MAP: # 使用模板引擎渲染已知事件 variables = {k: str(v) for k, v in event_data.items()} return render_template(event_type, variables) # fallback:通用事件描述 lines = ["## 工具链事件", ""] lines.append(f"- **事件类型**: {event_type or '未知'}") if event_data: lines.append("- **事件详情**:") for key, value in event_data.items(): lines.append(f" - {key}: {value}") lines.append("") return "\n".join(lines) def should_include(self, context: PromptContext) -> bool: return True class ToolchainApiSection: """API 操作指令(priority=40),success_status=done""" name: str = "toolchain_api" priority: int = 40 API_HOST = "localhost:8083" def render(self, context: PromptContext) -> str: lines = [ "## API 操作指令", "", f"项目 ID: `{context.project_id}`", f"任务 ID: `{context.task_id}`", "", "### 完成后必须更新任务状态", "完成后务必通过以下命令将任务标记为 **done**:", "```bash", f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\', ' -H "Content-Type: application/json" \\', ' -d \'{"status": "done"}\'', "```", "", "### 提交产出", "如有产出(如 review 结果、修复方案),提交到任务 outputs:", "```bash", f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\', ' -H "Content-Type: application/json" \\', ' -d \'{"content": "<你的产出内容>", "type": "text"}\'', "```", "", ] return "\n".join(lines) def should_include(self, context: PromptContext) -> bool: return True class ToolchainConstraintsSection: """硬约束(priority=50)""" name: str = "toolchain_constraints" priority: int = 50 def render(self, context: PromptContext) -> str: lines = [ "## 硬约束", "", "1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成", "2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态", "3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务", "4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done", "5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务", "", ] return "\n".join(lines) def should_include(self, context: PromptContext) -> bool: return True # --------------------------------------------------------------------------- # ToolchainHandler # --------------------------------------------------------------------------- class ToolchainHandler(BaseTaskHandler): """工具链事件 handler。""" task_type = "toolchain" virtual_project = "_toolchain" display_name = "工具链事件" def target_success_status(self) -> str: return "done" def pre_spawn(self, task_id: str, db_path: Path) -> bool: """auto_working:pending → working""" return self._auto_mark_working(task_id, db_path) def get_sections(self) -> list: """返回 3 个 Toolchain PromptSection 实例""" return [ ToolchainContextSection(), ToolchainApiSection(), ToolchainConstraintsSection(), ] def build_prompt(self, context: PromptContext) -> str: """通过 PromptComposer 拼装 sections 为最终 prompt""" composer = PromptComposer() composer.add_many(self.get_sections()) return composer.compose(context) def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: """检查行动输出(output 或 comment 有实质内容)""" try: conn = get_connection(db_path) try: # 检查 output output_count = conn.execute( "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) ).fetchone()[0] if output_count > 0: return VerifyResult(True, "has_output", f"output_count={output_count}") # 检查 comment(非系统、有实质内容) comment_count = conn.execute( "SELECT COUNT(*) FROM comments WHERE task_id=? " "AND author != 'system' AND LENGTH(content) >= 20", (task_id,) ).fetchone()[0] if comment_count > 0: return VerifyResult(True, "has_comment", f"comment_count={comment_count}") return VerifyResult(False, "no_action", "output=0, comment=0") finally: conn.close() except Exception as e: logger.error("Toolchain %s: verify error: %s", task_id, e) return VerifyResult(False, "verify_error", str(e)) def on_failure(self, task_id: str, agent_id: str, db_path: Path, verify: VerifyResult) -> None: """验证失败 → 标 failed + Mail API 通知主公""" self._mark_task_status(db_path, task_id, "failed") logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason) # 从 db 读取事件上下文 event_type = "" event_data: Dict = {} try: conn = get_connection(db_path) row = conn.execute( "SELECT must_haves FROM tasks WHERE id=?", (task_id,) ).fetchone() if row and row["must_haves"]: meta = json.loads(row["must_haves"]) event_type = meta.get("event_type", "") raw = meta.get("event_data", "{}") event_data = json.loads(raw) if isinstance(raw, str) else raw conn.close() except Exception: pass self._notify_via_mail_api( task_id, verify.reason, verify.evidence, event_type, event_data, ) def _build_gitea_links(self, event_type: str, event_data: dict) -> str: """根据事件类型构建 Gitea 链接。""" links = [] repo = event_data.get("repo", "") base_url = "http://192.168.2.154:3000" if "pr_number" in event_data: links.append(f"PR: {base_url}/{repo}/pulls/{event_data['pr_number']}") if "issue_number" in event_data: links.append(f"Issue: {base_url}/{repo}/issues/{event_data['issue_number']}") if "commit" in event_data: links.append(f"Commit: {base_url}/{repo}/commit/{event_data['commit']}") if "branch" in event_data and "commit" not in event_data: links.append(f"分支: {event_data['branch']}") return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)" def _notify_via_mail_api( self, task_id: str, reason: str, evidence: str, event_type: str, event_data: Dict, ) -> None: """通过 Mail API 发送丰富的失败通知给主公。""" # 构建行动指引 action_hint = "请检查黑板任务并手动处理。" et_lower = event_type.lower() if "ci" in et_lower or "deploy" in et_lower: action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。" elif "review" in et_lower: action_hint = "建议查看 PR review 状态,必要时通知相关开发者。" elif "issue" in et_lower: action_hint = "建议创建任务派给对应开发者处理 Issue。" # 构建事件详情 event_details = "" if event_data: event_details = "\n".join( f" - {k}: {v}" for k, v in event_data.items() ) # 构建 Gitea 链接 gitea_links = self._build_gitea_links(event_type, event_data) title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" text = ( f"任务 {task_id} 验证失败\n\n" f"事件类型: {event_type or '未知'}\n" f"事件详情:\n{event_details or ' (无)'}\n\n" f"失败原因: {reason}\n" f"证据: {evidence}\n\n" f"{gitea_links}\n\n" f"行动指引: {action_hint}" ) payload = json.dumps({ "from": "daemon", "to": "pangtong-fujunshi", "title": title, "text": text, "type": "inform", }, ensure_ascii=False).encode("utf-8") try: req = urllib.request.Request( "http://localhost:8083/api/mail", data=payload, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=5) logger.info("Toolchain %s: sent failure notification via Mail API", task_id) except Exception as e: logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)