Compare commits

..

12 Commits

Author SHA1 Message Date
cfdaily 149921fb5f docs: add #19 toolchain context layers design
CI / lint (push) Successful in 7s
CI / test (push) Successful in 14s
CI / lint (pull_request) Successful in 7s
CI / notify-on-failure (push) Successful in 0s
CI / test (pull_request) Successful in 14s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-09 22:25:17 +08:00
pangtong-fujunshi 59068b8d2a Merge pull request 'fix: resolve all flake8 lint errors (118 → 0)' (#10) from fix/lint-cleanup into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-09 22:24:10 +08:00
cfdaily 242057dfd6 fix: remove dead code config.get experience
CI / lint (push) Successful in 6s
CI / test (push) Successful in 14s
CI / notify-on-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 13m39s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Failing after 14m58s
2026-06-09 22:23:58 +08:00
cfdaily 09a0928bbc fix: resolve all flake8 lint errors (118 → 0)
CI / lint (push) Successful in 8s
CI / lint (pull_request) Successful in 5s
CI / test (push) Failing after 8s
CI / test (pull_request) Failing after 8s
CI / notify-on-failure (push) Successful in 1s
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-09 16:43:41 +08:00
jiangwei-infra 62d8ced8ed Merge pull request 'fix(ci): install all test dependencies' (#8) from fix/ci-deps into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 3s
2026-06-09 14:53:50 +08:00
jiangwei-infra 51ccbbf4b5 fix(ci): install all test dependencies (fastapi, pydantic, pyyaml, etc.)
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 0s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 6s
2026-06-09 14:53:24 +08:00
jiangwei-infra fe24a86d7d Merge pull request 'fix(ci): install pytest directly instead of editable mode' (#7) from fix/ci-pytest into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 14:33:52 +08:00
jiangwei-infra 25c9cfd1ed fix(ci): install pytest directly instead of editable mode
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 0s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 6s
2026-06-09 14:33:28 +08:00
jiangwei-infra 5af0e0e91d Merge pull request 'fix(ci): use pyproject.toml instead of missing requirements.txt' (#6) from fix/ci-requirements into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-09 14:24:42 +08:00
jiangwei-infra 05246d6469 fix(ci): use pyproject.toml instead of missing requirements.txt
CI / lint (push) Failing after 7s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 5s
2026-06-09 14:24:02 +08:00
jiangwei-infra 90e657636c Merge pull request 'fix(ci): use /tmp/ci-venv-* to avoid host .venv conflict' (#5) from fix/ci-venv-path into main
Deploy / ci (push) Failing after 10s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 13:22:34 +08:00
jiangwei-infra cbdc965a0e fix(ci): use /tmp/ci-venv-* to avoid host .venv conflict
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (push) Successful in 1s
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-09 13:21:01 +08:00
31 changed files with 139 additions and 384 deletions
+6 -6
View File
@@ -29,12 +29,12 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet flake8
python3 -m venv /tmp/ci-venv-lint
/tmp/ci-venv-lint/bin/pip install --quiet flake8
- name: Lint with flake8
run: |
.venv/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
/tmp/ci-venv-lint/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
# ── Job 2: Test ──────────────────────────────────────
test:
@@ -45,12 +45,12 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet -r requirements.txt
python3 -m venv /tmp/ci-venv-test
/tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Run tests (exclude E2E)
run: |
.venv/bin/pytest tests/ -m "not e2e" -x -q
/tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
# ── Job 3: CI 失败通知 ───────────────────────────────
# v1.23 不支持 failure(),用 always() + shell 检查 commit status 替代
+4 -16
View File
@@ -1590,7 +1590,7 @@ daemon 内部 ───────┘ │ 5. 创建 Mail │
| 只处理白名单内的事件类型 | 未知的忽略 + 日志 |
| issue_comment 需判断来源 | 只处理 CI workflow 写的评论(按特定前缀匹配:`❌ **CI 失败**` 或统一后的 `[CI]` 前缀) |
| PR 作者/审查者必须是已知 Agent | 未知的忽略 + 日志 |
| 幂等:同一事件不重复创建 Mail | 双重去重:① delivery UUID`{event}-{delivery}`)标准幂等;② review 事件 payload 内容去重(`{event}:{pr_num}:{sender}:{sha256(body_or_content)[:16]}`),防御同一 review 被不同来源重复提交(2026-06-09 新增 |
| 幂等:同一事件不重复创建 Mail | `{x_gitea_event}-{x_gitea_delivery}` 去重(delivery ID 来自 `X-Gitea-Delivery` header |
---
@@ -2007,9 +2007,6 @@ CI workflow 已有 `notify-on-failure` jobci.yml),当前格式:
| 7 | 签名算法 | ✅ 已确认 | Gitea 使用 HMAC-SHA256,代码注释已补 |
| 8 | Webhook 作用范围 | ✅ 组织级 | Gitea 组织级 webhookHook ID=28),覆盖 sanguo 下所有仓库,新增仓库自动覆盖 |
| 9 | ALLOWED_HOST_LIST | ✅ 已修复 | Gitea 容器配置 `192.168.2.153, 127.0.0.1, localhost, 172.17.0.0/16, 192.168.2.0/24` |
| 10 | Gitea review payload 格式 | ✅ 姜维调研确认(2026-06-08 | Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| 11 | Spawner compact 检测窗口 | ✅ 已修复 | 窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| 12 | inform 类型 Mail crash 误标 done | ✅ 已修复 | `_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
---
@@ -2716,10 +2713,10 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| # | 条件 | 状态 | 谁确认 |
|---|------|------|--------|
| 1 | act-runner 已注册且 label = `macos-arm64` | ✅ PM2 托管(sanguo-act-runner, id=44),崩溃自动重启 | 姜维确认 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ✅ 姜维确认(sanguo/moziplus-v2 已配 CI_TOKEN | 姜维 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ⚠️ 需确认 | 姜维 |
| 3 | Gitea 组织级 Webhook 已启用(Hook ID=28 | ✅ 已确认 | 已确认 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ✅ 已写入各 Agent TOOLS.md,姜维确认 token 记录存在 | 庞统+姜维 |
| 5 | main 分支保护规则(Review 才能 merge | ✅ 姜维已配置(moziplus-v2 + sanguo_moziplus_v2,需1个approve | 姜维 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ⚠️ 待分配 | 庞统协调 |
| 5 | main 分支保护规则(Review 才能 merge | ⚠️ 需确认 | 姜维 |
| 6 | 禁止在 daemon 运行时跑全量 E2E | ✅ 已警告司马懿 | 已确认 |
> 第 5 点很关键——如果 main 分支没有保护规则,开发者可以直接 push main 跳过 Review。
@@ -2756,12 +2753,3 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| §17.6.4 | 新增 P3 端到端验证结果(S1-S6 逐项) |
| §17.6.4 | 新增调研发现:Review API 枚举值、PullRequestReview webhook 支持、act-runner PM2 托管 |
| §17.10 | #1 状态更新:act-runner 已纳入 PM2 托管 |
### v3.1 → v3.2 变更(工具链修复 + Mail 投递 bug 修复)
| 编号 | 变更内容 |
|------|----------|
| §16.4 | Review handler 双格式兼容:HANDLERS 注册表同时注册 `pull_request_review` / `pull_request_approved` 等多种事件名;`_handle_pull_request_review` 兼容 repo webhookreview.state/body/user)和 org webhookreview.type/content/sender)两种 payload 格式 |
| §16.8 #10 | Gitea v1.23.4 review payload 调研结论(姜维 2026-06-08):Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| §16.8 #11 | Spawner compact 检测窗口修复:窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| §16.8 #12 | inform 类型 Mail crash 误标 done bug 修复:`_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
-121
View File
@@ -1,121 +0,0 @@
# §18. 工具链端到端验证测试
> 日期:2026-06-09
> 状态:已完成 ✅
> 目标:用真实 Webhook 触发验证整条 Mail 通知链路
## 前置确认
- Gitea 用户名 ↔ Agent ID 映射:完全一致(admin, guanyu-dev, jiangwei-infra, pangtong-fujunshi, simayi-challenger, zhangfei-dev, zhaoyun-data
- Gitea 组织级 WebhookHook ID=28):姜维确认最近 5 条投递全部 is_succeed=1
- Daemon 在线:sanguo-moziplus-v2 运行中
- 测试仓库:sanguo/moziplus-v2
## 命名规范
- Issue 标题:`[E2E-TEST] xxx`
- PR 标题:`[E2E-TEST] xxx`
- 分支名:`test/e2e-<timestamp>`
## 验证步骤
| 步骤 | 操作 | 触发事件 | 预期 Mail 通知 | 验证点 |
|------|------|----------|---------------|--------|
| 1 | 创建 Issue `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev | issues (assigned) | zhangfei-dev 收到 "Issue 指派" Mail | Mail to/模板正确 |
| 2 | 开分支 `test/e2e-<ts>`,创建 PR `[E2E-TEST] Review请求测试` | pull_request (opened) | simayi-challenger 收到 "Review 请求" Mail | Mail to/风险级别/文件列表 |
| 3 | PR Review APPROVED | pull_request_review (approved) | PR 作者(pangtong-fujunshi) 收到 "Review 通过 ✓" Mail | result=通过 ✓ |
| 4 | PR Review REQUEST_CHANGES | pull_request_review (rejected) | PR 作者收到 "Review 驳回 ✗" Mail | result=驳回 ✗ |
| 5 | Issue 上发评论 `[CI] CI 失败 — 分支: test/e2e-xxx, 错误: build timeout` | issue_comment | Issue 作者收到 "CI 失败" Mail | 模板含分支/错误摘要 |
| 6 | 创建标题含"部署失败"的 Issue(无指派) | issues (opened) | jiangwei-infra + pangtong-fujunshi 各收到 "部署失败" Mail | 双收件人 |
| 7 | 关闭步骤 1 的 Issue,再发 CI 失败评论 | issue_comment (closed issue) | 不产生 Mail(负面测试) | handler 跳过 closed |
| 8 | 重发步骤 1 Webhook(相同 delivery ID | 重复事件 | 不产生新 Mail(幂等测试) | 返回 duplicate |
## 签名校验
已测试(GITEA_WEBHOOK_SECRET 已配置且生效):
- ✅ 正确签名:请求正常处理
- ✅ 无签名:返回 403 `signature verification failed`
## Review 意见来源
- 姜维(基础设施确认 + 边界验证建议)
- 司马懿(遗漏点补充 + 命名规范 + 风险防范)
---
## 执行记录
> 2026-06-09 00:40~00:50 CST
### 步骤 1Issue 指派 ✅
- 操作:创建 Issue #22 `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev
- Mail`mail-1780936736480`from=system, to=zhangfei-dev, title=`Issue 指派: [E2E-TEST] Issue指派测试`
- 模板渲染正确(含 Issue 链接、标签、描述、建议分支名)
### 步骤 2PR Review 请求 ✅
- 操作:创建分支 `test/e2e-1780936838`,创建 PR #23
- Mail`mail-1780936851715`from=system, to=simayi-challenger
- 模板含 PR 链接、标题、作者(pangtong-fujunshi)、分支、风险级别(standard)
- 附带:CI 失败通知 `mail-1780936876572`CI 自动触发,符合预期)
### 步骤 3Review APPROVED ✅
- 操作:用 simayi-challenger token 提交 APPROVED review
- Mail`mail-1780936968411`from=system, to=pangtong-fujunshi, title=`Review 通过 ✓`
- 描述含审查者(simayi-challenger)、review body
- ⚠️ 收到 2 封重复 Mailorg webhook + repo webhook 双触发)
### 步骤 4Review REQUEST_CHANGES ✅
- 操作:用 simayi-challenger token 提交 REQUEST_CHANGES review
- Mail`mail-1780936972207`from=system, to=pangtong-fujunshi, title=`Review 驳回 ✗`
- ⚠️ 同上,收到 2 封重复 Mail
### 步骤 5CI 失败评论 ✅
- 操作:在 Issue #22 发评论 `[CI] CI 失败 — 分支: test/e2e-1780936838, 错误: build timeout`
- Mail`mail-1780936994513`from=system, to=pangtong-fujunshi, title=`CI 失败: sanguo/moziplus-v2#22`
- 模板含分支提取和错误摘要
### 步骤 6:部署失败 Issue ✅
- 操作:创建 Issue #24 `[E2E-TEST] 部署失败: test deploy`(无指派)
- Mail`mail-1780936999660` to=jiangwei-infra, `mail-1780936999684` to=pangtong-fujunshi
- 双收件人验证通过 ✅
### 步骤 7:已关闭 Issue 负面测试 ✅
- 操作:关闭 Issue #22 后发 `[CI] CI 失败 — 应被过滤`
- 结果:未产生新 Mail ✅(只有步骤 5 的 1 封 CI Mail,步骤 7 的评论被正确过滤)
### 步骤 8:幂等测试 ✅
- 操作:构造带正确 HMAC-SHA256 签名的 Webhook,用同一 delivery ID `test-idempotency-002` 发两次
- 第一次:返回 `ok`,产生 Mail ✅
- 第二次:返回 `duplicate`,无新 Mail ✅
- 额外验证:不带签名的请求返回 403 `signature verification failed`(签名校验正常工作)
---
## 汇总
| 步骤 | 状态 | 备注 |
|------|------|------|
| 1. Issue 指派 | ✅ 通过 | Mail to/模板正确 |
| 2. PR Review 请求 | ✅ 通过 | Mail to/风险级别/文件列表正确 |
| 3. Review APPROVED | ✅ 通过 | E2E 测试中产生 2 封 Mail(根因已查明,非平台问题) |
| 4. Review REQUEST_CHANGES | ✅ 通过 | 同上 |
| 5. CI 失败评论 | ✅ 通过 | 分支提取正确 |
| 6. 部署失败 Issue | ✅ 通过 | 双收件人验证通过 |
| 7. 已关闭 Issue 过滤 | ✅ 通过 | 负面测试通过,无新 Mail |
| 8. 幂等测试 | ✅ 通过 | 第二次返回 duplicate,无新 Mail;签名校验正常拦截无签名请求 |
## 发现的问题
### Review 事件双 Mail(已修复)
- **现象**E2E 测试步骤 3/4 中 Review 事件产生 2 封 Mail
- **根因**(姜维深入调查确认):E2E 测试中庞统手动用 simayi token 提交了 Review,同时 simayi agent 收到 Review 请求 Mail 后也自主提交了 Review。是两次独立的 API 调用,**不是 Gitea bug 或平台配置问题**
- 姜维控制实验:一次 review API 调用只产生 1 个 hook_task
- Gitea 路由日志确认两次 POST 间隔 7 秒,payload 有差异(review_comments、updated_at 不同)
- 之前的错误分析("Gitea webhookNotifier + actionsNotifier 双投递")已被推翻:actionsNotifier 走 handleWorkflows() 不创建 hook_task
- **修复**:payload 内容去重作为防御性编程保留(`_is_duplicate` 新增内容去重 key = event + pr_num + sender + sha256(body_or_content)),司马懿 APPROVED
- **验证**PR #27 实测只产生 1 封 Mail ✅
### 根因分析教训
- 姜维第一次分析给出了错误根因(Gitea 双 notifier),第二次深入调查后自我纠正
- 庞统把姜维的第一次结论当事实汇报给主公,没有标注"这是姜维的调查结论,尚未独立验证"
- **改进**:SOUL.md 新增规则——推测 vs 事实显式标注、引用他人结论时标注来源、结论被推翻时及时更正
+4 -5
View File
@@ -11,10 +11,9 @@
| 场景 | 命令 | 耗时 | 说明 |
|------|------|------|------|
| **改了某个模块** | `pytest tests/unit/test_spawner.py` | <5s | 只跑改动的模块对应的单元测试 |
| **改了 API 层** | `RUN_INTEGRATION=1 pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest` | ~2min | 默认排除 integration 和 e2e |
| **含集成测试** | `RUN_INTEGRATION=1 pytest` | ~5min | integration 测试 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 e2e,真实 Agent |
| **改了 API 层** | `pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest -m "not e2e"` | ~2min | 不跑 E2E,验证不破坏现有功能 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 E2E,真实 Agent |
| **只跑 E2E 场景** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_scenarios.py` | ~30min | 串行,一个跑完再下一个 |
| **只跑 E2E 压力** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_stress.py` | ~10min | 并发测试 |
@@ -102,7 +101,7 @@ E2E(慢,真实 Agent) → 验证完整链路,需要 RUN_INTEGRATION=1
## 关键规则
1. **只有 E2E 会 spawn 真实 Agent**,单元和集成不会
2. **直接跑 `pytest` 是安全的**integration 和 e2e 全部被排除(需 `RUN_INTEGRATION=1` 才跑)
2. **不带 `RUN_INTEGRATION=1` 跑 `pytest` 是安全的**E2E 全部 skip
3. **E2E 场景测试串行**,一个完成再下一个,失败要分析根因再继续
4. **E2E 压力测试并行**,场景测试全通过后再跑
5. **测试数据用 `e2e-` 前缀**,atexit 兜底清理,手动清理见上方
+1 -3
View File
@@ -8,10 +8,8 @@ requires-python = ">=3.9"
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
"integration: integration tests (requires RUN_INTEGRATION=1)",
"e2e: end-to-end tests with real daemon + Agent (requires RUN_INTEGRATION=1)",
"integration: real agent tests (requires RUN_INTEGRATION=1)",
]
# Default deselection of integration/e2e handled in conftest.py pytest_collection_modifyitems
[tool.pyright]
venvPath = "."
+15 -13
View File
@@ -5,17 +5,17 @@ from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
from src.blackboard.registry import ProjectRegistry
from src.utils import get_data_root
import src.utils as _utils
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
@@ -27,7 +27,7 @@ def _validate_project(project_id: str) -> str:
"""校验 project_id,已知项目/虚拟项目放行,未知项目返回 400"""
if project_id in _VIRTUAL_PROJECTS:
return project_id
reg = ProjectRegistry(get_data_root())
reg = ProjectRegistry(_utils.get_data_root())
if reg.get_project(project_id):
return project_id
raise HTTPException(400, {
@@ -43,12 +43,12 @@ def _validate_project(project_id: str) -> str:
def _bb(project_id: str) -> Blackboard:
_validate_project(project_id)
return Blackboard(get_data_root() / project_id / "blackboard.db")
return Blackboard(_utils.get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
_validate_project(project_id)
return Queries(get_data_root() / project_id / "blackboard.db")
return Queries(_utils.get_data_root() / project_id / "blackboard.db")
# --- Tasks ---
@@ -100,7 +100,7 @@ async def create_task(project_id: str, body: Dict[str, Any]):
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
import sqlite3
db_path = get_data_root() / project_id / "blackboard.db"
db_path = _utils.get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
max_id_row = conn.execute(
@@ -240,7 +240,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
@@ -265,6 +265,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
# --- @mention 自动提取(#04 ---
_KNOWN_AGENT_IDS: list = []
def _init_agent_ids():
"""从配置文件加载 Agent ID 列表"""
global _KNOWN_AGENT_IDS
@@ -279,6 +280,7 @@ def _init_agent_ids():
except Exception:
_KNOWN_AGENT_IDS = []
def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
@@ -317,8 +319,8 @@ async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
merged_mentions = list(set(explicit_mentions + auto_mentions))
cid = bb.add_comment(task_id, body["author"], comment_body,
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
if merged_mentions:
bb.record_mentions(cid, task_id, merged_mentions)
# #10: SSE 通知前端黑板有新 comment
@@ -424,8 +426,8 @@ async def get_decisions(project_id: str, task_id: str):
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
@@ -435,7 +437,7 @@ async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
+2 -2
View File
@@ -10,7 +10,7 @@ from pydantic import BaseModel
from typing import Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
import src.utils as _utils
router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"])
@@ -33,7 +33,7 @@ class ResolveCheckpointRequest(BaseModel):
# ── 工具 ──
def _bb(project_id: str) -> Blackboard:
db_path = get_data_root() / project_id / "blackboard.db"
db_path = _utils.get_data_root() / project_id / "blackboard.db"
if not db_path.exists():
raise HTTPException(status_code=404, detail="Project not found")
return Blackboard(db_path)
+5 -4
View File
@@ -9,7 +9,7 @@ from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
@@ -17,7 +17,7 @@ from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.utils import get_data_root
import src.utils as _utils
def _get_valid_agents() -> set:
@@ -36,13 +36,14 @@ def _get_valid_agents() -> set:
# fallback:硬编码
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
router = APIRouter(prefix="/api/mail", tags=["mail"])
MAIL_PROJECT_ID = "_mail"
def _db_path() -> Path:
root = get_data_root()
root = _utils.get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -222,7 +223,7 @@ async def send_mail(body: Dict[str, Any]):
# A8: 只有原邮件的双方能回复(严格 1 对 1)
if from_agent not in (orig_from, orig_to):
raise HTTPException(400, f"只有邮件的发送者或接收者可以回复")
raise HTTPException(400, "只有邮件的发送者或接收者可以回复")
# A6/A7: 自动纠正 to → 原邮件发件者
to_agent = body.get("to", "").strip()
+4 -4
View File
@@ -3,18 +3,18 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.registry import ProjectRegistry
from src.utils import get_data_root
import src.utils as _utils
router = APIRouter(prefix="/api/projects", tags=["projects"])
def _registry() -> ProjectRegistry:
return ProjectRegistry(get_data_root())
return ProjectRegistry(_utils.get_data_root())
@router.get("")
@@ -76,7 +76,7 @@ async def list_projects():
async def create_project(body: Dict[str, Any]):
reg = _registry()
try:
info = reg.create_project(
reg.create_project(
body["id"], body["name"],
agents=body.get("agents", []),
description=body.get("description", ""),
+15 -70
View File
@@ -28,7 +28,7 @@ from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
from src.daemon.toolchain_templates import render_template
from src.utils import get_data_root
import src.utils as _utils
logger = logging.getLogger(__name__)
@@ -46,42 +46,17 @@ _TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
def _is_duplicate(event: str, delivery: str, payload: Optional[Dict[str, Any]] = None) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。
双重去重策略
1. delivery UUID 去重标准幂等
2. payload 内容去重应对 Gitea v1.23.4 webhookNotifier + actionsNotifier
对同一 review 生成不同 UUID 的双投递问题
"""
def _is_duplicate(event: str, delivery: str) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
# 检查 delivery UUID 去重
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
# 注意:Gitea webhookNotifier 用 review.bodyactionsNotifier 用 review.content
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
if payload and "review" in event:
pr_num = payload.get("pull_request", {}).get("number")
sender = payload.get("sender", {}).get("login")
review = payload.get("review", {})
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}"
if content_key in _delivery_cache:
logger.info("Content-based duplicate detected: %s PR#%s by %s", event, pr_num, sender)
return True
_delivery_cache.add(content_key)
_delivery_timestamps.append((now, content_key))
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
@@ -166,13 +141,12 @@ def _calc_risk_level(changed_files: List[str]) -> str:
# ---------------------------------------------------------------------------
MAIL_PROJECT_ID = "_mail"
def _mail_db_path() -> Path:
"""获取 Mail 数据库路径,确保目录存在。"""
root = get_data_root()
root = _utils.get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -283,12 +257,7 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
支持两种 payload 格式
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
"""
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。"""
review = payload.get("review")
if not review or not isinstance(review, dict):
logger.warning("pull_request_review event missing review field, skipping")
@@ -297,18 +266,7 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
if not pr or not isinstance(pr, dict):
logger.warning("pull_request_review event missing pull_request field, skipping")
return
# 兼容两种 payload 格式提取 state
state = review.get("state", "")
if not state:
# org webhook 格式:review.type = "pull_request_review_approved"
review_type = review.get("type", "")
type_map = {
"pull_request_review_approved": "APPROVED",
"pull_request_review_rejected": "REQUEST_CHANGES",
"pull_request_review_comment": "COMMENTED",
}
state = type_map.get(review_type, "")
# 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态
if state == "COMMENTED":
@@ -318,9 +276,8 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
# 兼容:org webhook 的 review 没有 user,从 sender 取
reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
reviewer = review.get("user", {}).get("login", "unknown")
review_body = review.get("body", "(无评论)")
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
@@ -414,12 +371,6 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
if not issue or not isinstance(issue, dict):
logger.warning("issue_comment event missing issue field, skipping")
return
# 已关闭的 Issue/PR 不再发送 CI 失败通知
if issue.get("state") == "closed":
logger.debug("Skipping CI failure notification for closed issue #%s", issue.get("number"))
return
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
@@ -449,12 +400,6 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
@@ -487,20 +432,20 @@ async def gitea_webhook(
logger.warning("Webhook signature verification failed")
return Response(status_code=403, content="signature verification failed")
# 3. 解析 payload(提前解析,用于幂等检查
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
-1
View File
@@ -4,7 +4,6 @@ from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Optional
def init_db(db_path: Path) -> None:
+1 -1
View File
@@ -3,7 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any, List, Optional
@dataclass
-2
View File
@@ -11,7 +11,6 @@ from typing import Any, Dict, List, Optional
from .db import (
VALID_TRANSITIONS,
VALID_STATUSES,
COMMENT_TYPES,
EVENT_TYPES,
OUTPUT_TYPES,
@@ -693,7 +692,6 @@ class Blackboard:
finally:
conn.close()
# ── Checkpoint CRUDM3 ──
def create_checkpoint(
-1
View File
@@ -355,4 +355,3 @@ class ProjectRegistry:
def reload(self) -> None:
"""兼容旧接口(SQLite 不需要 reload cache"""
pass
+4 -4
View File
@@ -9,14 +9,14 @@ from pathlib import Path
from typing import List, Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience
import src.utils as _utils
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
def _find_project_root() -> Path:
return get_data_root()
return _utils.get_data_root()
def _get_bb(project_id: str) -> Blackboard:
@@ -262,7 +262,7 @@ def build_admin_parser() -> argparse.ArgumentParser:
p_pc.add_argument("--description", default="")
# project list
p_pl = sub.add_parser("project-list", help="List projects")
sub.add_parser("project-list", help="List projects")
# project archive
p_pa = sub.add_parser("project-archive", help="Archive project")
+1 -2
View File
@@ -11,8 +11,7 @@ A 类 Skill 由引擎确定性注入全文,不靠 Description 触发。
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, List
logger = logging.getLogger("moziplus-v2.bootstrap")
+1 -1
View File
@@ -73,7 +73,7 @@ class ActiveAgentCounter:
cd = seconds if seconds is not None else self._default_cooldown_seconds
self._cooldown_until[agent_id] = time.time() + cd
logger.info("Cooldown set for %s: %.0fs (until %.0f)",
agent_id, cd, self._cooldown_until[agent_id])
agent_id, cd, self._cooldown_until[agent_id])
async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool:
"""三层检查:cooldown → global → per agent → per session key"""
+10 -18
View File
@@ -14,7 +14,6 @@ from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -22,7 +21,7 @@ from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
from src.daemon.router import AgentRouter, RouteDecision
from src.daemon.router import AgentRouter
logger = logging.getLogger("moziplus-v2.dispatcher")
@@ -194,6 +193,7 @@ class Dispatcher:
_task_id = task.id
_mail_db = db_path
_disp = self
def _mail_on_checks_passed():
nonlocal _mail_marked_working
if not _disp._mail_auto_working(_task_id, _mail_db):
@@ -203,8 +203,8 @@ class Dispatcher:
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config,
mode=decision.get("mode", ""),
spawn_type=action_type or "executor")
mode=decision.get("mode", ""),
spawn_type=action_type or "executor")
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
# counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证
@@ -218,7 +218,7 @@ class Dispatcher:
def _mail_on_complete(aid, outcome):
# 幻觉门控:检查是否有回复,自动标 done/failed
try:
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves, outcome=outcome)
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves)
except Exception as e:
logger.error("Mail %s: on_complete error: %s", _task_id, e)
on_complete = _mail_on_complete
@@ -269,8 +269,8 @@ class Dispatcher:
from src.blackboard.blackboard import Blackboard
bb = Blackboard(_task_db)
bb.add_comment(_task_id, "daemon",
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Task %s: review verdict=%s, notified assignee=%s",
_task_id, verdict_str, task_row["assignee"] if task_row else "?")
# 不标 done,保持 review 状态
@@ -576,7 +576,7 @@ class Dispatcher:
def _mail_oc_legacy(aid, outcome):
try:
_disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh, outcome=outcome)
_disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh)
except Exception as e:
logger.error("Mail %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _mail_oc_legacy
@@ -661,7 +661,7 @@ class Dispatcher:
logger.error("Mail %s: failed to revert to pending: %s", task_id, e)
def _mail_auto_complete(self, task_id: str, agent_id: str,
db_path: Path, must_haves: str, outcome=None) -> None:
db_path: Path, must_haves: str) -> None:
"""Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)"""
try:
# 解析 performative
@@ -712,14 +712,6 @@ class Dispatcher:
logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
return
# inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投
# Task 路径不受此 bug 影响(走 _task_auto_complete 独立逻辑)
if performative == "inform":
INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"}
if outcome not in INFORM_DONE_OUTCOMES:
logger.info("Mail %s: inform outcome=%s, skip auto-done", task_id, outcome)
return
# 标 done(重试 3 次)
for attempt in range(3):
try:
@@ -866,7 +858,7 @@ class Dispatcher:
logger.error("Task %s: mark status error: %s", task_id, e)
@staticmethod
def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3,
def _check_crash_limit(task_id: str, db_path: Path, limit: int = 3,
window_minutes: int = 30) -> bool:
"""v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。
+3 -3
View File
@@ -14,7 +14,7 @@ import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
logger = logging.getLogger("moziplus-v2.experience")
@@ -68,7 +68,7 @@ class Experience:
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Experience:
return cls(**{k: v for k, v in data.items() if k != "id"},
experience_id=data.get("id"))
experience_id=data.get("id"))
class ExperienceStore:
@@ -284,7 +284,7 @@ class ExperienceDistiller:
all_tags.append(task_type)
results = self.store.search(tags=all_tags if all_tags else None,
query=query, limit=limit)
query=query, limit=limit)
# 按置信度排序
results.sort(key=lambda e: e.confidence, reverse=True)
+1 -1
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
+2 -4
View File
@@ -9,9 +9,9 @@ from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict
from src.blackboard.db import get_connection, init_db
from src.blackboard.db import get_connection
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.health")
@@ -41,7 +41,6 @@ class HealthChecker:
{"healthy": bool, "zombie": bool, "stale_ticks": int,
"alert_written": bool, "resolved": bool}
"""
db_key = str(db_path)
result: Dict[str, Any] = {
"healthy": True,
"zombie": False,
@@ -58,7 +57,6 @@ class HealthChecker:
# 用 event count 变化判断是否有真实变更
conn = queries._conn()
try:
total_events = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
non_tick_events = conn.execute(
"SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' "
"AND event_type != 'agent_zombie_detected'"
+2 -3
View File
@@ -15,7 +15,6 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
@@ -57,7 +56,7 @@ class InboxWatcher:
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Inbox watcher started (path=%s, interval=%.1fs)",
self.inbox_path, self.watch_interval)
self.inbox_path, self.watch_interval)
async def stop(self) -> None:
"""停止监听"""
@@ -69,7 +68,7 @@ class InboxWatcher:
except asyncio.CancelledError:
pass
logger.info("Inbox watcher stopped (processed=%d, errors=%d)",
self._total_processed, self._total_errors)
self._total_processed, self._total_errors)
@property
def is_running(self) -> bool:
+1 -1
View File
@@ -108,7 +108,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
)
bb.create_task(notify_task)
logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)",
original_mail_id, target_agent, from_agent, reason, notify_id)
original_mail_id, target_agent, from_agent, reason, notify_id)
except Exception as e:
logger.warning("notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e)
+1 -4
View File
@@ -8,15 +8,12 @@ from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.review")
+1 -2
View File
@@ -10,12 +10,11 @@ from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger("moziplus-v2.skill")
+14 -20
View File
@@ -15,7 +15,7 @@ from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.db import get_connection, init_db
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.spawner")
@@ -163,6 +163,7 @@ class AgentBusyError(Exception):
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
"""
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
self.agent_id = agent_id
self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck
@@ -299,7 +300,7 @@ class AgentSpawner:
project_id, agent_id)
def _build_minimal_fallback(self, task_id, title, description, must_haves,
project_id, agent_id):
project_id, agent_id):
"""最小 fallback:只有任务上下文 + API 指令"""
task_section = f"""## 任务
{title}
@@ -311,7 +312,7 @@ class AgentSpawner:
return task_section + "\n\n---\n\n" + api_section
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
@@ -337,8 +338,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
"""
def _build_discussion_prompt(self, task_id: str, title: str,
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
"""构建讨论类 spawn prompt(§3.3 框架 + Boids)"""
goal_snapshot = description or title
constraints = must_haves or "(无特殊约束)"
@@ -379,9 +380,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
return router.agent_profiles.get(agent_id)
return None
def _build_mail_prompt(self, task_id: str, title: str, description: str,
must_haves: str, agent_id: str) -> str:
must_haves: str, agent_id: str) -> str:
"""构建 Mail 专用精简模板"""
# 解析 must_haves 获取 from 和 performative
from_agent = agent_id
@@ -575,7 +575,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid,
broadcast_task_ids=broadcast_task_ids)
broadcast_task_ids=broadcast_task_ids)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
@@ -848,13 +848,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
if outcome == "crashed" and self.counter:
self.counter.set_cooldown(agent_id, seconds=60)
logger.info("Crash cooldown set for %s: 60s (outcome=%s)", agent_id, outcome)
elif outcome in ("compact_failed", "process_crash", "session_stuck",
if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
logger.info("Error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
# F1: 不可恢复 outcome → 立刻标 failed + 写黑板
if outcome in ("auth_failed", "agent_error") and db_path and task_id:
logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome)
@@ -881,9 +878,6 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except Exception:
pass
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
@@ -1219,7 +1213,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
实测 50KB 在长对话中不够compact 记录被推出窗口导致漏检
正常扫描量不变从尾部往前扫遇到超过 15min timestamp break
"""
if not session_file or not pathlib.Path(session_file).exists():
if not session_file or not Path(session_file).exists():
return False
try:
from datetime import datetime, timezone
@@ -1428,7 +1422,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return defaults
def _update_retry_counts(self, db_path: Optional[Path],
task_id: Optional[str], counts: dict):
task_id: Optional[str], counts: dict):
"""将 retry counts 写回最新 task_attempt 的 metadata"""
if not db_path or not task_id:
return
@@ -1488,8 +1482,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
from src.blackboard.operations import Blackboard
bb = Blackboard(db_path)
cid = bb.add_comment(task_id, "daemon",
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
bb.record_mentions(cid, task_id, ["pangtong-fujunshi"])
logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason)
except Exception as e:
+1 -4
View File
@@ -9,14 +9,11 @@ from __future__ import annotations
import asyncio
import json
import logging
import subprocess
import uuid
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional
from src.blackboard.models import Event
logger = logging.getLogger("moziplus-v2.sse")
+25 -26
View File
@@ -21,7 +21,6 @@ from dataclasses import dataclass, field as dc_field
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.daemon.spawner import AgentBusyError
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
@@ -35,6 +34,7 @@ class BroadcastRound:
responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮)
logger = logging.getLogger("moziplus-v2.ticker")
@@ -391,7 +391,7 @@ class Ticker:
MAX_ROUNDS = 5 # §4.5 防无限循环
async def _check_round_complete(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""检测 parent task 下所有 sub task 终态 → spawn 庞统 review
流程§4.4
@@ -462,7 +462,7 @@ class Ticker:
"Round %d review spawned for parent %s (subs: %s)",
new_round, parent_id, summary
)
except Exception as e:
except Exception:
logger.exception("Round check error for parent %s", parent_id)
return reviewed
@@ -531,9 +531,9 @@ Parent Task ID: {parent_task.id}
"""
async def _spawn_pangtong_review(self, parent_task,
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
"""Spawn 庞统进行 review
流程
@@ -543,7 +543,6 @@ Parent Task ID: {parent_task.id}
"""
try:
agent_id = "pangtong-fujunshi"
session_id = f"review-{parent_task.id}-r{new_round}"
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
async def _on_review_complete(aid: str, outcome: str):
@@ -586,7 +585,7 @@ Parent Task ID: {parent_task.id}
self._set_parent_reviewing(parent_task.id, project_id)
return True
return False
except Exception as e:
except Exception:
logger.exception("Failed to spawn pangtong review for %s", parent_task.id)
return False
@@ -603,14 +602,14 @@ Parent Task ID: {parent_task.id}
(parent_id,))
conn.commit()
logger.info("Parent %s → reviewing (round review in progress)",
parent_id)
parent_id)
finally:
conn.close()
except Exception:
logger.exception("Failed to set parent %s to reviewing", parent_id)
def _handle_review_conclusion(self, parent_id: str, project_id: str,
review_text: str, round_num: int):
review_text: str, round_num: int):
"""解析庞统 review 结论,更新 parent 状态
review_text 是庞统回复的文本 spawner session meta payloads 拼接
@@ -665,8 +664,8 @@ Parent Task ID: {parent_task.id}
def _resolve_db_path(self, project_id: str) -> Path:
"""解析项目 DB 路径"""
from src.utils import get_data_root
return get_data_root() / project_id / "blackboard.db"
import src.utils as _utils
return _utils.get_data_root() / project_id / "blackboard.db"
# ------------------------------------------------------------------
# @mention 通知处理 (v2.9 #01)
@@ -675,7 +674,7 @@ Parent Task ID: {parent_task.id}
MENTION_MAX_RETRIES = 5
async def _process_mentions(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 pending mentions → spawn 被 @ 的 Agent
流程§3.4
@@ -767,8 +766,8 @@ Parent Task ID: {parent_task.id}
from src.blackboard.blackboard import Blackboard
bb2 = Blackboard(rdb_path)
bb2.add_comment(_t_id, "daemon",
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
except Exception:
logger.exception("Rebuttal on_complete failed for task %s", _t_id)
@@ -805,7 +804,7 @@ Parent Task ID: {parent_task.id}
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id)
except Exception as e:
except Exception:
logger.exception("Mention processing error for agent %s", agent_id)
for item in items:
try:
@@ -948,7 +947,7 @@ Parent Task ID: {parent_task.id}
# ------------------------------------------------------------------
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 pending 任务并调度
v3.0: 两条路径
@@ -1242,7 +1241,7 @@ Parent Task ID: {parent_task.id}
return [aid for aid in all_agents if active.get(aid, 0) == 0]
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
# mail 任务不走 review 流程,直接跳过
if project_id == "_mail":
@@ -1344,7 +1343,7 @@ Parent Task ID: {parent_task.id}
)
reclaimed.append(task.id)
logger.warning("Escalated %s: no taker after %d broadcasts",
task.id, retry_count)
task.id, retry_count)
finally:
conn.close()
else:
@@ -1423,7 +1422,7 @@ Parent Task ID: {parent_task.id}
if ok:
reclaimed.append(task.id)
logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)",
task.id, elapsed)
task.id, elapsed)
finally:
conn.close()
continue
@@ -1440,7 +1439,7 @@ Parent Task ID: {parent_task.id}
if ok:
reclaimed.append(task.id)
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
task.id, elapsed, timeout_minutes)
task.id, elapsed, timeout_minutes)
finally:
conn.close()
except (ValueError, TypeError):
@@ -1501,7 +1500,7 @@ Parent Task ID: {parent_task.id}
return True # 保守:查询失败假设有回复
def _check_recent_routing(self, db_path: Path, task_id: str,
action_type: str) -> bool:
action_type: str) -> bool:
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
try:
conn = get_connection(db_path)
@@ -1579,11 +1578,11 @@ Parent Task ID: {parent_task.id}
if recovery_report["total_recovered"] > 0:
logger.info("Startup recovery: %d tasks recovered across %d projects",
recovery_report["total_recovered"],
len(recovery_report["projects"]))
recovery_report["total_recovered"],
len(recovery_report["projects"]))
elif recovery_report["total_noop"] > 0:
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
recovery_report["total_noop"])
else:
logger.info("Startup recovery: no non-terminal tasks found, clean start")
@@ -1629,7 +1628,7 @@ Parent Task ID: {parent_task.id}
return recovered, noop_count
def _determine_recovery_action(self, conn, task, status: str,
db_path: Path) -> Optional[str]:
db_path: Path) -> Optional[str]:
"""根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
task_id = task["id"]
+15 -26
View File
@@ -23,7 +23,15 @@ from src.daemon.health import HealthChecker
from src.daemon.experience import ExperienceDistiller, ExperienceStore
from src.daemon.inbox import InboxWatcher
from src.daemon.guardrails import GuardrailEngine
from src.utils import get_data_root
import src.utils as _utils
from src.api.blackboard_routes import router as blackboard_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.mail_routes import router as mail_router
from src.api.toolchain_routes import router as toolchain_router
logger = logging.getLogger("moziplus-v2")
@@ -78,7 +86,7 @@ config = load_config()
# 全局组件
# ---------------------------------------------------------------------------
DATA_ROOT = get_data_root()
DATA_ROOT = _utils.get_data_root()
ticker: Optional[Ticker] = None
@@ -191,7 +199,6 @@ async def lifespan(app: FastAPI):
)
# ExperienceDistiller(经验自动蒸馏)
experience_config = config.get("experience", {})
experience_distiller = ExperienceDistiller(
store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"),
)
@@ -252,14 +259,6 @@ app.add_middleware(
# API 路由注册
# ---------------------------------------------------------------------------
from src.api.blackboard_routes import router as blackboard_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.mail_routes import router as mail_router
from src.api.toolchain_routes import router as toolchain_router
app.include_router(blackboard_router)
app.include_router(checkpoint_router)
app.include_router(daemon_router)
@@ -268,17 +267,6 @@ app.include_router(sse_router)
app.include_router(mail_router)
app.include_router(toolchain_router)
# ---------------------------------------------------------------------------
# 健康检查端点
# ---------------------------------------------------------------------------
@app.get("/api/healthz")
async def healthz():
"""轻量级健康检查,无需认证"""
return {"status": "ok"}
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
@@ -300,16 +288,17 @@ async def list_projects_compat():
DIST_DIR = Path(__file__).parent / "frontend" / "dist"
if DIST_DIR.exists():
# v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理)
import mimetypes
_static_app = StaticFiles(directory=str(DIST_DIR), html=True)
class CachedStaticFiles:
"""包装 StaticFiles,添加 Cache-Control 头"""
def __init__(self, app):
self._app = app
async def __call__(self, scope, receive, send):
original_send = send
async def patched_send(message):
if message.get("type") == "http.response.start":
headers = dict(message.get("headers", []))
@@ -321,5 +310,5 @@ if DIST_DIR.exists():
message["headers"] = list(headers.items())
await original_send(message)
await self._app(scope, receive, patched_send)
app.mount("/", CachedStaticFiles(_static_app), name="frontend")
-1
View File
@@ -10,7 +10,6 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
def get_data_root() -> Path:
-15
View File
@@ -55,21 +55,6 @@ def client_with_isolation(isolated_data_root):
# ── E2E gate ──
def pytest_collection_modifyitems(config, items):
if not os.environ.get("RUN_INTEGRATION"):
skip_reason = "needs RUN_INTEGRATION=1"
remaining = []
deselected = []
for item in items:
if "integration" in item.keywords or "e2e" in item.keywords:
deselected.append(item)
else:
remaining.append(item)
if deselected:
config.hook.pytest_deselected(items=deselected)
items[:] = remaining
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",