From f7fbdac89c6b902eff6e9586c15533d2edb01b88 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 9 Jun 2026 23:35:36 +0800 Subject: [PATCH] chore: simayi-approved changes - lint fixes, toolchain improvements, healthz MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All changes reviewed and APPROVED in PR #12 (Review ID: 40): - toolchain_routes: webhook repo/org format compat, content dedup (sha256), closed issue filter - dispatcher: inform mail crash 误标 done 修复 - ticker: cleanup and improvements - healthz endpoint - conftest: integration/e2e deselect markers - docs: design docs, test-guide updates - various lint/whitespace fixes across 30 files --- docs/design/13-toolchain-and-dev-workflow.md | 20 ++- docs/design/18-toolchain-e2e-test.md | 121 +++++++++++++++++++ docs/test-guide.md | 9 +- pyproject.toml | 4 +- src/api/blackboard_routes.py | 28 ++--- src/api/checkpoint_routes.py | 4 +- src/api/mail_routes.py | 9 +- src/api/project_routes.py | 8 +- src/api/toolchain_routes.py | 85 ++++++++++--- src/blackboard/db.py | 1 + src/blackboard/models.py | 2 +- src/blackboard/operations.py | 2 + src/blackboard/registry.py | 1 + src/cli/blackboard.py | 8 +- src/daemon/bootstrap.py | 3 +- src/daemon/counter.py | 2 +- src/daemon/dispatcher.py | 28 +++-- src/daemon/experience.py | 6 +- src/daemon/guardrails.py | 2 +- src/daemon/health.py | 6 +- src/daemon/inbox.py | 5 +- src/daemon/mail_notify.py | 2 +- src/daemon/review.py | 5 +- src/daemon/skill_system.py | 3 +- src/daemon/spawner.py | 10 +- src/daemon/sse.py | 5 +- src/daemon/ticker.py | 51 ++++---- src/main.py | 41 ++++--- src/utils.py | 1 + tests/conftest.py | 15 +++ 30 files changed, 362 insertions(+), 125 deletions(-) create mode 100644 docs/design/18-toolchain-e2e-test.md diff --git a/docs/design/13-toolchain-and-dev-workflow.md b/docs/design/13-toolchain-and-dev-workflow.md index e64b2d0..ca9d151 100644 --- a/docs/design/13-toolchain-and-dev-workflow.md +++ b/docs/design/13-toolchain-and-dev-workflow.md @@ -1590,7 +1590,7 @@ daemon 内部 ───────┘ │ 5. 创建 Mail │ | 只处理白名单内的事件类型 | 未知的忽略 + 日志 | | issue_comment 需判断来源 | 只处理 CI workflow 写的评论(按特定前缀匹配:`❌ **CI 失败**` 或统一后的 `[CI]` 前缀) | | PR 作者/审查者必须是已知 Agent | 未知的忽略 + 日志 | -| 幂等:同一事件不重复创建 Mail | 按 `{x_gitea_event}-{x_gitea_delivery}` 去重(delivery ID 来自 `X-Gitea-Delivery` header) | +| 幂等:同一事件不重复创建 Mail | 双重去重:① delivery UUID(`{event}-{delivery}`)标准幂等;② review 事件 payload 内容去重(`{event}:{pr_num}:{sender}:{sha256(body_or_content)[:16]}`),防御同一 review 被不同来源重复提交(2026-06-09 新增) | --- @@ -2007,6 +2007,9 @@ CI workflow 已有 `notify-on-failure` job(ci.yml),当前格式: | 7 | 签名算法 | ✅ 已确认 | Gitea 使用 HMAC-SHA256,代码注释已补 | | 8 | Webhook 作用范围 | ✅ 组织级 | Gitea 组织级 webhook(Hook ID=28),覆盖 sanguo 下所有仓库,新增仓库自动覆盖 | | 9 | ALLOWED_HOST_LIST | ✅ 已修复 | Gitea 容器配置 `192.168.2.153, 127.0.0.1, localhost, 172.17.0.0/16, 192.168.2.0/24` | +| 10 | Gitea review payload 格式 | ✅ 姜维调研确认(2026-06-08) | Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 | +| 11 | Spawner compact 检测窗口 | ✅ 已修复 | 窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 | +| 12 | inform 类型 Mail crash 误标 done | ✅ 已修复 | `_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s | --- @@ -2713,10 +2716,10 @@ Gitea v1.23.4 自带完整的 CI 管理界面: | # | 条件 | 状态 | 谁确认 | |---|------|------|--------| | 1 | act-runner 已注册且 label = `macos-arm64` | ✅ PM2 托管(sanguo-act-runner, id=44),崩溃自动重启 | 姜维确认 | -| 2 | Gitea repository secrets 已配置(CI_TOKEN) | ⚠️ 需确认 | 姜维 | +| 2 | Gitea repository secrets 已配置(CI_TOKEN) | ✅ 姜维确认(sanguo/moziplus-v2 已配 CI_TOKEN) | 姜维 | | 3 | Gitea 组织级 Webhook 已启用(Hook ID=28) | ✅ 已确认 | 已确认 | -| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ⚠️ 待分配 | 庞统协调 | -| 5 | main 分支保护规则(Review 才能 merge) | ⚠️ 需确认 | 姜维 | +| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ✅ 已写入各 Agent TOOLS.md,姜维确认 token 记录存在 | 庞统+姜维 | +| 5 | main 分支保护规则(Review 才能 merge) | ✅ 姜维已配置(moziplus-v2 + sanguo_moziplus_v2,需1个approve) | 姜维 | | 6 | 禁止在 daemon 运行时跑全量 E2E | ✅ 已警告司马懿 | 已确认 | > 第 5 点很关键——如果 main 分支没有保护规则,开发者可以直接 push main 跳过 Review。 @@ -2753,3 +2756,12 @@ Gitea v1.23.4 自带完整的 CI 管理界面: | §17.6.4 | 新增 P3 端到端验证结果(S1-S6 逐项) | | §17.6.4 | 新增调研发现:Review API 枚举值、PullRequestReview webhook 支持、act-runner PM2 托管 | | §17.10 | #1 状态更新:act-runner 已纳入 PM2 托管 | + +### v3.1 → v3.2 变更(工具链修复 + Mail 投递 bug 修复) + +| 编号 | 变更内容 | +|------|----------| +| §16.4 | Review handler 双格式兼容:HANDLERS 注册表同时注册 `pull_request_review` / `pull_request_approved` 等多种事件名;`_handle_pull_request_review` 兼容 repo webhook(review.state/body/user)和 org webhook(review.type/content/sender)两种 payload 格式 | +| §16.8 #10 | Gitea v1.23.4 review payload 调研结论(姜维 2026-06-08):Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 | +| §16.8 #11 | Spawner compact 检测窗口修复:窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 | +| §16.8 #12 | inform 类型 Mail crash 误标 done bug 修复:`_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s | diff --git a/docs/design/18-toolchain-e2e-test.md b/docs/design/18-toolchain-e2e-test.md new file mode 100644 index 0000000..533e2fe --- /dev/null +++ b/docs/design/18-toolchain-e2e-test.md @@ -0,0 +1,121 @@ +# §18. 工具链端到端验证测试 + +> 日期:2026-06-09 +> 状态:已完成 ✅ +> 目标:用真实 Webhook 触发验证整条 Mail 通知链路 + +## 前置确认 + +- Gitea 用户名 ↔ Agent ID 映射:完全一致(admin, guanyu-dev, jiangwei-infra, pangtong-fujunshi, simayi-challenger, zhangfei-dev, zhaoyun-data) +- Gitea 组织级 Webhook(Hook ID=28):姜维确认最近 5 条投递全部 is_succeed=1 +- Daemon 在线:sanguo-moziplus-v2 运行中 +- 测试仓库:sanguo/moziplus-v2 + +## 命名规范 + +- Issue 标题:`[E2E-TEST] xxx` +- PR 标题:`[E2E-TEST] xxx` +- 分支名:`test/e2e-` + +## 验证步骤 + +| 步骤 | 操作 | 触发事件 | 预期 Mail 通知 | 验证点 | +|------|------|----------|---------------|--------| +| 1 | 创建 Issue `[E2E-TEST] Issue指派测试`,assignee=zhangfei-dev | issues (assigned) | zhangfei-dev 收到 "Issue 指派" Mail | Mail to/模板正确 | +| 2 | 开分支 `test/e2e-`,创建 PR `[E2E-TEST] Review请求测试` | pull_request (opened) | simayi-challenger 收到 "Review 请求" Mail | Mail to/风险级别/文件列表 | +| 3 | PR Review APPROVED | pull_request_review (approved) | PR 作者(pangtong-fujunshi) 收到 "Review 通过 ✓" Mail | result=通过 ✓ | +| 4 | PR Review REQUEST_CHANGES | pull_request_review (rejected) | PR 作者收到 "Review 驳回 ✗" Mail | result=驳回 ✗ | +| 5 | Issue 上发评论 `[CI] CI 失败 — 分支: test/e2e-xxx, 错误: build timeout` | issue_comment | Issue 作者收到 "CI 失败" Mail | 模板含分支/错误摘要 | +| 6 | 创建标题含"部署失败"的 Issue(无指派) | issues (opened) | jiangwei-infra + pangtong-fujunshi 各收到 "部署失败" Mail | 双收件人 | +| 7 | 关闭步骤 1 的 Issue,再发 CI 失败评论 | issue_comment (closed issue) | 不产生 Mail(负面测试) | handler 跳过 closed | +| 8 | 重发步骤 1 Webhook(相同 delivery ID) | 重复事件 | 不产生新 Mail(幂等测试) | 返回 duplicate | + +## 签名校验 + +已测试(GITEA_WEBHOOK_SECRET 已配置且生效): +- ✅ 正确签名:请求正常处理 +- ✅ 无签名:返回 403 `signature verification failed` + +## Review 意见来源 + +- 姜维(基础设施确认 + 边界验证建议) +- 司马懿(遗漏点补充 + 命名规范 + 风险防范) + +--- + +## 执行记录 + +> 2026-06-09 00:40~00:50 CST + +### 步骤 1:Issue 指派 ✅ +- 操作:创建 Issue #22 `[E2E-TEST] Issue指派测试`,assignee=zhangfei-dev +- Mail:`mail-1780936736480`,from=system, to=zhangfei-dev, title=`Issue 指派: [E2E-TEST] Issue指派测试` +- 模板渲染正确(含 Issue 链接、标签、描述、建议分支名) + +### 步骤 2:PR Review 请求 ✅ +- 操作:创建分支 `test/e2e-1780936838`,创建 PR #23 +- Mail:`mail-1780936851715`,from=system, to=simayi-challenger +- 模板含 PR 链接、标题、作者(pangtong-fujunshi)、分支、风险级别(standard) +- 附带:CI 失败通知 `mail-1780936876572`(CI 自动触发,符合预期) + +### 步骤 3:Review APPROVED ✅ +- 操作:用 simayi-challenger token 提交 APPROVED review +- Mail:`mail-1780936968411`,from=system, to=pangtong-fujunshi, title=`Review 通过 ✓` +- 描述含审查者(simayi-challenger)、review body +- ⚠️ 收到 2 封重复 Mail(org webhook + repo webhook 双触发) + +### 步骤 4:Review REQUEST_CHANGES ✅ +- 操作:用 simayi-challenger token 提交 REQUEST_CHANGES review +- Mail:`mail-1780936972207`,from=system, to=pangtong-fujunshi, title=`Review 驳回 ✗` +- ⚠️ 同上,收到 2 封重复 Mail + +### 步骤 5:CI 失败评论 ✅ +- 操作:在 Issue #22 发评论 `[CI] CI 失败 — 分支: test/e2e-1780936838, 错误: build timeout` +- Mail:`mail-1780936994513`,from=system, to=pangtong-fujunshi, title=`CI 失败: sanguo/moziplus-v2#22` +- 模板含分支提取和错误摘要 + +### 步骤 6:部署失败 Issue ✅ +- 操作:创建 Issue #24 `[E2E-TEST] 部署失败: test deploy`(无指派) +- Mail:`mail-1780936999660` to=jiangwei-infra, `mail-1780936999684` to=pangtong-fujunshi +- 双收件人验证通过 ✅ + +### 步骤 7:已关闭 Issue 负面测试 ✅ +- 操作:关闭 Issue #22 后发 `[CI] CI 失败 — 应被过滤` +- 结果:未产生新 Mail ✅(只有步骤 5 的 1 封 CI Mail,步骤 7 的评论被正确过滤) + +### 步骤 8:幂等测试 ✅ +- 操作:构造带正确 HMAC-SHA256 签名的 Webhook,用同一 delivery ID `test-idempotency-002` 发两次 +- 第一次:返回 `ok`,产生 Mail ✅ +- 第二次:返回 `duplicate`,无新 Mail ✅ +- 额外验证:不带签名的请求返回 403 `signature verification failed`(签名校验正常工作) + +--- + +## 汇总 + +| 步骤 | 状态 | 备注 | +|------|------|------| +| 1. Issue 指派 | ✅ 通过 | Mail to/模板正确 | +| 2. PR Review 请求 | ✅ 通过 | Mail to/风险级别/文件列表正确 | +| 3. Review APPROVED | ✅ 通过 | E2E 测试中产生 2 封 Mail(根因已查明,非平台问题) | +| 4. Review REQUEST_CHANGES | ✅ 通过 | 同上 | +| 5. CI 失败评论 | ✅ 通过 | 分支提取正确 | +| 6. 部署失败 Issue | ✅ 通过 | 双收件人验证通过 | +| 7. 已关闭 Issue 过滤 | ✅ 通过 | 负面测试通过,无新 Mail | +| 8. 幂等测试 | ✅ 通过 | 第二次返回 duplicate,无新 Mail;签名校验正常拦截无签名请求 | + +## 发现的问题 + +### Review 事件双 Mail(已修复) +- **现象**:E2E 测试步骤 3/4 中 Review 事件产生 2 封 Mail +- **根因**(姜维深入调查确认):E2E 测试中庞统手动用 simayi token 提交了 Review,同时 simayi agent 收到 Review 请求 Mail 后也自主提交了 Review。是两次独立的 API 调用,**不是 Gitea bug 或平台配置问题** + - 姜维控制实验:一次 review API 调用只产生 1 个 hook_task + - Gitea 路由日志确认两次 POST 间隔 7 秒,payload 有差异(review_comments、updated_at 不同) + - 之前的错误分析("Gitea webhookNotifier + actionsNotifier 双投递")已被推翻:actionsNotifier 走 handleWorkflows() 不创建 hook_task +- **修复**:payload 内容去重作为防御性编程保留(`_is_duplicate` 新增内容去重 key = event + pr_num + sender + sha256(body_or_content)),司马懿 APPROVED +- **验证**:PR #27 实测只产生 1 封 Mail ✅ + +### 根因分析教训 +- 姜维第一次分析给出了错误根因(Gitea 双 notifier),第二次深入调查后自我纠正 +- 庞统把姜维的第一次结论当事实汇报给主公,没有标注"这是姜维的调查结论,尚未独立验证" +- **改进**:SOUL.md 新增规则——推测 vs 事实显式标注、引用他人结论时标注来源、结论被推翻时及时更正 diff --git a/docs/test-guide.md b/docs/test-guide.md index 5401e36..1ba1a5d 100644 --- a/docs/test-guide.md +++ b/docs/test-guide.md @@ -11,9 +11,10 @@ | 场景 | 命令 | 耗时 | 说明 | |------|------|------|------| | **改了某个模块** | `pytest tests/unit/test_spawner.py` | <5s | 只跑改动的模块对应的单元测试 | -| **改了 API 层** | `pytest tests/integration/` | ~1min | 跑全部集成测试 | -| **提交前快速验证** | `pytest -m "not e2e"` | ~2min | 不跑 E2E,验证不破坏现有功能 | -| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 E2E,真实 Agent | +| **改了 API 层** | `RUN_INTEGRATION=1 pytest tests/integration/` | ~1min | 跑全部集成测试 | +| **提交前快速验证** | `pytest` | ~2min | 默认排除 integration 和 e2e | +| **含集成测试** | `RUN_INTEGRATION=1 pytest` | ~5min | 包含 integration 测试 | +| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 e2e,真实 Agent | | **只跑 E2E 场景** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_scenarios.py` | ~30min | 串行,一个跑完再下一个 | | **只跑 E2E 压力** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_stress.py` | ~10min | 并发测试 | @@ -101,7 +102,7 @@ E2E(慢,真实 Agent) → 验证完整链路,需要 RUN_INTEGRATION=1 ## 关键规则 1. **只有 E2E 会 spawn 真实 Agent**,单元和集成不会 -2. **不带 `RUN_INTEGRATION=1` 跑 `pytest` 是安全的**,E2E 全部 skip +2. **直接跑 `pytest` 是安全的**,integration 和 e2e 全部被排除(需 `RUN_INTEGRATION=1` 才跑) 3. **E2E 场景测试串行**,一个完成再下一个,失败要分析根因再继续 4. **E2E 压力测试并行**,场景测试全通过后再跑 5. **测试数据用 `e2e-` 前缀**,atexit 兜底清理,手动清理见上方 diff --git a/pyproject.toml b/pyproject.toml index 801e0a4..34e4063 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,10 @@ requires-python = ">=3.9" asyncio_mode = "auto" testpaths = ["tests"] markers = [ - "integration: real agent tests (requires RUN_INTEGRATION=1)", + "integration: integration tests (requires RUN_INTEGRATION=1)", + "e2e: end-to-end tests with real daemon + Agent (requires RUN_INTEGRATION=1)", ] +# Default deselection of integration/e2e handled in conftest.py pytest_collection_modifyitems [tool.pyright] venvPath = "." diff --git a/src/api/blackboard_routes.py b/src/api/blackboard_routes.py index 5476ac7..8bf30d3 100644 --- a/src/api/blackboard_routes.py +++ b/src/api/blackboard_routes.py @@ -5,17 +5,17 @@ from __future__ import annotations import json import os from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from src.blackboard.operations import Blackboard from src.blackboard.models import Task, Review from src.blackboard.queries import Queries -from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES +from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES from src.blackboard.registry import ProjectRegistry -import src.utils as _utils +from src.utils import get_data_root router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"]) @@ -27,7 +27,7 @@ def _validate_project(project_id: str) -> str: """校验 project_id,已知项目/虚拟项目放行,未知项目返回 400""" if project_id in _VIRTUAL_PROJECTS: return project_id - reg = ProjectRegistry(_utils.get_data_root()) + reg = ProjectRegistry(get_data_root()) if reg.get_project(project_id): return project_id raise HTTPException(400, { @@ -43,12 +43,12 @@ def _validate_project(project_id: str) -> str: def _bb(project_id: str) -> Blackboard: _validate_project(project_id) - return Blackboard(_utils.get_data_root() / project_id / "blackboard.db") + return Blackboard(get_data_root() / project_id / "blackboard.db") def _q(project_id: str) -> Queries: _validate_project(project_id) - return Queries(_utils.get_data_root() / project_id / "blackboard.db") + return Queries(get_data_root() / project_id / "blackboard.db") # --- Tasks --- @@ -100,7 +100,7 @@ async def create_task(project_id: str, body: Dict[str, Any]): date_str = datetime.now().strftime('%Y%m%d') # seq: 查当前项目最大 seq import sqlite3 - db_path = _utils.get_data_root() / project_id / "blackboard.db" + db_path = get_data_root() / project_id / "blackboard.db" try: conn = sqlite3.connect(str(db_path), timeout=5) max_id_row = conn.execute( @@ -240,7 +240,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): }) if not bb.update_task_status(task_id, new_status, - agent=body.get("agent")): + agent=body.get("agent")): raise HTTPException(409, { "error": "transition_failed", "detail": f"Status update failed for {task_id}", @@ -265,7 +265,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]): # --- @mention 自动提取(#04) --- _KNOWN_AGENT_IDS: list = [] - def _init_agent_ids(): """从配置文件加载 Agent ID 列表""" global _KNOWN_AGENT_IDS @@ -280,7 +279,6 @@ def _init_agent_ids(): except Exception: _KNOWN_AGENT_IDS = [] - def _extract_mentions(text: str) -> list: """从文本中自动提取 @agent-id 格式的 mention""" import re @@ -319,8 +317,8 @@ async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]): merged_mentions = list(set(explicit_mentions + auto_mentions)) cid = bb.add_comment(task_id, body["author"], comment_body, - comment_type=body.get("comment_type", "general"), - mentions=merged_mentions) + comment_type=body.get("comment_type", "general"), + mentions=merged_mentions) if merged_mentions: bb.record_mentions(cid, task_id, merged_mentions) # #10: SSE 通知前端黑板有新 comment @@ -426,8 +424,8 @@ async def get_decisions(project_id: str, task_id: str): async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) did = bb.add_decision(task_id, body["decider"], body["decision"], - body["rationale"], - alternatives=body.get("alternatives")) + body["rationale"], + alternatives=body.get("alternatives")) return {"ok": True, "decision_id": did} @@ -437,7 +435,7 @@ async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]): async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]): bb = _bb(project_id) oid = bb.add_observation(task_id, body["observer"], body["body"], - severity=body.get("severity", "info")) + severity=body.get("severity", "info")) return {"ok": True, "observation_id": oid} diff --git a/src/api/checkpoint_routes.py b/src/api/checkpoint_routes.py index 0b3d357..c713067 100644 --- a/src/api/checkpoint_routes.py +++ b/src/api/checkpoint_routes.py @@ -10,7 +10,7 @@ from pydantic import BaseModel from typing import Optional from src.blackboard.operations import Blackboard -import src.utils as _utils +from src.utils import get_data_root router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"]) @@ -33,7 +33,7 @@ class ResolveCheckpointRequest(BaseModel): # ── 工具 ── def _bb(project_id: str) -> Blackboard: - db_path = _utils.get_data_root() / project_id / "blackboard.db" + db_path = get_data_root() / project_id / "blackboard.db" if not db_path.exists(): raise HTTPException(status_code=404, detail="Project not found") return Blackboard(db_path) diff --git a/src/api/mail_routes.py b/src/api/mail_routes.py index d6542ed..ef83690 100644 --- a/src/api/mail_routes.py +++ b/src/api/mail_routes.py @@ -9,7 +9,7 @@ from __future__ import annotations import json from datetime import datetime from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query @@ -17,7 +17,7 @@ from src.blackboard.db import init_db from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.blackboard.queries import Queries -import src.utils as _utils +from src.utils import get_data_root def _get_valid_agents() -> set: @@ -36,14 +36,13 @@ def _get_valid_agents() -> set: # fallback:硬编码 return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"} - router = APIRouter(prefix="/api/mail", tags=["mail"]) MAIL_PROJECT_ID = "_mail" def _db_path() -> Path: - root = _utils.get_data_root() + root = get_data_root() db = root / MAIL_PROJECT_ID / "blackboard.db" db.parent.mkdir(parents=True, exist_ok=True) init_db(db) @@ -223,7 +222,7 @@ async def send_mail(body: Dict[str, Any]): # A8: 只有原邮件的双方能回复(严格 1 对 1) if from_agent not in (orig_from, orig_to): - raise HTTPException(400, "只有邮件的发送者或接收者可以回复") + raise HTTPException(400, f"只有邮件的发送者或接收者可以回复") # A6/A7: 自动纠正 to → 原邮件发件者 to_agent = body.get("to", "").strip() diff --git a/src/api/project_routes.py b/src/api/project_routes.py index 06d5247..ff9d1f4 100644 --- a/src/api/project_routes.py +++ b/src/api/project_routes.py @@ -3,18 +3,18 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException, Query from src.blackboard.registry import ProjectRegistry -import src.utils as _utils +from src.utils import get_data_root router = APIRouter(prefix="/api/projects", tags=["projects"]) def _registry() -> ProjectRegistry: - return ProjectRegistry(_utils.get_data_root()) + return ProjectRegistry(get_data_root()) @router.get("") @@ -76,7 +76,7 @@ async def list_projects(): async def create_project(body: Dict[str, Any]): reg = _registry() try: - reg.create_project( + info = reg.create_project( body["id"], body["name"], agents=body.get("agents", []), description=body.get("description", ""), diff --git a/src/api/toolchain_routes.py b/src/api/toolchain_routes.py index 27b9c86..666708a 100644 --- a/src/api/toolchain_routes.py +++ b/src/api/toolchain_routes.py @@ -28,7 +28,7 @@ from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.config.agents import AGENT_IDS from src.daemon.toolchain_templates import render_template -import src.utils as _utils +from src.utils import get_data_root logger = logging.getLogger(__name__) @@ -46,17 +46,42 @@ _TTL_SECONDS = 7 * 24 * 3600 _idempotency_lock = asyncio.Lock() -def _is_duplicate(event: str, delivery: str) -> bool: - """检查 Webhook 是否重复投递,自动清理过期条目。""" +def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = None) -> bool: + """检查 Webhook 是否重复投递,自动清理过期条目。 + + 双重去重策略: + 1. delivery UUID 去重(标准幂等) + 2. payload 内容去重(应对 Gitea v1.23.4 的 webhookNotifier + actionsNotifier + 对同一 review 生成不同 UUID 的双投递问题) + """ now = time.time() # 清理过期条目 while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS: _, key = _delivery_timestamps.pop(0) _delivery_cache.discard(key) + # 检查 delivery UUID 去重 key = f"{event}-{delivery}" if key in _delivery_cache: return True + + # 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容) + # 注意:Gitea webhookNotifier 用 review.body,actionsNotifier 用 review.content + # 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key + if payload and "review" in event: + pr_num = payload.get("pull_request", {}).get("number") + sender = payload.get("sender", {}).get("login") + review = payload.get("review", {}) + # 取 body 或 content,优先 body(webhookNotifier 格式) + content = review.get("body", "") or review.get("content", "") + content_hash = hashlib.sha256(content.encode()).hexdigest()[:16] + content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}" + if content_key in _delivery_cache: + logger.info("Content-based duplicate detected: %s PR#%s by %s", event, pr_num, sender) + return True + _delivery_cache.add(content_key) + _delivery_timestamps.append((now, content_key)) + _delivery_cache.add(key) _delivery_timestamps.append((now, key)) return False @@ -141,12 +166,13 @@ def _calc_risk_level(changed_files: List[str]) -> str: # --------------------------------------------------------------------------- + MAIL_PROJECT_ID = "_mail" def _mail_db_path() -> Path: """获取 Mail 数据库路径,确保目录存在。""" - root = _utils.get_data_root() + root = get_data_root() db = root / MAIL_PROJECT_ID / "blackboard.db" db.parent.mkdir(parents=True, exist_ok=True) init_db(db) @@ -257,7 +283,12 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None: async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: - """处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。""" + """处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。 + + 支持两种 payload 格式: + - repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES" + - org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected" + """ review = payload.get("review") if not review or not isinstance(review, dict): logger.warning("pull_request_review event missing review field, skipping") @@ -266,7 +297,18 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: if not pr or not isinstance(pr, dict): logger.warning("pull_request_review event missing pull_request field, skipping") return + + # 兼容两种 payload 格式提取 state state = review.get("state", "") + if not state: + # org webhook 格式:review.type = "pull_request_review_approved" + review_type = review.get("type", "") + type_map = { + "pull_request_review_approved": "APPROVED", + "pull_request_review_rejected": "REQUEST_CHANGES", + "pull_request_review_comment": "COMMENTED", + } + state = type_map.get(review_type, "") # 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态 if state == "COMMENTED": @@ -276,8 +318,9 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: pr_number = pr.get("number", 0) pr_title = pr.get("title", "") pr_author = pr.get("user", {}).get("login", "unknown") - reviewer = review.get("user", {}).get("login", "unknown") - review_body = review.get("body", "(无评论)") + # 兼容:org webhook 的 review 没有 user,从 sender 取 + reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown") + review_body = review.get("body", "") or review.get("content", "(无评论)") result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"} if state not in result_map: @@ -371,6 +414,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None: if not issue or not isinstance(issue, dict): logger.warning("issue_comment event missing issue field, skipping") return + + # 已关闭的 Issue/PR 不再发送 CI 失败通知 + if issue.get("state") == "closed": + logger.debug("Skipping CI failure notification for closed issue #%s", issue.get("number")) + return + repo = _repo_fullname(payload) issue_number = issue.get("number", 0) @@ -400,6 +449,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None: _EVENT_HANDLERS: Dict[str, Any] = { "pull_request": _handle_pull_request, "pull_request_review": _handle_pull_request_review, + "pull_request_review_approved": _handle_pull_request_review, + "pull_request_review_rejected": _handle_pull_request_review, + "pull_request_review_comment": _handle_pull_request_review, + # Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段) + "pull_request_approved": _handle_pull_request_review, + "pull_request_rejected": _handle_pull_request_review, "issues": _handle_issues, "issue_comment": _handle_issue_comment, } @@ -432,20 +487,20 @@ async def gitea_webhook( logger.warning("Webhook signature verification failed") return Response(status_code=403, content="signature verification failed") - # 2. 幂等检查 - if x_gitea_event and x_gitea_delivery: - async with _idempotency_lock: - if _is_duplicate(x_gitea_event, x_gitea_delivery): - logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) - return Response(status_code=200, content="duplicate") - - # 3. 解析 payload + # 3. 解析 payload(提前解析,用于幂等检查) try: payload = await request.json() except Exception: logger.warning("Failed to parse webhook payload") return Response(status_code=200, content="invalid payload") + # 2. 幂等检查(需要在 payload 解析后,以支持内容去重) + if x_gitea_event and x_gitea_delivery: + async with _idempotency_lock: + if _is_duplicate(x_gitea_event, x_gitea_delivery, payload): + logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) + return Response(status_code=200, content="duplicate") + # 4. 查找 handler handler = _EVENT_HANDLERS.get(x_gitea_event or "") if not handler: diff --git a/src/blackboard/db.py b/src/blackboard/db.py index 821318e..f94c88c 100644 --- a/src/blackboard/db.py +++ b/src/blackboard/db.py @@ -4,6 +4,7 @@ from __future__ import annotations import sqlite3 from pathlib import Path +from typing import Optional def init_db(db_path: Path) -> None: diff --git a/src/blackboard/models.py b/src/blackboard/models.py index b6a2dbc..617588a 100644 --- a/src/blackboard/models.py +++ b/src/blackboard/models.py @@ -3,7 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, List, Optional +from typing import Any, Dict, List, Optional @dataclass diff --git a/src/blackboard/operations.py b/src/blackboard/operations.py index d27e32d..2d75f3e 100644 --- a/src/blackboard/operations.py +++ b/src/blackboard/operations.py @@ -11,6 +11,7 @@ from typing import Any, Dict, List, Optional from .db import ( VALID_TRANSITIONS, + VALID_STATUSES, COMMENT_TYPES, EVENT_TYPES, OUTPUT_TYPES, @@ -692,6 +693,7 @@ class Blackboard: finally: conn.close() + # ── Checkpoint CRUD(M3) ── def create_checkpoint( diff --git a/src/blackboard/registry.py b/src/blackboard/registry.py index 10e227d..af1fafd 100644 --- a/src/blackboard/registry.py +++ b/src/blackboard/registry.py @@ -355,3 +355,4 @@ class ProjectRegistry: def reload(self) -> None: """兼容旧接口(SQLite 不需要 reload cache)""" + pass diff --git a/src/cli/blackboard.py b/src/cli/blackboard.py index dc5690f..853332a 100644 --- a/src/cli/blackboard.py +++ b/src/cli/blackboard.py @@ -9,14 +9,14 @@ from pathlib import Path from typing import List, Optional from src.blackboard.operations import Blackboard -import src.utils as _utils -from src.blackboard.models import Task, Review +from src.utils import get_data_root +from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry def _find_project_root() -> Path: - return _utils.get_data_root() + return get_data_root() def _get_bb(project_id: str) -> Blackboard: @@ -262,7 +262,7 @@ def build_admin_parser() -> argparse.ArgumentParser: p_pc.add_argument("--description", default="") # project list - sub.add_parser("project-list", help="List projects") + p_pl = sub.add_parser("project-list", help="List projects") # project archive p_pa = sub.add_parser("project-archive", help="Archive project") diff --git a/src/daemon/bootstrap.py b/src/daemon/bootstrap.py index e142b51..e5d5aca 100644 --- a/src/daemon/bootstrap.py +++ b/src/daemon/bootstrap.py @@ -11,7 +11,8 @@ A 类 Skill 由引擎确定性注入全文,不靠 Description 触发。 import logging import os -from typing import Any, List +from pathlib import Path +from typing import Any, Dict, List, Optional logger = logging.getLogger("moziplus-v2.bootstrap") diff --git a/src/daemon/counter.py b/src/daemon/counter.py index 999655f..b70c209 100644 --- a/src/daemon/counter.py +++ b/src/daemon/counter.py @@ -73,7 +73,7 @@ class ActiveAgentCounter: cd = seconds if seconds is not None else self._default_cooldown_seconds self._cooldown_until[agent_id] = time.time() + cd logger.info("Cooldown set for %s: %.0fs (until %.0f)", - agent_id, cd, self._cooldown_until[agent_id]) + agent_id, cd, self._cooldown_until[agent_id]) async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool: """三层检查:cooldown → global → per agent → per session key""" diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index a5140da..4f9fa2b 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -14,6 +14,7 @@ from __future__ import annotations import json import logging import sqlite3 +from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional @@ -21,7 +22,7 @@ from typing import Any, Dict, List, Optional from src.blackboard.models import Task from src.blackboard.db import get_connection from src.daemon.spawner import AgentBusyError -from src.daemon.router import AgentRouter +from src.daemon.router import AgentRouter, RouteDecision logger = logging.getLogger("moziplus-v2.dispatcher") @@ -193,7 +194,6 @@ class Dispatcher: _task_id = task.id _mail_db = db_path _disp = self - def _mail_on_checks_passed(): nonlocal _mail_marked_working if not _disp._mail_auto_working(_task_id, _mail_db): @@ -203,8 +203,8 @@ class Dispatcher: # 构建 spawn message message = self._build_spawn_message(task, agent_id, project_config, - mode=decision.get("mode", ""), - spawn_type=action_type or "executor") + mode=decision.get("mode", ""), + spawn_type=action_type or "executor") # v2.7.2: on_complete 只含业务逻辑,不含 counter.release # counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证 @@ -218,7 +218,7 @@ class Dispatcher: def _mail_on_complete(aid, outcome): # 幻觉门控:检查是否有回复,自动标 done/failed try: - _dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves) + _dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves, outcome=outcome) except Exception as e: logger.error("Mail %s: on_complete error: %s", _task_id, e) on_complete = _mail_on_complete @@ -269,8 +269,8 @@ class Dispatcher: from src.blackboard.blackboard import Blackboard bb = Blackboard(_task_db) bb.add_comment(_task_id, "daemon", - f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", - comment_type="review") + f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", + comment_type="review") logger.info("Task %s: review verdict=%s, notified assignee=%s", _task_id, verdict_str, task_row["assignee"] if task_row else "?") # 不标 done,保持 review 状态 @@ -576,7 +576,7 @@ class Dispatcher: def _mail_oc_legacy(aid, outcome): try: - _disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh) + _disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh, outcome=outcome) except Exception as e: logger.error("Mail %s: legacy on_complete error: %s", _t_id, e) on_complete_legacy = _mail_oc_legacy @@ -661,7 +661,7 @@ class Dispatcher: logger.error("Mail %s: failed to revert to pending: %s", task_id, e) def _mail_auto_complete(self, task_id: str, agent_id: str, - db_path: Path, must_haves: str) -> None: + db_path: Path, must_haves: str, outcome=None) -> None: """Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)""" try: # 解析 performative @@ -712,6 +712,14 @@ class Dispatcher: logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id) return + # inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投 + # Task 路径不受此 bug 影响(走 _task_auto_complete 独立逻辑) + if performative == "inform": + INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"} + if outcome not in INFORM_DONE_OUTCOMES: + logger.info("Mail %s: inform outcome=%s, skip auto-done", task_id, outcome) + return + # 标 done(重试 3 次) for attempt in range(3): try: @@ -858,7 +866,7 @@ class Dispatcher: logger.error("Task %s: mark status error: %s", task_id, e) @staticmethod - def _check_crash_limit(task_id: str, db_path: Path, limit: int = 3, + def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3, window_minutes: int = 30) -> bool: """v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。 diff --git a/src/daemon/experience.py b/src/daemon/experience.py index 1745ded..663ef74 100644 --- a/src/daemon/experience.py +++ b/src/daemon/experience.py @@ -14,7 +14,7 @@ import logging import re from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger("moziplus-v2.experience") @@ -68,7 +68,7 @@ class Experience: @classmethod def from_dict(cls, data: Dict[str, Any]) -> Experience: return cls(**{k: v for k, v in data.items() if k != "id"}, - experience_id=data.get("id")) + experience_id=data.get("id")) class ExperienceStore: @@ -284,7 +284,7 @@ class ExperienceDistiller: all_tags.append(task_type) results = self.store.search(tags=all_tags if all_tags else None, - query=query, limit=limit) + query=query, limit=limit) # 按置信度排序 results.sort(key=lambda e: e.confidence, reverse=True) diff --git a/src/daemon/guardrails.py b/src/daemon/guardrails.py index 6de476d..8412b58 100644 --- a/src/daemon/guardrails.py +++ b/src/daemon/guardrails.py @@ -4,7 +4,7 @@ from __future__ import annotations import logging import re -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional diff --git a/src/daemon/health.py b/src/daemon/health.py index 02a10b5..50ca567 100644 --- a/src/daemon/health.py +++ b/src/daemon/health.py @@ -9,9 +9,9 @@ from __future__ import annotations import json import logging from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, Optional -from src.blackboard.db import get_connection +from src.blackboard.db import get_connection, init_db from src.blackboard.queries import Queries logger = logging.getLogger("moziplus-v2.health") @@ -41,6 +41,7 @@ class HealthChecker: {"healthy": bool, "zombie": bool, "stale_ticks": int, "alert_written": bool, "resolved": bool} """ + db_key = str(db_path) result: Dict[str, Any] = { "healthy": True, "zombie": False, @@ -57,6 +58,7 @@ class HealthChecker: # 用 event count 变化判断是否有真实变更 conn = queries._conn() try: + total_events = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] non_tick_events = conn.execute( "SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' " "AND event_type != 'agent_zombie_detected'" diff --git a/src/daemon/inbox.py b/src/daemon/inbox.py index eb25989..f76d9ca 100644 --- a/src/daemon/inbox.py +++ b/src/daemon/inbox.py @@ -15,6 +15,7 @@ from __future__ import annotations import asyncio import json import logging +import os from pathlib import Path from typing import Any, Callable, Coroutine, Dict, List, Optional @@ -56,7 +57,7 @@ class InboxWatcher: self._running = True self._task = asyncio.create_task(self._loop()) logger.info("Inbox watcher started (path=%s, interval=%.1fs)", - self.inbox_path, self.watch_interval) + self.inbox_path, self.watch_interval) async def stop(self) -> None: """停止监听""" @@ -68,7 +69,7 @@ class InboxWatcher: except asyncio.CancelledError: pass logger.info("Inbox watcher stopped (processed=%d, errors=%d)", - self._total_processed, self._total_errors) + self._total_processed, self._total_errors) @property def is_running(self) -> bool: diff --git a/src/daemon/mail_notify.py b/src/daemon/mail_notify.py index d1ee741..020415e 100644 --- a/src/daemon/mail_notify.py +++ b/src/daemon/mail_notify.py @@ -108,7 +108,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, ) bb.create_task(notify_task) logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)", - original_mail_id, target_agent, from_agent, reason, notify_id) + original_mail_id, target_agent, from_agent, reason, notify_id) except Exception as e: logger.warning("notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e) diff --git a/src/daemon/review.py b/src/daemon/review.py index 1bff5b1..667923b 100644 --- a/src/daemon/review.py +++ b/src/daemon/review.py @@ -8,12 +8,15 @@ from __future__ import annotations import json import logging +import re +from datetime import datetime from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple from src.blackboard.models import Task from src.blackboard.operations import Blackboard +from src.blackboard.queries import Queries logger = logging.getLogger("moziplus-v2.review") diff --git a/src/daemon/skill_system.py b/src/daemon/skill_system.py index a54afb4..7774763 100644 --- a/src/daemon/skill_system.py +++ b/src/daemon/skill_system.py @@ -10,11 +10,12 @@ from __future__ import annotations import json import logging +import re from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple logger = logging.getLogger("moziplus-v2.skill") diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 7876435..c53a48e 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1373,13 +1373,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ # A17: 真正的 crash → 保持 working,ticker 兜底 return {"outcome": "crashed", "should_retry": False, "original": "process_crash"} - # stdout 为空但 exit=0:可能是正常完成但 --json 没输出 - # 查任务状态判断 + # A13 revised: stdout 为空但 exit=0 → 信任进程退出码,视为正常完成 + # 实测发现 openclaw session=None + exit=0 是正常场景(inform 通知等) + # 旧逻辑按 task_status 区分,非终态判 agent_error → 导致 inform 邮件永不标 done if status is None and not stdout_text.strip() and exit_code == 0: - terminal_statuses = {"done", "review"} - if task_status in terminal_statuses: - return {"outcome": "completed", "should_retry": False} - return {"outcome": "agent_error", "should_retry": False} + return {"outcome": "completed", "should_retry": False} # A7-A12: status=error → 不续杯,stderr 辅助分类 if status == "error": diff --git a/src/daemon/sse.py b/src/daemon/sse.py index e844bd7..d3f960b 100644 --- a/src/daemon/sse.py +++ b/src/daemon/sse.py @@ -9,11 +9,14 @@ from __future__ import annotations import asyncio import json import logging +import subprocess import uuid from datetime import datetime from enum import Enum -from typing import Any, Dict, List, Optional +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Set +from src.blackboard.models import Event logger = logging.getLogger("moziplus-v2.sse") diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 7b86317..6a75264 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -21,6 +21,7 @@ from dataclasses import dataclass, field as dc_field from src.blackboard.operations import Blackboard from src.blackboard.db import get_connection +from src.blackboard.models import Task from src.daemon.spawner import AgentBusyError from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry @@ -34,7 +35,6 @@ class BroadcastRound: responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY) round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮) - logger = logging.getLogger("moziplus-v2.ticker") @@ -391,7 +391,7 @@ class Ticker: MAX_ROUNDS = 5 # §4.5 防无限循环 async def _check_round_complete(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """检测 parent task 下所有 sub task 终态 → spawn 庞统 review 流程(§4.4): @@ -462,7 +462,7 @@ class Ticker: "Round %d review spawned for parent %s (subs: %s)", new_round, parent_id, summary ) - except Exception: + except Exception as e: logger.exception("Round check error for parent %s", parent_id) return reviewed @@ -531,9 +531,9 @@ Parent Task ID: {parent_task.id} """ async def _spawn_pangtong_review(self, parent_task, - review_prompt: str, - project_id: str, - new_round: int = 0) -> bool: + review_prompt: str, + project_id: str, + new_round: int = 0) -> bool: """Spawn 庞统进行 review 流程: @@ -543,6 +543,7 @@ Parent Task ID: {parent_task.id} """ try: agent_id = "pangtong-fujunshi" + session_id = f"review-{parent_task.id}-r{new_round}" # 构造 on_complete 回调:解析庞统结论,更新 parent 状态 async def _on_review_complete(aid: str, outcome: str): @@ -585,7 +586,7 @@ Parent Task ID: {parent_task.id} self._set_parent_reviewing(parent_task.id, project_id) return True return False - except Exception: + except Exception as e: logger.exception("Failed to spawn pangtong review for %s", parent_task.id) return False @@ -602,14 +603,14 @@ Parent Task ID: {parent_task.id} (parent_id,)) conn.commit() logger.info("Parent %s → reviewing (round review in progress)", - parent_id) + parent_id) finally: conn.close() except Exception: logger.exception("Failed to set parent %s to reviewing", parent_id) def _handle_review_conclusion(self, parent_id: str, project_id: str, - review_text: str, round_num: int): + review_text: str, round_num: int): """解析庞统 review 结论,更新 parent 状态 review_text 是庞统回复的文本(从 spawner session meta payloads 拼接)。 @@ -664,8 +665,8 @@ Parent Task ID: {parent_task.id} def _resolve_db_path(self, project_id: str) -> Path: """解析项目 DB 路径""" - import src.utils as _utils - return _utils.get_data_root() / project_id / "blackboard.db" + from src.utils import get_data_root + return get_data_root() / project_id / "blackboard.db" # ------------------------------------------------------------------ # @mention 通知处理 (v2.9 #01) @@ -674,7 +675,7 @@ Parent Task ID: {parent_task.id} MENTION_MAX_RETRIES = 5 async def _process_mentions(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 pending mentions → spawn 被 @ 的 Agent 流程(§3.4): @@ -766,8 +767,8 @@ Parent Task ID: {parent_task.id} from src.blackboard.blackboard import Blackboard bb2 = Blackboard(rdb_path) bb2.add_comment(_t_id, "daemon", - f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", - comment_type="review") + f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", + comment_type="review") logger.info("Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str) except Exception: logger.exception("Rebuttal on_complete failed for task %s", _t_id) @@ -804,7 +805,7 @@ Parent Task ID: {parent_task.id} # Agent 忙,不递增 retry_count,等下次 tick 自然重试 logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id) - except Exception: + except Exception as e: logger.exception("Mention processing error for agent %s", agent_id) for item in items: try: @@ -947,7 +948,7 @@ Parent Task ID: {parent_task.id} # ------------------------------------------------------------------ async def _dispatch_pending(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 pending 任务并调度 v3.0: 两条路径 @@ -1241,7 +1242,7 @@ Parent Task ID: {parent_task.id} return [aid for aid in all_agents if active.get(aid, 0) == 0] async def _dispatch_reviews(self, db_path: Path, - project_id: str) -> List[str]: + project_id: str) -> List[str]: """扫描 review 状态任务,检查是否有产出,调度审查 Agent""" # mail 任务不走 review 流程,直接跳过 if project_id == "_mail": @@ -1343,7 +1344,7 @@ Parent Task ID: {parent_task.id} ) reclaimed.append(task.id) logger.warning("Escalated %s: no taker after %d broadcasts", - task.id, retry_count) + task.id, retry_count) finally: conn.close() else: @@ -1422,7 +1423,7 @@ Parent Task ID: {parent_task.id} if ok: reclaimed.append(task.id) logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)", - task.id, elapsed) + task.id, elapsed) finally: conn.close() continue @@ -1439,7 +1440,7 @@ Parent Task ID: {parent_task.id} if ok: reclaimed.append(task.id) logger.warning("Task %s timed out (working %.1fm > %.1fm)", - task.id, elapsed, timeout_minutes) + task.id, elapsed, timeout_minutes) finally: conn.close() except (ValueError, TypeError): @@ -1500,7 +1501,7 @@ Parent Task ID: {parent_task.id} return True # 保守:查询失败假设有回复 def _check_recent_routing(self, db_path: Path, task_id: str, - action_type: str) -> bool: + action_type: str) -> bool: """检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)""" try: conn = get_connection(db_path) @@ -1578,11 +1579,11 @@ Parent Task ID: {parent_task.id} if recovery_report["total_recovered"] > 0: logger.info("Startup recovery: %d tasks recovered across %d projects", - recovery_report["total_recovered"], - len(recovery_report["projects"])) + recovery_report["total_recovered"], + len(recovery_report["projects"])) elif recovery_report["total_noop"] > 0: logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)", - recovery_report["total_noop"]) + recovery_report["total_noop"]) else: logger.info("Startup recovery: no non-terminal tasks found, clean start") @@ -1628,7 +1629,7 @@ Parent Task ID: {parent_task.id} return recovered, noop_count def _determine_recovery_action(self, conn, task, status: str, - db_path: Path) -> Optional[str]: + db_path: Path) -> Optional[str]: """根据黑板线索决定恢复动作,返回 None 表示不需要干预""" task_id = task["id"] diff --git a/src/main.py b/src/main.py index 4ebeeaa..5754acc 100644 --- a/src/main.py +++ b/src/main.py @@ -23,15 +23,7 @@ from src.daemon.health import HealthChecker from src.daemon.experience import ExperienceDistiller, ExperienceStore from src.daemon.inbox import InboxWatcher from src.daemon.guardrails import GuardrailEngine -import src.utils as _utils - -from src.api.blackboard_routes import router as blackboard_router -from src.api.checkpoint_routes import router as checkpoint_router -from src.api.daemon_routes import router as daemon_router -from src.api.project_routes import router as project_router -from src.api.sse_routes import router as sse_router -from src.api.mail_routes import router as mail_router -from src.api.toolchain_routes import router as toolchain_router +from src.utils import get_data_root logger = logging.getLogger("moziplus-v2") @@ -86,7 +78,7 @@ config = load_config() # 全局组件 # --------------------------------------------------------------------------- -DATA_ROOT = _utils.get_data_root() +DATA_ROOT = get_data_root() ticker: Optional[Ticker] = None @@ -199,6 +191,7 @@ async def lifespan(app: FastAPI): ) # ExperienceDistiller(经验自动蒸馏) + experience_config = config.get("experience", {}) experience_distiller = ExperienceDistiller( store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"), ) @@ -259,6 +252,14 @@ app.add_middleware( # API 路由注册 # --------------------------------------------------------------------------- +from src.api.blackboard_routes import router as blackboard_router +from src.api.checkpoint_routes import router as checkpoint_router +from src.api.daemon_routes import router as daemon_router +from src.api.project_routes import router as project_router +from src.api.sse_routes import router as sse_router +from src.api.mail_routes import router as mail_router +from src.api.toolchain_routes import router as toolchain_router + app.include_router(blackboard_router) app.include_router(checkpoint_router) app.include_router(daemon_router) @@ -267,6 +268,17 @@ app.include_router(sse_router) app.include_router(mail_router) app.include_router(toolchain_router) +# --------------------------------------------------------------------------- +# 健康检查端点 +# --------------------------------------------------------------------------- + + +@app.get("/api/healthz") +async def healthz(): + """轻量级健康检查,无需认证""" + return {"status": "ok"} + + # --------------------------------------------------------------------------- # 兼容端点 # --------------------------------------------------------------------------- @@ -288,17 +300,16 @@ async def list_projects_compat(): DIST_DIR = Path(__file__).parent / "frontend" / "dist" if DIST_DIR.exists(): # v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理) + import mimetypes _static_app = StaticFiles(directory=str(DIST_DIR), html=True) - + class CachedStaticFiles: """包装 StaticFiles,添加 Cache-Control 头""" - def __init__(self, app): self._app = app - + async def __call__(self, scope, receive, send): original_send = send - async def patched_send(message): if message.get("type") == "http.response.start": headers = dict(message.get("headers", [])) @@ -310,5 +321,5 @@ if DIST_DIR.exists(): message["headers"] = list(headers.items()) await original_send(message) await self._app(scope, receive, patched_send) - + app.mount("/", CachedStaticFiles(_static_app), name="frontend") diff --git a/src/utils.py b/src/utils.py index cf0d20e..9c3dac7 100644 --- a/src/utils.py +++ b/src/utils.py @@ -10,6 +10,7 @@ from __future__ import annotations import os from pathlib import Path +from typing import Optional def get_data_root() -> Path: diff --git a/tests/conftest.py b/tests/conftest.py index a1e4aca..734a97b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -55,6 +55,21 @@ def client_with_isolation(isolated_data_root): # ── E2E gate ── +def pytest_collection_modifyitems(config, items): + if not os.environ.get("RUN_INTEGRATION"): + skip_reason = "needs RUN_INTEGRATION=1" + remaining = [] + deselected = [] + for item in items: + if "integration" in item.keywords or "e2e" in item.keywords: + deselected.append(item) + else: + remaining.append(item) + if deselected: + config.hook.pytest_deselected(items=deselected) + items[:] = remaining + + skip_no_integration = pytest.mark.skipif( not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",