diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 4b98af7..5b1fbb8 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -42,12 +42,23 @@ jobs: - name: Setup Python run: | + rm -rf /tmp/ci-venv-test python3 -m venv /tmp/ci-venv-test /tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx + - name: Debug environment + run: | + echo "PWD=$(pwd)" + echo "PYTHONPATH=$PYTHONPATH" + python3 -c "import sys; [print(p) for p in sys.path if 'sanguo' in p.lower() or 'openclaw' in p.lower()]" + grep -c "assignee = agent_id" src/daemon/toolchain_handler.py || true + grep -c "_BUSINESS_FAIL_THRESHOLD" src/daemon/toolchain_handler.py || true + - name: Run tests (exclude E2E) run: | - /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q + PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q || \ + (echo '=== RETRY WITH VERBOSE ===' && \ + PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -v 2>&1 | tail -30) # ── Job 3: CI 失败通知 ─────────────────────────────── # 使用 needs..result 直接判断,不查询 commit status API diff --git a/src/api/toolchain_routes.py b/src/api/toolchain_routes.py index 36ee3b3..39ed51d 100644 --- a/src/api/toolchain_routes.py +++ b/src/api/toolchain_routes.py @@ -50,7 +50,15 @@ router = APIRouter(tags=["toolchain"]) _delivery_cache: Set[str] = set() _delivery_timestamps: List[Tuple[float, str]] = [] _TTL_SECONDS = 7 * 24 * 3600 -_idempotency_lock = asyncio.Lock() +_idempotency_lock: Optional[asyncio.Lock] = None + + +def _get_idempotency_lock() -> asyncio.Lock: + """懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。""" + global _idempotency_lock + if _idempotency_lock is None: + _idempotency_lock = asyncio.Lock() + return _idempotency_lock def _is_duplicate(event: str, delivery: str, @@ -189,6 +197,7 @@ def _calc_risk_level(changed_files: List[str]) -> str: MAIL_PROJECT_ID = "_mail" +TOOLCHAIN_PROJECT_ID = "_toolchain" def _mail_db_path() -> Path: @@ -200,6 +209,73 @@ def _mail_db_path() -> Path: return db +def _toolchain_db_path() -> Path: + """获取 Toolchain 数据库路径,确保目录和表存在。""" + root = get_data_root() + db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db" + db.parent.mkdir(parents=True, exist_ok=True) + init_db(db) + return db + + +def _send_toolchain_task( + to_agent: str, + title: str, + description: str, + event_type: str, + action_type: str, + steps: list, + context_data: dict | None = None, + source: str = "webhook", +) -> str: + """创建 Toolchain Task 并写入 _toolchain DB。 + + Args: + to_agent: 收件人 Agent ID + title: 任务标题 + description: 任务描述(模板渲染后的事件信息) + event_type: 事件类型(review_result / ci_failure / ...) + action_type: 动作分类(用于步骤选择和日志统计) + steps: 结构化编号步骤列表 + context_data: 事件上下文数据(PR 号、仓库名等) + source: 来源标识 + + Returns: + 创建的 Task ID + """ + if to_agent not in AGENT_IDS: + logger.warning("Unknown agent: %s, skipping toolchain task", to_agent) + return "" + + task_id = f"tc-{int(datetime.now().timestamp() * 1000)}" + must_hives = json.dumps({ + "event_type": event_type, + "action_type": action_type, + "steps": steps, + "context": context_data or {}, + "from": "system", + "source": source, + }, ensure_ascii=False) + + task = Task( + id=task_id, + title=title, + description=description, + assignee=to_agent, + assigned_by="system", + must_haves=must_hives, + task_type="toolchain", + status="pending", + ) + bb = Blackboard(_toolchain_db_path()) + bb.create_task(task) + logger.info( + "Toolchain task sent: %s → %s [%s] action_type=%s", + title[:40], to_agent, task_id, action_type, + ) + return task_id + + def _send_mail( to_agent: str, title: str, @@ -327,7 +403,25 @@ async def _send_mention_mails( }) title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})" - _send_mail(agent_id, title, text) + _send_toolchain_task( + to_agent=agent_id, + title=title, + description=text, + event_type="mention", + action_type="mention", + steps=[ + "按上方 mention 模板中的 response_guidance 执行", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "source_type": source_type, + "source_url": source_url, + "commenter": commenter, + "content_snippet": content[:500], + "repo": repo, + "issue_number": issue_number, + }, + ) # --------------------------------------------------------------------------- @@ -379,7 +473,27 @@ async def _handle_pr_opened(payload: Dict[str, Any]) -> None: }) title = f"Review 请求: {pr_title} ({repo}#{pr_number})" - _send_mail("simayi-challenger", title, text) + _send_toolchain_task( + to_agent="simayi-challenger", + title=title, + description=text, + event_type="review_request", + action_type="review_request", + steps=[ + f"读取 PR diff(Gitea API: GET /repos/{repo}/pulls/{pr_number}.diff)", + "按审查清单审查(参考 code-review Skill)", + f"提交 Review(Gitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "pr_number": pr_number, + "repo": repo, + "pr_title": pr_title, + "pr_author": pr_author, + "branch": branch, + "risk_level": risk_level, + }, + ) # S3: PR body @mention 通知 pr_body = pr.get("body", "") or "" @@ -488,7 +602,25 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: }) title = f"Review 评论: {pr_title} ({repo}#{pr_number})" - _send_mail(pr_author, title, text) + _send_toolchain_task( + to_agent=pr_author, + title=title, + description=text, + event_type="review_comment", + action_type="review_comment", + steps=[ + f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments)", + "根据评论内容响应(修改代码或在 PR 上回复 comment)", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "pr_number": pr_number, + "repo": repo, + "pr_title": pr_title, + "reviewer": reviewer, + "comment_body": review_body, + }, + ) # S5: Review body @mention 通知(COMMENTED 路径) await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number) @@ -510,7 +642,34 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: }) title = f"Review {result}: {pr_title} ({repo}#{pr_number})" - _send_mail(pr_author, title, text) + if state == "APPROVED": + tc_steps = [ + f"合并 PR(Gitea API: POST /repos/{repo}/pulls/{pr_number}/merge)", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ] + else: # REQUEST_CHANGES + tc_steps = [ + "按审查意见逐条修改代码", + "push 到原分支 → CI 自动跑", + "CI 通过后等重新 Review", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ] + _send_toolchain_task( + to_agent=pr_author, + title=title, + description=text, + event_type="review_result", + action_type="review_result", + steps=tc_steps, + context_data={ + "pr_number": pr_number, + "repo": repo, + "pr_title": pr_title, + "result": result, + "reviewer": reviewer, + "review_body": review_body, + }, + ) # S5: Review body @mention 通知(非 COMMENTED 路径) await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number) @@ -579,11 +738,31 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None: }) title = f"PR 更新: {pr_title} ({repo}#{pr_number})" - _send_mail(reviewer, title, text) + _send_toolchain_task( + to_agent=reviewer, + title=title, + description=text, + event_type="review_updated", + action_type="review_updated", + steps=[ + f"读取 PR diff(Gitea API: GET /repos/{repo}/pulls/{pr_number}.diff)", + "重点检查上次 Review 意见的修改部分", + f"提交 Review(Gitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "pr_number": pr_number, + "repo": repo, + "pr_title": pr_title, + "pr_author": pr_author, + "new_sha": new_sha, + "reviewer": reviewer, + }, + ) -def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: str) -> None: - """CD 部署失败通知,复用 deploy_failure 模板""" +def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None: + """CD 部署失败通知,走 ToolchainHandler。""" text = render_template("deploy_failure", { "repo": repo, "commit_sha": f"PR #{pr_number}", @@ -591,7 +770,25 @@ def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})" full_text = f"{text}\n\n失败原因: {reason}" for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): - _send_mail(agent_id, title, full_text) + _send_toolchain_task( + to_agent=agent_id, + title=title, + description=full_text, + event_type="deploy_failure", + action_type="deploy_failure", + steps=[ + "检查 deploy 日志", + "排查失败原因", + "修复并重新部署", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "repo": repo, + "pr_number": pr_number, + "pr_title": pr_title, + "reason": reason, + }, + ) async def _handle_pr_closed(payload: Dict[str, Any]) -> None: @@ -623,7 +820,21 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None: }) title = f"PR 已合并: {pr_title} ({repo}#{pr_number})" - _send_mail(pr_author, title, text) + _send_toolchain_task( + to_agent=pr_author, + title=title, + description=text, + event_type="review_merged", + action_type="review_merged", + steps=[], # 纯通知,无步骤 + context_data={ + "pr_number": pr_number, + "repo": repo, + "pr_title": pr_title, + "pr_author": pr_author, + "merged_by": merged_by, + }, + ) # 自动部署:git pull + rsync + 按需 post_deploy try: @@ -676,7 +887,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None: if rsync_proc.returncode != 0: logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode()) - _send_deploy_failure_mail(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}") + _send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}") return # Step 3: 判断是否需要执行 post_deploy @@ -731,7 +942,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None: if deploy_proc.returncode != 0: logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode()) - _send_deploy_failure_mail(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}") + _send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}") break else: logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5])) @@ -740,7 +951,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None: except asyncio.TimeoutError: logger.error("Auto-deploy: timeout for %s", repo) - _send_deploy_failure_mail(repo, pr_number, pr_title, "部署超时") + _send_deploy_failure_task(repo, pr_number, pr_title, "部署超时") except Exception as e: logger.error("Auto-deploy: unexpected error: %s", e) @@ -787,7 +998,29 @@ async def _handle_issues(payload: Dict[str, Any]) -> None: }) title = f"Issue 指派: {issue_title} ({repo}#{issue_number})" - _send_mail(assignee, title, text) + _send_toolchain_task( + to_agent=assignee, + title=title, + description=text, + event_type="issue_assigned", + action_type="issue_assigned", + steps=[ + f"创建分支 fix/{issue_number}-{brief}", + "编码 + 写 UT", + "push → 等 CI", + f"CI 通过后创建 PR(Gitea API: POST /repos/{repo}/pulls)", + "等 Review", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "issue_number": issue_number, + "repo": repo, + "issue_title": issue_title, + "labels": labels, + "issue_body": issue_body or "(无描述)", + "brief": brief, + }, + ) elif action == "opened": if "部署失败" in issue_title: @@ -802,7 +1035,23 @@ async def _handle_issues(payload: Dict[str, Any]) -> None: title = f"部署失败: {repo}" for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): - _send_mail(agent_id, title, text) + _send_toolchain_task( + to_agent=agent_id, + title=title, + description=text, + event_type="deploy_failure", + action_type="deploy_failure", + steps=[ + "检查 deploy 日志", + "排查失败原因", + "修复并重新部署", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "repo": repo, + "commit_sha": commit_sha or "(未知)", + }, + ) # Issue body @mention(opened 时检查) issue_body = issue.get("body", "") or "" @@ -869,7 +1118,25 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None: }) title = f"CI 失败: {repo}#{issue_number}" - _send_mail(pr_author, title, text) + _send_toolchain_task( + to_agent=pr_author, + title=title, + description=text, + event_type="ci_failure", + action_type="ci_failure", + steps=[ + "查看完整 CI 日志(PR 页面或 Gitea Actions 页面)", + "修复失败的测试", + "push → CI 自动重跑", + "提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks//comments,comment_type=action_report)", + ], + context_data={ + "pr_number": issue_number, + "repo": repo, + "branch": branch, + "error_summary": error_summary, + }, + ) # CI 处理完不 return,继续检查 @mention # === 路径 2:@mention 通知(新增,独立路径) === @@ -960,7 +1227,7 @@ async def gitea_webhook( # 2. 幂等检查(需要在 payload 解析后,以支持内容去重) if x_gitea_event and x_gitea_delivery: - async with _idempotency_lock: + async with _get_idempotency_lock(): if _is_duplicate(x_gitea_event, x_gitea_delivery, payload): logger.debug( "Duplicate webhook: %s/%s", diff --git a/src/blackboard/db.py b/src/blackboard/db.py index 4a0c2a5..e92aaeb 100644 --- a/src/blackboard/db.py +++ b/src/blackboard/db.py @@ -293,7 +293,7 @@ _SCHEMA_STATEMENTS = [ id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL REFERENCES tasks(id), author TEXT NOT NULL, - comment_type TEXT NOT NULL DEFAULT 'general' CHECK (comment_type IN ('general','handoff','observation','review','rebuttal','rebuttal_response','debate_argument','debate_rebuttal','debate_judgment')), + comment_type TEXT NOT NULL DEFAULT 'general', body TEXT NOT NULL, mentions TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) diff --git a/src/daemon/prompt_composer.py b/src/daemon/prompt_composer.py index 2eb6fa4..bf7908d 100644 --- a/src/daemon/prompt_composer.py +++ b/src/daemon/prompt_composer.py @@ -65,6 +65,8 @@ class PromptContext: # toolchain 专用 event_type: str = "" # ci_failure / review_request / ... event_data: Dict = field(default_factory=dict) + action_type: str = "" # 动作分类(review_result / ci_failure / ...) + action_steps: list = field(default_factory=list) # 结构化编号步骤列表 # 前序产出 depends_on_outputs: Optional[List] = None diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 91d2770..28451bb 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -286,10 +286,15 @@ class AgentSpawner: # 从 must_haves 解析 mail 元数据(from / performative) from_agent = "" mail_type = "" + action_type = "" + action_steps = [] try: meta = json.loads(must_haves) if must_haves else {} from_agent = meta.get("from", "") mail_type = meta.get("performative", meta.get("type", "")) + # toolchain 字段提取 + action_type = meta.get("action_type", "") + action_steps = meta.get("steps", []) except Exception: pass ctx = PromptContext( @@ -298,6 +303,7 @@ class AgentSpawner: agent_id=agent_id, role=spawn_type, spawn_type=spawn_type, from_agent=from_agent, mail_type=mail_type, + action_type=action_type, action_steps=action_steps, ) return handler.build_prompt(ctx) diff --git a/src/daemon/toolchain_handler.py b/src/daemon/toolchain_handler.py index 3ee37ce..e45736f 100644 --- a/src/daemon/toolchain_handler.py +++ b/src/daemon/toolchain_handler.py @@ -1,14 +1,16 @@ -"""toolchain_handler.py — 工具链事件 handler。 +"""toolchain_handler.py - 工具链事件 handler。 -处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。 +处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。 +L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)。 """ from __future__ import annotations import json import logging +import os import urllib.request from pathlib import Path -from typing import Dict +from typing import Dict, List from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.prompt_composer import PromptComposer, PromptContext @@ -17,13 +19,34 @@ from src.blackboard.db import get_connection logger = logging.getLogger("moziplus-v2.handler.toolchain") +# --------------------------------------------------------------------------- +# Gitea API 配置 +# --------------------------------------------------------------------------- + +_GITEA_BASE = "http://192.168.2.154:3000/api/v1" +_GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "") + +# action_type → action_hint 映射 +_ACTION_HINTS: Dict[str, str] = { + "review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。", + "review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。", + "review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。", + "review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。", + "ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。", + "issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。", + "deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。", + "mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。", + "review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。", + "infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。", +} + # --------------------------------------------------------------------------- # Toolchain PromptSections # --------------------------------------------------------------------------- class ToolchainContextSection: - """事件类型 + 事件详情(priority=10)""" + """事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)""" name: str = "toolchain_context" priority: int = 10 @@ -32,27 +55,44 @@ class ToolchainContextSection: event_type = context.event_type event_data: Dict = context.event_data or {} + # Part 1: 事件信息(现有模板引擎) if event_type in _TEMPLATE_MAP: - # 使用模板引擎渲染已知事件 variables = {k: str(v) for k, v in event_data.items()} - return render_template(event_type, variables) + event_text = render_template(event_type, variables) + else: + lines = ["## 工具链事件", ""] + lines.append(f"- **事件类型**: {event_type or '未知'}") + if event_data: + lines.append("- **事件详情**:") + for key, value in event_data.items(): + lines.append(f" - {key}: {value}") + lines.append("") + event_text = "\n".join(lines) - # fallback:通用事件描述 - lines = ["## 工具链事件", ""] - lines.append(f"- **事件类型**: {event_type or '未知'}") - if event_data: - lines.append("- **事件详情**:") - for key, value in event_data.items(): - lines.append(f" - {key}: {value}") - lines.append("") - return "\n".join(lines) + # Part 2: 结构化编号步骤(新增,从 action_steps 渲染) + steps: List[str] = context.action_steps or [] + if steps: + step_lines = ["", "### 必须执行的步骤", ""] + for i, step in enumerate(steps, 1): + step_lines.append(f"{i}. {step}") + steps_text = "\n".join(step_lines) + else: + steps_text = "" + + # Part 3: action 指引(新增,按 action_type 选择) + action_hint = _ACTION_HINTS.get( + context.action_type, + "你收到一个工具链事件,这是一个需要你执行动作的事件。", + ) + + return f"{action_hint}\n\n{event_text}{steps_text}" def should_include(self, context: PromptContext) -> bool: return True class ToolchainApiSection: - """API 操作指令(priority=40),success_status=done""" + """API 操作指令(priority=40)-- action_report 提交指引""" name: str = "toolchain_api" priority: int = 40 @@ -60,28 +100,48 @@ class ToolchainApiSection: API_HOST = "localhost:8083" def render(self, context: PromptContext) -> str: + task_id = context.task_id + project_id = context.project_id + agent_id = context.agent_id + lines = [ "## API 操作指令", "", - f"项目 ID: `{context.project_id}`", - f"任务 ID: `{context.task_id}`", + f"项目 ID: `{project_id}`", + f"任务 ID: `{task_id}`", "", - "### 完成后必须更新任务状态", - "完成后务必通过以下命令将任务标记为 **done**:", + "### 完成后必须提交 action report", + "", + "执行完所有步骤后,必须提交 action report:", "```bash", - f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\', + f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\', ' -H "Content-Type: application/json" \\', - ' -d \'{"status": "done"}\'', + f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'', "```", "", + "⚠️ 不提交 action report 的任务会被标记为 failed。", + "", "### 提交产出", - "如有产出(如 review 结果、修复方案),提交到任务 outputs:", + "", + "如有产出(如 review 结果、修复方案),提交到任务 outputs:", "```bash", - f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\', + f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\', ' -H "Content-Type: application/json" \\', ' -d \'{"content": "<你的产出内容>", "type": "text"}\'', "```", "", + "### 需要其他角色支持时", + "", + "如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:", + "```bash", + f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\', + ' -H "Authorization: token " \\', + ' -H "Content-Type: application/json" \\', + ' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'', + "```", + "", + "⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。", + "", ] return "\n".join(lines) @@ -90,20 +150,50 @@ class ToolchainApiSection: class ToolchainConstraintsSection: - """硬约束(priority=50)""" + """硬约束 + Red Flags(priority=50)""" name: str = "toolchain_constraints" priority: int = 50 def render(self, context: PromptContext) -> str: lines = [ - "## 硬约束", + "## 硬约束(必须遵守)", "", - "1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成", - "2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态", - "3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务", - "4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done", - "5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务", + "⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。", + "", + "### 1. 必须按步骤执行", + '- 检查上方“必须执行的步骤”列表', + '- 逐条执行每个步骤,不可跳过', + '- 不要只读不做——这不是纯通知', + "", + "### 2. 必须提交 action report", + '- 执行完所有步骤后,必须提交 action report', + "- 提交方式:POST comment(comment_type='action_report')", + '- 报告内容:简要描述你执行了什么操作、结果如何', + '- ⚠️ 不提交 action report 的任务会被标记为 failed', + "", + "### 3. 不要执行任何状态转换命令", + '- 不要手动标 working/done/review/failed,系统会自动处理', + "", + "### 4. 不需要回复", + '- action report 就是你的完成凭证', + '- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成', + "", + "### 5. 所有协作通过 Gitea 完成", + '- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方', + '- 不要使用 Mail API(飞鸽传书)发送消息', + '- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕', + "", + "### Red Flags(如果脑海中出现以下想法,说明你错了)", + "", + '| Agent 想法 | Red Flag 驳回 |', + '|------------|--------------|', + '| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |', + '| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |', + '| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |', + '| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |', + '| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |', + '| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |', "", ] return "\n".join(lines) @@ -127,7 +217,7 @@ class ToolchainHandler(BaseTaskHandler): return "done" def pre_spawn(self, task_id: str, db_path: Path) -> bool: - """auto_working:pending → working""" + """auto_working:pending → working""" return self._auto_mark_working(task_id, db_path) def get_sections(self) -> list: @@ -145,27 +235,55 @@ class ToolchainHandler(BaseTaskHandler): return composer.compose(context) def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: - """检查行动输出(output 或 comment 有实质内容)""" + """检查 action report(精确验证)+ 三层 fallback""" try: conn = get_connection(db_path) try: - # 检查 output + # 特殊处理:infrastructure_failure 始终通过(防递归) + row = conn.execute( + "SELECT must_haves FROM tasks WHERE id=?", (task_id,) + ).fetchone() + if row and row["must_haves"]: + try: + meta = json.loads(row["must_haves"]) + except Exception: + meta = {} + if meta.get("action_type") == "infrastructure_failure": + return VerifyResult(True, "infrastructure_passthrough", + "infrastructure_failure auto-pass") + + # 特殊处理:review_merged 始终通过(纯通知) + if meta.get("action_type") == "review_merged": + return VerifyResult(True, "merged_passthrough", + "review_merged auto-pass") + + # 1. 优先检查 action_report comment + report_row = conn.execute( + "SELECT id FROM comments WHERE task_id=? " + "AND comment_type='action_report' LIMIT 1", + (task_id,) + ).fetchone() + if report_row: + return VerifyResult(True, "has_action_report", "action_report found") + + # 2. fallback:检查 output(向后兼容) output_count = conn.execute( "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) ).fetchone()[0] if output_count > 0: return VerifyResult(True, "has_output", f"output_count={output_count}") - # 检查 comment(非系统、有实质内容) + # 3. fallback:检查有实质内容的 comment(向后兼容) comment_count = conn.execute( "SELECT COUNT(*) FROM comments WHERE task_id=? " - "AND author != 'system' AND LENGTH(content) >= 20", + "AND author != 'system' AND LENGTH(body) >= 20", (task_id,) ).fetchone()[0] if comment_count > 0: return VerifyResult(True, "has_comment", f"comment_count={comment_count}") - return VerifyResult(False, "no_action", "output=0, comment=0") + return VerifyResult(False, "no_action", + "no action_report, no output, no valid comment") finally: conn.close() except Exception as e: @@ -174,32 +292,217 @@ class ToolchainHandler(BaseTaskHandler): def on_failure(self, task_id: str, agent_id: str, db_path: Path, verify: VerifyResult) -> None: - """验证失败 → 标 failed + Mail API 通知主公""" + """验证失败 → 三分路处理(业务/系统/基础设施)""" self._mark_task_status(db_path, task_id, "failed") - logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason) + logger.info("Toolchain %s: verify failed (%s), marked failed", + task_id, verify.reason) - # 从 db 读取事件上下文 - event_type = "" - event_data: Dict = {} + # 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取 + meta = {} + assignee = agent_id try: conn = get_connection(db_path) row = conn.execute( - "SELECT must_haves FROM tasks WHERE id=?", (task_id,) + "SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,) ).fetchone() - if row and row["must_haves"]: - meta = json.loads(row["must_haves"]) - event_type = meta.get("event_type", "") - raw = meta.get("event_data", "{}") - event_data = json.loads(raw) if isinstance(raw, str) else raw + if row: + if row["must_haves"]: + meta = json.loads(row["must_haves"]) + assignee = row["assignee"] or agent_id conn.close() except Exception: pass - self._notify_via_mail_api( - task_id, verify.reason, verify.evidence, - event_type, event_data, + action_type = meta.get("action_type", "") + context_data = meta.get("context", {}) + + # 三分路决策 + route = self._classify_failure(verify) + + if route == "business": + self._handle_business_failure( + task_id, agent_id, verify, action_type, context_data, assignee, db_path) + elif route == "system": + self._handle_system_failure( + task_id, agent_id, verify, action_type, context_data, db_path) + else: # infrastructure + self._handle_infrastructure_failure( + task_id, agent_id, verify, db_path) + + def _classify_failure(self, verify: VerifyResult) -> str: + """分类失败类型:business / infrastructure(system 通过升级到达)""" + # verify_error 或 DB 不可用 → 基础设施失败 + if verify.reason == "verify_error": + return "infrastructure" + # 默认:业务失败 + return "business" + + def _handle_business_failure( + self, task_id: str, agent_id: str, verify: VerifyResult, + action_type: str, context_data: dict, assignee: str, + db_path: Path, + ) -> None: + """业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee""" + repo = context_data.get("repo", "") + pr_number = context_data.get("pr_number") or context_data.get("issue_number", "") + + if repo and pr_number: + comment_body = ( + f"@{assignee or agent_id} 工具链任务执行失败\n\n" + f"任务 ID: {task_id}\n" + f"失败原因: {verify.reason}\n" + f"证据: {verify.evidence}\n\n" + f"请检查黑板任务并处理。" + ) + success = self._create_gitea_comment(repo, pr_number, comment_body) + if success: + logger.info("Toolchain %s: business failure → Gitea comment on %s#%s", + task_id, repo, pr_number) + return + # Gitea API failed → escalate to system failure + logger.warning( + "Toolchain %s: Gitea comment failed, escalating to system failure", + task_id) + self._handle_system_failure( + task_id, agent_id, verify, action_type, context_data, db_path) + else: + # 没有 PR/Issue 关联 → fallback 到系统失败 + logger.warning( + "Toolchain %s: no PR/Issue context for business failure, " + "escalating to system failure", task_id) + self._handle_system_failure( + task_id, agent_id, verify, action_type, context_data, db_path) + + def _handle_system_failure( + self, task_id: str, agent_id: str, verify: VerifyResult, + action_type: str, context_data: dict, db_path: Path, + ) -> None: + """系统失败 → 创建 Gitea Issue @pangtong-fujunshi""" + repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2") + title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" + body = ( + f"任务 {task_id} 验证失败\n\n" + f"事件类型: {action_type or '未知'}\n" + f"失败原因: {verify.reason}\n" + f"证据: {verify.evidence}\n\n" + f"@pangtong-fujunshi 请检查黑板任务并手动处理。" ) + # 尝试在 Gitea 创建 Issue + created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"]) + if created: + logger.info("Toolchain %s: system failure → Gitea Issue created on %s", + task_id, repo) + else: + # Gitea API 不可用 → 基础设施失败 + logger.error( + "Toolchain %s: Gitea API unavailable, escalating to infrastructure failure", + task_id) + self._handle_infrastructure_failure( + task_id, agent_id, verify, db_path) + + def _handle_infrastructure_failure( + self, task_id: str, agent_id: str, + verify: VerifyResult, db_path: Path, + ) -> None: + """基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)""" + try: + from datetime import datetime + new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}" + must_hives = json.dumps({ + "event_type": "infrastructure_failure", + "action_type": "infrastructure_failure", + "steps": [ + "检查 Gitea 服务状态(http://192.168.2.154:3000)", + "检查网络连通性", + "恢复后提交 action report", + ], + "context": {"original_task_id": task_id, "verify_reason": verify.reason}, + "from": "system", + "source": "toolchain_handler_on_failure", + }, ensure_ascii=False) + conn = get_connection(db_path) + conn.execute( + "INSERT INTO tasks (id, title, description, assignee, assigned_by, " + "must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + new_task_id, + f"[基础设施] Gitea API 不可用 - {task_id}", + f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n" + f"请检查 Gitea 服务状态和网络连通性。", + "jiangwei-infra", + "system", + must_hives, + "toolchain", + "pending", + ) + ) + conn.commit() + conn.close() + logger.info( + "Toolchain %s: infrastructure failure → task %s created for jiangwei-infra", + task_id, new_task_id) + except Exception as e: + logger.error( + "Toolchain %s: failed to create infrastructure_failure task: %s", + task_id, e) + + # ----------------------------------------------------------------------- + # Gitea API 辅助 + # ----------------------------------------------------------------------- + + def _create_gitea_comment( + self, repo: str, pr_number: int, body: str, + ) -> bool: + """在 PR/Issue 上创建 comment。返回是否成功。""" + if not _GITEA_TOKEN: + return False + payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8") + try: + req = urllib.request.Request( + f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments", + data=payload, + headers={ + "Authorization": f"token {_GITEA_TOKEN}", + "Content-Type": "application/json", + }, + ) + urllib.request.urlopen(req, timeout=5) + return True + except Exception as e: + logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e) + return False + + def _create_gitea_issue( + self, repo: str, title: str, body: str, + assignees: list = None, + ) -> bool: + """创建 Gitea Issue。返回是否成功。""" + if not _GITEA_TOKEN: + return False + data = {"title": title, "body": body} + if assignees: + data["assignees"] = assignees + payload = json.dumps(data, ensure_ascii=False).encode("utf-8") + try: + req = urllib.request.Request( + f"{_GITEA_BASE}/repos/{repo}/issues", + data=payload, + headers={ + "Authorization": f"token {_GITEA_TOKEN}", + "Content-Type": "application/json", + }, + ) + urllib.request.urlopen(req, timeout=5) + return True + except Exception as e: + logger.warning("Gitea create issue failed on %s: %s", repo, e) + return False + + # ----------------------------------------------------------------------- + # 兼容:保留旧方法签名(但不再被 on_failure 调用) + # ----------------------------------------------------------------------- + def _build_gitea_links(self, event_type: str, event_data: dict) -> str: """根据事件类型构建 Gitea 链接。""" links = [] @@ -215,63 +518,4 @@ class ToolchainHandler(BaseTaskHandler): if "branch" in event_data and "commit" not in event_data: links.append(f"分支: {event_data['branch']}") - return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)" - - def _notify_via_mail_api( - self, - task_id: str, - reason: str, - evidence: str, - event_type: str, - event_data: Dict, - ) -> None: - """通过 Mail API 发送丰富的失败通知给主公。""" - # 构建行动指引 - action_hint = "请检查黑板任务并手动处理。" - et_lower = event_type.lower() - if "ci" in et_lower or "deploy" in et_lower: - action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。" - elif "review" in et_lower: - action_hint = "建议查看 PR review 状态,必要时通知相关开发者。" - elif "issue" in et_lower: - action_hint = "建议创建任务派给对应开发者处理 Issue。" - - # 构建事件详情 - event_details = "" - if event_data: - event_details = "\n".join( - f" - {k}: {v}" for k, v in event_data.items() - ) - - # 构建 Gitea 链接 - gitea_links = self._build_gitea_links(event_type, event_data) - - title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" - text = ( - f"任务 {task_id} 验证失败\n\n" - f"事件类型: {event_type or '未知'}\n" - f"事件详情:\n{event_details or ' (无)'}\n\n" - f"失败原因: {reason}\n" - f"证据: {evidence}\n\n" - f"{gitea_links}\n\n" - f"行动指引: {action_hint}" - ) - - payload = json.dumps({ - "from": "daemon", - "to": "pangtong-fujunshi", - "title": title, - "text": text, - "type": "inform", - }, ensure_ascii=False).encode("utf-8") - - try: - req = urllib.request.Request( - "http://localhost:8083/api/mail", - data=payload, - headers={"Content-Type": "application/json"}, - ) - urllib.request.urlopen(req, timeout=5) - logger.info("Toolchain %s: sent failure notification via Mail API", task_id) - except Exception as e: - logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e) + return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)" diff --git a/tests/unit/test_toolchain_handler_v2.py b/tests/unit/test_toolchain_handler_v2.py new file mode 100644 index 0000000..620495b --- /dev/null +++ b/tests/unit/test_toolchain_handler_v2.py @@ -0,0 +1,525 @@ +"""Unit tests for §17 ToolchainHandler 强约束实现.""" +import json +import os +import sys +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Add project root to path +PROJECT_ROOT = Path(__file__).parent.parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +from src.daemon.prompt_composer import PromptContext, PromptComposer +from src.daemon.toolchain_handler import ( + ToolchainHandler, + ToolchainContextSection, + ToolchainApiSection, + ToolchainConstraintsSection, + _ACTION_HINTS, +) +from src.daemon.base_task_handler import VerifyResult +from src.blackboard.db import init_db, get_connection + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def tmp_db(): + """Create a temporary _toolchain DB for testing.""" + with tempfile.TemporaryDirectory() as d: + db_path = Path(d) / "blackboard.db" + init_db(db_path) + yield db_path + + +@pytest.fixture +def handler(): + return ToolchainHandler() + + +def _insert_task(db_path, task_id, must_haves_json, status="working"): + """Insert a task into DB for testing.""" + conn = get_connection(db_path) + conn.execute( + "INSERT INTO tasks (id, title, description, assignee, assigned_by, " + "must_haves, task_type, status) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (task_id, "test", "test desc", "zhangfei-dev", "system", + must_haves_json, "toolchain", status) + ) + conn.commit() + conn.close() + + +def _insert_comment(db_path, task_id, author, body, comment_type="general"): + """Insert a comment into DB.""" + conn = get_connection(db_path) + conn.execute( + "INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)", + (task_id, author, comment_type, body) + ) + conn.commit() + conn.close() + + +def _insert_output(db_path, task_id, content="test output"): + """Insert an output into DB.""" + conn = get_connection(db_path) + conn.execute( + "INSERT INTO outputs (task_id, agent, output_type, title, summary) " + "VALUES (?, ?, ?, ?, ?)", + (task_id, "zhangfei-dev", "document", "test", content) + ) + conn.commit() + conn.close() + + +# --------------------------------------------------------------------------- +# Step 1a: PromptContext new fields +# --------------------------------------------------------------------------- + +class TestPromptContextFields: + def test_action_type_default(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + ) + assert ctx.action_type == "" + + def test_action_steps_default(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + ) + assert ctx.action_steps == [] + + def test_action_type_set(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + action_type="review_result", + ) + assert ctx.action_type == "review_result" + + def test_action_steps_set(self): + steps = ["step 1", "step 2"] + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + action_steps=steps, + ) + assert ctx.action_steps == steps + + +# --------------------------------------------------------------------------- +# Step 2a: ToolchainContextSection steps rendering + action_hint +# --------------------------------------------------------------------------- + +class TestToolchainContextSection: + def test_renders_steps(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + event_type="review_result", + event_data={"pr_number": "42", "repo": "sanguo/test"}, + action_type="review_result", + action_steps=["合并 PR", "提交 action report"], + ) + section = ToolchainContextSection() + result = section.render(ctx) + assert "必须执行的步骤" in result + assert "1. 合并 PR" in result + assert "2. 提交 action report" in result + + def test_renders_action_hint(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + event_type="ci_failure", + action_type="ci_failure", + action_steps=[], + ) + section = ToolchainContextSection() + result = section.render(ctx) + assert "CI 失败" in result + assert "需要你修复" in result + + def test_renders_default_hint_for_unknown_action_type(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + event_type="unknown", + action_type="unknown_type", + action_steps=[], + ) + section = ToolchainContextSection() + result = section.render(ctx) + assert "需要你执行动作的事件" in result + + def test_no_steps_no_steps_section(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + event_type="review_merged", + action_type="review_merged", + action_steps=[], + ) + section = ToolchainContextSection() + result = section.render(ctx) + assert "必须执行的步骤" not in result + + +# --------------------------------------------------------------------------- +# Step 2b: ToolchainApiSection action_report guidance +# --------------------------------------------------------------------------- + +class TestToolchainApiSection: + def test_has_action_report_instruction(self): + ctx = PromptContext( + task_id="tc-123", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="zhangfei-dev", + ) + section = ToolchainApiSection() + result = section.render(ctx) + assert "action_report" in result + assert "comment_type" in result + assert "tc-123" in result + + def test_no_manual_done_instruction(self): + ctx = PromptContext( + task_id="tc-123", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="zhangfei-dev", + ) + section = ToolchainApiSection() + result = section.render(ctx) + # Should NOT contain the old "标记为 done" instruction + assert "标记为 **done**" not in result + assert '"status": "done"' not in result + + def test_has_outputs_instruction(self): + ctx = PromptContext( + task_id="tc-123", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="zhangfei-dev", + ) + section = ToolchainApiSection() + result = section.render(ctx) + assert "outputs" in result + + def test_has_gitea_collaboration_instruction(self): + ctx = PromptContext( + task_id="tc-123", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="zhangfei-dev", + ) + section = ToolchainApiSection() + result = section.render(ctx) + assert "Gitea" in result + assert "Mail API" in result + + +# --------------------------------------------------------------------------- +# Step 2c: ToolchainConstraintsSection Red Flags +# --------------------------------------------------------------------------- + +class TestToolchainConstraintsSection: + def test_has_red_flags_table(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + ) + section = ToolchainConstraintsSection() + result = section.render(ctx) + assert "Red Flags" in result + assert "❌" in result + + def test_has_all_5_constraints(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + ) + section = ToolchainConstraintsSection() + result = section.render(ctx) + assert "必须按步骤执行" in result + assert "必须提交 action report" in result + assert "不要执行任何状态转换命令" in result + assert "不需要回复" in result + assert "所有协作通过 Gitea 完成" in result + + def test_has_strong_language(self): + ctx = PromptContext( + task_id="t1", title="test", description="d", + must_haves="", project_id="_toolchain", agent_id="a1", + ) + section = ToolchainConstraintsSection() + result = section.render(ctx) + assert "强制要求" in result + assert "不是建议" in result + + +# --------------------------------------------------------------------------- +# Step 2d: verify_completion tests +# --------------------------------------------------------------------------- + +class TestVerifyCompletion: + def test_action_report_passes(self, handler, tmp_db): + """action_report comment → pass""" + must_haves = json.dumps({"action_type": "review_result"}) + _insert_task(tmp_db, "t1", must_haves) + _insert_comment(tmp_db, "t1", "zhangfei-dev", + "已修复 CI", comment_type="action_report") + + result = handler.verify_completion("t1", tmp_db) + assert result.passed is True + assert result.reason == "has_action_report" + + def test_no_action_report_fallback_output(self, handler, tmp_db): + """No action_report but has output → pass (fallback)""" + must_haves = json.dumps({"action_type": "review_result"}) + _insert_task(tmp_db, "t2", must_haves) + _insert_output(tmp_db, "t2", "review result content") + + result = handler.verify_completion("t2", tmp_db) + assert result.passed is True + assert result.reason == "has_output" + + def test_no_action_report_fallback_comment(self, handler, tmp_db): + """No action_report but has substantial comment → pass (fallback)""" + must_haves = json.dumps({"action_type": "review_result"}) + _insert_task(tmp_db, "t3", must_haves) + _insert_comment(tmp_db, "t3", "zhangfei-dev", + "This is a sufficiently long comment about the task.") + + result = handler.verify_completion("t3", tmp_db) + assert result.passed is True + assert result.reason == "has_comment" + + def test_nothing_passes(self, handler, tmp_db): + """No action_report, no output, no comment → fail""" + must_haves = json.dumps({"action_type": "review_result"}) + _insert_task(tmp_db, "t4", must_haves) + + result = handler.verify_completion("t4", tmp_db) + assert result.passed is False + assert result.reason == "no_action" + + def test_short_comment_fails(self, handler, tmp_db): + """Comment < 20 chars → fail""" + must_haves = json.dumps({"action_type": "review_result"}) + _insert_task(tmp_db, "t5", must_haves) + _insert_comment(tmp_db, "t5", "zhangfei-dev", "ok") + + result = handler.verify_completion("t5", tmp_db) + assert result.passed is False + + def test_review_merged_auto_passes(self, handler, tmp_db): + """review_merged → always pass""" + must_haves = json.dumps({"action_type": "review_merged"}) + _insert_task(tmp_db, "t6", must_haves) + + result = handler.verify_completion("t6", tmp_db) + assert result.passed is True + assert result.reason == "merged_passthrough" + + def test_infrastructure_failure_auto_passes(self, handler, tmp_db): + """infrastructure_failure → always pass (anti-recursion)""" + must_haves = json.dumps({"action_type": "infrastructure_failure"}) + _insert_task(tmp_db, "t7", must_haves) + + result = handler.verify_completion("t7", tmp_db) + assert result.passed is True + assert result.reason == "infrastructure_passthrough" + + +# --------------------------------------------------------------------------- +# Step 3a: _send_toolchain_task tests +# --------------------------------------------------------------------------- + +class TestSendToolchainTask: + def test_creates_task_in_toolchain_db(self): + """_send_toolchain_task creates a task in _toolchain DB.""" + from src.api.toolchain_routes import _send_toolchain_task, _toolchain_db_path + + with patch("src.api.toolchain_routes.get_data_root") as mock_root: + with tempfile.TemporaryDirectory() as d: + mock_root.return_value = Path(d) + + task_id = _send_toolchain_task( + to_agent="zhangfei-dev", + title="Test Task", + description="Test description", + event_type="ci_failure", + action_type="ci_failure", + steps=["Fix test", "Submit report"], + context_data={"pr_number": 42}, + ) + + assert task_id.startswith("tc-") + + # Verify task was written to _toolchain DB + db_path = _toolchain_db_path() + conn = get_connection(db_path) + row = conn.execute( + "SELECT * FROM tasks WHERE id=?", (task_id,) + ).fetchone() + assert row is not None + assert row["task_type"] == "toolchain" + assert row["assignee"] == "zhangfei-dev" + + # Verify must_haves JSON + meta = json.loads(row["must_haves"]) + assert meta["event_type"] == "ci_failure" + assert meta["action_type"] == "ci_failure" + assert meta["steps"] == ["Fix test", "Submit report"] + assert meta["context"]["pr_number"] == 42 + conn.close() + + def test_unknown_agent_returns_empty(self): + """_send_toolchain_task with unknown agent returns empty string.""" + from src.api.toolchain_routes import _send_toolchain_task + + task_id = _send_toolchain_task( + to_agent="unknown-agent", + title="Test", + description="desc", + event_type="test", + action_type="test", + steps=[], + ) + assert task_id == "" + + +# --------------------------------------------------------------------------- +# Step 2e: on_failure three-way routing tests +# --------------------------------------------------------------------------- + +class TestOnFailureRouting: + def test_business_failure_creates_gitea_comment(self, handler, tmp_db): + """Business failure → Gitea PR comment @task assignee (not must_hives field)""" + # S4: must_hives does NOT contain assignee — production data doesn't have it + must_haves = json.dumps({ + "action_type": "review_result", + "context": {"repo": "sanguo/test", "pr_number": 42}, + "from": "system", + }) + # assignee is set on the tasks table row (as production code writes it) + _insert_task(tmp_db, "t-fail", must_haves) + + with patch.object(handler, "_create_gitea_comment") as mock_comment: + mock_comment.return_value = True + verify = VerifyResult(False, "no_action", "no action_report") + handler.on_failure("t-fail", "zhangfei-dev", tmp_db, verify) + mock_comment.assert_called_once() + call_args = mock_comment.call_args + assert call_args[0][0] == "sanguo/test" + assert call_args[0][1] == 42 + # M2: comment body should @ the task's assignee from tasks table + comment_body = call_args[0][2] + assert "@zhangfei-dev" in comment_body + + def test_infrastructure_failure_creates_task(self, handler, tmp_db): + """Infrastructure failure → direct DB task for jiangwei-infra (no reverse dep)""" + must_haves = json.dumps({ + "action_type": "review_result", + "context": {"repo": "sanguo/test", "pr_number": 42}, + }) + _insert_task(tmp_db, "t-infra", must_haves) + + with patch.object(handler, "_create_gitea_comment") as mock_comment: + mock_comment.return_value = False # Gitea API down + with patch.object(handler, "_create_gitea_issue") as mock_issue: + mock_issue.return_value = False # Gitea API still down + verify = VerifyResult(False, "no_action", "no action_report") + handler.on_failure("t-infra", "zhangfei-dev", tmp_db, verify) + + # S3: should directly INSERT into DB, not call _send_toolchain_task + # Verify a new task was created in DB for jiangwei-infra + conn = get_connection(tmp_db) + rows = conn.execute( + "SELECT * FROM tasks WHERE assignee=?", + ("jiangwei-infra",) + ).fetchall() + conn.close() + assert len(rows) >= 1, "No infrastructure_failure task created" + infra_task = rows[0] + assert infra_task["task_type"] == "toolchain" + meta = json.loads(infra_task["must_haves"]) + assert meta["action_type"] == "infrastructure_failure" + + +# --------------------------------------------------------------------------- +# Regression: _mail path unaffected +# --------------------------------------------------------------------------- + +class TestMailRegression: + def test_send_mail_still_exists(self): + """_send_mail function is preserved.""" + from src.api.toolchain_routes import _send_mail + assert callable(_send_mail) + + def test_send_mail_not_called_by_handlers(self): + """No toolchain handler calls _send_mail.""" + import inspect + from src.api import toolchain_routes + + # Get source of handler functions + source = inspect.getsource(toolchain_routes) + # _send_mail should appear only in its own definition, not in handler bodies + lines = source.split("\n") + in_handler = False + handler_send_mail_calls = [] + for i, line in enumerate(lines): + if line.strip().startswith("async def _handle_") or line.strip().startswith("async def _send_mention_mails"): + in_handler = True + elif line.strip().startswith("async def ") or line.strip().startswith("def _"): + if not line.strip().startswith("async def _handle_") and not line.strip().startswith("async def _send_mention_mails"): + in_handler = False + if in_handler and "_send_mail(" in line and not line.strip().startswith("#"): + handler_send_mail_calls.append((i, line.strip())) + + assert len(handler_send_mail_calls) == 0, \ + f"_send_mail still called in handlers: {handler_send_mail_calls}" + + +# --------------------------------------------------------------------------- +# Integration: full prompt build +# --------------------------------------------------------------------------- + +class TestFullPromptBuild: + def test_prompt_contains_all_sections(self, handler): + """Full prompt has context, API, and constraints sections.""" + ctx = PromptContext( + task_id="tc-test", + title="CI 失败修复", + description="Fix CI failure", + must_haves=json.dumps({ + "event_type": "ci_failure", + "action_type": "ci_failure", + "steps": ["Fix test", "Push", "Submit report"], + "context": {"pr_number": 42}, + }), + project_id="_toolchain", + agent_id="zhangfei-dev", + event_type="ci_failure", + event_data={"pr_number": "42", "repo": "sanguo/test"}, + action_type="ci_failure", + action_steps=["Fix test", "Push", "Submit report"], + ) + + prompt = handler.build_prompt(ctx) + + # Must have action hint + assert "CI 失败" in prompt + assert "需要你修复" in prompt + # Must have steps + assert "必须执行的步骤" in prompt + assert "1. Fix test" in prompt + # Must have API section with action_report + assert "action_report" in prompt + assert "tc-test" in prompt + # Must have constraints with Red Flags + assert "Red Flags" in prompt + assert "强制要求" in prompt