"""toolchain_handler.py - 工具链事件 handler。 处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。 L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)。 """ from __future__ import annotations import json import logging import os import urllib.request from pathlib import Path from typing import Dict, List from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.handler.toolchain") # --------------------------------------------------------------------------- # Gitea API 配置 # --------------------------------------------------------------------------- _GITEA_BASE = "http://192.168.2.154:3000/api/v1" _GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "") # action_type → action_hint 映射 _ACTION_HINTS: Dict[str, str] = { "review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。", "review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。", "review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。", "review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。", "ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。", "issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。", "deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。", "mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。", "review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。", "infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。", } # --------------------------------------------------------------------------- # Toolchain PromptSections # --------------------------------------------------------------------------- class ToolchainContextSection: """事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)""" name: str = "toolchain_context" priority: int = 10 def render(self, context: PromptContext) -> str: event_type = context.event_type event_data: Dict = context.event_data or {} # Part 1: 事件信息(现有模板引擎) if event_type in _TEMPLATE_MAP: variables = {k: str(v) for k, v in event_data.items()} event_text = render_template(event_type, variables) else: lines = ["## 工具链事件", ""] lines.append(f"- **事件类型**: {event_type or '未知'}") if event_data: lines.append("- **事件详情**:") for key, value in event_data.items(): lines.append(f" - {key}: {value}") lines.append("") event_text = "\n".join(lines) # Part 2: 结构化编号步骤(新增,从 action_steps 渲染) steps: List[str] = context.action_steps or [] if steps: step_lines = ["", "### 必须执行的步骤", ""] for i, step in enumerate(steps, 1): step_lines.append(f"{i}. {step}") steps_text = "\n".join(step_lines) else: steps_text = "" # Part 3: action 指引(新增,按 action_type 选择) action_hint = _ACTION_HINTS.get( context.action_type, "你收到一个工具链事件,这是一个需要你执行动作的事件。", ) return f"{action_hint}\n\n{event_text}{steps_text}" def should_include(self, context: PromptContext) -> bool: return True class ToolchainApiSection: """API 操作指令(priority=40)-- action_report 提交指引""" name: str = "toolchain_api" priority: int = 40 API_HOST = "localhost:8083" def render(self, context: PromptContext) -> str: task_id = context.task_id project_id = context.project_id agent_id = context.agent_id lines = [ "## API 操作指令", "", f"项目 ID: `{project_id}`", f"任务 ID: `{task_id}`", "", "### 完成后必须提交 action report", "", "执行完所有步骤后,必须提交 action report:", "```bash", f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\', ' -H "Content-Type: application/json" \\', f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'', "```", "", "⚠️ 不提交 action report 的任务会被标记为 failed。", "", "### 提交产出", "", "如有产出(如 review 结果、修复方案),提交到任务 outputs:", "```bash", f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\', ' -H "Content-Type: application/json" \\', ' -d \'{"content": "<你的产出内容>", "type": "text"}\'', "```", "", "### 需要其他角色支持时", "", "如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:", "```bash", f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\', ' -H "Authorization: token " \\', ' -H "Content-Type: application/json" \\', ' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'', "```", "", "⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。", "", ] return "\n".join(lines) def should_include(self, context: PromptContext) -> bool: return True class ToolchainConstraintsSection: """硬约束 + Red Flags(priority=50)""" name: str = "toolchain_constraints" priority: int = 50 def render(self, context: PromptContext) -> str: lines = [ "## 硬约束(必须遵守)", "", "⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。", "", "### 1. 必须按步骤执行", '- 检查上方“必须执行的步骤”列表', '- 逐条执行每个步骤,不可跳过', '- 不要只读不做——这不是纯通知', "", "### 2. 必须提交 action report", '- 执行完所有步骤后,必须提交 action report', "- 提交方式:POST comment(comment_type='action_report')", '- 报告内容:简要描述你执行了什么操作、结果如何', '- ⚠️ 不提交 action report 的任务会被标记为 failed', "", "### 3. 不要执行任何状态转换命令", '- 不要手动标 working/done/review/failed,系统会自动处理', "", "### 4. 不需要回复", '- action report 就是你的完成凭证', '- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成', "", "### 5. 所有协作通过 Gitea 完成", '- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方', '- 不要使用 Mail API(飞鸽传书)发送消息', '- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕', "", "### Red Flags(如果脑海中出现以下想法,说明你错了)", "", '| Agent 想法 | Red Flag 驳回 |', '|------------|--------------|', '| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |', '| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |', '| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |', '| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |', '| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |', '| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |', "", ] return "\n".join(lines) def should_include(self, context: PromptContext) -> bool: return True # --------------------------------------------------------------------------- # ToolchainHandler # --------------------------------------------------------------------------- class ToolchainHandler(BaseTaskHandler): """工具链事件 handler。""" task_type = "toolchain" virtual_project = "_toolchain" display_name = "工具链事件" def target_success_status(self) -> str: return "done" def pre_spawn(self, task_id: str, db_path: Path) -> bool: """auto_working:pending → working""" return self._auto_mark_working(task_id, db_path) def get_sections(self) -> list: """返回 Toolchain PromptSection 实例""" return [ ToolchainContextSection(), ToolchainApiSection(), ToolchainConstraintsSection(), GiteaConventionSection(), WikiGuideSection(), ] def build_prompt(self, context: PromptContext) -> str: """通过 PromptComposer 拼装 sections 为最终 prompt""" composer = PromptComposer() composer.add_many(self.get_sections()) return composer.compose(context) def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: """检查 action report(精确验证)+ 三层 fallback""" try: conn = get_connection(db_path) try: # 特殊处理:infrastructure_failure 始终通过(防递归) row = conn.execute( "SELECT must_haves FROM tasks WHERE id=?", (task_id,) ).fetchone() if row and row["must_haves"]: try: meta = json.loads(row["must_haves"]) except Exception: meta = {} if meta.get("action_type") == "infrastructure_failure": return VerifyResult(True, "infrastructure_passthrough", "infrastructure_failure auto-pass") # 特殊处理:review_merged 始终通过(纯通知) if meta.get("action_type") == "review_merged": return VerifyResult(True, "merged_passthrough", "review_merged auto-pass") # 1. 优先检查 action_report comment report_row = conn.execute( "SELECT id FROM comments WHERE task_id=? " "AND comment_type='action_report' LIMIT 1", (task_id,) ).fetchone() if report_row: return VerifyResult(True, "has_action_report", "action_report found") # 2. fallback:检查 output(向后兼容) output_count = conn.execute( "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) ).fetchone()[0] if output_count > 0: return VerifyResult(True, "has_output", f"output_count={output_count}") # 3. fallback:检查有实质内容的 comment(向后兼容) comment_count = conn.execute( "SELECT COUNT(*) FROM comments WHERE task_id=? " "AND author != 'system' AND LENGTH(body) >= 20", (task_id,) ).fetchone()[0] if comment_count > 0: return VerifyResult(True, "has_comment", f"comment_count={comment_count}") return VerifyResult(False, "no_action", "no action_report, no output, no valid comment") finally: conn.close() except Exception as e: logger.error("Toolchain %s: verify error: %s", task_id, e) return VerifyResult(False, "verify_error", str(e)) def on_failure(self, task_id: str, agent_id: str, db_path: Path, verify: VerifyResult) -> None: """验证失败 → 三分路处理(业务/系统/基础设施)""" self._mark_task_status(db_path, task_id, "failed") logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason) # 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取 meta = {} assignee = agent_id try: conn = get_connection(db_path) row = conn.execute( "SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,) ).fetchone() if row: if row["must_haves"]: meta = json.loads(row["must_haves"]) assignee = row["assignee"] or agent_id conn.close() except Exception: pass action_type = meta.get("action_type", "") context_data = meta.get("context", {}) # 三分路决策 route = self._classify_failure(verify) if route == "business": self._handle_business_failure( task_id, agent_id, verify, action_type, context_data, assignee, db_path) elif route == "system": self._handle_system_failure( task_id, agent_id, verify, action_type, context_data, db_path) else: # infrastructure self._handle_infrastructure_failure( task_id, agent_id, verify, db_path) def _classify_failure(self, verify: VerifyResult) -> str: """分类失败类型:business / infrastructure(system 通过升级到达)""" # verify_error 或 DB 不可用 → 基础设施失败 if verify.reason == "verify_error": return "infrastructure" # 默认:业务失败 return "business" def _handle_business_failure( self, task_id: str, agent_id: str, verify: VerifyResult, action_type: str, context_data: dict, assignee: str, db_path: Path, ) -> None: """业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee""" repo = context_data.get("repo", "") pr_number = context_data.get("pr_number") or context_data.get("issue_number", "") if repo and pr_number: comment_body = ( f"@{assignee or agent_id} 工具链任务执行失败\n\n" f"任务 ID: {task_id}\n" f"失败原因: {verify.reason}\n" f"证据: {verify.evidence}\n\n" f"请检查黑板任务并处理。" ) success = self._create_gitea_comment(repo, pr_number, comment_body) if success: logger.info("Toolchain %s: business failure → Gitea comment on %s#%s", task_id, repo, pr_number) return # Gitea API failed → escalate to system failure logger.warning( "Toolchain %s: Gitea comment failed, escalating to system failure", task_id) self._handle_system_failure( task_id, agent_id, verify, action_type, context_data, db_path) else: # 没有 PR/Issue 关联 → fallback 到系统失败 logger.warning( "Toolchain %s: no PR/Issue context for business failure, " "escalating to system failure", task_id) self._handle_system_failure( task_id, agent_id, verify, action_type, context_data, db_path) def _handle_system_failure( self, task_id: str, agent_id: str, verify: VerifyResult, action_type: str, context_data: dict, db_path: Path, ) -> None: """系统失败 → 创建 Gitea Issue @pangtong-fujunshi""" repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2") title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" body = ( f"任务 {task_id} 验证失败\n\n" f"事件类型: {action_type or '未知'}\n" f"失败原因: {verify.reason}\n" f"证据: {verify.evidence}\n\n" f"@pangtong-fujunshi 请检查黑板任务并手动处理。" ) # 尝试在 Gitea 创建 Issue created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"]) if created: logger.info("Toolchain %s: system failure → Gitea Issue created on %s", task_id, repo) else: # Gitea API 不可用 → 基础设施失败 logger.error( "Toolchain %s: Gitea API unavailable, escalating to infrastructure failure", task_id) self._handle_infrastructure_failure( task_id, agent_id, verify, db_path) def _handle_infrastructure_failure( self, task_id: str, agent_id: str, verify: VerifyResult, db_path: Path, ) -> None: """基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)""" try: from datetime import datetime new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}" must_hives = json.dumps({ "event_type": "infrastructure_failure", "action_type": "infrastructure_failure", "steps": [ "检查 Gitea 服务状态(http://192.168.2.154:3000)", "检查网络连通性", "恢复后提交 action report", ], "context": {"original_task_id": task_id, "verify_reason": verify.reason}, "from": "system", "source": "toolchain_handler_on_failure", }, ensure_ascii=False) conn = get_connection(db_path) conn.execute( "INSERT INTO tasks (id, title, description, assignee, assigned_by, " "must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( new_task_id, f"[基础设施] Gitea API 不可用 - {task_id}", f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n" f"请检查 Gitea 服务状态和网络连通性。", "jiangwei-infra", "system", must_hives, "toolchain", "pending", ) ) conn.commit() conn.close() logger.info( "Toolchain %s: infrastructure failure → task %s created for jiangwei-infra", task_id, new_task_id) except Exception as e: logger.error( "Toolchain %s: failed to create infrastructure_failure task: %s", task_id, e) # ----------------------------------------------------------------------- # Gitea API 辅助 # ----------------------------------------------------------------------- def _create_gitea_comment( self, repo: str, pr_number: int, body: str, ) -> bool: """在 PR/Issue 上创建 comment。返回是否成功。""" if not _GITEA_TOKEN: return False payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8") try: req = urllib.request.Request( f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments", data=payload, headers={ "Authorization": f"token {_GITEA_TOKEN}", "Content-Type": "application/json", }, ) urllib.request.urlopen(req, timeout=5) return True except Exception as e: logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e) return False def _create_gitea_issue( self, repo: str, title: str, body: str, assignees: list = None, ) -> bool: """创建 Gitea Issue。返回是否成功。""" if not _GITEA_TOKEN: return False data = {"title": title, "body": body} if assignees: data["assignees"] = assignees payload = json.dumps(data, ensure_ascii=False).encode("utf-8") try: req = urllib.request.Request( f"{_GITEA_BASE}/repos/{repo}/issues", data=payload, headers={ "Authorization": f"token {_GITEA_TOKEN}", "Content-Type": "application/json", }, ) urllib.request.urlopen(req, timeout=5) return True except Exception as e: logger.warning("Gitea create issue failed on %s: %s", repo, e) return False # ----------------------------------------------------------------------- # 兼容:保留旧方法签名(但不再被 on_failure 调用) # ----------------------------------------------------------------------- def _build_gitea_links(self, event_type: str, event_data: dict) -> str: """根据事件类型构建 Gitea 链接。""" links = [] repo = event_data.get("repo", "") base_url = "http://192.168.2.154:3000" if "pr_number" in event_data: links.append(f"PR: {base_url}/{repo}/pulls/{event_data['pr_number']}") if "issue_number" in event_data: links.append(f"Issue: {base_url}/{repo}/issues/{event_data['issue_number']}") if "commit" in event_data: links.append(f"Commit: {base_url}/{repo}/commit/{event_data['commit']}") if "branch" in event_data and "commit" not in event_data: links.append(f"分支: {event_data['branch']}") return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)"