Compare commits

...

17 Commits

Author SHA1 Message Date
cfdaily b90b7b37c7 fix(test): e2e test 在 collection 阶段跳过(不 import 安装目录)
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
根因: test_e2e_v27.py 的 skipif 只标记了函数级别,pytest collection 阶段
仍会 import 该文件,触发 sys.path.insert 指向安装目录的 spawner.py。
如果安装目录有 merge conflict 残留,整个 test job crash。

修复: 将 skipif 加入 pytestmark 级别,collection 阶段即跳过。
2026-06-10 07:52:41 +08:00
jiangwei-infra 672fadfee4 Merge pull request 'fix: deploy.yml requirements.txt + frontend resumed_from TS编译' (#18) from fix/deploy-workflow into main
Deploy / ci (push) Successful in 10s
Deploy / deploy (push) Failing after 11s
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 07:21:24 +08:00
cfdaily f380b5f92d fix(frontend): V2Task 添加 resumed_from 字段
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
deploy 时 TypeScript 编译报 TS2339: Property 'resumed_from' does not exist on type 'V2Task'。
DB 表有此字段但 TS interface 遗漏。
2026-06-10 07:20:24 +08:00
jiangwei-infra 228a95b9fa Merge pull request 'fix(ci): deploy.yml 用 /tmp/ci-venv 替代 requirements.txt' (#17) from fix/deploy-workflow into main
Deploy / ci (push) Successful in 23s
Deploy / deploy (push) Failing after 9s
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-10 07:15:39 +08:00
cfdaily 405b7147a7 fix(ci): deploy.yml 用 /tmp/ci-venv + 直接 pip install 替代 requirements.txt
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 1s
仓库没有 requirements.txt,deploy workflow 每次 push 到 main 都报错。
改为与 ci.yml 一致的方式:/tmp/ci-venv + 直接 pip install 依赖。
2026-06-10 07:14:29 +08:00
jiangwei-infra b876159b52 Merge pull request 'fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)' (#16) from fix/lint-regression into main
Deploy / ci (push) Failing after 8s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 07:09:44 +08:00
cfdaily d58e38d58f fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。
修复内容:
- autoflake 移除未使用导入/变量
- autopep8 修复缩进/空格
- 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量)
- 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
2026-06-09 23:53:29 +08:00
pangtong-fujunshi 7184079a75 Merge pull request 'fix(spawner): A13 exit=0 always completed' (#15) from fix/a13-exit0-completed into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
2026-06-09 23:42:05 +08:00
cfdaily fc9b66b905 docs(#08): update A13 revised - exit=0 always completed
CI / lint (pull_request) Failing after 9s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
Merge old A12/A13 into single A13 revised: trust exit_code=0
regardless of stdout/JSON output. Old logic caused inform Mail
infinite retry loop.
2026-06-09 23:41:53 +08:00
cfdaily 5bb220d237 fix(spawner): A13 exit=0 always completed, not agent_error
exit=0 means process exited normally. Trust the exit code regardless
of stdout/JSON output or task_status. Old logic misclassified inform
Mail completions as agent_error, causing infinite retry loops.

Includes test update: test_task_status_pending expects completed.
2026-06-09 23:41:53 +08:00
cfdaily f7fbdac89c chore: simayi-approved changes - lint fixes, toolchain improvements, healthz
All changes reviewed and APPROVED in PR #12 (Review ID: 40):
- toolchain_routes: webhook repo/org format compat, content dedup (sha256), closed issue filter
- dispatcher: inform mail crash 误标 done 修复
- ticker: cleanup and improvements
- healthz endpoint
- conftest: integration/e2e deselect markers
- docs: design docs, test-guide updates
- various lint/whitespace fixes across 30 files
2026-06-09 23:41:53 +08:00
cfdaily a1a4d7c5a7 docs: #19 adopt simayi review suggestions (v1.1) 2026-06-09 23:41:53 +08:00
jiangwei-infra 717dbc446a Merge pull request 'fix(CI): notify竞态修复 + 双倍触发去重 (PR #12 rebase, reviewed & approved)' (#14) from fix/ci-dedup-v2 into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 23:36:19 +08:00
cfdaily ee1ef23ace fix(spawner): crash cooldown分级 + inform mail crash误标done修复
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
- crashed outcome cooldown 60s(vs 其他 300s)
- import init_db
- whitespace/lint fixes
2026-06-09 23:35:02 +08:00
cfdaily 20b3b5facb fix(ci): 修复notify竞态条件 - 用needs.result替代commit status查询
根因:notify-on-failure job 通过 commit status API 查询结果时,
自身的 pending status 会污染查询结果(竞态条件):
1. lint/test 都 success
2. notify 开始运行,自身状态 pending 写入 commit status
3. notify 查询 commit status → 看到 pending(自己的)≠ success
4. 误发 [CI] 失败 评论 + webhook 触发 Mail 通知

修复方案:
- 不再查询 commit status API
- 直接用 needs.lint.result 和 needs.test.result 判断
- 只有明确的 failure 才发通知
- 同时去掉 push 触发避免双倍运行
2026-06-09 23:34:44 +08:00
cfdaily 05201d778e fix(ci): 去掉push触发避免双倍触发 + 修复notify误报
1. 触发器:去掉 push,只保留 pull_request(opened, synchronize)
   - 每次 push 到 PR 分支不再跑 2 次 CI
2. notify-on-failure:只有明确的 failure 状态才发通知
   - 之前:空状态/unknown/pending 都触发通知(误报根因)
   - 现在:只有 STATUS=failure 才发通知
3. venv 路径:统一用 /tmp/ci-venv-lint 和 /tmp/ci-venv-test
   - 避免 host 模式下与开发目录 .venv 冲突
2026-06-09 23:34:41 +08:00
pangtong-fujunshi 5b2c42687a Merge pull request 'docs: add #19 toolchain context layers design' (#11) from docs/19-toolchain-context-layers-v2 into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 22:26:05 +08:00
35 changed files with 1092 additions and 391 deletions
+18 -17
View File
@@ -1,9 +1,10 @@
# CI 管道 — moziplus v2.0
#
# 触发条件:
# - push(非 main 分支)
# - pull_requestopened, synchronize
#
# 注意:只保留 pull_request 触发,避免 push + pull_request 双倍触发
#
# Gitea v1.23.4 限制注意:
# - 不支持 failure() 表达式,用 always() + shell 条件判断替代
# - 不支持 concurrency / continue-on-error / timeout-minutes / permissions
@@ -13,10 +14,6 @@
name: CI
on:
push:
branches:
- '**'
- '!main'
pull_request:
types: [opened, synchronize]
@@ -53,7 +50,8 @@ jobs:
/tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
# ── Job 3: CI 失败通知 ───────────────────────────────
# v1.23 不支持 failure(),用 always() + shell 检查 commit status 替代
# 使用 needs.<job>.result 直接判断,不查询 commit status API
# 根因:notify 自身的 pending status 会污染 commit status 查询结果(竞态条件)
notify-on-failure:
runs-on: macos-arm64
needs: [lint, test]
@@ -62,31 +60,34 @@ jobs:
- name: Check results and notify
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
LINT_RESULT: ${{ needs.lint.result }}
TEST_RESULT: ${{ needs.test.result }}
run: |
# 查询当前 commit 的 status
STATUS=$(curl -sf \
-H "Authorization: token $GITEA_TOKEN" \
"${{ gitea.api_url }}/repos/${{ gitea.repository }}/commits/${{ gitea.sha }}/status" \
| python3 -c "import sys,json; print(json.load(sys.stdin).get('state',''))" 2>/dev/null || echo "")
echo "Lint result: $LINT_RESULT"
echo "Test result: $TEST_RESULT"
echo "Commit status: $STATUS"
if [ "$STATUS" != "success" ]; then
echo "CI failed or status unknown, sending notification..."
# 只有 lint 或 test 明确失败时才发通知
if [ "$LINT_RESULT" = "failure" ] || [ "$TEST_RESULT" = "failure" ]; then
echo "CI has failures, sending notification..."
# 如果是 PR 事件,写评论通知
PR_NUMBER="${{ gitea.event.pull_request.number }}"
if [ -n "$PR_NUMBER" ]; then
# 构建失败摘要
FAILED_JOBS=""
[ "$LINT_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}lint "
[ "$TEST_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}test "
curl -sf -X POST \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
"${{ gitea.api_url }}/repos/${{ gitea.repository }}/issues/${PR_NUMBER}/comments" \
-d "{\"body\": \"[CI] 失败\\n\\n分支: ${{ gitea.ref_name }}\\n触发 commit: \`${{ gitea.sha }}\`\\n请检查 CI 日志并修复。\"}" \
-d "{\"body\": \"[CI] 失败\\n\\n分支: ${{ gitea.ref_name }}\\n触发 commit: \`${{ gitea.sha }}\`\\n失败 Job: ${FAILED_JOBS}\\n请检查 CI 日志并修复。\"}" \
|| echo "Failed to post PR comment"
echo "PR comment posted."
else
echo "Not a PR event, skipping PR comment."
fi
else
echo "CI passed, no notification needed."
echo "No explicit failures (results: lint=$LINT_RESULT, test=$TEST_RESULT), no notification needed."
fi
+4 -4
View File
@@ -23,16 +23,16 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet -r requirements.txt
python3 -m venv /tmp/ci-venv-deploy
/tmp/ci-venv-deploy/bin/pip install --quiet flake8 fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Lint
run: |
.venv/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
/tmp/ci-venv-deploy/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
- name: Unit & Integration Tests
run: |
.venv/bin/pytest tests/ -m "not e2e" -x -q
/tmp/ci-venv-deploy/bin/pytest tests/ -m "not e2e" -x -q
# ── Job 2: 部署 ─────────────────────────────────────
deploy:
@@ -110,8 +110,8 @@ TCP 握手只能检测进程端口是否监听,无法检测 Gateway **业务
| 编号 | 条件 | outcome | 可恢复? | 处理 |
|------|------|---------|----------|------|
| A12 | exit=0 + task_status ∈ {done, review} | completed | — | 正常完成 |
| A13 | exit=0 + task_status ∉ {done, review} | agent_error | ❌ | 标 failed + 原因写黑板 |
| A12 | ~~已合并到 A13 revised~~ | — | — | 见下方 A13 revised |
| **A13 revised** | exit=0(无 JSON 输出) | completed | — | 信任进程退出码,exit=0 即正常完成。旧逻辑按 task_status 区分,非终态判 agent_error → 导致 inform Mail 永不标 done,与 dispatcher inform auto-done 形成死循环 |
| **A14** | exit=130 (SIGINT) 或 exit=143 (SIGTERM) | interrupted | ✅ | retry |
| **A15** | exit≠0 + stderr 含 network 关键字 | gateway_unreachable | ✅ | retry + cooldown 30s |
| **A16** | exit≠0 + stderr 含 compact 关键字 | compact_interrupted | ✅ | retry + cooldown 60s |
+16 -4
View File
@@ -1590,7 +1590,7 @@ daemon 内部 ───────┘ │ 5. 创建 Mail │
| 只处理白名单内的事件类型 | 未知的忽略 + 日志 |
| issue_comment 需判断来源 | 只处理 CI workflow 写的评论(按特定前缀匹配:`❌ **CI 失败**` 或统一后的 `[CI]` 前缀) |
| PR 作者/审查者必须是已知 Agent | 未知的忽略 + 日志 |
| 幂等:同一事件不重复创建 Mail | `{x_gitea_event}-{x_gitea_delivery}` 去重(delivery ID 来自 `X-Gitea-Delivery` header |
| 幂等:同一事件不重复创建 Mail | 双重去重:① delivery UUID`{event}-{delivery}`)标准幂等;② review 事件 payload 内容去重(`{event}:{pr_num}:{sender}:{sha256(body_or_content)[:16]}`),防御同一 review 被不同来源重复提交(2026-06-09 新增 |
---
@@ -2007,6 +2007,9 @@ CI workflow 已有 `notify-on-failure` jobci.yml),当前格式:
| 7 | 签名算法 | ✅ 已确认 | Gitea 使用 HMAC-SHA256,代码注释已补 |
| 8 | Webhook 作用范围 | ✅ 组织级 | Gitea 组织级 webhookHook ID=28),覆盖 sanguo 下所有仓库,新增仓库自动覆盖 |
| 9 | ALLOWED_HOST_LIST | ✅ 已修复 | Gitea 容器配置 `192.168.2.153, 127.0.0.1, localhost, 172.17.0.0/16, 192.168.2.0/24` |
| 10 | Gitea review payload 格式 | ✅ 姜维调研确认(2026-06-08 | Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| 11 | Spawner compact 检测窗口 | ✅ 已修复 | 窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| 12 | inform 类型 Mail crash 误标 done | ✅ 已修复 | `_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
---
@@ -2713,10 +2716,10 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| # | 条件 | 状态 | 谁确认 |
|---|------|------|--------|
| 1 | act-runner 已注册且 label = `macos-arm64` | ✅ PM2 托管(sanguo-act-runner, id=44),崩溃自动重启 | 姜维确认 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ⚠️ 需确认 | 姜维 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ✅ 姜维确认(sanguo/moziplus-v2 已配 CI_TOKEN | 姜维 |
| 3 | Gitea 组织级 Webhook 已启用(Hook ID=28 | ✅ 已确认 | 已确认 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ⚠️ 待分配 | 庞统协调 |
| 5 | main 分支保护规则(Review 才能 merge | ⚠️ 需确认 | 姜维 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ✅ 已写入各 Agent TOOLS.md,姜维确认 token 记录存在 | 庞统+姜维 |
| 5 | main 分支保护规则(Review 才能 merge | ✅ 姜维已配置(moziplus-v2 + sanguo_moziplus_v2,需1个approve | 姜维 |
| 6 | 禁止在 daemon 运行时跑全量 E2E | ✅ 已警告司马懿 | 已确认 |
> 第 5 点很关键——如果 main 分支没有保护规则,开发者可以直接 push main 跳过 Review。
@@ -2753,3 +2756,12 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| §17.6.4 | 新增 P3 端到端验证结果(S1-S6 逐项) |
| §17.6.4 | 新增调研发现:Review API 枚举值、PullRequestReview webhook 支持、act-runner PM2 托管 |
| §17.10 | #1 状态更新:act-runner 已纳入 PM2 托管 |
### v3.1 → v3.2 变更(工具链修复 + Mail 投递 bug 修复)
| 编号 | 变更内容 |
|------|----------|
| §16.4 | Review handler 双格式兼容:HANDLERS 注册表同时注册 `pull_request_review` / `pull_request_approved` 等多种事件名;`_handle_pull_request_review` 兼容 repo webhookreview.state/body/user)和 org webhookreview.type/content/sender)两种 payload 格式 |
| §16.8 #10 | Gitea v1.23.4 review payload 调研结论(姜维 2026-06-08):Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| §16.8 #11 | Spawner compact 检测窗口修复:窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| §16.8 #12 | inform 类型 Mail crash 误标 done bug 修复:`_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
+121
View File
@@ -0,0 +1,121 @@
# §18. 工具链端到端验证测试
> 日期:2026-06-09
> 状态:已完成 ✅
> 目标:用真实 Webhook 触发验证整条 Mail 通知链路
## 前置确认
- Gitea 用户名 ↔ Agent ID 映射:完全一致(admin, guanyu-dev, jiangwei-infra, pangtong-fujunshi, simayi-challenger, zhangfei-dev, zhaoyun-data
- Gitea 组织级 WebhookHook ID=28):姜维确认最近 5 条投递全部 is_succeed=1
- Daemon 在线:sanguo-moziplus-v2 运行中
- 测试仓库:sanguo/moziplus-v2
## 命名规范
- Issue 标题:`[E2E-TEST] xxx`
- PR 标题:`[E2E-TEST] xxx`
- 分支名:`test/e2e-<timestamp>`
## 验证步骤
| 步骤 | 操作 | 触发事件 | 预期 Mail 通知 | 验证点 |
|------|------|----------|---------------|--------|
| 1 | 创建 Issue `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev | issues (assigned) | zhangfei-dev 收到 "Issue 指派" Mail | Mail to/模板正确 |
| 2 | 开分支 `test/e2e-<ts>`,创建 PR `[E2E-TEST] Review请求测试` | pull_request (opened) | simayi-challenger 收到 "Review 请求" Mail | Mail to/风险级别/文件列表 |
| 3 | PR Review APPROVED | pull_request_review (approved) | PR 作者(pangtong-fujunshi) 收到 "Review 通过 ✓" Mail | result=通过 ✓ |
| 4 | PR Review REQUEST_CHANGES | pull_request_review (rejected) | PR 作者收到 "Review 驳回 ✗" Mail | result=驳回 ✗ |
| 5 | Issue 上发评论 `[CI] CI 失败 — 分支: test/e2e-xxx, 错误: build timeout` | issue_comment | Issue 作者收到 "CI 失败" Mail | 模板含分支/错误摘要 |
| 6 | 创建标题含"部署失败"的 Issue(无指派) | issues (opened) | jiangwei-infra + pangtong-fujunshi 各收到 "部署失败" Mail | 双收件人 |
| 7 | 关闭步骤 1 的 Issue,再发 CI 失败评论 | issue_comment (closed issue) | 不产生 Mail(负面测试) | handler 跳过 closed |
| 8 | 重发步骤 1 Webhook(相同 delivery ID | 重复事件 | 不产生新 Mail(幂等测试) | 返回 duplicate |
## 签名校验
已测试(GITEA_WEBHOOK_SECRET 已配置且生效):
- ✅ 正确签名:请求正常处理
- ✅ 无签名:返回 403 `signature verification failed`
## Review 意见来源
- 姜维(基础设施确认 + 边界验证建议)
- 司马懿(遗漏点补充 + 命名规范 + 风险防范)
---
## 执行记录
> 2026-06-09 00:40~00:50 CST
### 步骤 1Issue 指派 ✅
- 操作:创建 Issue #22 `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev
- Mail`mail-1780936736480`from=system, to=zhangfei-dev, title=`Issue 指派: [E2E-TEST] Issue指派测试`
- 模板渲染正确(含 Issue 链接、标签、描述、建议分支名)
### 步骤 2PR Review 请求 ✅
- 操作:创建分支 `test/e2e-1780936838`,创建 PR #23
- Mail`mail-1780936851715`from=system, to=simayi-challenger
- 模板含 PR 链接、标题、作者(pangtong-fujunshi)、分支、风险级别(standard)
- 附带:CI 失败通知 `mail-1780936876572`CI 自动触发,符合预期)
### 步骤 3Review APPROVED ✅
- 操作:用 simayi-challenger token 提交 APPROVED review
- Mail`mail-1780936968411`from=system, to=pangtong-fujunshi, title=`Review 通过 ✓`
- 描述含审查者(simayi-challenger)、review body
- ⚠️ 收到 2 封重复 Mailorg webhook + repo webhook 双触发)
### 步骤 4Review REQUEST_CHANGES ✅
- 操作:用 simayi-challenger token 提交 REQUEST_CHANGES review
- Mail`mail-1780936972207`from=system, to=pangtong-fujunshi, title=`Review 驳回 ✗`
- ⚠️ 同上,收到 2 封重复 Mail
### 步骤 5CI 失败评论 ✅
- 操作:在 Issue #22 发评论 `[CI] CI 失败 — 分支: test/e2e-1780936838, 错误: build timeout`
- Mail`mail-1780936994513`from=system, to=pangtong-fujunshi, title=`CI 失败: sanguo/moziplus-v2#22`
- 模板含分支提取和错误摘要
### 步骤 6:部署失败 Issue ✅
- 操作:创建 Issue #24 `[E2E-TEST] 部署失败: test deploy`(无指派)
- Mail`mail-1780936999660` to=jiangwei-infra, `mail-1780936999684` to=pangtong-fujunshi
- 双收件人验证通过 ✅
### 步骤 7:已关闭 Issue 负面测试 ✅
- 操作:关闭 Issue #22 后发 `[CI] CI 失败 — 应被过滤`
- 结果:未产生新 Mail ✅(只有步骤 5 的 1 封 CI Mail,步骤 7 的评论被正确过滤)
### 步骤 8:幂等测试 ✅
- 操作:构造带正确 HMAC-SHA256 签名的 Webhook,用同一 delivery ID `test-idempotency-002` 发两次
- 第一次:返回 `ok`,产生 Mail ✅
- 第二次:返回 `duplicate`,无新 Mail ✅
- 额外验证:不带签名的请求返回 403 `signature verification failed`(签名校验正常工作)
---
## 汇总
| 步骤 | 状态 | 备注 |
|------|------|------|
| 1. Issue 指派 | ✅ 通过 | Mail to/模板正确 |
| 2. PR Review 请求 | ✅ 通过 | Mail to/风险级别/文件列表正确 |
| 3. Review APPROVED | ✅ 通过 | E2E 测试中产生 2 封 Mail(根因已查明,非平台问题) |
| 4. Review REQUEST_CHANGES | ✅ 通过 | 同上 |
| 5. CI 失败评论 | ✅ 通过 | 分支提取正确 |
| 6. 部署失败 Issue | ✅ 通过 | 双收件人验证通过 |
| 7. 已关闭 Issue 过滤 | ✅ 通过 | 负面测试通过,无新 Mail |
| 8. 幂等测试 | ✅ 通过 | 第二次返回 duplicate,无新 Mail;签名校验正常拦截无签名请求 |
## 发现的问题
### Review 事件双 Mail(已修复)
- **现象**E2E 测试步骤 3/4 中 Review 事件产生 2 封 Mail
- **根因**(姜维深入调查确认):E2E 测试中庞统手动用 simayi token 提交了 Review,同时 simayi agent 收到 Review 请求 Mail 后也自主提交了 Review。是两次独立的 API 调用,**不是 Gitea bug 或平台配置问题**
- 姜维控制实验:一次 review API 调用只产生 1 个 hook_task
- Gitea 路由日志确认两次 POST 间隔 7 秒,payload 有差异(review_comments、updated_at 不同)
- 之前的错误分析("Gitea webhookNotifier + actionsNotifier 双投递")已被推翻:actionsNotifier 走 handleWorkflows() 不创建 hook_task
- **修复**:payload 内容去重作为防御性编程保留(`_is_duplicate` 新增内容去重 key = event + pr_num + sender + sha256(body_or_content)),司马懿 APPROVED
- **验证**PR #27 实测只产生 1 封 Mail ✅
### 根因分析教训
- 姜维第一次分析给出了错误根因(Gitea 双 notifier),第二次深入调查后自我纠正
- 庞统把姜维的第一次结论当事实汇报给主公,没有标注"这是姜维的调查结论,尚未独立验证"
- **改进**:SOUL.md 新增规则——推测 vs 事实显式标注、引用他人结论时标注来源、结论被推翻时及时更正
@@ -69,6 +69,7 @@
- **获取完整上下文** → 用 Gitea API 拉取 Issue 详情和评论,不要只看 Mail 里的快照
### Gitea API 速查
> 其中 `{owner}/{repo}` 替换为实际仓库,如 `sanguo/sanguo_moziplus_v2`
- Issue 详情: GET /api/v1/repos/{owner}/{repo}/issues/{number}
- Issue 评论: GET /api/v1/repos/{owner}/{repo}/issues/{number}/comments
- PR diff: GET /api/v1/repos/{owner}/{repo}/pulls/{number}.diff
@@ -193,6 +194,8 @@ AGENT_IDS = {
}
# 前缀映射:@张飞 → zhangfei-dev
# 中文名映射:Agent 在 Gitea Issue 评论中可能用中文名 @mention
# 英文短名映射:Agent 可能用不带 -dev/-infra 后缀的短名
AGENT_ALIAS = {
"张飞": "zhangfei-dev",
"关羽": "guanyu-dev",
@@ -311,6 +314,13 @@ async def _handle_issue_comment(payload):
具体改动在 `_send_mail()` 函数或其调用处:工具链路由调用 `_send_mail` 时传入 `performative="request"`
**⚠️ 验证要点**:改为 request 后,Agent spawn prompt 变为 "请处理以下请求",需确认:
1. Agent 不再把工具链 Mail 当纯通知忽略
2. Agent 能正确处理「已阅型」工具链事件(如 CI 失败通知——不需要回复,但需要知道)
3. 对已关闭 PR/Issue 的延迟通知,Agent 不会尝试去处理
验证方法:部署后发一条 Issue 指派 Mail,观察 Agent 行为是否符合预期。
---
## 五、完整改动清单
+5 -4
View File
@@ -11,9 +11,10 @@
| 场景 | 命令 | 耗时 | 说明 |
|------|------|------|------|
| **改了某个模块** | `pytest tests/unit/test_spawner.py` | <5s | 只跑改动的模块对应的单元测试 |
| **改了 API 层** | `pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest -m "not e2e"` | ~2min | 不跑 E2E,验证不破坏现有功能 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 E2E,真实 Agent |
| **改了 API 层** | `RUN_INTEGRATION=1 pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest` | ~2min | 默认排除 integration 和 e2e |
| **含集成测试** | `RUN_INTEGRATION=1 pytest` | ~5min | integration 测试 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 e2e,真实 Agent |
| **只跑 E2E 场景** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_scenarios.py` | ~30min | 串行,一个跑完再下一个 |
| **只跑 E2E 压力** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_stress.py` | ~10min | 并发测试 |
@@ -101,7 +102,7 @@ E2E(慢,真实 Agent) → 验证完整链路,需要 RUN_INTEGRATION=1
## 关键规则
1. **只有 E2E 会 spawn 真实 Agent**,单元和集成不会
2. **不带 `RUN_INTEGRATION=1` 跑 `pytest` 是安全的**E2E 全部 skip
2. **直接跑 `pytest` 是安全的**integration 和 e2e 全部被排除(需 `RUN_INTEGRATION=1` 才跑)
3. **E2E 场景测试串行**,一个完成再下一个,失败要分析根因再继续
4. **E2E 压力测试并行**,场景测试全通过后再跑
5. **测试数据用 `e2e-` 前缀**,atexit 兜底清理,手动清理见上方
+3 -1
View File
@@ -8,8 +8,10 @@ requires-python = ">=3.9"
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
"integration: real agent tests (requires RUN_INTEGRATION=1)",
"integration: integration tests (requires RUN_INTEGRATION=1)",
"e2e: end-to-end tests with real daemon + Agent (requires RUN_INTEGRATION=1)",
]
# Default deselection of integration/e2e handled in conftest.py pytest_collection_modifyitems
[tool.pyright]
venvPath = "."
+39 -16
View File
@@ -15,7 +15,7 @@ from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
from src.blackboard.registry import ProjectRegistry
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
@@ -27,7 +27,7 @@ def _validate_project(project_id: str) -> str:
"""校验 project_id,已知项目/虚拟项目放行,未知项目返回 400"""
if project_id in _VIRTUAL_PROJECTS:
return project_id
reg = ProjectRegistry(_utils.get_data_root())
reg = ProjectRegistry(get_data_root())
if reg.get_project(project_id):
return project_id
raise HTTPException(400, {
@@ -43,12 +43,12 @@ def _validate_project(project_id: str) -> str:
def _bb(project_id: str) -> Blackboard:
_validate_project(project_id)
return Blackboard(_utils.get_data_root() / project_id / "blackboard.db")
return Blackboard(get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
_validate_project(project_id)
return Queries(_utils.get_data_root() / project_id / "blackboard.db")
return Queries(get_data_root() / project_id / "blackboard.db")
# --- Tasks ---
@@ -59,7 +59,10 @@ async def list_tasks(project_id: str,
assignee: Optional[str] = None,
parent_task: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
tasks = bb.list_tasks(
status=status,
assignee=assignee,
parent_task=parent_task)
return {"tasks": [_task_to_dict(t) for t in tasks]}
@@ -79,10 +82,12 @@ async def get_task(project_id: str, task_id: str,
result["outputs_count"] = detail.get("outputs_count", 0)
result["review_status"] = detail.get("review_status")
result["latest_event_detail"] = detail.get("latest_event_detail")
result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)]
result["comments"] = [dict(c.__dict__)
for c in bb.get_comments(task_id)]
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
result["decisions"] = [dict(d.__dict__)
for d in bb.get_decisions(task_id)]
result["events"] = q.task_events(task_id)
result["experiences"] = q.task_experiences(task_id)
return result
@@ -100,7 +105,7 @@ async def create_task(project_id: str, body: Dict[str, Any]):
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
import sqlite3
db_path = _utils.get_data_root() / project_id / "blackboard.db"
db_path = get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
max_id_row = conn.execute(
@@ -134,7 +139,8 @@ async def create_task(project_id: str, body: Dict[str, Any]):
priority=body.get("priority", 5),
assignee=assignee,
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None,
depends_on=json.dumps(
body["depends_on"]) if "depends_on" in body else None,
parent_task=body.get("parent_task"),
risk_level=body.get("risk_level", "standard"),
stage=body.get("stage"),
@@ -175,7 +181,8 @@ async def _generate_title(description: str) -> str | None:
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "system",
"content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "user", "content": description[:500]},
],
max_tokens=50,
@@ -187,7 +194,8 @@ async def _generate_title(description: str) -> str | None:
return title
except Exception as e:
import logging
logging.getLogger("moziplus-v2").warning(f"Title generation failed: {e}")
logging.getLogger(
"moziplus-v2").warning(f"Title generation failed: {e}")
return None
@@ -205,7 +213,8 @@ async def task_progress(project_id: str, task_id: str):
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
if not bb.claim_task(task_id, body["agent"]):
raise HTTPException(409, "Claim failed (already claimed or wrong assignee)")
raise HTTPException(
409, "Claim failed (already claimed or wrong assignee)")
return {"ok": True}
@@ -273,10 +282,20 @@ def _init_agent_ids():
return
try:
import yaml
cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml")
cfg_path = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"config",
"default.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
_KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys())
_KNOWN_AGENT_IDS = list(
cfg.get(
"daemon",
{}).get(
"agent_profiles",
{}).keys())
except Exception:
_KNOWN_AGENT_IDS = []
@@ -285,7 +304,10 @@ def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
_init_agent_ids()
candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text))
candidates = set(
re.findall(
r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)',
text))
return [a for a in candidates if a in _KNOWN_AGENT_IDS]
@@ -397,7 +419,8 @@ async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
)
os.makedirs(artifacts_dir, exist_ok=True)
# 安全文件名
safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title)
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
+23 -9
View File
@@ -10,9 +10,11 @@ from pydantic import BaseModel
from typing import Optional
from src.blackboard.operations import Blackboard
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"])
router = APIRouter(
prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints",
tags=["checkpoints"])
# ── 请求模型 ──
@@ -33,7 +35,7 @@ class ResolveCheckpointRequest(BaseModel):
# ── 工具 ──
def _bb(project_id: str) -> Blackboard:
db_path = _utils.get_data_root() / project_id / "blackboard.db"
db_path = get_data_root() / project_id / "blackboard.db"
if not db_path.exists():
raise HTTPException(status_code=404, detail="Project not found")
return Blackboard(db_path)
@@ -50,10 +52,12 @@ def list_checkpoints(project_id: str, task_id: str):
@router.post("")
def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointRequest):
def create_checkpoint(project_id: str, task_id: str,
req: CreateCheckpointRequest):
"""Agent 创建 checkpoint"""
if req.type not in ("verify", "decision", "action"):
raise HTTPException(status_code=400, detail=f"Invalid checkpoint type: {req.type}")
raise HTTPException(status_code=400,
detail=f"Invalid checkpoint type: {req.type}")
bb = _bb(project_id)
# 验证 task 存在
@@ -73,10 +77,15 @@ def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointReques
@router.post("/{checkpoint_id}/approve")
def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest):
def approve_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户通过 checkpoint → 自动推进 task 状态"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(checkpoint_id, "approve", req.resolved_by, req.note)
result = bb.resolve_checkpoint(
checkpoint_id,
"approve",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
@@ -97,10 +106,15 @@ def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: R
@router.post("/{checkpoint_id}/reject")
def reject_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest):
def reject_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户驳回 checkpoint → task 回到 working"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(checkpoint_id, "reject", req.resolved_by, req.note)
result = bb.resolve_checkpoint(
checkpoint_id,
"reject",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
+18 -8
View File
@@ -17,7 +17,7 @@ from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
import src.utils as _utils
from src.utils import get_data_root
def _get_valid_agents() -> set:
@@ -34,7 +34,8 @@ def _get_valid_agents() -> set:
except Exception:
pass
# fallback:硬编码
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
router = APIRouter(prefix="/api/mail", tags=["mail"])
@@ -43,7 +44,7 @@ MAIL_PROJECT_ID = "_mail"
def _db_path() -> Path:
root = _utils.get_data_root()
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -98,7 +99,10 @@ async def list_mail(
):
"""Mail 列表(按时间倒序)"""
bb = _bb()
tasks = bb.list_tasks(status=status, assignee=to_agent, assigned_by=from_agent)
tasks = bb.list_tasks(
status=status,
assignee=to_agent,
assigned_by=from_agent)
mails = []
for t in tasks:
@@ -229,7 +233,10 @@ async def send_mail(body: Dict[str, Any]):
to_agent = body.get("to", "").strip()
corrected_to = orig_from # 回复方向固定: reply → original sender
if to_agent and to_agent != corrected_to:
auto_corrected = {"field": "to", "original": to_agent, "corrected": corrected_to}
auto_corrected = {
"field": "to",
"original": to_agent,
"corrected": corrected_to}
to_agent = corrected_to
else:
# --- A2: to 必填(非回复场景) ---
@@ -256,7 +263,8 @@ async def send_mail(body: Dict[str, Any]):
conversation_id = body.get("conversation_id")
if not conversation_id and original:
try:
orig_meta = json.loads(original.must_haves) if original.must_haves else {}
orig_meta = json.loads(
original.must_haves) if original.must_haves else {}
conversation_id = orig_meta.get("conversation_id")
except Exception:
pass
@@ -311,10 +319,12 @@ async def delete_mail(prefix: Optional[str] = Query(None)):
for t in tasks:
if t.title and t.title.startswith(prefix):
if t.status not in ("cancelled",):
bb.update_task_status(t.id, "cancelled", agent="mail-cleanup-api")
bb.update_task_status(
t.id, "cancelled", agent="mail-cleanup-api")
deleted_ids.append(t.id)
return {"ok": True, "deleted_count": len(deleted_ids), "deleted_ids": deleted_ids}
return {"ok": True, "deleted_count": len(
deleted_ids), "deleted_ids": deleted_ids}
@router.patch("/{mail_id}")
+22 -10
View File
@@ -8,13 +8,13 @@ from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.registry import ProjectRegistry
import src.utils as _utils
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects", tags=["projects"])
def _registry() -> ProjectRegistry:
return ProjectRegistry(_utils.get_data_root())
return ProjectRegistry(get_data_root())
@router.get("")
@@ -31,8 +31,10 @@ async def list_projects():
if db_path.exists():
try:
conn = sqlite3.connect(str(db_path), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
archived = total - active
conn.close()
info['task_count'] = active
@@ -45,8 +47,10 @@ async def list_projects():
if general_db.exists() and "_general" not in projects:
try:
conn = sqlite3.connect(str(general_db), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"] = {
"id": "_general", "name": "一般任务", "description": "无项目归属的通用任务",
@@ -60,8 +64,10 @@ async def list_projects():
if general_db_check.exists():
try:
conn = sqlite3.connect(str(general_db_check), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"]["task_count"] = active
projects["_general"]["task_count_total"] = total
@@ -173,7 +179,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]):
depends_on=child.depends_on, must_haves=child.must_haves,
)
tgt_bb.create_task(moved_child)
src_bb.update_task_status(child.id, "cancelled", detail=f"Moved to {target_project}")
src_bb.update_task_status(
child.id,
"cancelled",
detail=f"Moved to {target_project}")
moved_ids.append(child.id)
# 移动主任务
@@ -186,7 +195,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]):
depends_on=task.depends_on, must_haves=task.must_haves,
)
tgt_bb.create_task(moved_task)
src_bb.update_task_status(task_id, "cancelled", detail=f"Moved to {target_project}")
src_bb.update_task_status(
task_id,
"cancelled",
detail=f"Moved to {target_project}")
moved_ids.insert(0, task_id)
return {"ok": True, "moved_to": target_project, "moved_ids": moved_ids}
+113 -25
View File
@@ -28,7 +28,7 @@ from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
from src.daemon.toolchain_templates import render_template
import src.utils as _utils
from src.utils import get_data_root
logger = logging.getLogger(__name__)
@@ -46,17 +46,48 @@ _TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
def _is_duplicate(event: str, delivery: str) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。"""
def _is_duplicate(event: str, delivery: str,
payload: Optional[Dict[str, Any]] = None) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。
双重去重策略
1. delivery UUID 去重标准幂等
2. payload 内容去重应对 Gitea v1.23.4 webhookNotifier + actionsNotifier
对同一 review 生成不同 UUID 的双投递问题
"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
while _delivery_timestamps and (
now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
# 检查 delivery UUID 去重
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
# 注意:Gitea webhookNotifier 用 review.bodyactionsNotifier 用 review.content
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
if payload and "review" in event:
pr_num = payload.get("pull_request", {}).get("number")
sender = payload.get("sender", {}).get("login")
review = payload.get("review", {})
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}"
if content_key in _delivery_cache:
logger.info(
"Content-based duplicate detected: %s PR#%s by %s",
event,
pr_num,
sender)
return True
_delivery_cache.add(content_key)
_delivery_timestamps.append((now, content_key))
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
@@ -112,8 +143,16 @@ async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]:
last_error = str(e)
if attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
logger.warning("Retry %d/3 fetching PR files: %s/pulls/%d", attempt + 1, repo, pr_number)
logger.warning("Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", repo, pr_number, last_error)
logger.warning(
"Retry %d/3 fetching PR files: %s/pulls/%d",
attempt + 1,
repo,
pr_number)
logger.warning(
"Failed to fetch PR files after 3 retries: %s/pulls/%d - %s",
repo,
pr_number,
last_error)
return [], f"获取文件列表失败(重试3次): {last_error}"
@@ -146,7 +185,7 @@ MAIL_PROJECT_ID = "_mail"
def _mail_db_path() -> Path:
"""获取 Mail 数据库路径,确保目录存在。"""
root = _utils.get_data_root()
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
@@ -226,7 +265,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning("pull_request event missing pull_request field, skipping")
logger.warning(
"pull_request event missing pull_request field, skipping")
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
@@ -240,7 +280,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
if fetch_error:
file_list = f"⚠️ {fetch_error}"
else:
file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
file_list = "\n".join(
f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
text = render_template("review_request", {
"repo": repo,
@@ -257,16 +298,34 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。"""
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
支持两种 payload 格式
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
"""
review = payload.get("review")
if not review or not isinstance(review, dict):
logger.warning("pull_request_review event missing review field, skipping")
logger.warning(
"pull_request_review event missing review field, skipping")
return
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning("pull_request_review event missing pull_request field, skipping")
logger.warning(
"pull_request_review event missing pull_request field, skipping")
return
# 兼容两种 payload 格式提取 state
state = review.get("state", "")
if not state:
# org webhook 格式:review.type = "pull_request_review_approved"
review_type = review.get("type", "")
type_map = {
"pull_request_review_approved": "APPROVED",
"pull_request_review_rejected": "REQUEST_CHANGES",
"pull_request_review_comment": "COMMENTED",
}
state = type_map.get(review_type, "")
# 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态
if state == "COMMENTED":
@@ -276,8 +335,17 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
reviewer = review.get("user", {}).get("login", "unknown")
review_body = review.get("body", "(无评论)")
# 兼容:org webhook 的 review 没有 user,从 sender 取
reviewer = review.get(
"user",
{}).get(
"login",
"") or payload.get(
"sender",
{}).get(
"login",
"unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
@@ -323,7 +391,8 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
logger.debug("Issue assigned but no assignee found, skipping")
return
labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])]
labels_list = [lbl.get("name", "")
for lbl in (issue.get("labels") or [])]
labels = ", ".join(labels_list) if labels_list else "(无标签)"
issue_body = issue.get("body", "(无描述)")
brief = issue_title[:20].replace(" ", "-").lower()
@@ -371,6 +440,14 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
if not issue or not isinstance(issue, dict):
logger.warning("issue_comment event missing issue field, skipping")
return
# 已关闭的 Issue/PR 不再发送 CI 失败通知
if issue.get("state") == "closed":
logger.debug(
"Skipping CI failure notification for closed issue #%s",
issue.get("number"))
return
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
@@ -400,6 +477,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
@@ -430,27 +513,32 @@ async def gitea_webhook(
# 1. 签名验证
if not _verify_signature(body, x_gitea_signature):
logger.warning("Webhook signature verification failed")
return Response(status_code=403, content="signature verification failed")
return Response(status_code=403,
content="signature verification failed")
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
# 3. 解析 payload(提前解析,用于幂等检查
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",
x_gitea_event,
x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
logger.debug("Unhandled event type: %s", x_gitea_event)
return Response(status_code=200, content=f"unhandled event: {x_gitea_event}")
return Response(status_code=200,
content=f"unhandled event: {x_gitea_event}")
# 5. 执行 handler
try:
+19 -14
View File
@@ -132,8 +132,10 @@ def _migrate_v28(conn: sqlite3.Connection) -> None:
resolved_by TEXT,
resolve_note TEXT
)""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)")
# 4. outputs 扩展字段(M3 成果物)
_safe_add_column(conn, "outputs", "file_name", "TEXT")
@@ -188,18 +190,20 @@ TERMINAL_STATUSES = frozenset() # v3.1: 无终态,全靠 VALID_TRANSITIONS
MANUAL_STATUSES = frozenset({"cancelled", "paused", "reviewing"})
VALID_TRANSITIONS = {
"pending": {"claimed", "paused", "blocked", "cancelled"},
"claimed": {"working", "paused", "pending", "cancelled"},
"working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"}, # pending: Mail spawn 失败回退
"paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"}, # 恢复到 resumed_from 记录的状态
"review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"},
"blocked": {"pending", "escalated", "cancelled"},
"failed": {"pending", "escalated", "cancelled"},
"escalated": {"working", "pending", "paused", "cancelled"},
"pending": {"claimed", "paused", "blocked", "cancelled"},
"claimed": {"working", "paused", "pending", "cancelled"},
# pending: Mail spawn 失败回退
"working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"},
# 恢复到 resumed_from 记录的状态
"paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"},
"review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"},
"blocked": {"pending", "escalated", "cancelled"},
"failed": {"pending", "escalated", "cancelled"},
"escalated": {"working", "pending", "paused", "cancelled"},
"waiting_human": {"working", "done", "paused", "cancelled"},
"done": {"cancelled", "reviewing"},
"reviewing": {"done", "working", "cancelled"},
"cancelled": {"pending"},
"done": {"cancelled", "reviewing"},
"reviewing": {"done", "working", "cancelled"},
"cancelled": {"pending"},
}
COMMENT_TYPES = frozenset({
@@ -223,7 +227,8 @@ EVENT_TYPES = frozenset({
OUTPUT_TYPES = frozenset({"code", "document", "data", "config", "other"})
REVIEW_TYPES = frozenset({"plan_review", "output_review", "guardrail", "final_review"})
REVIEW_TYPES = frozenset(
{"plan_review", "output_review", "guardrail", "final_review"})
VERDICT_TYPES = frozenset({"approved", "rejected", "needs_revision"})
EXPERIENCE_SOURCES = frozenset({
+12 -6
View File
@@ -83,7 +83,8 @@ class Blackboard:
"""获取单个任务"""
conn = self._conn()
try:
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
return Task.from_row(row) if row else None
finally:
conn.close()
@@ -128,7 +129,8 @@ class Blackboard:
updates["completed_at"] = now # paused 也记录时间用于恢复
updates["resumed_from"] = old_status # 记录暂停前状态
elif new_status == "pending":
# 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414 对齐)
# 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414
# 对齐)
updates["assignee"] = None
updates["claimed_at"] = None
updates["current_agent"] = None
@@ -707,7 +709,8 @@ class Blackboard:
import uuid
# BUG-33: 校验 payload 结构必须含 version 字段
if not isinstance(payload, dict) or "version" not in payload:
raise ValueError("payload must be a dict containing 'version' field")
raise ValueError(
"payload must be a dict containing 'version' field")
cp_id = checkpoint_id or f"cp-{uuid.uuid4().hex[:8]}"
conn = self._conn()
try:
@@ -964,7 +967,8 @@ class Blackboard:
finally:
conn.close()
def get_pending_mentions(self, max_retries: int = 5) -> List[Dict[str, Any]]:
def get_pending_mentions(
self, max_retries: int = 5) -> List[Dict[str, Any]]:
"""获取所有 pending 且未超过重试上限的 mentions"""
conn = self._conn()
try:
@@ -999,7 +1003,8 @@ class Blackboard:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,))
conn.execute(
"UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,))
conn.commit()
return True
finally:
@@ -1010,7 +1015,8 @@ class Blackboard:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,))
conn.execute(
"UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,))
conn.commit()
return True
finally:
+8 -4
View File
@@ -132,7 +132,8 @@ class Queries:
"""任务详情聚合(含关联数据)"""
conn = self._conn()
try:
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return None
task = dict(row)
@@ -159,7 +160,8 @@ class Queries:
finally:
conn.close()
def task_events(self, task_id: str, limit: int = 50) -> List[Dict[str, Any]]:
def task_events(self, task_id: str,
limit: int = 50) -> List[Dict[str, Any]]:
"""任务事件列表"""
conn = self._conn()
try:
@@ -265,7 +267,8 @@ class Queries:
return "review"
# 有 working/claimed → working
if status_counts.get("working", 0) > 0 or status_counts.get("claimed", 0) > 0:
if status_counts.get("working", 0) > 0 or status_counts.get(
"claimed", 0) > 0:
return "working"
# 有 pending → pending
@@ -337,7 +340,8 @@ class Queries:
# 当前活跃 stage
active_stage = None
for sp in stage_progress:
if sp["active"] > 0 or (sp["total"] > 0 and sp["done"] < sp["total"]):
if sp["active"] > 0 or (
sp["total"] > 0 and sp["done"] < sp["total"]):
if not active_stage and sp["done"] < sp["total"]:
active_stage = sp["label"]
+6 -3
View File
@@ -119,7 +119,8 @@ class ProjectRegistry:
finally:
conn.close()
def list_projects(self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
def list_projects(
self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""列出项目"""
conn = self._connect()
try:
@@ -178,7 +179,8 @@ class ProjectRegistry:
status="deleted",
)
def physical_delete_project(self, project_id: str) -> Optional[Dict[str, Any]]:
def physical_delete_project(
self, project_id: str) -> Optional[Dict[str, Any]]:
"""物理删除项目(删目录 + 删 registry 条目)"""
import shutil
@@ -260,7 +262,8 @@ class ProjectRegistry:
# 迁移(从 _registry.yaml
# ===================================================================
def discover_sanguo_projects(self, scan_dir: Optional[Path] = None) -> List[str]:
def discover_sanguo_projects(
self, scan_dir: Optional[Path] = None) -> List[str]:
"""扫描 sanguo_projects 开发目录,自动注册正式项目"""
scan_dir = scan_dir or Path(os.environ.get(
"SANGUO_PROJECTS_DIR",
+14 -6
View File
@@ -9,14 +9,14 @@ from pathlib import Path
from typing import List, Optional
from src.blackboard.operations import Blackboard
import src.utils as _utils
from src.utils import get_data_root
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
def _find_project_root() -> Path:
return _utils.get_data_root()
return get_data_root()
def _get_bb(project_id: str) -> Blackboard:
@@ -35,7 +35,9 @@ def _get_queries(project_id: str) -> Queries:
def build_blackboard_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="blackboard", description="Agent blackboard operations")
parser = argparse.ArgumentParser(
prog="blackboard",
description="Agent blackboard operations")
sub = parser.add_subparsers(dest="command")
# read
@@ -206,7 +208,11 @@ def _cmd_comment(opts) -> int:
def _cmd_decide(opts) -> int:
bb = _get_bb(opts.project)
did = bb.add_decision(opts.task_id, opts.decider, opts.decision, opts.rationale)
did = bb.add_decision(
opts.task_id,
opts.decider,
opts.decision,
opts.rationale)
print(f"Decision recorded: {did}")
return 0
@@ -251,7 +257,8 @@ def _print_tasks(tasks, as_json: bool):
def build_admin_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="admin", description="Admin operations")
parser = argparse.ArgumentParser(
prog="admin", description="Admin operations")
sub = parser.add_subparsers(dest="command")
# project create
@@ -300,7 +307,8 @@ def run_admin_cli(args: Optional[List[str]] = None) -> int:
for pid, info in projects.items():
status = info.get("status", "?")
agents = ",".join(info.get("agents", []))
print(f" {pid} [{status}] {info.get('name', '')} agents: {agents}")
print(
f" {pid} [{status}] {info.get('name', '')} agents: {agents}")
return 0
elif opts.command == "project-archive":
+10 -7
View File
@@ -27,12 +27,12 @@ class BootstrapBuilder:
"""L2 引擎注入层构建器(v2.1 四段式)"""
ROLE_SKILL_MAP = {
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"reviewer-pangtong": "blackboard-reviewer-pangtong",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
}
# 默认从环境变量或配置读取,fallback 到默认路径
@@ -61,7 +61,9 @@ class BootstrapBuilder:
# 段 2: 前序产出(有依赖时注入)
if task.get("depends_on_outputs"):
sections.append(self._format_prior_outputs(task["depends_on_outputs"]))
sections.append(
self._format_prior_outputs(
task["depends_on_outputs"]))
# 段 3: 角色操作规范全文(通过 ROLE_SKILL_MAP 从 Skill 文件读取)
skill_name = self.ROLE_SKILL_MAP.get(role)
@@ -133,7 +135,8 @@ class BootstrapBuilder:
"""格式化前序产出摘要(段 2"""
parts = ["## 前序产出"]
for out in outputs:
parts.append(f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}")
parts.append(
f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}")
return "\n".join(parts)
def _format_constraints(self, role: str) -> str:
+8 -4
View File
@@ -68,20 +68,23 @@ class ActiveAgentCounter:
self._cooldown_until.pop(agent_id, None)
return False
def set_cooldown(self, agent_id: str, seconds: Optional[float] = None) -> None:
def set_cooldown(self, agent_id: str,
seconds: Optional[float] = None) -> None:
"""设置冷却期(默认 120 秒)"""
cd = seconds if seconds is not None else self._default_cooldown_seconds
self._cooldown_until[agent_id] = time.time() + cd
logger.info("Cooldown set for %s: %.0fs (until %.0f)",
agent_id, cd, self._cooldown_until[agent_id])
async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool:
async def can_acquire(self, agent_id: str,
session_id: str = "main") -> bool:
"""三层检查:cooldown → global → per agent → per session key"""
if self.is_cooling_down(agent_id):
return False
if self._global_active >= self._max_global:
return False
if self._agent_active.get(agent_id, 0) >= self._max_concurrent_sessions:
if self._agent_active.get(
agent_id, 0) >= self._max_concurrent_sessions:
return False
key = self._make_key(agent_id, session_id)
if self._active_keys.get(key, 0) >= self._max_per_session:
@@ -122,7 +125,8 @@ class ActiveAgentCounter:
del self._active_keys[key]
if agent_id in self._agent_active:
self._agent_active[agent_id] = max(0, self._agent_active[agent_id] - 1)
self._agent_active[agent_id] = max(
0, self._agent_active[agent_id] - 1)
if self._agent_active[agent_id] == 0:
del self._agent_active[agent_id]
+163 -64
View File
@@ -63,7 +63,8 @@ class Dispatcher:
if self._legacy_mode:
self.registered_agents = set(registered_agents or [])
self.capability_map = capability_map or {}
logger.warning("Dispatcher running in legacy mode (no AgentRouter)")
logger.warning(
"Dispatcher running in legacy mode (no AgentRouter)")
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策(委托给 Router
@@ -123,16 +124,21 @@ class Dispatcher:
"""
# 安全红线检查(调度前拦截)
# Mail 是 Agent 间通信,不做 guardrail 检查
is_mail = project_config.get("project_id") == "_mail" if project_config else False
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
critical = [v for v in violations if v.action in ("block_and_notify", "terminate_and_escalate")]
critical = [
v for v in violations if v.action in (
"block_and_notify",
"terminate_and_escalate")]
if critical:
v = critical[0]
logger.warning("Task '%s' blocked by guardrail: %s - %s",
task.title, v.rule_id, v.message)
# 写入黑板事件
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
if _routing_db:
self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none",
"reason": v.message}, "blocked", v.message, _routing_db)
@@ -151,7 +157,8 @@ class Dispatcher:
decision = self.decide(task, action_type)
level = decision["level"]
# 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB)
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
agent_id = decision["agent_id"]
# v2.7.2: counter 检查移到 spawn_full_agent 内部
@@ -159,7 +166,8 @@ class Dispatcher:
# 本地执行
if level == DispatchLevel.LOCAL:
self._record_routing(task, decision, "dispatched", None, _routing_db)
self._record_routing(
task, decision, "dispatched", None, _routing_db)
return {
"level": level.value,
"agent_id": "daemon",
@@ -171,7 +179,8 @@ class Dispatcher:
# Full Agent / Escalate spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
self._record_routing(task, decision, "error", "No spawner", _routing_db)
self._record_routing(
task, decision, "error", "No spawner", _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -182,9 +191,11 @@ class Dispatcher:
try:
# [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前)
is_mail = project_config.get("project_id") == "_mail" if project_config else False
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail:
db_path = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
db_path = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
# on_checks_passed: 所有检查通过后才标 working,检查失败不标
on_checks_passed = None
@@ -203,7 +214,8 @@ class Dispatcher:
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config,
mode=decision.get("mode", ""),
mode=decision.get(
"mode", ""),
spawn_type=action_type or "executor")
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
@@ -218,14 +230,17 @@ class Dispatcher:
def _mail_on_complete(aid, outcome):
# 幻觉门控:检查是否有回复,自动标 done/failed
try:
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves)
_dispatcher._mail_auto_complete(
_task_id, aid, _mail_db, _must_haves, outcome=outcome)
except Exception as e:
logger.error("Mail %s: on_complete error: %s", _task_id, e)
logger.error(
"Mail %s: on_complete error: %s", _task_id, e)
on_complete = _mail_on_complete
else:
# #02: Task 路径也加 on_complete(幻觉门控)
_task_id = task.id
_task_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
_task_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
_dispatcher = self
_is_review = action_type == "review"
@@ -239,10 +254,12 @@ class Dispatcher:
try:
# #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db:
_dispatcher._rollback_current_agent(_task_db, _task_id, aid)
_dispatcher._rollback_current_agent(
_task_db, _task_id, aid)
if _is_review:
if _task_db and outcome in ("completed", "session_revived"):
if _task_db and outcome in (
"completed", "session_revived"):
# #09: 读 verdict 决定后续动作
conn = get_connection(_task_db)
try:
@@ -254,14 +271,18 @@ class Dispatcher:
conn.close()
if review and review["verdict"] == "approved":
_dispatcher._mark_task_status(_task_db, _task_id, "done")
logger.info("Task %s: review approved, marking done", _task_id)
_dispatcher._mark_task_status(
_task_db, _task_id, "done")
logger.info(
"Task %s: review approved, marking done", _task_id)
else:
# 非 approved → @mention 被审 agentassignee,非 current_agent
# 非 approved → @mention 被审
# agentassignee,非 current_agent
verdict_str = review["verdict"] if review else "未知"
conn2 = get_connection(_task_db)
try:
task_row = conn2.execute("SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
task_row = conn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
finally:
conn2.close()
@@ -275,12 +296,15 @@ class Dispatcher:
_task_id, verdict_str, task_row["assignee"] if task_row else "?")
# 不标 done,保持 review 状态
else:
logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
logger.warning(
"Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
else:
# executor: 三信号验证 → 标 review
_dispatcher._task_auto_complete(_task_id, _task_db)
_dispatcher._task_auto_complete(
_task_id, _task_db)
except Exception as e:
logger.error("Task %s: on_complete error: %s", _task_id, e)
logger.error(
"Task %s: on_complete error: %s", _task_id, e)
on_complete = _task_on_complete
session_id = await self.spawner.spawn_full_agent(
@@ -289,7 +313,8 @@ class Dispatcher:
task_id=task.id,
on_complete=on_complete,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
on_checks_passed=on_checks_passed,
)
@@ -312,9 +337,14 @@ class Dispatcher:
else:
log_level = logger.debug
detail_msg = f"Agent busy: {reason}"
log_level("Dispatch skipped %s for task %s: %s", agent_id, task.id, detail_msg)
log_level(
"Dispatch skipped %s for task %s: %s",
agent_id,
task.id,
detail_msg)
# on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退
self._record_routing(task, decision, "skipped", detail_msg, _routing_db)
self._record_routing(
task, decision, "skipped", detail_msg, _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -326,7 +356,8 @@ class Dispatcher:
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
if _mail_marked_working:
self._mail_revert_to_pending(task.id, db_path)
self._record_routing(task, decision, "error", str(e), _routing_db)
self._record_routing(
task, decision, "error", str(e), _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -385,9 +416,16 @@ class Dispatcher:
def _build_delegate_prompt(self, task: Task,
project_config: Optional[Dict]) -> str:
"""构建 delegate 模式的 prompt(协调员分配任务)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
project_id = project_config.get("project_id", "") if project_config else ""
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
project_id = project_config.get(
"project_id", "") if project_config else ""
return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。
@@ -478,7 +516,8 @@ class Dispatcher:
# ── Legacy 兼容(deprecated ──
def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
def _legacy_decide(
self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""旧版三级决策树(兼容过渡用)"""
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check",
@@ -518,7 +557,8 @@ class Dispatcher:
return registered[0]
return "pangtong-fujunshi"
async def _legacy_dispatch(self, task, action_type="", project_config=None):
async def _legacy_dispatch(
self, task, action_type="", project_config=None):
"""旧版 dispatch(兼容过渡用)
v2.7.2: counter acquire/release 移到 spawn_full_agent 内部
@@ -541,15 +581,19 @@ class Dispatcher:
# NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。
# Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。
# 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。
is_mail_legacy = project_config.get("project_id") == "_mail" if project_config else False
is_mail_legacy = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail_legacy:
db_path_legacy = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(task.id, db_path_legacy):
db_path_legacy = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(
task.id, db_path_legacy):
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "mail_auto_working_failed"}
if hasattr(self.spawner, 'build_spawn_message') and project_config:
if hasattr(self.spawner,
'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id, title=task.title,
@@ -576,9 +620,11 @@ class Dispatcher:
def _mail_oc_legacy(aid, outcome):
try:
_disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh)
_disp._mail_auto_complete(
_t_id, aid, _m_db, _m_mh, outcome=outcome)
except Exception as e:
logger.error("Mail %s: legacy on_complete error: %s", _t_id, e)
logger.error(
"Mail %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _mail_oc_legacy
session_id = await self.spawner.spawn_full_agent(
@@ -586,14 +632,16 @@ class Dispatcher:
task_id=task.id,
on_complete=on_complete_legacy,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
)
return {"level": level.value, "agent_id": agent_id,
"session_id": session_id, "status": "dispatched",
"reason": decision["reason"]}
except AgentBusyError as e:
reason = getattr(e, 'reason', 'busy')
detail_msg = f"Session busy: {reason}" if reason.startswith("session_") else f"Agent busy: {reason}"
detail_msg = f"Session busy: {reason}" if reason.startswith(
"session_") else f"Agent busy: {reason}"
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "skipped",
"reason": detail_msg}
@@ -618,9 +666,11 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
logger.warning("Mail %s: cannot mark working (task not found)", task_id)
logger.warning(
"Mail %s: cannot mark working (task not found)", task_id)
return False
if row["status"] not in ("pending", "claimed"):
logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)",
@@ -631,7 +681,10 @@ class Dispatcher:
(task_id,),
)
conn.commit()
logger.info("Mail %s: auto-marked working (system, was %s)", task_id, row["status"])
logger.info(
"Mail %s: auto-marked working (system, was %s)",
task_id,
row["status"])
return True
finally:
conn.close()
@@ -645,30 +698,40 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if row and row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.commit()
logger.info("Mail %s: reverted working → pending (spawn failed)", task_id)
logger.info(
"Mail %s: reverted working → pending (spawn failed)", task_id)
else:
logger.debug("Mail %s: skip revert (status=%s, expected working)", task_id, row["status"] if row else "not_found")
logger.debug(
"Mail %s: skip revert (status=%s, expected working)",
task_id,
row["status"] if row else "not_found")
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: failed to revert to pending: %s", task_id, e)
logger.error(
"Mail %s: failed to revert to pending: %s",
task_id,
e)
def _mail_auto_complete(self, task_id: str, agent_id: str,
db_path: Path, must_haves: str) -> None:
db_path: Path, must_haves: str, outcome=None) -> None:
"""Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)"""
try:
# 解析 performative
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
performative = meta.get("performative", meta.get("type", "request"))
performative = meta.get(
"performative", meta.get(
"type", "request"))
except Exception:
pass
@@ -677,13 +740,15 @@ class Dispatcher:
has_reply = self._mail_check_reply(task_id, db_path)
if not has_reply:
# F3: 立刻标 failed(不等 ticker 30 分钟)
logger.error("Mail %s: no reply found, marking failed (no_reply_found)", task_id)
logger.error(
"Mail %s: no reply found, marking failed (no_reply_found)", task_id)
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
@@ -697,19 +762,35 @@ class Dispatcher:
json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)),
)
conn.commit()
logger.info("Mail %s: marked failed (no_reply_found)", task_id)
logger.info(
"Mail %s: marked failed (no_reply_found)", task_id)
# Mail 失败通知:通知发件人
try:
from src.daemon.mail_notify import notify_mail_failed
notify_mail_failed(db_path, task_id, "no_reply_found")
notify_mail_failed(
db_path, task_id, "no_reply_found")
except Exception as ne:
logger.warning("Mail %s: failed to send no_reply_found notification: %s", task_id, ne)
logger.warning(
"Mail %s: failed to send no_reply_found notification: %s", task_id, ne)
return
finally:
conn.close()
except Exception as e:
logger.warning("Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
logger.warning(
"Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error(
"Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
return
# inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投
# Task 路径不受此 bug 影响(走 _task_auto_complete 独立逻辑)
if performative == "inform":
INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"}
if outcome not in INFORM_DONE_OUTCOMES:
logger.info(
"Mail %s: inform outcome=%s, skip auto-done",
task_id,
outcome)
return
# 标 done(重试 3 次)
@@ -718,7 +799,8 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
@@ -733,9 +815,15 @@ class Dispatcher:
finally:
conn.close()
except Exception as e:
logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e)
logger.warning(
"Mail %s: done attempt %d failed: %s",
task_id,
attempt + 1,
e)
# 3 次都失败,留 working 等 ticker 超时兜底
logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id)
logger.error(
"Mail %s: all 3 done attempts failed, leaving for ticker",
task_id)
except Exception as e:
logger.error("Mail %s: auto-complete error: %s", task_id, e)
@@ -780,7 +868,9 @@ class Dispatcher:
logger.info("Task %s: verify passed, marking review", task_id)
self._mark_task_status(db_path, task_id, "review")
else:
logger.info("Task %s: verify not passed (no signal), leaving working", task_id)
logger.info(
"Task %s: verify not passed (no signal), leaving working",
task_id)
except Exception as e:
logger.error("Task %s: auto-complete error: %s", task_id, e)
@@ -815,7 +905,8 @@ class Dispatcher:
logger.error("Task %s: verify error: %s", task_id, e)
return True
def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None:
def _rollback_current_agent(
self, db_path: Path, task_id: str, agent_id: str) -> None:
"""#07.2: crash 后回退 current_agent 到 assignee,避免 exclude_current 卡死"""
try:
conn = get_connection(db_path)
@@ -829,11 +920,18 @@ class Dispatcher:
conn.commit()
finally:
conn.close()
logger.info("Task %s: rolled back current_agent from %s to assignee", task_id, agent_id)
logger.info(
"Task %s: rolled back current_agent from %s to assignee",
task_id,
agent_id)
except Exception as e:
logger.warning("Task %s: failed to rollback current_agent: %s", task_id, e)
logger.warning(
"Task %s: failed to rollback current_agent: %s",
task_id,
e)
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
def _mark_task_status(self, db_path: Path,
task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件"""
try:
conn = get_connection(db_path)
@@ -849,7 +947,8 @@ class Dispatcher:
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, payload) VALUES (?, 'dispatcher', 'status_change', ?)",
(task_id, f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
(task_id,
f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
)
conn.commit()
finally:
+15 -5
View File
@@ -38,7 +38,9 @@ class GuardrailEngine:
data = yaml.safe_load(f)
self.rules = data.get("rules", [])
self.settings = data.get("settings", {"enabled": True})
logger.info("Loaded %d guardrail rules from %s", len(self.rules), config_path)
logger.info(
"Loaded %d guardrail rules from %s", len(
self.rules), config_path)
def check_task(self, task: Any) -> List[GuardrailViolation]:
"""检查 Task 是否触犯安全红线(调度前调用)"""
@@ -95,7 +97,8 @@ class GuardrailEngine:
return violations
def check_token_usage(self, token_count: int) -> Optional[GuardrailViolation]:
def check_token_usage(
self, token_count: int) -> Optional[GuardrailViolation]:
"""检查 Token 消耗是否超标"""
if not self.settings.get("enabled", True):
return None
@@ -103,7 +106,10 @@ class GuardrailEngine:
for rule in self.rules:
if rule["id"] != "high_token_usage":
continue
threshold = rule.get("triggers", [{}])[0].get("token_threshold", 100000)
threshold = rule.get(
"triggers", [
{}])[0].get(
"token_threshold", 100000)
if token_count > threshold:
return GuardrailViolation(
rule_id=rule["id"],
@@ -114,7 +120,8 @@ class GuardrailEngine:
)
return None
def check_consecutive_failure(self, failure_count: int) -> Optional[GuardrailViolation]:
def check_consecutive_failure(
self, failure_count: int) -> Optional[GuardrailViolation]:
"""检查连续失败次数"""
if not self.settings.get("enabled", True):
return None
@@ -122,7 +129,10 @@ class GuardrailEngine:
for rule in self.rules:
if rule["id"] != "consecutive_failure":
continue
threshold = rule.get("triggers", [{}])[0].get("consecutive_failures", 3)
threshold = rule.get(
"triggers", [
{}])[0].get(
"consecutive_failures", 3)
if failure_count >= threshold:
return GuardrailViolation(
rule_id=rule["id"],
+9 -2
View File
@@ -41,6 +41,7 @@ class HealthChecker:
{"healthy": bool, "zombie": bool, "stale_ticks": int,
"alert_written": bool, "resolved": bool}
"""
str(db_path)
result: Dict[str, Any] = {
"healthy": True,
"zombie": False,
@@ -57,6 +58,8 @@ class HealthChecker:
# 用 event count 变化判断是否有真实变更
conn = queries._conn()
try:
conn.execute(
"SELECT COUNT(*) FROM events").fetchone()[0]
non_tick_events = conn.execute(
"SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' "
"AND event_type != 'agent_zombie_detected'"
@@ -83,7 +86,8 @@ class HealthChecker:
self._stale_ticks[project_id] = stale
result["stale_ticks"] = stale
if stale >= self.zombie_threshold and not self._alerted.get(project_id):
if stale >= self.zombie_threshold and not self._alerted.get(
project_id):
# 写告警
self._write_alert(db_path, project_id, tick_num, stale)
self._alerted[project_id] = True
@@ -124,7 +128,10 @@ class HealthChecker:
conn.commit()
finally:
conn.close()
logger.warning("Zombie detected: %s (stale=%d)", project_id, stale_ticks)
logger.warning(
"Zombie detected: %s (stale=%d)",
project_id,
stale_ticks)
def _write_resolution(self, db_path: Path, project_id: str,
tick_num: int) -> None:
+4 -2
View File
@@ -27,7 +27,8 @@ class InboxWatcher:
def __init__(
self,
inbox_path: Path,
process_callback: Optional[Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
process_callback: Optional[Callable[[
Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
watch_interval: float = 1.0,
):
"""
@@ -159,7 +160,8 @@ class InboxWatcher:
line_no, type(event).__name__)
self._total_errors += 1
except json.JSONDecodeError:
logger.warning("Inbox line %d: invalid JSON, skipping", line_no)
logger.warning(
"Inbox line %d: invalid JSON, skipping", line_no)
self._total_errors += 1
return events
+13 -4
View File
@@ -50,7 +50,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
bb = Blackboard(db_path)
original = bb.get_task(original_mail_id)
if not original:
logger.warning("notify_mail_failed: original mail %s not found", original_mail_id)
logger.warning(
"notify_mail_failed: original mail %s not found",
original_mail_id)
return
# 解析原邮件元数据
@@ -58,7 +60,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
# 防递归:系统通知邮件失败不再发通知
if meta.get("system_notify"):
logger.info("Mail %s: system notify mail failed, skipping recursive notification", original_mail_id)
logger.info(
"Mail %s: system notify mail failed, skipping recursive notification",
original_mail_id)
return
# 获取发件人(优先 assigned_byfallback must_haves.from
@@ -67,7 +71,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
title = original.title or ""
if not from_agent:
logger.warning("notify_mail_failed: cannot determine sender for mail %s", original_mail_id)
logger.warning(
"notify_mail_failed: cannot determine sender for mail %s",
original_mail_id)
return
# 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播
@@ -111,4 +117,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
original_mail_id, target_agent, from_agent, reason, notify_id)
except Exception as e:
logger.warning("notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e)
logger.warning(
"notify_mail_failed: failed to send notification for mail %s: %s",
original_mail_id,
e)
+14 -7
View File
@@ -148,12 +148,14 @@ class ReviewPipeline:
) -> ReviewResult:
"""Step 2: 格式合规"""
if not outputs:
return ReviewResult("format", ReviewVerdict.FAIL, 0.0, "No outputs")
return ReviewResult(
"format", ReviewVerdict.FAIL, 0.0, "No outputs")
issues = []
for out in outputs:
# output.md 必须存在且非空
if out.get("type") == "markdown" or out.get("path", "").endswith(".md"):
if out.get("type") == "markdown" or out.get(
"path", "").endswith(".md"):
content = out.get("content", "")
if not content and out.get("path"):
try:
@@ -164,7 +166,8 @@ class ReviewPipeline:
issues.append(f"Output too short: {out.get('path', '?')}")
# 结论 JSON 必须有效
if out.get("type") == "json" or out.get("path", "").endswith(".json"):
if out.get("type") == "json" or out.get(
"path", "").endswith(".json"):
content = out.get("content", "")
if not content and out.get("path"):
try:
@@ -174,7 +177,8 @@ class ReviewPipeline:
try:
data = json.loads(content)
if not isinstance(data, dict):
issues.append(f"JSON not a dict: {out.get('path', '?')}")
issues.append(
f"JSON not a dict: {out.get('path', '?')}")
except (json.JSONDecodeError, TypeError):
issues.append(f"Invalid JSON: {out.get('path', '?')}")
@@ -191,7 +195,8 @@ class ReviewPipeline:
) -> ReviewResult:
"""Step 3: 内容质量(自定义检查)"""
if not outputs:
return ReviewResult("quality", ReviewVerdict.FAIL, 0.0, "No outputs")
return ReviewResult(
"quality", ReviewVerdict.FAIL, 0.0, "No outputs")
suggestions = []
total_score = 0.0
@@ -212,7 +217,8 @@ class ReviewPipeline:
avg = 1.0 # 无自定义检查默认通过
verdict = ReviewVerdict.PASS if avg >= 0.6 else ReviewVerdict.FAIL
return ReviewResult("quality", verdict, round(avg, 2), suggestions=suggestions)
return ReviewResult("quality", verdict, round(
avg, 2), suggestions=suggestions)
def _determine_gate(
self, task: Task, results: List[ReviewResult]
@@ -326,6 +332,7 @@ class RebuttalManager:
return 0
try:
observations = self.bb.get_observations(task_id=task_id)
return sum(1 for o in observations if "Rebuttal round" in (o.body or ""))
return sum(
1 for o in observations if "Rebuttal round" in (o.body or ""))
except Exception:
return 0
+11 -5
View File
@@ -107,7 +107,8 @@ class AgentRouter:
# ── 快速路径 2: retry → 原执行者 ──
if action_type == "retry":
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
if current and current in self.agent_profiles:
return RouteDecision(
agent_id=current,
@@ -119,7 +120,8 @@ class AgentRouter:
# ── Mode B: Agent 声明式交接 ──
next_cap = task_info.get("next_capability")
if next_cap and self._validate_capability(next_cap):
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
exclude = {current} if current else set()
matched = self._match_capability(next_cap, exclude)
if matched:
@@ -129,7 +131,9 @@ class AgentRouter:
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
logger.info("next_capability '%s' no match, delegate to coordinator", next_cap)
logger.info(
"next_capability '%s' no match, delegate to coordinator",
next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
@@ -140,7 +144,8 @@ class AgentRouter:
exclude_current = lifecycle.get("exclude_current", False)
exclude = set()
if exclude_current:
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
if current:
exclude.add(current)
matched = self._match_capability(cap, exclude)
@@ -154,7 +159,8 @@ class AgentRouter:
# ── 快速路径 4: 有 assignee 且非生命周期流转 ──
assignee = task_info.get("assignee")
if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"):
if assignee and assignee in self.agent_profiles and action_type not in (
"review", "escalation"):
return RouteDecision(
agent_id=assignee,
reason=f"Direct assignee: {assignee}",
+165 -68
View File
@@ -164,9 +164,11 @@ class AgentBusyError(Exception):
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
"""
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
def __init__(self, agent_id: str, reason: str = "busy",
detail: Optional[dict] = None):
self.agent_id = agent_id
self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck
# counter_blocked / session_locked / session_running / session_compacting / session_stuck
self.reason = reason
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
@@ -278,11 +280,15 @@ class AgentSpawner:
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
return self._build_mail_prompt(
task_id, title, description, must_haves, agent_id)
# 走 BootstrapBuilder 新路径
if self.bootstrap_builder and task is not None:
role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"}
role_map = {
"executor": "executor",
"review": "reviewer",
"discussion": "planner"}
role = role_map.get(spawn_type, "executor")
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
@@ -294,7 +300,8 @@ class AgentSpawner:
# 无 BootstrapBuilder 或无 task 对象 → 最小 fallback
# 只保留任务上下文 + API 操作指令
logger.warning("No BootstrapBuilder or task object, using minimal fallback")
logger.warning(
"No BootstrapBuilder or task object, using minimal fallback")
return self._build_minimal_fallback(
task_id, title, description, must_haves,
project_id, agent_id)
@@ -369,7 +376,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if not self.guardrails:
return "无特殊限制"
try:
return "".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6])
return "".join(r.get("name", r.get("rule_id", ""))
for r in self.guardrails.rules[:6])
except Exception:
return "无特殊限制"
@@ -389,7 +397,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", agent_id)
performative = meta.get("performative", meta.get("type", "request"))
performative = meta.get(
"performative", meta.get(
"type", "request"))
except Exception:
pass
@@ -472,7 +482,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
self._revive_session(agent_id)
elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id)
logger.warning(
"Phase 0: %s status=running but lock PID dead, reviving",
agent_id)
self._revive_session(agent_id)
# Phase 1: Counter acquire(互斥锁)
@@ -487,12 +499,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if use_main_session:
session_state = self._check_session_state(agent_id)
logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s",
agent_id, session_state.get('status'), session_state.get('lock_pid'),
agent_id, session_state.get(
'status'), session_state.get('lock_pid'),
session_state.get('lock_pid_alive'), session_state.get('recent_compact'))
blockers = []
if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"):
blockers.append(("session_locked", session_state.get("lock_pid")))
if session_state.get(
"lock_pid_alive") and not session_state.get("lock_expired"):
blockers.append(
("session_locked", session_state.get("lock_pid")))
if session_state.get("status") == "running":
if session_state.get("lock_pid_alive"):
# 真 running:外部进程占用
@@ -515,7 +530,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
# Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检)
# 此场景应被 Phase 0 提前修复,这里做兜底
if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"):
if session_state.get("status") == "running" and not session_state.get(
"lock_pid_alive"):
logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving",
agent_id)
self._revive_session(agent_id)
@@ -538,7 +554,10 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
raise
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key)
logger.info(
"[DRY RUN] Would spawn agent %s (session=%s)",
agent_id,
_sid_key)
self._register_session(_sid_key, agent_id, task_id, pid=None)
return _sid_key
@@ -554,7 +573,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("Business on_complete failed for %s", aid, exc_info=True)
logger.warning(
"Business on_complete failed for %s", aid, exc_info=True)
cmd = [
"openclaw", "agent",
@@ -593,7 +613,11 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if self.counter:
self.counter.release(agent_id, _sid_key)
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
self._record_attempt(
task_id,
agent_id,
"spawn_failed",
error=str(e))
raise
async def spawn_subagent(
@@ -609,7 +633,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
logger.info(
"[DRY RUN] Would spawn subagent (session=%s)",
session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
@@ -729,10 +755,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
agent_id, session_id, json_result)
# 查任务实际状态
task_status = self._get_task_status(db_path, task_id) if task_id else None
task_status = self._get_task_status(
db_path, task_id) if task_id else None
# 分类
cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text)
cls = self._classify_outcome(
exit_code,
json_result,
stderr_text,
task_status,
stdout_text)
outcome = cls["outcome"]
# 更新 session 状态
@@ -761,17 +793,21 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
agent_id, session_id, outcome, exit_code, task_status)
# 广播反馈追踪(Phase 1 bug fix)
if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker:
if task_id == "broadcast" and hasattr(
self, '_ticker') and self._ticker:
# 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker
sess_info = self._sessions.get(session_id or "main", {})
bt_ids = sess_info.get("broadcast_task_ids") or []
# 广播场景一律标 no_reply:Agent 只 claim 一个任务,
# 其余任务的 tracker 不能被 claimed 清除
for real_task_id in bt_ids:
self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply")
self._ticker.record_broadcast_response(
real_task_id, agent_id, "no_reply")
elif task_id and hasattr(self, '_ticker') and self._ticker:
outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply"
self._ticker.record_broadcast_response(task_id, agent_id, outcome_str)
outcome_str = "claimed" if cls.get(
"status") == "ok" else "no_reply"
self._ticker.record_broadcast_response(
task_id, agent_id, outcome_str)
if cls["should_retry"]:
# cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10)
@@ -848,13 +884,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
if outcome == "crashed" and self.counter:
self.counter.set_cooldown(agent_id, seconds=60)
logger.info(
"Crash cooldown set for %s: 60s (outcome=%s)",
agent_id,
outcome)
elif outcome in ("compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
logger.info(
"Error cooldown set for %s: 300s (outcome=%s)",
agent_id,
outcome)
# F1: 不可恢复 outcome → 立刻标 failed + 写黑板
if outcome in ("auth_failed", "agent_error") and db_path and task_id:
logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome)
if outcome in ("auth_failed",
"agent_error") and db_path and task_id:
logger.error(
"Task %s: unrecoverable outcome=%s, marking failed immediately",
task_id,
outcome)
self._mark_task(db_path, task_id, "failed", {
"reason": outcome,
"stderr_preview": (stderr_text or "")[:500],
@@ -878,10 +927,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except Exception:
pass
# stderr collected but not used in this handler
# (kept for potential future diagnostics)
b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
if state.get("status") == "running" and not state.get(
"lock_pid_alive", True):
# 假死计数
stuck_count = self._stuck_counts.get(task_id, 0) + 1
self._stuck_counts[task_id] = stuck_count
@@ -907,7 +962,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
await self._do_on_complete_async(on_complete, agent_id, "session_revived")
else:
# 复活失败 → 标 failed
logger.error("Agent %s revive failed, marking failed", agent_id)
logger.error(
"Agent %s revive failed, marking failed", agent_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "revive_failed", "stuck_count": stuck_count,
"diagnostics": state})
@@ -988,7 +1044,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
# Bug-6 fix: pending 不是终态
if row and row["status"] in ("done", "failed", "cancelled", "review"):
if row and row["status"] in (
"done", "failed", "cancelled", "review"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# on_complete = wrapped_on_complete,会 release counter
@@ -997,7 +1054,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.warning("Retry status check failed for %s, proceeding", task_id)
logger.warning(
"Retry status check failed for %s, proceeding", task_id)
# 直接读写 tasks 表的 retry_count
if retry_field == "retry_count" and db_path and task_id:
@@ -1017,7 +1075,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry_count for task %s", task_id)
logger.exception(
"Failed to update retry_count for task %s", task_id)
count = 1
else:
retry_counts = self._get_retry_counts(db_path, task_id)
@@ -1101,7 +1160,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"""
text = stdout_text.strip()
if not text:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
return {"status": None, "summary": None, "fallback_used": False,
"fallback_reason": None, "payloads": []}
try:
data = json.loads(text)
except json.JSONDecodeError:
@@ -1113,7 +1173,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except json.JSONDecodeError:
continue
else:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
return {"status": None, "summary": None, "fallback_used": False,
"fallback_reason": None, "payloads": []}
# 从 data.result.meta.executionTrace 取 fallback 信息
result = data.get("result", {})
@@ -1129,7 +1190,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
}
@staticmethod
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
def _get_task_status(
db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
"""查任务实际 API 状态"""
if not db_path or not task_id:
return None
@@ -1146,7 +1208,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return None
@staticmethod
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
def _get_task_info(db_path: Optional[Path],
task_id: Optional[str]) -> Optional[dict]:
"""查任务基本信息"""
if not db_path or not task_id:
return None
@@ -1154,7 +1217,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
"SELECT id, title, status FROM tasks WHERE id=?", (
task_id,)
).fetchone()
if not row:
return None
@@ -1186,7 +1250,9 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
sessions[main_key] = main_session
with open(sessions_path, "w") as f:
json.dump(sessions, f, indent=2)
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
logger.info(
"Revived %s: sessions.json status changed running→idle",
agent_id)
# #07 O4: 同时清理残留 lock 文件
sf = main_session.get("sessionFile", "")
if sf:
@@ -1194,7 +1260,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name)
logger.info(
"Cleaned stale lock for %s: %s",
agent_id,
lock_path.name)
except Exception:
pass
return True
@@ -1203,7 +1272,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return False
@staticmethod
def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 900) -> bool:
def _check_recent_compaction_jsonl(
session_file: str, window_seconds: int = 900) -> bool:
"""v2.8.2 Fix-2: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。
compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录,
@@ -1235,7 +1305,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
ts = obj.get("timestamp", "")
if ts:
try:
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
ct = datetime.fromisoformat(
ts.replace("Z", "+00:00"))
if (now - ct).total_seconds() < window_seconds:
return True
except (ValueError, TypeError):
@@ -1259,7 +1330,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1),
替代失效的 compactionCheckpoints 检测
"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
result = {
"status": "unknown",
"lock_pid": None,
"lock_pid_alive": False,
"recent_compact": False}
sessions_path = Path(os.environ.get(
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
)) / "agents" / agent_id / "sessions" / "sessions.json"
@@ -1298,8 +1373,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
created_at_str = lock_data.get("createdAt", "")
if created_at_str:
from datetime import datetime as _dt, timezone as _tz
created_dt = _dt.fromisoformat(created_at_str.replace("Z", "+00:00"))
elapsed = (_dt.now(_tz.utc) - created_dt).total_seconds()
created_dt = _dt.fromisoformat(
created_at_str.replace("Z", "+00:00"))
elapsed = (_dt.now(_tz.utc) -
created_dt).total_seconds()
if elapsed > 1800: # 30 minutes
result["lock_pid_alive"] = False
result["lock_expired"] = True
@@ -1312,8 +1389,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
if result["status"] not in ("done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
if result["status"] not in (
"done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
sf)
except Exception:
pass
return result
@@ -1358,45 +1437,53 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# A15/A16: stderr 含 network/compact 关键字 → 可恢复
if stderr_text:
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
if any(kw in stderr_lower for kw in [
"econnrefused", "etimedout", "gateway closed", "econnreset"]):
return {"outcome": "gateway_unreachable", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_interrupted", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
# A17: 真正的 crash → 保持 working,ticker 兜底
return {"outcome": "crashed", "should_retry": False, "original": "process_crash"}
return {"outcome": "crashed", "should_retry": False,
"original": "process_crash"}
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
# 查任务状态判断
# A13 revised: stdout 为空但 exit=0 → 信任进程退出码,视为正常完成
# 实测发现 openclaw session=None + exit=0 是正常场景(inform 通知等)
# 旧逻辑按 task_status 区分,非终态判 agent_error → 导致 inform 邮件永不标 done
if status is None and not stdout_text.strip() and exit_code == 0:
terminal_statuses = {"done", "review"}
if task_status in terminal_statuses:
return {"outcome": "completed", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
return {"outcome": "completed", "should_retry": False}
# A7-A12: status=error → 不续杯,stderr 辅助分类
if status == "error":
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]):
if any(kw in stderr_lower for kw in [
"401", "403", "unauthorized", "auth"]):
return {"outcome": "auth_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
if any(kw in stderr_lower for kw in [
"econnrefused", "etimedout", "gateway closed", "econnreset"]):
return {"outcome": "gateway_unreachable", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]):
if any(kw in stderr_lower for kw in [
"rate_limit", "500", "503", "api error"]):
return {"outcome": "api_error", "should_retry": False}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]):
if any(kw in stderr_lower for kw in [
"lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
return {"outcome": "agent_error", "should_retry": False}
# 兜底:status 未知值
return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"}
return {"outcome": "agent_error",
"should_retry": False, "original": "unknown_status"}
@staticmethod
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
def _get_retry_counts(
db_path: Optional[Path], task_id: Optional[str]) -> dict:
"""从最新 task_attempt 的 metadata 读计数器"""
defaults = {"retry_count": 0, "connect_retry_count": 0,
"api_retry_count": 0, "lock_retry_count": 0,
@@ -1436,7 +1523,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
(task_id,)
).fetchone()
if row:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
meta = json.loads(
row["metadata"]) if row["metadata"] else {}
meta.update(counts)
conn.execute(
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
@@ -1446,7 +1534,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry counts for task %s", task_id)
logger.exception(
"Failed to update retry counts for task %s", task_id)
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
@@ -1464,7 +1553,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if detail:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
(task_id, "daemon", status, json.dumps(
detail, ensure_ascii=False))
)
conn.commit()
finally:
@@ -1485,7 +1575,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
bb.record_mentions(cid, task_id, ["pangtong-fujunshi"])
logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason)
logger.info(
"Task %s: failure notified pangtong via comment+mention (reason=%s)",
task_id,
reason)
except Exception as e:
logger.warning("Task %s: failed to notify: %s", task_id, e)
except Exception:
@@ -1514,7 +1607,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
logger.warning(
"on_complete callback failed for %s",
agent_id,
exc_info=True)
def _register_session(
self,
@@ -1592,7 +1688,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
for sid, info in self._sessions.items():
if info.get("agent_id") == agent_id and info.get("status") == "running":
if info.get("agent_id") == agent_id and info.get(
"status") == "running":
return info
return None
+2 -1
View File
@@ -49,7 +49,8 @@ class SSEEvent:
"""格式化为 SSE 协议文本"""
lines = [f"id: {self.id}"]
lines.append(f"event: {self.event_type}")
lines.append(f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}")
lines.append(
f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}")
return "\n".join(lines) + "\n\n"
+178 -74
View File
@@ -31,7 +31,8 @@ class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
responded_agents: set = dc_field(
default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮)
@@ -46,7 +47,8 @@ class Ticker:
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
on_tick_complete: Optional[Callable[[],
Coroutine[Any, Any, None]]] = None,
dispatcher: Optional[Any] = None,
spawner: Optional[Any] = None,
max_dispatch_per_tick: int = 3,
@@ -194,7 +196,10 @@ class Ticker:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
logger.exception(
"Tick %d project %s error",
tick_num,
project_id)
results["projects"][project_id] = {"error": str(e)}
# 虚拟项目 _general:不在 registry 但需要调度
@@ -223,7 +228,10 @@ class Ticker:
logger.exception("Tick %d _mail error", tick_num)
results["projects"]["_mail"] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
logger.debug(
"Tick %d complete: %d projects",
tick_num,
len(active_projects))
if self.on_tick_complete:
try:
@@ -314,7 +322,8 @@ class Ticker:
# 8. 健康检查(僵尸检测)
if self.health_checker:
try:
self.health_checker.check(project_id, db_path, self._tick_count)
self.health_checker.check(
project_id, db_path, self._tick_count)
except Exception as e:
logger.warning("HealthChecker error for %s: %s", project_id, e)
@@ -335,7 +344,8 @@ class Ticker:
task_id=t.id, task_title=t.title, task_type=t.task_type
)
except Exception as e:
logger.warning("ExperienceDistiller error for %s: %s", project_id, e)
logger.warning(
"ExperienceDistiller error for %s: %s", project_id, e)
# 10. 扫描后状态
result["summary_after"] = queries.task_summary()
@@ -375,7 +385,8 @@ class Ticker:
(computed, pid),
)
refreshed.append(pid)
logger.info("Parent %s status aggregated: → %s", pid, computed)
logger.info(
"Parent %s status aggregated: → %s", pid, computed)
if refreshed:
conn.commit()
@@ -543,6 +554,7 @@ Parent Task ID: {parent_task.id}
"""
try:
agent_id = "pangtong-fujunshi"
f"review-{parent_task.id}-r{new_round}"
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
async def _on_review_complete(aid: str, outcome: str):
@@ -554,7 +566,8 @@ Parent Task ID: {parent_task.id}
latest_meta = None
latest_time = ""
for sid, sess in self.spawner._sessions.items():
if sess.get("agent_id") == agent_id and sess.get("meta"):
if sess.get(
"agent_id") == agent_id and sess.get("meta"):
t = sess.get("completed_at", "")
if t > latest_time:
latest_time = t
@@ -586,7 +599,9 @@ Parent Task ID: {parent_task.id}
return True
return False
except Exception:
logger.exception("Failed to spawn pangtong review for %s", parent_task.id)
logger.exception(
"Failed to spawn pangtong review for %s",
parent_task.id)
return False
def _set_parent_reviewing(self, parent_id: str, project_id: str):
@@ -618,7 +633,8 @@ Parent Task ID: {parent_task.id}
conn = get_connection(db_path)
try:
# 解析 GOAL_ACHIEVED
is_achieved = bool(review_text and "GOAL_ACHIEVED" in review_text.upper())
is_achieved = bool(
review_text and "GOAL_ACHIEVED" in review_text.upper())
if is_achieved:
# Goal 达成 → parent 最终完成
@@ -648,7 +664,9 @@ Parent Task ID: {parent_task.id}
"(round %d, subs=%d)",
parent_id, round_num, sub_count)
except Exception:
logger.exception("Failed to handle review conclusion for %s", parent_id)
logger.exception(
"Failed to handle review conclusion for %s",
parent_id)
# 安全恢复:reviewing → working
try:
conn.execute("BEGIN IMMEDIATE")
@@ -664,8 +682,8 @@ Parent Task ID: {parent_task.id}
def _resolve_db_path(self, project_id: str) -> Path:
"""解析项目 DB 路径"""
import src.utils as _utils
return _utils.get_data_root() / project_id / "blackboard.db"
from src.utils import get_data_root
return get_data_root() / project_id / "blackboard.db"
# ------------------------------------------------------------------
# @mention 通知处理 (v2.9 #01)
@@ -686,7 +704,8 @@ Parent Task ID: {parent_task.id}
return []
bb = Blackboard(db_path)
mentions = bb.get_pending_mentions(max_retries=self.MENTION_MAX_RETRIES)
mentions = bb.get_pending_mentions(
max_retries=self.MENTION_MAX_RETRIES)
if not mentions:
return []
@@ -750,16 +769,19 @@ Parent Task ID: {parent_task.id}
if new_review and new_review["verdict"] == "approved":
_ticker._transition_status(
get_connection(rdb_path), _t_id, "done",
get_connection(
rdb_path), _t_id, "done",
agent="daemon",
detail={"reason": "rebuttal_approved"})
logger.info("Rebuttal: task %s approved after rebuttal", _t_id)
logger.info(
"Rebuttal: task %s approved after rebuttal", _t_id)
else:
# 仍非 approved → @mention assignee
verdict_str = new_review["verdict"] if new_review else "未知"
rconn2 = get_connection(rdb_path)
try:
t_row = rconn2.execute("SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
t_row = rconn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
finally:
rconn2.close()
if t_row and t_row["assignee"]:
@@ -768,9 +790,11 @@ Parent Task ID: {parent_task.id}
bb2.add_comment(_t_id, "daemon",
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
logger.info(
"Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
except Exception:
logger.exception("Rebuttal on_complete failed for task %s", _t_id)
logger.exception(
"Rebuttal on_complete failed for task %s", _t_id)
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
@@ -793,22 +817,30 @@ Parent Task ID: {parent_task.id}
for item in items:
bb.mark_mention_notified(item["id"])
processed.append(agent_id)
logger.info("Mention spawn success: %s (%d mentions)", agent_id, len(items))
logger.info(
"Mention spawn success: %s (%d mentions)",
agent_id,
len(items))
else:
# spawn 返回 None(其他原因)→ 递增 retry_count
for item in items:
bb.mark_mention_retry(item["id"])
logger.warning("Mention spawn failed: %s, retrying next tick", agent_id)
logger.warning(
"Mention spawn failed: %s, retrying next tick", agent_id)
except AgentBusyError:
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id)
logger.info(
"Mention spawn skipped: %s busy, will retry next tick",
agent_id)
except Exception:
logger.exception("Mention processing error for agent %s", agent_id)
logger.exception(
"Mention processing error for agent %s", agent_id)
for item in items:
try:
if item.get("retry_count", 0) >= self.MENTION_MAX_RETRIES - 1:
if item.get("retry_count",
0) >= self.MENTION_MAX_RETRIES - 1:
bb.mark_mention_failed(item["id"])
else:
bb.mark_mention_retry(item["id"])
@@ -821,8 +853,14 @@ Parent Task ID: {parent_task.id}
mention_lines: List[str],
project_id: str) -> str:
"""#03: @mention prompt(身份注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
@@ -898,7 +936,8 @@ Parent Task ID: {parent_task.id}
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
@@ -937,7 +976,8 @@ Parent Task ID: {parent_task.id}
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
(task_id, agent, event_type, json.dumps(
{"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
@@ -977,9 +1017,12 @@ Parent Task ID: {parent_task.id}
try:
result = await self.dispatcher.dispatch(
task,
project_config={"project_id": project_id, "db_path": db_path},
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
if result["status"] == "dispatched" and result["level"] in (
"full", "escalate"):
conn = get_connection(db_path)
try:
# [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed
@@ -1072,7 +1115,8 @@ Parent Task ID: {parent_task.id}
detail={"reason": "no_taker_after_3_broadcasts",
"round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0},
)
logger.warning("Escalated %s: no taker after 3 broadcast rounds", t.id)
logger.warning(
"Escalated %s: no taker after 3 broadcast rounds", t.id)
self._broadcast_tracker.pop(t.id, None)
finally:
conn.close()
@@ -1082,7 +1126,8 @@ Parent Task ID: {parent_task.id}
idle_agents = self._get_idle_agents()
if not idle_agents:
logger.warning("No idle agents for broadcast, skipping (capacity issue)")
logger.warning(
"No idle agents for broadcast, skipping (capacity issue)")
return []
task_ids = [t.id for t in broadcastable]
@@ -1113,7 +1158,8 @@ Parent Task ID: {parent_task.id}
spawned = []
for agent_id in idle_agents:
prompt = self._build_claim_prompt(agent_id, broadcastable, project_id)
prompt = self._build_claim_prompt(
agent_id, broadcastable, project_id)
try:
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
@@ -1127,7 +1173,8 @@ Parent Task ID: {parent_task.id}
spawned.append(session_id)
# 记录已通知的 Agent
for t in broadcastable:
self._broadcast_tracker[t.id].notified_agents.add(agent_id)
self._broadcast_tracker[t.id].notified_agents.add(
agent_id)
except AgentBusyError:
logger.debug("Broadcast skip %s: busy", agent_id)
except Exception:
@@ -1138,8 +1185,14 @@ Parent Task ID: {parent_task.id}
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
"""#03: 广播认领 prompt(身份+专长注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
@@ -1194,7 +1247,8 @@ Parent Task ID: {parent_task.id}
@property
def counter(self):
"""从 Dispatcher 获取 counter"""
return getattr(self.dispatcher, 'counter', None) if self.dispatcher else None
return getattr(self.dispatcher, 'counter',
None) if self.dispatcher else None
@staticmethod
def _is_pid_alive(pid: int) -> bool:
@@ -1206,7 +1260,8 @@ Parent Task ID: {parent_task.id}
except (ProcessLookupError, PermissionError):
return False
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
def record_broadcast_response(
self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(Spawner 调用的公共 API"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
@@ -1227,7 +1282,8 @@ Parent Task ID: {parent_task.id}
def _get_all_agent_ids(self) -> List[str]:
"""获取所有配置的 Agent ID"""
if self.dispatcher and hasattr(self.dispatcher, 'router') and self.dispatcher.router:
if self.dispatcher and hasattr(
self.dispatcher, 'router') and self.dispatcher.router:
return list(self.dispatcher.router.agent_profiles.keys())
return []
@@ -1236,7 +1292,8 @@ Parent Task ID: {parent_task.id}
if not self.counter:
return []
# agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表
all_agents = list(self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
all_agents = list(
self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
active = self.counter.active_agents
return [aid for aid in all_agents if active.get(aid, 0) == 0]
@@ -1290,7 +1347,9 @@ Parent Task ID: {parent_task.id}
result = await self.dispatcher.dispatch(
task,
action_type="review",
project_config={"project_id": project_id, "db_path": db_path},
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched":
dispatched.append(task.id)
@@ -1374,8 +1433,10 @@ Parent Task ID: {parent_task.id}
working = queries.tasks_by_status("working")
for task in working:
# #07.2: crash_limit 统一检查(比超时更严重的信号)
if self.dispatcher and hasattr(self.dispatcher, '_check_crash_limit'):
if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
if self.dispatcher and hasattr(
self.dispatcher, '_check_crash_limit'):
if self.dispatcher._check_crash_limit(
task.id, db_path, limit=3, window_minutes=30):
conn = get_connection(db_path)
try:
self._transition_status(
@@ -1387,7 +1448,8 @@ Parent Task ID: {parent_task.id}
finally:
conn.close()
reclaimed.append(task.id)
logger.error("Task %s: executor crash limit (3/30m), marking failed", task.id)
logger.error(
"Task %s: executor crash limit (3/30m), marking failed", task.id)
continue
# #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at
@@ -1399,7 +1461,8 @@ Parent Task ID: {parent_task.id}
# per-task timeout: deadline 优先,否则用默认值
if task.deadline:
deadline_time = datetime.fromisoformat(task.deadline)
timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0
timeout_minutes = (
deadline_time - start_time).total_seconds() / 60.0
if timeout_minutes < 1:
timeout_minutes = self.default_task_timeout_minutes
else:
@@ -1446,8 +1509,10 @@ Parent Task ID: {parent_task.id}
pass
# v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底
if self.spawner and self.counter and hasattr(self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(self.counter, "active_agents") else []:
if self.spawner and self.counter and hasattr(
self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(
self.counter, "active_agents") else []:
session_info = self.spawner.get_session_by_agent(agent_id)
if not session_info:
continue
@@ -1464,20 +1529,24 @@ Parent Task ID: {parent_task.id}
conn = get_connection(db_path)
try:
current_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id_check,)
"SELECT status FROM tasks WHERE id=?", (
task_id_check,)
).fetchone()
if current_row and current_row["status"] == "review":
logger.info("Task %s in review, keeping status (process dead)", task_id_check)
logger.info(
"Task %s in review, keeping status (process dead)", task_id_check)
else:
self._transition_status(
conn, task_id_check, "pending",
agent="daemon",
detail={"reason": "process_dead", "pid": pid},
detail={
"reason": "process_dead", "pid": pid},
)
finally:
conn.close()
except Exception:
logger.exception("Failed to handle process dead for task %s", task_id_check)
logger.exception(
"Failed to handle process dead for task %s", task_id_check)
# #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理
@@ -1496,7 +1565,10 @@ Parent Task ID: {parent_task.id}
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: ticker reply check error: %s", original_task_id, e)
logger.error(
"Mail %s: ticker reply check error: %s",
original_task_id,
e)
return True # 保守:查询失败假设有回复
def _check_recent_routing(self, db_path: Path, task_id: str,
@@ -1505,7 +1577,8 @@ Parent Task ID: {parent_task.id}
try:
conn = get_connection(db_path)
try:
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review dispatch
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review
# dispatch
if action_type == "review":
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
@@ -1536,17 +1609,22 @@ Parent Task ID: {parent_task.id}
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
projects = self.registry.list_projects()
recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0}
recovery_report = {
"projects": {},
"total_recovered": 0,
"total_noop": 0}
# 收集所有需要扫描的项目(registry + 虚拟项目)
project_dirs = {}
for project_id, project_info in projects.items():
if project_info.get("status") == "active":
project_dirs[project_id] = self.registry.root / project_id / "blackboard.db"
project_dirs[project_id] = self.registry.root / \
project_id / "blackboard.db"
# 虚拟项目
for virtual_id in ("_general", "_mail"):
virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db"
virtual_db = Path(self.registry.root) / \
virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
project_dirs[virtual_id] = virtual_db
@@ -1566,13 +1644,15 @@ Parent Task ID: {parent_task.id}
old_pid = self._current_project_id
self._current_project_id = project_id
try:
recovered, noop_count = self._recover_project(db_path, NON_TERMINAL)
recovered, noop_count = self._recover_project(
db_path, NON_TERMINAL)
if recovered:
recovery_report["projects"][project_id] = recovered
recovery_report["total_recovered"] += len(recovered)
recovery_report["total_noop"] += noop_count
except Exception:
logger.exception("Startup recovery failed for project %s", project_id)
logger.exception(
"Startup recovery failed for project %s", project_id)
finally:
self._current_project_id = old_pid
@@ -1584,7 +1664,8 @@ Parent Task ID: {parent_task.id}
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
else:
logger.info("Startup recovery: no non-terminal tasks found, clean start")
logger.info(
"Startup recovery: no non-terminal tasks found, clean start")
return recovery_report
@@ -1607,10 +1688,13 @@ Parent Task ID: {parent_task.id}
for task in rows:
try:
action = self._determine_recovery_action(conn, task, status, db_path)
action = self._determine_recovery_action(
conn, task, status, db_path)
if action:
self._execute_recovery(conn, task["id"], action, db_path)
recovered.append({"task_id": task["id"], "from": status, "action": action})
self._execute_recovery(
conn, task["id"], action, db_path)
recovered.append(
{"task_id": task["id"], "from": status, "action": action})
else:
# 审计:保持原状的任务也记录事件
noop_count += 1
@@ -1621,7 +1705,8 @@ Parent Task ID: {parent_task.id}
)
conn.commit()
except Exception:
logger.exception("Startup recovery failed for task %s", task["id"])
logger.exception(
"Startup recovery failed for task %s", task["id"])
finally:
conn.close()
@@ -1699,7 +1784,8 @@ Parent Task ID: {parent_task.id}
# 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
return None
def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path):
def _execute_recovery(self, conn, task_id: str,
action: str, db_path: Path):
"""执行恢复动作"""
# 获取原始状态(用于审计)
orig_row = conn.execute(
@@ -1711,17 +1797,22 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 清空 current_agent(常规推 pending,无特定 agent 接手)
conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.execute(
"UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.commit()
elif action == "push_to_pending_keep_agent":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 保留 current_agent,让同一 agent 重新接手
conn.commit()
@@ -1730,7 +1821,9 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "review",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": "working"},
detail={
"reason": "startup_recovery",
"original_status": "working"},
)
conn.commit()
@@ -1738,7 +1831,9 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "done",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
@@ -1746,22 +1841,30 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "failed",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
# 记录恢复审计事件
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task_id, "daemon", "startup_recovery", json.dumps({"action": action}))
(task_id, "daemon", "startup_recovery",
json.dumps({"action": action}))
)
conn.commit()
logger.info("Recovery: task %s%s (action=%s)", task_id, action, action)
logger.info(
"Recovery: task %s%s (action=%s)",
task_id,
action,
action)
def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
"""查 events 表找到 reviewing 之前的状态(done 或 failed"""
# _transition_status 写入 event_type=f"task_{new_status}"detail 用 from/to
# _transition_status 写入 event_type=f"task_{new_status}"detail 用
# from/to
rows = conn.execute(
"""SELECT detail FROM events
WHERE task_id=? AND event_type='task_reviewing'
@@ -1772,7 +1875,8 @@ Parent Task ID: {parent_task.id}
for event in rows:
try:
detail = json.loads(event["detail"])
# _transition_status detail 格式: {"from": old_status, "to": new_status, ...}
# _transition_status detail 格式: {"from": old_status, "to":
# new_status, ...}
prev = detail.get("from") or detail.get("old_status")
if prev in ("done", "failed"):
return prev
+1
View File
@@ -57,6 +57,7 @@ export interface V2Task {
estimated_duration_minutes: number | null;
escalated: number;
archived: number; // v2.8: 归档标记
resumed_from: string | null; // v2.8: 续杯来源
// API 聚合字段
comments_count?: number;
outputs_count?: number;
+28 -12
View File
@@ -1,6 +1,13 @@
"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop"""
from __future__ import annotations
from src.api.toolchain_routes import router as toolchain_router
from src.api.mail_routes import router as mail_router
from src.api.sse_routes import router as sse_router
from src.api.project_routes import router as project_router
from src.api.daemon_routes import router as daemon_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.blackboard_routes import router as blackboard_router
import logging
from contextlib import asynccontextmanager
@@ -23,15 +30,7 @@ from src.daemon.health import HealthChecker
from src.daemon.experience import ExperienceDistiller, ExperienceStore
from src.daemon.inbox import InboxWatcher
from src.daemon.guardrails import GuardrailEngine
import src.utils as _utils
from src.api.blackboard_routes import router as blackboard_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.mail_routes import router as mail_router
from src.api.toolchain_routes import router as toolchain_router
from src.utils import get_data_root
logger = logging.getLogger("moziplus-v2")
@@ -86,7 +85,7 @@ config = load_config()
# 全局组件
# ---------------------------------------------------------------------------
DATA_ROOT = _utils.get_data_root()
DATA_ROOT = get_data_root()
ticker: Optional[Ticker] = None
@@ -139,7 +138,8 @@ async def lifespan(app: FastAPI):
counter = ActiveAgentCounter(
max_global=daemon_config.get("max_global_agents", 5),
max_per_session=daemon_config.get("max_per_session", 1),
max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3),
max_concurrent_sessions=daemon_config.get(
"max_concurrent_sessions", 3),
default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120),
)
# BootstrapBuilderL2 四段式引擎注入层,v2.1)
@@ -189,7 +189,10 @@ async def lifespan(app: FastAPI):
spawner=spawner,
counter=counter,
db_path=default_db_path,
guardrails=GuardrailEngine(config_path=Path(__file__).parent.parent / "config" / "guardrails.yaml"),
guardrails=GuardrailEngine(
config_path=Path(__file__).parent.parent /
"config" /
"guardrails.yaml"),
)
# ── 集成模块 ──
@@ -199,6 +202,7 @@ async def lifespan(app: FastAPI):
)
# ExperienceDistiller(经验自动蒸馏)
config.get("experience", {})
experience_distiller = ExperienceDistiller(
store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"),
)
@@ -259,6 +263,7 @@ app.add_middleware(
# API 路由注册
# ---------------------------------------------------------------------------
app.include_router(blackboard_router)
app.include_router(checkpoint_router)
app.include_router(daemon_router)
@@ -267,6 +272,17 @@ app.include_router(sse_router)
app.include_router(mail_router)
app.include_router(toolchain_router)
# ---------------------------------------------------------------------------
# 健康检查端点
# ---------------------------------------------------------------------------
@app.get("/api/healthz")
async def healthz():
"""轻量级健康检查,无需认证"""
return {"status": "ok"}
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
+15
View File
@@ -55,6 +55,21 @@ def client_with_isolation(isolated_data_root):
# ── E2E gate ──
def pytest_collection_modifyitems(config, items):
if not os.environ.get("RUN_INTEGRATION"):
skip_reason = "needs RUN_INTEGRATION=1"
remaining = []
deselected = []
for item in items:
if "integration" in item.keywords or "e2e" in item.keywords:
deselected.append(item)
else:
remaining.append(item)
if deselected:
config.hook.pytest_deselected(items=deselected)
items[:] = remaining
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
+2 -2
View File
@@ -1,12 +1,12 @@
import pytest
pytestmark = pytest.mark.e2e
skip_no_integration = pytest.mark.skipif(
not __import__("os").environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
)
pytestmark = [pytest.mark.e2e, skip_no_integration]
"""v2.7 端到端测试 — 全链路真实环境
覆盖项目管理 Task CRUD SubTask Stage进度 状态聚合 依赖链 超时 Mail 真实Agent调度
+1 -1
View File
@@ -123,7 +123,7 @@ class TestClassifyNoJsonExit0:
def test_task_status_pending(self):
result = Spawner._classify_outcome(0, {}, "", "pending", "")
assert result["outcome"] == "agent_error"
assert result["outcome"] == "completed"
assert result["should_retry"] is False