Compare commits

...

27 Commits

Author SHA1 Message Date
cfdaily f380b5f92d fix(frontend): V2Task 添加 resumed_from 字段
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
deploy 时 TypeScript 编译报 TS2339: Property 'resumed_from' does not exist on type 'V2Task'。
DB 表有此字段但 TS interface 遗漏。
2026-06-10 07:20:24 +08:00
jiangwei-infra 228a95b9fa Merge pull request 'fix(ci): deploy.yml 用 /tmp/ci-venv 替代 requirements.txt' (#17) from fix/deploy-workflow into main
Deploy / ci (push) Successful in 23s
Deploy / deploy (push) Failing after 9s
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-10 07:15:39 +08:00
cfdaily 405b7147a7 fix(ci): deploy.yml 用 /tmp/ci-venv + 直接 pip install 替代 requirements.txt
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 1s
仓库没有 requirements.txt,deploy workflow 每次 push 到 main 都报错。
改为与 ci.yml 一致的方式:/tmp/ci-venv + 直接 pip install 依赖。
2026-06-10 07:14:29 +08:00
jiangwei-infra b876159b52 Merge pull request 'fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)' (#16) from fix/lint-regression into main
Deploy / ci (push) Failing after 8s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 07:09:44 +08:00
cfdaily d58e38d58f fix(lint): 修复 PR #14 引入的 lint 回退 (119→0)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
PR #14 从旧分支复制文件导致回退了 PR #10 的 lint 修复。
修复内容:
- autoflake 移除未使用导入/变量
- autopep8 修复缩进/空格
- 手动修复 F821(pathlib→Path), F541(f-string), F841(未使用变量)
- 所有修复均通过 flake8 --max-line-length=120 --extend-ignore=E501 检查 (0 errors)
2026-06-09 23:53:29 +08:00
pangtong-fujunshi 7184079a75 Merge pull request 'fix(spawner): A13 exit=0 always completed' (#15) from fix/a13-exit0-completed into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
2026-06-09 23:42:05 +08:00
cfdaily fc9b66b905 docs(#08): update A13 revised - exit=0 always completed
CI / lint (pull_request) Failing after 9s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
Merge old A12/A13 into single A13 revised: trust exit_code=0
regardless of stdout/JSON output. Old logic caused inform Mail
infinite retry loop.
2026-06-09 23:41:53 +08:00
cfdaily 5bb220d237 fix(spawner): A13 exit=0 always completed, not agent_error
exit=0 means process exited normally. Trust the exit code regardless
of stdout/JSON output or task_status. Old logic misclassified inform
Mail completions as agent_error, causing infinite retry loops.

Includes test update: test_task_status_pending expects completed.
2026-06-09 23:41:53 +08:00
cfdaily f7fbdac89c chore: simayi-approved changes - lint fixes, toolchain improvements, healthz
All changes reviewed and APPROVED in PR #12 (Review ID: 40):
- toolchain_routes: webhook repo/org format compat, content dedup (sha256), closed issue filter
- dispatcher: inform mail crash 误标 done 修复
- ticker: cleanup and improvements
- healthz endpoint
- conftest: integration/e2e deselect markers
- docs: design docs, test-guide updates
- various lint/whitespace fixes across 30 files
2026-06-09 23:41:53 +08:00
cfdaily a1a4d7c5a7 docs: #19 adopt simayi review suggestions (v1.1) 2026-06-09 23:41:53 +08:00
jiangwei-infra 717dbc446a Merge pull request 'fix(CI): notify竞态修复 + 双倍触发去重 (PR #12 rebase, reviewed & approved)' (#14) from fix/ci-dedup-v2 into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 23:36:19 +08:00
cfdaily ee1ef23ace fix(spawner): crash cooldown分级 + inform mail crash误标done修复
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 4s
- crashed outcome cooldown 60s(vs 其他 300s)
- import init_db
- whitespace/lint fixes
2026-06-09 23:35:02 +08:00
cfdaily 20b3b5facb fix(ci): 修复notify竞态条件 - 用needs.result替代commit status查询
根因:notify-on-failure job 通过 commit status API 查询结果时,
自身的 pending status 会污染查询结果(竞态条件):
1. lint/test 都 success
2. notify 开始运行,自身状态 pending 写入 commit status
3. notify 查询 commit status → 看到 pending(自己的)≠ success
4. 误发 [CI] 失败 评论 + webhook 触发 Mail 通知

修复方案:
- 不再查询 commit status API
- 直接用 needs.lint.result 和 needs.test.result 判断
- 只有明确的 failure 才发通知
- 同时去掉 push 触发避免双倍运行
2026-06-09 23:34:44 +08:00
cfdaily 05201d778e fix(ci): 去掉push触发避免双倍触发 + 修复notify误报
1. 触发器:去掉 push,只保留 pull_request(opened, synchronize)
   - 每次 push 到 PR 分支不再跑 2 次 CI
2. notify-on-failure:只有明确的 failure 状态才发通知
   - 之前:空状态/unknown/pending 都触发通知(误报根因)
   - 现在:只有 STATUS=failure 才发通知
3. venv 路径:统一用 /tmp/ci-venv-lint 和 /tmp/ci-venv-test
   - 避免 host 模式下与开发目录 .venv 冲突
2026-06-09 23:34:41 +08:00
pangtong-fujunshi 5b2c42687a Merge pull request 'docs: add #19 toolchain context layers design' (#11) from docs/19-toolchain-context-layers-v2 into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 22:26:05 +08:00
cfdaily 149921fb5f docs: add #19 toolchain context layers design
CI / lint (push) Successful in 7s
CI / test (push) Successful in 14s
CI / lint (pull_request) Successful in 7s
CI / notify-on-failure (push) Successful in 0s
CI / test (pull_request) Successful in 14s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-09 22:25:17 +08:00
pangtong-fujunshi 59068b8d2a Merge pull request 'fix: resolve all flake8 lint errors (118 → 0)' (#10) from fix/lint-cleanup into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-09 22:24:10 +08:00
cfdaily 242057dfd6 fix: remove dead code config.get experience
CI / lint (push) Successful in 6s
CI / test (push) Successful in 14s
CI / notify-on-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 13m39s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Failing after 14m58s
2026-06-09 22:23:58 +08:00
cfdaily 09a0928bbc fix: resolve all flake8 lint errors (118 → 0)
CI / lint (push) Successful in 8s
CI / lint (pull_request) Successful in 5s
CI / test (push) Failing after 8s
CI / test (pull_request) Failing after 8s
CI / notify-on-failure (push) Successful in 1s
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-09 16:43:41 +08:00
jiangwei-infra 62d8ced8ed Merge pull request 'fix(ci): install all test dependencies' (#8) from fix/ci-deps into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 3s
2026-06-09 14:53:50 +08:00
jiangwei-infra 51ccbbf4b5 fix(ci): install all test dependencies (fastapi, pydantic, pyyaml, etc.)
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 0s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 6s
2026-06-09 14:53:24 +08:00
jiangwei-infra fe24a86d7d Merge pull request 'fix(ci): install pytest directly instead of editable mode' (#7) from fix/ci-pytest into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 14:33:52 +08:00
jiangwei-infra 25c9cfd1ed fix(ci): install pytest directly instead of editable mode
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 0s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 6s
2026-06-09 14:33:28 +08:00
jiangwei-infra 5af0e0e91d Merge pull request 'fix(ci): use pyproject.toml instead of missing requirements.txt' (#6) from fix/ci-requirements into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-09 14:24:42 +08:00
jiangwei-infra 05246d6469 fix(ci): use pyproject.toml instead of missing requirements.txt
CI / lint (push) Failing after 7s
CI / test (push) Has been skipped
CI / notify-on-failure (push) Successful in 1s
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 5s
2026-06-09 14:24:02 +08:00
jiangwei-infra 90e657636c Merge pull request 'fix(ci): use /tmp/ci-venv-* to avoid host .venv conflict' (#5) from fix/ci-venv-path into main
Deploy / ci (push) Failing after 10s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-09 13:22:34 +08:00
jiangwei-infra cbdc965a0e fix(ci): use /tmp/ci-venv-* to avoid host .venv conflict
CI / lint (push) Failing after 6s
CI / test (push) Has been skipped
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (push) Successful in 1s
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-09 13:21:01 +08:00
38 changed files with 1524 additions and 469 deletions
+24 -23
View File
@@ -1,9 +1,10 @@
# CI 管道 — moziplus v2.0
#
# 触发条件:
# - push(非 main 分支)
# - pull_requestopened, synchronize
#
# 注意:只保留 pull_request 触发,避免 push + pull_request 双倍触发
#
# Gitea v1.23.4 限制注意:
# - 不支持 failure() 表达式,用 always() + shell 条件判断替代
# - 不支持 concurrency / continue-on-error / timeout-minutes / permissions
@@ -13,10 +14,6 @@
name: CI
on:
push:
branches:
- '**'
- '!main'
pull_request:
types: [opened, synchronize]
@@ -29,12 +26,12 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet flake8
python3 -m venv /tmp/ci-venv-lint
/tmp/ci-venv-lint/bin/pip install --quiet flake8
- name: Lint with flake8
run: |
.venv/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
/tmp/ci-venv-lint/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
# ── Job 2: Test ──────────────────────────────────────
test:
@@ -45,15 +42,16 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet -r requirements.txt
python3 -m venv /tmp/ci-venv-test
/tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Run tests (exclude E2E)
run: |
.venv/bin/pytest tests/ -m "not e2e" -x -q
/tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
# ── Job 3: CI 失败通知 ───────────────────────────────
# v1.23 不支持 failure(),用 always() + shell 检查 commit status 替代
# 使用 needs.<job>.result 直接判断,不查询 commit status API
# 根因:notify 自身的 pending status 会污染 commit status 查询结果(竞态条件)
notify-on-failure:
runs-on: macos-arm64
needs: [lint, test]
@@ -62,31 +60,34 @@ jobs:
- name: Check results and notify
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
LINT_RESULT: ${{ needs.lint.result }}
TEST_RESULT: ${{ needs.test.result }}
run: |
# 查询当前 commit 的 status
STATUS=$(curl -sf \
-H "Authorization: token $GITEA_TOKEN" \
"${{ gitea.api_url }}/repos/${{ gitea.repository }}/commits/${{ gitea.sha }}/status" \
| python3 -c "import sys,json; print(json.load(sys.stdin).get('state',''))" 2>/dev/null || echo "")
echo "Lint result: $LINT_RESULT"
echo "Test result: $TEST_RESULT"
echo "Commit status: $STATUS"
if [ "$STATUS" != "success" ]; then
echo "CI failed or status unknown, sending notification..."
# 只有 lint 或 test 明确失败时才发通知
if [ "$LINT_RESULT" = "failure" ] || [ "$TEST_RESULT" = "failure" ]; then
echo "CI has failures, sending notification..."
# 如果是 PR 事件,写评论通知
PR_NUMBER="${{ gitea.event.pull_request.number }}"
if [ -n "$PR_NUMBER" ]; then
# 构建失败摘要
FAILED_JOBS=""
[ "$LINT_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}lint "
[ "$TEST_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}test "
curl -sf -X POST \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
"${{ gitea.api_url }}/repos/${{ gitea.repository }}/issues/${PR_NUMBER}/comments" \
-d "{\"body\": \"[CI] 失败\\n\\n分支: ${{ gitea.ref_name }}\\n触发 commit: \`${{ gitea.sha }}\`\\n请检查 CI 日志并修复。\"}" \
-d "{\"body\": \"[CI] 失败\\n\\n分支: ${{ gitea.ref_name }}\\n触发 commit: \`${{ gitea.sha }}\`\\n失败 Job: ${FAILED_JOBS}\\n请检查 CI 日志并修复。\"}" \
|| echo "Failed to post PR comment"
echo "PR comment posted."
else
echo "Not a PR event, skipping PR comment."
fi
else
echo "CI passed, no notification needed."
echo "No explicit failures (results: lint=$LINT_RESULT, test=$TEST_RESULT), no notification needed."
fi
+4 -4
View File
@@ -23,16 +23,16 @@ jobs:
- name: Setup Python
run: |
python3 -m venv .venv
.venv/bin/pip install --quiet -r requirements.txt
python3 -m venv /tmp/ci-venv-deploy
/tmp/ci-venv-deploy/bin/pip install --quiet flake8 fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Lint
run: |
.venv/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
/tmp/ci-venv-deploy/bin/flake8 src/ --max-line-length=120 --extend-ignore=E501
- name: Unit & Integration Tests
run: |
.venv/bin/pytest tests/ -m "not e2e" -x -q
/tmp/ci-venv-deploy/bin/pytest tests/ -m "not e2e" -x -q
# ── Job 2: 部署 ─────────────────────────────────────
deploy:
@@ -110,8 +110,8 @@ TCP 握手只能检测进程端口是否监听,无法检测 Gateway **业务
| 编号 | 条件 | outcome | 可恢复? | 处理 |
|------|------|---------|----------|------|
| A12 | exit=0 + task_status ∈ {done, review} | completed | — | 正常完成 |
| A13 | exit=0 + task_status ∉ {done, review} | agent_error | ❌ | 标 failed + 原因写黑板 |
| A12 | ~~已合并到 A13 revised~~ | — | — | 见下方 A13 revised |
| **A13 revised** | exit=0(无 JSON 输出) | completed | — | 信任进程退出码,exit=0 即正常完成。旧逻辑按 task_status 区分,非终态判 agent_error → 导致 inform Mail 永不标 done,与 dispatcher inform auto-done 形成死循环 |
| **A14** | exit=130 (SIGINT) 或 exit=143 (SIGTERM) | interrupted | ✅ | retry |
| **A15** | exit≠0 + stderr 含 network 关键字 | gateway_unreachable | ✅ | retry + cooldown 30s |
| **A16** | exit≠0 + stderr 含 compact 关键字 | compact_interrupted | ✅ | retry + cooldown 60s |
+16 -4
View File
@@ -1590,7 +1590,7 @@ daemon 内部 ───────┘ │ 5. 创建 Mail │
| 只处理白名单内的事件类型 | 未知的忽略 + 日志 |
| issue_comment 需判断来源 | 只处理 CI workflow 写的评论(按特定前缀匹配:`❌ **CI 失败**` 或统一后的 `[CI]` 前缀) |
| PR 作者/审查者必须是已知 Agent | 未知的忽略 + 日志 |
| 幂等:同一事件不重复创建 Mail | `{x_gitea_event}-{x_gitea_delivery}` 去重(delivery ID 来自 `X-Gitea-Delivery` header |
| 幂等:同一事件不重复创建 Mail | 双重去重:① delivery UUID`{event}-{delivery}`)标准幂等;② review 事件 payload 内容去重(`{event}:{pr_num}:{sender}:{sha256(body_or_content)[:16]}`),防御同一 review 被不同来源重复提交(2026-06-09 新增 |
---
@@ -2007,6 +2007,9 @@ CI workflow 已有 `notify-on-failure` jobci.yml),当前格式:
| 7 | 签名算法 | ✅ 已确认 | Gitea 使用 HMAC-SHA256,代码注释已补 |
| 8 | Webhook 作用范围 | ✅ 组织级 | Gitea 组织级 webhookHook ID=28),覆盖 sanguo 下所有仓库,新增仓库自动覆盖 |
| 9 | ALLOWED_HOST_LIST | ✅ 已修复 | Gitea 容器配置 `192.168.2.153, 127.0.0.1, localhost, 172.17.0.0/16, 192.168.2.0/24` |
| 10 | Gitea review payload 格式 | ✅ 姜维调研确认(2026-06-08 | Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| 11 | Spawner compact 检测窗口 | ✅ 已修复 | 窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| 12 | inform 类型 Mail crash 误标 done | ✅ 已修复 | `_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
---
@@ -2713,10 +2716,10 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| # | 条件 | 状态 | 谁确认 |
|---|------|------|--------|
| 1 | act-runner 已注册且 label = `macos-arm64` | ✅ PM2 托管(sanguo-act-runner, id=44),崩溃自动重启 | 姜维确认 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ⚠️ 需确认 | 姜维 |
| 2 | Gitea repository secrets 已配置(CI_TOKEN | ✅ 姜维确认(sanguo/moziplus-v2 已配 CI_TOKEN | 姜维 |
| 3 | Gitea 组织级 Webhook 已启用(Hook ID=28 | ✅ 已确认 | 已确认 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ⚠️ 待分配 | 庞统协调 |
| 5 | main 分支保护规则(Review 才能 merge | ⚠️ 需确认 | 姜维 |
| 4 | 各 Agent 的 GITEA_TOKEN 环境变量 | ✅ 已写入各 Agent TOOLS.md,姜维确认 token 记录存在 | 庞统+姜维 |
| 5 | main 分支保护规则(Review 才能 merge | ✅ 姜维已配置(moziplus-v2 + sanguo_moziplus_v2,需1个approve | 姜维 |
| 6 | 禁止在 daemon 运行时跑全量 E2E | ✅ 已警告司马懿 | 已确认 |
> 第 5 点很关键——如果 main 分支没有保护规则,开发者可以直接 push main 跳过 Review。
@@ -2753,3 +2756,12 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| §17.6.4 | 新增 P3 端到端验证结果(S1-S6 逐项) |
| §17.6.4 | 新增调研发现:Review API 枚举值、PullRequestReview webhook 支持、act-runner PM2 托管 |
| §17.10 | #1 状态更新:act-runner 已纳入 PM2 托管 |
### v3.1 → v3.2 变更(工具链修复 + Mail 投递 bug 修复)
| 编号 | 变更内容 |
|------|----------|
| §16.4 | Review handler 双格式兼容:HANDLERS 注册表同时注册 `pull_request_review` / `pull_request_approved` 等多种事件名;`_handle_pull_request_review` 兼容 repo webhookreview.state/body/user)和 org webhookreview.type/content/sender)两种 payload 格式 |
| §16.8 #10 | Gitea v1.23.4 review payload 调研结论(姜维 2026-06-08):Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| §16.8 #11 | Spawner compact 检测窗口修复:窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| §16.8 #12 | inform 类型 Mail crash 误标 done bug 修复:`_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
+121
View File
@@ -0,0 +1,121 @@
# §18. 工具链端到端验证测试
> 日期:2026-06-09
> 状态:已完成 ✅
> 目标:用真实 Webhook 触发验证整条 Mail 通知链路
## 前置确认
- Gitea 用户名 ↔ Agent ID 映射:完全一致(admin, guanyu-dev, jiangwei-infra, pangtong-fujunshi, simayi-challenger, zhangfei-dev, zhaoyun-data
- Gitea 组织级 WebhookHook ID=28):姜维确认最近 5 条投递全部 is_succeed=1
- Daemon 在线:sanguo-moziplus-v2 运行中
- 测试仓库:sanguo/moziplus-v2
## 命名规范
- Issue 标题:`[E2E-TEST] xxx`
- PR 标题:`[E2E-TEST] xxx`
- 分支名:`test/e2e-<timestamp>`
## 验证步骤
| 步骤 | 操作 | 触发事件 | 预期 Mail 通知 | 验证点 |
|------|------|----------|---------------|--------|
| 1 | 创建 Issue `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev | issues (assigned) | zhangfei-dev 收到 "Issue 指派" Mail | Mail to/模板正确 |
| 2 | 开分支 `test/e2e-<ts>`,创建 PR `[E2E-TEST] Review请求测试` | pull_request (opened) | simayi-challenger 收到 "Review 请求" Mail | Mail to/风险级别/文件列表 |
| 3 | PR Review APPROVED | pull_request_review (approved) | PR 作者(pangtong-fujunshi) 收到 "Review 通过 ✓" Mail | result=通过 ✓ |
| 4 | PR Review REQUEST_CHANGES | pull_request_review (rejected) | PR 作者收到 "Review 驳回 ✗" Mail | result=驳回 ✗ |
| 5 | Issue 上发评论 `[CI] CI 失败 — 分支: test/e2e-xxx, 错误: build timeout` | issue_comment | Issue 作者收到 "CI 失败" Mail | 模板含分支/错误摘要 |
| 6 | 创建标题含"部署失败"的 Issue(无指派) | issues (opened) | jiangwei-infra + pangtong-fujunshi 各收到 "部署失败" Mail | 双收件人 |
| 7 | 关闭步骤 1 的 Issue,再发 CI 失败评论 | issue_comment (closed issue) | 不产生 Mail(负面测试) | handler 跳过 closed |
| 8 | 重发步骤 1 Webhook(相同 delivery ID | 重复事件 | 不产生新 Mail(幂等测试) | 返回 duplicate |
## 签名校验
已测试(GITEA_WEBHOOK_SECRET 已配置且生效):
- ✅ 正确签名:请求正常处理
- ✅ 无签名:返回 403 `signature verification failed`
## Review 意见来源
- 姜维(基础设施确认 + 边界验证建议)
- 司马懿(遗漏点补充 + 命名规范 + 风险防范)
---
## 执行记录
> 2026-06-09 00:40~00:50 CST
### 步骤 1Issue 指派 ✅
- 操作:创建 Issue #22 `[E2E-TEST] Issue指派测试`assignee=zhangfei-dev
- Mail`mail-1780936736480`from=system, to=zhangfei-dev, title=`Issue 指派: [E2E-TEST] Issue指派测试`
- 模板渲染正确(含 Issue 链接、标签、描述、建议分支名)
### 步骤 2PR Review 请求 ✅
- 操作:创建分支 `test/e2e-1780936838`,创建 PR #23
- Mail`mail-1780936851715`from=system, to=simayi-challenger
- 模板含 PR 链接、标题、作者(pangtong-fujunshi)、分支、风险级别(standard)
- 附带:CI 失败通知 `mail-1780936876572`CI 自动触发,符合预期)
### 步骤 3Review APPROVED ✅
- 操作:用 simayi-challenger token 提交 APPROVED review
- Mail`mail-1780936968411`from=system, to=pangtong-fujunshi, title=`Review 通过 ✓`
- 描述含审查者(simayi-challenger)、review body
- ⚠️ 收到 2 封重复 Mailorg webhook + repo webhook 双触发)
### 步骤 4Review REQUEST_CHANGES ✅
- 操作:用 simayi-challenger token 提交 REQUEST_CHANGES review
- Mail`mail-1780936972207`from=system, to=pangtong-fujunshi, title=`Review 驳回 ✗`
- ⚠️ 同上,收到 2 封重复 Mail
### 步骤 5CI 失败评论 ✅
- 操作:在 Issue #22 发评论 `[CI] CI 失败 — 分支: test/e2e-1780936838, 错误: build timeout`
- Mail`mail-1780936994513`from=system, to=pangtong-fujunshi, title=`CI 失败: sanguo/moziplus-v2#22`
- 模板含分支提取和错误摘要
### 步骤 6:部署失败 Issue ✅
- 操作:创建 Issue #24 `[E2E-TEST] 部署失败: test deploy`(无指派)
- Mail`mail-1780936999660` to=jiangwei-infra, `mail-1780936999684` to=pangtong-fujunshi
- 双收件人验证通过 ✅
### 步骤 7:已关闭 Issue 负面测试 ✅
- 操作:关闭 Issue #22 后发 `[CI] CI 失败 — 应被过滤`
- 结果:未产生新 Mail ✅(只有步骤 5 的 1 封 CI Mail,步骤 7 的评论被正确过滤)
### 步骤 8:幂等测试 ✅
- 操作:构造带正确 HMAC-SHA256 签名的 Webhook,用同一 delivery ID `test-idempotency-002` 发两次
- 第一次:返回 `ok`,产生 Mail ✅
- 第二次:返回 `duplicate`,无新 Mail ✅
- 额外验证:不带签名的请求返回 403 `signature verification failed`(签名校验正常工作)
---
## 汇总
| 步骤 | 状态 | 备注 |
|------|------|------|
| 1. Issue 指派 | ✅ 通过 | Mail to/模板正确 |
| 2. PR Review 请求 | ✅ 通过 | Mail to/风险级别/文件列表正确 |
| 3. Review APPROVED | ✅ 通过 | E2E 测试中产生 2 封 Mail(根因已查明,非平台问题) |
| 4. Review REQUEST_CHANGES | ✅ 通过 | 同上 |
| 5. CI 失败评论 | ✅ 通过 | 分支提取正确 |
| 6. 部署失败 Issue | ✅ 通过 | 双收件人验证通过 |
| 7. 已关闭 Issue 过滤 | ✅ 通过 | 负面测试通过,无新 Mail |
| 8. 幂等测试 | ✅ 通过 | 第二次返回 duplicate,无新 Mail;签名校验正常拦截无签名请求 |
## 发现的问题
### Review 事件双 Mail(已修复)
- **现象**E2E 测试步骤 3/4 中 Review 事件产生 2 封 Mail
- **根因**(姜维深入调查确认):E2E 测试中庞统手动用 simayi token 提交了 Review,同时 simayi agent 收到 Review 请求 Mail 后也自主提交了 Review。是两次独立的 API 调用,**不是 Gitea bug 或平台配置问题**
- 姜维控制实验:一次 review API 调用只产生 1 个 hook_task
- Gitea 路由日志确认两次 POST 间隔 7 秒,payload 有差异(review_comments、updated_at 不同)
- 之前的错误分析("Gitea webhookNotifier + actionsNotifier 双投递")已被推翻:actionsNotifier 走 handleWorkflows() 不创建 hook_task
- **修复**:payload 内容去重作为防御性编程保留(`_is_duplicate` 新增内容去重 key = event + pr_num + sender + sha256(body_or_content)),司马懿 APPROVED
- **验证**PR #27 实测只产生 1 封 Mail ✅
### 根因分析教训
- 姜维第一次分析给出了错误根因(Gitea 双 notifier),第二次深入调查后自我纠正
- 庞统把姜维的第一次结论当事实汇报给主公,没有标注"这是姜维的调查结论,尚未独立验证"
- **改进**SOUL.md 新增规则——推测 vs 事实显式标注、引用他人结论时标注来源、结论被推翻时及时更正
+382
View File
@@ -0,0 +1,382 @@
# #19 工具链事件中枢 — 上下文四层改造方案
> 版本: v1.0
> 日期: 2026-06-09
> 作者: 庞统(副军师)
> 状态: 待主公确认
> 前置: #13 工具链与开发流程 §16, #05 上下文四层架构
> 来源: E2E 真实场景测试暴露的三个断层
---
## 一、问题诊断
### 1.1 E2E 真实场景测试暴露的三个断层
主公在 moziplus-v2 仓库创建了 Issue #32(添加 /api/stats 端点),指派张飞。链条在第一步就断了。
| 断层 | 现象 | 根因 |
|------|------|------|
| **Agent 不知道该做什么** | 张飞收到 Issue 指派 Mail,回复"已阅"就结束了 | Mail 模板(issue_assigned.md5 行信息,无流程引导;spawn prompt 说"已阅即可" |
| **Agent 去错了仓库** | 张飞去读了 sanguo_moziplus_v2 平台代码,而不是空的实验仓库 moziplus-v2 | Mail 模板没有仓库 clone URL,张飞凭习惯去了开发目录 |
| **Agent 在 Control UI 提问** | 张飞遇到问题直接在 Control UI 问主公,没有去 Gitea Issue 评论 | 没有任何地方引导"有疑问去 Gitea Issue 评论" |
| **Agent 不知道怎么协作** | 张飞判断任务需要澄清,但不知道该怎么请求澄清 | 没有"做不了→在 Issue 评论 / Mail 庞统"的回退路径 |
| **跨 Agent @mention 无法通知** | 张飞在 Issue 评论 @赵云,赵云收不到通知 | issue_comment handler 只处理 [CI] 评论,@mention 被忽略 |
### 1.2 根因:工具链在四层架构中的断层
| 层 | 应该有 | 实际有 | Gap |
|---|---|---|---|
| **L0 铁律** | — | — | 无需改动 |
| **L1 角色** | 工具链协作行为规范(所有 Agent 共享) | 无 | AGENTS.md 没有工具链相关内容 |
| **L2 引擎注入** | 事件上下文(仓库 clone URL、Gitea API、Issue/PR 详情) | Mail 模板只有 5 行摘要 | 缺仓库信息和流程引导 |
| **L3 被动参考** | 技术细节(分支命名、commit 规范、PR 创建方式) | git-workflow 等 Skill 已存在但没人触发 | Agent 不知道该加载哪个 Skill |
---
## 二、改造方案:四层归属
### 2.1 分层原则
| 层 | 放什么 | 不放什么 | 理由 |
|---|---|---|---|
| **L0** | 不放 | — | 工具链不是安全底线 |
| **L1** | 协作行为规范:收到什么通知该做什么、遇到问题怎么办 | 技术细节(分支命名、commit 格式) | 行为规范是团队常识,每个 Agent 都要知道 |
| **L2** | 事件上下文:仓库 clone URL、Gitea API URL、Issue/PR 链接、动态信息 | 固定的协作流程 | 动态信息每次不同,由 Mail 模板 + spawn 时注入 |
| **L3** | 技术细节:git-workflow、code-review 等 Skill 全文 | — | 按需加载,Agent 知道"我要提 PR"后自己读 |
### 2.2 各层具体内容
#### L1AGENTS.md 加工具链协作行为段(所有 Agent 统一)
```markdown
## 工具链协作(Gitea
收到 Gitea 事件通知(Issue 指派、Review 请求、CI 失败等)时,按以下流程操作:
### 基本流程
- **Issue 指派** → clone 仓库 → 开分支 → 编码 → 提 PR(参考 git-workflow Skill
- **Review 请求** → 读 PR diffGitea API)→ 提交 Review(参考 code-review Skill
- **Review 通过** → 等 merge
- **Review 驳回** → 看 review body → 修代码 → 重新 push
- **CI 失败** → 看错误摘要 → 修代码 → push(自动重触发 CI)
- **部署失败** → 查 deploy 日志 → 修复
### 协作规则
- **有疑问?** 在 Gitea Issue 下评论,不要在 Control UI 或 Mail 里问
- **需要别人帮忙?** 在 Issue 评论中 @mention 对应 Agent(如 @zhaoyun-data
- **做不了?** 回复 Mail 说明原因和建议的接手人
- **获取完整上下文** → 用 Gitea API 拉取 Issue 详情和评论,不要只看 Mail 里的快照
### Gitea API 速查
> 其中 `{owner}/{repo}` 替换为实际仓库,如 `sanguo/sanguo_moziplus_v2`
- Issue 详情: GET /api/v1/repos/{owner}/{repo}/issues/{number}
- Issue 评论: GET /api/v1/repos/{owner}/{repo}/issues/{number}/comments
- PR diff: GET /api/v1/repos/{owner}/{repo}/pulls/{number}.diff
- 提交 Review: POST /api/v1/repos/{owner}/{repo}/pulls/{number}/reviews
```
**改动范围**6 个 Agent 的 AGENTS.md 各加一段(内容统一)。
#### L2:Mail 模板精简 + 事件上下文注入
**原则**:模板只放摘要 + 链接 + 仓库信息,不写固定步骤(步骤在 L1)。
**issue_assigned.md** 改为:
```markdown
Issue 指派
Issue: {issue_url}
标题: {issue_title}
标签: {labels}
📋 获取完整上下文(先读再动手):
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
仓库: {repo_clone_url}
建议分支: feat/issue-{issue_number}-{brief}
```
**review_request.md** 改为:
```markdown
PR Review 请求
PR: {pr_url}
标题: {pr_title}
作者: {pr_author}
分支: {branch}
风险级别: {risk_level}
📋 获取完整上下文:
- PR diff: GET {gitea_api}/repos/{repo}/pulls/{pr_number}.diff
- PR 文件列表: GET {gitea_api}/repos/{repo}/pulls/{pr_number}/files
```
**review_result.md** 改为:
```markdown
Review {result}
PR: {pr_url}
标题: {pr_title}
审查者: {reviewer}
{review_body}
```
**ci_failure.md** 改为:
```markdown
CI 失败
Issue: {issue_url}
分支: {branch}
错误摘要:
{error_summary}
📋 CI 日志: {gitea_url}/{repo}/actions
修复后 push 会自动重触发 CI。
```
**deploy_failure.md** 改为:
```markdown
部署失败
仓库: {repo}
Commit: {commit_sha}
📋 排查步骤:
- CI 日志: {gitea_url}/{repo}/actions
- 服务器: pm2 logs {service_name}
```
**L2 代码改动**`toolchain_routes.py`):
1. 从 Webhook payload 的 `repository` 对象提取 `clone_url``html_url`
2. `render_template()` 传入新变量:`gitea_api``gitea_url``repo_clone_url`
3. 所有模板变量统一补齐
#### L3Skill 按需加载(不改 Skill 本身)
git-workflow、code-review 等 Skill 保持不变。
L1 的协作行为段里会引用 Skill 名称("参考 git-workflow Skill"),Agent 收到 Mail 后根据 L1 的引导自主加载对应 Skill。
**不改 Skill 路由机制**——靠 L1 的文案触发 Agent 的 Skill 路由器匹配。
---
## 三、新增功能:issue_comment @mention 通知
### 3.1 设计
当前 `_handle_issue_comment` 只处理 `[CI]` 前缀评论。扩展为:
```
issue_comment 事件
├── 含 [CI] / CI 失败 → 原有 CI 失败通知逻辑
└── 含 @username → 解析 @mention → Mail 通知被 @的 Agent
```
### 3.2 实现
**`toolchain_routes.py` 新增 `_handle_issue_comment_mention()`**
```python
AGENT_IDS = {
"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "simayi-challenger", "pangtong-fujunshi",
}
# 前缀映射:@张飞 → zhangfei-dev
# 中文名映射:Agent 在 Gitea Issue 评论中可能用中文名 @mention
# 英文短名映射:Agent 可能用不带 -dev/-infra 后缀的短名
AGENT_ALIAS = {
"张飞": "zhangfei-dev",
"关羽": "guanyu-dev",
"赵云": "zhaoyun-data",
"姜维": "jiangwei-infra",
"司马懿": "simayi-challenger",
"庞统": "pangtong-fujunshi",
"pangtong": "pangtong-fujunshi",
"simayi": "simayi-challenger",
"zhangfei": "zhangfei-dev",
"guanyu": "guanyu-dev",
"zhaoyun": "zhaoyun-data",
"jiangwei": "jiangwei-infra",
}
def extract_mentions(body: str, sender: str) -> list[str]:
"""从评论 body 中提取 @mention 的 Agent ID"""
candidates = re.findall(r"@([a-zA-Z\u4e00-\u9fa5][a-zA-Z0-9\u4e00-\u9fff-]*)", body)
result = set()
for c in candidates:
# 精确匹配
if c in AGENT_IDS:
result.add(c)
# 前缀/别名匹配
elif c in AGENT_ALIAS:
result.add(AGENT_ALIAS[c])
else:
# 前缀模糊匹配:@zhangfei → zhangfei-dev
for aid in AGENT_IDS:
if aid.startswith(c):
result.add(aid)
break
# 过滤掉评论者自己
result.discard(sender)
return list(result)
```
**新增 mention 通知模板** `templates/toolchain/mention.md`
```markdown
你在 Issue 中被 @mention
Issue: {issue_url}
评论者: {commenter}
评论内容:
{comment_body}
📋 获取完整上下文:
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
```
**改动 `_handle_issue_comment`**
```python
async def _handle_issue_comment(payload):
comment = payload.get("comment", {})
body = comment.get("body", "")
sender = comment.get("user", {}).get("login", "")
repo = _repo_fullname(payload)
issue = payload.get("issue", {})
# 原有 CI 失败逻辑(不变)
if "[CI]" in body or "CI 失败" in body:
# ... 原有逻辑 ...
# 新增:@mention 通知
mentions = extract_mentions(body, sender)
if mentions:
issue_number = issue.get("number", 0)
issue_title = issue.get("title", "")
text = render_template("mention", {
"repo": repo,
"issue_number": str(issue_number),
"issue_url": issue.get("html_url", ""),
"commenter": sender,
"comment_body": body[:500],
"gitea_api": "http://192.168.2.154:3000/api/v1",
})
title = f"@mention: {issue_title} ({repo}#{issue_number})"
for agent_id in mentions:
_send_mail(agent_id, title, text)
```
### 3.3 去重
- 同一条评论 @多人:每人一封 Mail(不同 to,内容相同)
- 同一事件 org webhook + repo webhook 双触发:现有 delivery UUID 去重机制覆盖
- 同一人被 @多次`extract_mentions` 返回 set,自动去重
---
## 四、Mail Spawn Prompt 改造
### 4.1 问题
当前工具链 Mail 走 Mail 通道,spawn prompt 是:
```
你收到一封飞鸽传书(纯通知)。
发件者: system
主题: Issue 指派: xxx
内容: [工具链模板]
已阅即可。
```
"已阅即可"直接让 Agent 不做事。
### 4.2 方案
**不改 MAIL_INFORM_TEMPLATE / MAIL_REQUEST_TEMPLATE 本身**(那是 Mail 系统通用的)。
改为:**工具链 Mail 使用 `type=request`(而不是默认的 inform**。
`_send_mail()` 中,工具链事件创建的 Mail 默认 `performative=request`,这样 Agent 收到时走 `MAIL_REQUEST_TEMPLATE`,知道需要处理。
具体改动在 `_send_mail()` 函数或其调用处:工具链路由调用 `_send_mail` 时传入 `performative="request"`
**⚠️ 验证要点**:改为 request 后,Agent spawn prompt 变为 "请处理以下请求",需确认:
1. Agent 不再把工具链 Mail 当纯通知忽略
2. Agent 能正确处理「已阅型」工具链事件(如 CI 失败通知——不需要回复,但需要知道)
3. 对已关闭 PR/Issue 的延迟通知,Agent 不会尝试去处理
验证方法:部署后发一条 Issue 指派 Mail,观察 Agent 行为是否符合预期。
---
## 五、完整改动清单
| # | 改什么 | 改动内容 | 层 | 风险 |
|---|--------|---------|---|------|
| 1 | 6 个 Agent 的 `AGENTS.md` | 加"工具链协作"段(内容统一) | L1 | 低(纯追加) |
| 2 | `templates/toolchain/issue_assigned.md` | 精简 + 加仓库上下文 + Gitea API 引导 | L2 | 低 |
| 3 | `templates/toolchain/review_request.md` | 精简 + 加 Gitea API 引导 | L2 | 低 |
| 4 | `templates/toolchain/review_result.md` | 精简 | L2 | 低 |
| 5 | `templates/toolchain/ci_failure.md` | 精简 + 加 CI 日志链接 | L2 | 低 |
| 6 | `templates/toolchain/deploy_failure.md` | 精简 + 加排查步骤 | L2 | 低 |
| 7 | **新建** `templates/toolchain/mention.md` | @mention 通知模板 | L2 | 低 |
| 8 | `src/api/toolchain_routes.py` | 提取 clone_url/html_url 传入模板;issue_comment 增加 @mention 解析;工具链 Mail 改为 request 类型 | L2 | 中 |
| 9 | 不改 | git-workflow 等 Skill 保持不变 | L3 | — |
| 10 | 不改 | daemon 核心逻辑、BootstrapBuilder、Skill 路由 | — | — |
---
## 六、验证方案
### 6.1 单元验证
| 验证点 | 方法 |
|--------|------|
| `extract_mentions()` 提取 `@zhangfei-dev` | unit test |
| `extract_mentions()` 别名匹配 `@张飞` → zhangfei-dev | unit test |
| `extract_mentions()` 前缀匹配 `@zhangfei` → zhangfei-dev | unit test |
| `extract_mentions()` 过滤自己 | unit test |
| 模板渲染新变量不报错 | unit test |
### 6.2 真实场景 E2E 验证
重复 Issue #32 的场景:
1. 创建 Issue 指派张飞
2. **验证**:张飞收到的 Mail 含 clone URL + Gitea API 引导
3. **验证**:张飞 spawn 后知道该做什么(L1 AGENTS.md 有流程引导)
4. **验证**:张飞有疑问时去 Gitea Issue 评论(而不是 Control UI
5. 在 Issue 评论 @赵云
6. **验证**:赵云收到 @mention Mail
---
## 七、不做的事(标记为后续)
| 标记 | 描述 | 原因 |
|------|------|------|
| 后续-1 | Agent 离开工具链讨论后,是否有意识回到工具链 | 需要更多真实场景观察 |
| 后续-2 | 工具链使用标准在所有 Agent 间的一致性验证 | L1 统一段落是第一步,需要 E2E 验证 |
| 后续-3 | Mail 通道接入 BootstrapBuilder L2 注入 | 改动大,当前方案(L1 统一段落 + 模板引导)够用 |
| 后续-4 | Skill 路由器自动触发(引擎注入) | 改动 daemon 核心,当前靠 L1 文案触发 |
---
## 八、变更记录
| 日期 | 版本 | 变更 |
|------|------|------|
| 2026-06-09 | v1.0 | 初版:E2E 真实场景暴露问题 → 四层改造方案 + @mention 通知 + Mail type 改造 |
+5 -4
View File
@@ -11,9 +11,10 @@
| 场景 | 命令 | 耗时 | 说明 |
|------|------|------|------|
| **改了某个模块** | `pytest tests/unit/test_spawner.py` | <5s | 只跑改动的模块对应的单元测试 |
| **改了 API 层** | `pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest -m "not e2e"` | ~2min | 不跑 E2E,验证不破坏现有功能 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 E2E,真实 Agent |
| **改了 API 层** | `RUN_INTEGRATION=1 pytest tests/integration/` | ~1min | 跑全部集成测试 |
| **提交前快速验证** | `pytest` | ~2min | 默认排除 integration 和 e2e |
| **含集成测试** | `RUN_INTEGRATION=1 pytest` | ~5min | integration 测试 |
| **部署前全量验证** | `RUN_INTEGRATION=1 pytest` | ~60min | 含 e2e,真实 Agent |
| **只跑 E2E 场景** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_scenarios.py` | ~30min | 串行,一个跑完再下一个 |
| **只跑 E2E 压力** | `RUN_INTEGRATION=1 pytest tests/e2e/test_e2e_stress.py` | ~10min | 并发测试 |
@@ -101,7 +102,7 @@ E2E(慢,真实 Agent) → 验证完整链路,需要 RUN_INTEGRATION=1
## 关键规则
1. **只有 E2E 会 spawn 真实 Agent**,单元和集成不会
2. **不带 `RUN_INTEGRATION=1` 跑 `pytest` 是安全的**E2E 全部 skip
2. **直接跑 `pytest` 是安全的**integration 和 e2e 全部被排除(需 `RUN_INTEGRATION=1` 才跑)
3. **E2E 场景测试串行**,一个完成再下一个,失败要分析根因再继续
4. **E2E 压力测试并行**,场景测试全通过后再跑
5. **测试数据用 `e2e-` 前缀**,atexit 兜底清理,手动清理见上方
+3 -1
View File
@@ -8,8 +8,10 @@ requires-python = ">=3.9"
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
"integration: real agent tests (requires RUN_INTEGRATION=1)",
"integration: integration tests (requires RUN_INTEGRATION=1)",
"e2e: end-to-end tests with real daemon + Agent (requires RUN_INTEGRATION=1)",
]
# Default deselection of integration/e2e handled in conftest.py pytest_collection_modifyitems
[tool.pyright]
venvPath = "."
+44 -19
View File
@@ -5,14 +5,14 @@ from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS, COMMENT_TYPES, OUTPUT_TYPES
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
from src.blackboard.registry import ProjectRegistry
from src.utils import get_data_root
@@ -59,7 +59,10 @@ async def list_tasks(project_id: str,
assignee: Optional[str] = None,
parent_task: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
tasks = bb.list_tasks(
status=status,
assignee=assignee,
parent_task=parent_task)
return {"tasks": [_task_to_dict(t) for t in tasks]}
@@ -79,10 +82,12 @@ async def get_task(project_id: str, task_id: str,
result["outputs_count"] = detail.get("outputs_count", 0)
result["review_status"] = detail.get("review_status")
result["latest_event_detail"] = detail.get("latest_event_detail")
result["comments"] = [dict(c.__dict__) for c in bb.get_comments(task_id)]
result["comments"] = [dict(c.__dict__)
for c in bb.get_comments(task_id)]
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
result["decisions"] = [dict(d.__dict__)
for d in bb.get_decisions(task_id)]
result["events"] = q.task_events(task_id)
result["experiences"] = q.task_experiences(task_id)
return result
@@ -134,7 +139,8 @@ async def create_task(project_id: str, body: Dict[str, Any]):
priority=body.get("priority", 5),
assignee=assignee,
assigned_by=body.get("assigned_by", "user"),
depends_on=json.dumps(body["depends_on"]) if "depends_on" in body else None,
depends_on=json.dumps(
body["depends_on"]) if "depends_on" in body else None,
parent_task=body.get("parent_task"),
risk_level=body.get("risk_level", "standard"),
stage=body.get("stage"),
@@ -175,7 +181,8 @@ async def _generate_title(description: str) -> str | None:
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "system",
"content": "你是一个任务标题生成器。根据用户的需求描述,生成一个简洁的中文标题(5-15字),只输出标题,不要任何其他内容。"},
{"role": "user", "content": description[:500]},
],
max_tokens=50,
@@ -187,7 +194,8 @@ async def _generate_title(description: str) -> str | None:
return title
except Exception as e:
import logging
logging.getLogger("moziplus-v2").warning(f"Title generation failed: {e}")
logging.getLogger(
"moziplus-v2").warning(f"Title generation failed: {e}")
return None
@@ -205,7 +213,8 @@ async def task_progress(project_id: str, task_id: str):
async def claim_task(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
if not bb.claim_task(task_id, body["agent"]):
raise HTTPException(409, "Claim failed (already claimed or wrong assignee)")
raise HTTPException(
409, "Claim failed (already claimed or wrong assignee)")
return {"ok": True}
@@ -240,7 +249,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
})
if not bb.update_task_status(task_id, new_status,
agent=body.get("agent")):
agent=body.get("agent")):
raise HTTPException(409, {
"error": "transition_failed",
"detail": f"Status update failed for {task_id}",
@@ -265,6 +274,7 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
# --- @mention 自动提取(#04 ---
_KNOWN_AGENT_IDS: list = []
def _init_agent_ids():
"""从配置文件加载 Agent ID 列表"""
global _KNOWN_AGENT_IDS
@@ -272,18 +282,32 @@ def _init_agent_ids():
return
try:
import yaml
cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml")
cfg_path = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"config",
"default.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
_KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys())
_KNOWN_AGENT_IDS = list(
cfg.get(
"daemon",
{}).get(
"agent_profiles",
{}).keys())
except Exception:
_KNOWN_AGENT_IDS = []
def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
_init_agent_ids()
candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text))
candidates = set(
re.findall(
r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)',
text))
return [a for a in candidates if a in _KNOWN_AGENT_IDS]
@@ -317,8 +341,8 @@ async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
merged_mentions = list(set(explicit_mentions + auto_mentions))
cid = bb.add_comment(task_id, body["author"], comment_body,
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
if merged_mentions:
bb.record_mentions(cid, task_id, merged_mentions)
# #10: SSE 通知前端黑板有新 comment
@@ -395,7 +419,8 @@ async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
)
os.makedirs(artifacts_dir, exist_ok=True)
# 安全文件名
safe_name = "".join(c if c.isalnum() or c in "._-" else "_" for c in title)
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
@@ -424,8 +449,8 @@ async def get_decisions(project_id: str, task_id: str):
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
@@ -435,7 +460,7 @@ async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
+21 -7
View File
@@ -12,7 +12,9 @@ from typing import Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
router = APIRouter(prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints", tags=["checkpoints"])
router = APIRouter(
prefix="/api/projects/{project_id}/tasks/{task_id}/checkpoints",
tags=["checkpoints"])
# ── 请求模型 ──
@@ -50,10 +52,12 @@ def list_checkpoints(project_id: str, task_id: str):
@router.post("")
def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointRequest):
def create_checkpoint(project_id: str, task_id: str,
req: CreateCheckpointRequest):
"""Agent 创建 checkpoint"""
if req.type not in ("verify", "decision", "action"):
raise HTTPException(status_code=400, detail=f"Invalid checkpoint type: {req.type}")
raise HTTPException(status_code=400,
detail=f"Invalid checkpoint type: {req.type}")
bb = _bb(project_id)
# 验证 task 存在
@@ -73,10 +77,15 @@ def create_checkpoint(project_id: str, task_id: str, req: CreateCheckpointReques
@router.post("/{checkpoint_id}/approve")
def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest):
def approve_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户通过 checkpoint → 自动推进 task 状态"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(checkpoint_id, "approve", req.resolved_by, req.note)
result = bb.resolve_checkpoint(
checkpoint_id,
"approve",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
@@ -97,10 +106,15 @@ def approve_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: R
@router.post("/{checkpoint_id}/reject")
def reject_checkpoint(project_id: str, task_id: str, checkpoint_id: str, req: ResolveCheckpointRequest):
def reject_checkpoint(project_id: str, task_id: str,
checkpoint_id: str, req: ResolveCheckpointRequest):
"""用户驳回 checkpoint → task 回到 working"""
bb = _bb(project_id)
result = bb.resolve_checkpoint(checkpoint_id, "reject", req.resolved_by, req.note)
result = bb.resolve_checkpoint(
checkpoint_id,
"reject",
req.resolved_by,
req.note)
if result is None:
raise HTTPException(status_code=404, detail="Checkpoint not found")
if "error" in result:
+19 -8
View File
@@ -9,7 +9,7 @@ from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
@@ -34,7 +34,9 @@ def _get_valid_agents() -> set:
except Exception:
pass
# fallback:硬编码
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
return {"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "pangtong-fujunshi", "simayi-challenger"}
router = APIRouter(prefix="/api/mail", tags=["mail"])
@@ -97,7 +99,10 @@ async def list_mail(
):
"""Mail 列表(按时间倒序)"""
bb = _bb()
tasks = bb.list_tasks(status=status, assignee=to_agent, assigned_by=from_agent)
tasks = bb.list_tasks(
status=status,
assignee=to_agent,
assigned_by=from_agent)
mails = []
for t in tasks:
@@ -222,13 +227,16 @@ async def send_mail(body: Dict[str, Any]):
# A8: 只有原邮件的双方能回复(严格 1 对 1)
if from_agent not in (orig_from, orig_to):
raise HTTPException(400, f"只有邮件的发送者或接收者可以回复")
raise HTTPException(400, "只有邮件的发送者或接收者可以回复")
# A6/A7: 自动纠正 to → 原邮件发件者
to_agent = body.get("to", "").strip()
corrected_to = orig_from # 回复方向固定: reply → original sender
if to_agent and to_agent != corrected_to:
auto_corrected = {"field": "to", "original": to_agent, "corrected": corrected_to}
auto_corrected = {
"field": "to",
"original": to_agent,
"corrected": corrected_to}
to_agent = corrected_to
else:
# --- A2: to 必填(非回复场景) ---
@@ -255,7 +263,8 @@ async def send_mail(body: Dict[str, Any]):
conversation_id = body.get("conversation_id")
if not conversation_id and original:
try:
orig_meta = json.loads(original.must_haves) if original.must_haves else {}
orig_meta = json.loads(
original.must_haves) if original.must_haves else {}
conversation_id = orig_meta.get("conversation_id")
except Exception:
pass
@@ -310,10 +319,12 @@ async def delete_mail(prefix: Optional[str] = Query(None)):
for t in tasks:
if t.title and t.title.startswith(prefix):
if t.status not in ("cancelled",):
bb.update_task_status(t.id, "cancelled", agent="mail-cleanup-api")
bb.update_task_status(
t.id, "cancelled", agent="mail-cleanup-api")
deleted_ids.append(t.id)
return {"ok": True, "deleted_count": len(deleted_ids), "deleted_ids": deleted_ids}
return {"ok": True, "deleted_count": len(
deleted_ids), "deleted_ids": deleted_ids}
@router.patch("/{mail_id}")
+22 -10
View File
@@ -3,7 +3,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Query
@@ -31,8 +31,10 @@ async def list_projects():
if db_path.exists():
try:
conn = sqlite3.connect(str(db_path), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
archived = total - active
conn.close()
info['task_count'] = active
@@ -45,8 +47,10 @@ async def list_projects():
if general_db.exists() and "_general" not in projects:
try:
conn = sqlite3.connect(str(general_db), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"] = {
"id": "_general", "name": "一般任务", "description": "无项目归属的通用任务",
@@ -60,8 +64,10 @@ async def list_projects():
if general_db_check.exists():
try:
conn = sqlite3.connect(str(general_db_check), timeout=5)
total = conn.execute("SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute("SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE status != 'cancelled'").fetchone()[0]
active = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE COALESCE(archived,0)=0").fetchone()[0]
conn.close()
projects["_general"]["task_count"] = active
projects["_general"]["task_count_total"] = total
@@ -76,7 +82,7 @@ async def list_projects():
async def create_project(body: Dict[str, Any]):
reg = _registry()
try:
info = reg.create_project(
reg.create_project(
body["id"], body["name"],
agents=body.get("agents", []),
description=body.get("description", ""),
@@ -173,7 +179,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]):
depends_on=child.depends_on, must_haves=child.must_haves,
)
tgt_bb.create_task(moved_child)
src_bb.update_task_status(child.id, "cancelled", detail=f"Moved to {target_project}")
src_bb.update_task_status(
child.id,
"cancelled",
detail=f"Moved to {target_project}")
moved_ids.append(child.id)
# 移动主任务
@@ -186,7 +195,10 @@ async def move_task(project_id: str, task_id: str, body: Dict[str, Any]):
depends_on=task.depends_on, must_haves=task.must_haves,
)
tgt_bb.create_task(moved_task)
src_bb.update_task_status(task_id, "cancelled", detail=f"Moved to {target_project}")
src_bb.update_task_status(
task_id,
"cancelled",
detail=f"Moved to {target_project}")
moved_ids.insert(0, task_id)
return {"ok": True, "moved_to": target_project, "moved_ids": moved_ids}
+111 -24
View File
@@ -46,17 +46,48 @@ _TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
def _is_duplicate(event: str, delivery: str) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。"""
def _is_duplicate(event: str, delivery: str,
payload: Optional[Dict[str, Any]] = None) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。
双重去重策略
1. delivery UUID 去重标准幂等
2. payload 内容去重应对 Gitea v1.23.4 webhookNotifier + actionsNotifier
对同一 review 生成不同 UUID 的双投递问题
"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
while _delivery_timestamps and (
now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
# 检查 delivery UUID 去重
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
# 注意:Gitea webhookNotifier 用 review.bodyactionsNotifier 用 review.content
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
if payload and "review" in event:
pr_num = payload.get("pull_request", {}).get("number")
sender = payload.get("sender", {}).get("login")
review = payload.get("review", {})
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}"
if content_key in _delivery_cache:
logger.info(
"Content-based duplicate detected: %s PR#%s by %s",
event,
pr_num,
sender)
return True
_delivery_cache.add(content_key)
_delivery_timestamps.append((now, content_key))
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
@@ -112,8 +143,16 @@ async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]:
last_error = str(e)
if attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
logger.warning("Retry %d/3 fetching PR files: %s/pulls/%d", attempt + 1, repo, pr_number)
logger.warning("Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", repo, pr_number, last_error)
logger.warning(
"Retry %d/3 fetching PR files: %s/pulls/%d",
attempt + 1,
repo,
pr_number)
logger.warning(
"Failed to fetch PR files after 3 retries: %s/pulls/%d - %s",
repo,
pr_number,
last_error)
return [], f"获取文件列表失败(重试3次): {last_error}"
@@ -141,7 +180,6 @@ def _calc_risk_level(changed_files: List[str]) -> str:
# ---------------------------------------------------------------------------
MAIL_PROJECT_ID = "_mail"
@@ -227,7 +265,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning("pull_request event missing pull_request field, skipping")
logger.warning(
"pull_request event missing pull_request field, skipping")
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
@@ -241,7 +280,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
if fetch_error:
file_list = f"⚠️ {fetch_error}"
else:
file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
file_list = "\n".join(
f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
text = render_template("review_request", {
"repo": repo,
@@ -258,16 +298,34 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。"""
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
支持两种 payload 格式
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
"""
review = payload.get("review")
if not review or not isinstance(review, dict):
logger.warning("pull_request_review event missing review field, skipping")
logger.warning(
"pull_request_review event missing review field, skipping")
return
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning("pull_request_review event missing pull_request field, skipping")
logger.warning(
"pull_request_review event missing pull_request field, skipping")
return
# 兼容两种 payload 格式提取 state
state = review.get("state", "")
if not state:
# org webhook 格式:review.type = "pull_request_review_approved"
review_type = review.get("type", "")
type_map = {
"pull_request_review_approved": "APPROVED",
"pull_request_review_rejected": "REQUEST_CHANGES",
"pull_request_review_comment": "COMMENTED",
}
state = type_map.get(review_type, "")
# 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态
if state == "COMMENTED":
@@ -277,8 +335,17 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
reviewer = review.get("user", {}).get("login", "unknown")
review_body = review.get("body", "(无评论)")
# 兼容:org webhook 的 review 没有 user,从 sender 取
reviewer = review.get(
"user",
{}).get(
"login",
"") or payload.get(
"sender",
{}).get(
"login",
"unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
@@ -324,7 +391,8 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
logger.debug("Issue assigned but no assignee found, skipping")
return
labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])]
labels_list = [lbl.get("name", "")
for lbl in (issue.get("labels") or [])]
labels = ", ".join(labels_list) if labels_list else "(无标签)"
issue_body = issue.get("body", "(无描述)")
brief = issue_title[:20].replace(" ", "-").lower()
@@ -372,6 +440,14 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
if not issue or not isinstance(issue, dict):
logger.warning("issue_comment event missing issue field, skipping")
return
# 已关闭的 Issue/PR 不再发送 CI 失败通知
if issue.get("state") == "closed":
logger.debug(
"Skipping CI failure notification for closed issue #%s",
issue.get("number"))
return
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
@@ -401,6 +477,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
@@ -431,27 +513,32 @@ async def gitea_webhook(
# 1. 签名验证
if not _verify_signature(body, x_gitea_signature):
logger.warning("Webhook signature verification failed")
return Response(status_code=403, content="signature verification failed")
return Response(status_code=403,
content="signature verification failed")
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
# 3. 解析 payload(提前解析,用于幂等检查
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",
x_gitea_event,
x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
logger.debug("Unhandled event type: %s", x_gitea_event)
return Response(status_code=200, content=f"unhandled event: {x_gitea_event}")
return Response(status_code=200,
content=f"unhandled event: {x_gitea_event}")
# 5. 执行 handler
try:
+19 -15
View File
@@ -4,7 +4,6 @@ from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Optional
def init_db(db_path: Path) -> None:
@@ -133,8 +132,10 @@ def _migrate_v28(conn: sqlite3.Connection) -> None:
resolved_by TEXT,
resolve_note TEXT
)""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_checkpoints_task ON checkpoints(task_id)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_checkpoints_status ON checkpoints(status)")
# 4. outputs 扩展字段(M3 成果物)
_safe_add_column(conn, "outputs", "file_name", "TEXT")
@@ -189,18 +190,20 @@ TERMINAL_STATUSES = frozenset() # v3.1: 无终态,全靠 VALID_TRANSITIONS
MANUAL_STATUSES = frozenset({"cancelled", "paused", "reviewing"})
VALID_TRANSITIONS = {
"pending": {"claimed", "paused", "blocked", "cancelled"},
"claimed": {"working", "paused", "pending", "cancelled"},
"working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"}, # pending: Mail spawn 失败回退
"paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"}, # 恢复到 resumed_from 记录的状态
"review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"},
"blocked": {"pending", "escalated", "cancelled"},
"failed": {"pending", "escalated", "cancelled"},
"escalated": {"working", "pending", "paused", "cancelled"},
"pending": {"claimed", "paused", "blocked", "cancelled"},
"claimed": {"working", "paused", "pending", "cancelled"},
# pending: Mail spawn 失败回退
"working": {"review", "done", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled", "pending"},
# 恢复到 resumed_from 记录的状态
"paused": {"working", "claimed", "review", "escalated", "waiting_human", "cancelled"},
"review": {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"},
"blocked": {"pending", "escalated", "cancelled"},
"failed": {"pending", "escalated", "cancelled"},
"escalated": {"working", "pending", "paused", "cancelled"},
"waiting_human": {"working", "done", "paused", "cancelled"},
"done": {"cancelled", "reviewing"},
"reviewing": {"done", "working", "cancelled"},
"cancelled": {"pending"},
"done": {"cancelled", "reviewing"},
"reviewing": {"done", "working", "cancelled"},
"cancelled": {"pending"},
}
COMMENT_TYPES = frozenset({
@@ -224,7 +227,8 @@ EVENT_TYPES = frozenset({
OUTPUT_TYPES = frozenset({"code", "document", "data", "config", "other"})
REVIEW_TYPES = frozenset({"plan_review", "output_review", "guardrail", "final_review"})
REVIEW_TYPES = frozenset(
{"plan_review", "output_review", "guardrail", "final_review"})
VERDICT_TYPES = frozenset({"approved", "rejected", "needs_revision"})
EXPERIENCE_SOURCES = frozenset({
+1 -1
View File
@@ -3,7 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any, List, Optional
@dataclass
+12 -8
View File
@@ -11,7 +11,6 @@ from typing import Any, Dict, List, Optional
from .db import (
VALID_TRANSITIONS,
VALID_STATUSES,
COMMENT_TYPES,
EVENT_TYPES,
OUTPUT_TYPES,
@@ -84,7 +83,8 @@ class Blackboard:
"""获取单个任务"""
conn = self._conn()
try:
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
return Task.from_row(row) if row else None
finally:
conn.close()
@@ -129,7 +129,8 @@ class Blackboard:
updates["completed_at"] = now # paused 也记录时间用于恢复
updates["resumed_from"] = old_status # 记录暂停前状态
elif new_status == "pending":
# 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414 对齐)
# 所有 →pending 转换都清空 assignee(与 ticker._transition_status L414
# 对齐)
updates["assignee"] = None
updates["claimed_at"] = None
updates["current_agent"] = None
@@ -693,7 +694,6 @@ class Blackboard:
finally:
conn.close()
# ── Checkpoint CRUDM3 ──
def create_checkpoint(
@@ -709,7 +709,8 @@ class Blackboard:
import uuid
# BUG-33: 校验 payload 结构必须含 version 字段
if not isinstance(payload, dict) or "version" not in payload:
raise ValueError("payload must be a dict containing 'version' field")
raise ValueError(
"payload must be a dict containing 'version' field")
cp_id = checkpoint_id or f"cp-{uuid.uuid4().hex[:8]}"
conn = self._conn()
try:
@@ -966,7 +967,8 @@ class Blackboard:
finally:
conn.close()
def get_pending_mentions(self, max_retries: int = 5) -> List[Dict[str, Any]]:
def get_pending_mentions(
self, max_retries: int = 5) -> List[Dict[str, Any]]:
"""获取所有 pending 且未超过重试上限的 mentions"""
conn = self._conn()
try:
@@ -1001,7 +1003,8 @@ class Blackboard:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,))
conn.execute(
"UPDATE mention_queue SET retry_count=retry_count+1 WHERE id=?", (mention_id,))
conn.commit()
return True
finally:
@@ -1012,7 +1015,8 @@ class Blackboard:
conn = self._conn()
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute("UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,))
conn.execute(
"UPDATE mention_queue SET status='failed' WHERE id=?", (mention_id,))
conn.commit()
return True
finally:
+8 -4
View File
@@ -132,7 +132,8 @@ class Queries:
"""任务详情聚合(含关联数据)"""
conn = self._conn()
try:
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return None
task = dict(row)
@@ -159,7 +160,8 @@ class Queries:
finally:
conn.close()
def task_events(self, task_id: str, limit: int = 50) -> List[Dict[str, Any]]:
def task_events(self, task_id: str,
limit: int = 50) -> List[Dict[str, Any]]:
"""任务事件列表"""
conn = self._conn()
try:
@@ -265,7 +267,8 @@ class Queries:
return "review"
# 有 working/claimed → working
if status_counts.get("working", 0) > 0 or status_counts.get("claimed", 0) > 0:
if status_counts.get("working", 0) > 0 or status_counts.get(
"claimed", 0) > 0:
return "working"
# 有 pending → pending
@@ -337,7 +340,8 @@ class Queries:
# 当前活跃 stage
active_stage = None
for sp in stage_progress:
if sp["active"] > 0 or (sp["total"] > 0 and sp["done"] < sp["total"]):
if sp["active"] > 0 or (
sp["total"] > 0 and sp["done"] < sp["total"]):
if not active_stage and sp["done"] < sp["total"]:
active_stage = sp["label"]
+6 -4
View File
@@ -119,7 +119,8 @@ class ProjectRegistry:
finally:
conn.close()
def list_projects(self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
def list_projects(
self, status: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
"""列出项目"""
conn = self._connect()
try:
@@ -178,7 +179,8 @@ class ProjectRegistry:
status="deleted",
)
def physical_delete_project(self, project_id: str) -> Optional[Dict[str, Any]]:
def physical_delete_project(
self, project_id: str) -> Optional[Dict[str, Any]]:
"""物理删除项目(删目录 + 删 registry 条目)"""
import shutil
@@ -260,7 +262,8 @@ class ProjectRegistry:
# 迁移(从 _registry.yaml
# ===================================================================
def discover_sanguo_projects(self, scan_dir: Optional[Path] = None) -> List[str]:
def discover_sanguo_projects(
self, scan_dir: Optional[Path] = None) -> List[str]:
"""扫描 sanguo_projects 开发目录,自动注册正式项目"""
scan_dir = scan_dir or Path(os.environ.get(
"SANGUO_PROJECTS_DIR",
@@ -355,4 +358,3 @@ class ProjectRegistry:
def reload(self) -> None:
"""兼容旧接口(SQLite 不需要 reload cache"""
pass
+14 -6
View File
@@ -10,7 +10,7 @@ from typing import List, Optional
from src.blackboard.operations import Blackboard
from src.utils import get_data_root
from src.blackboard.models import Task, Comment, Output, Decision, Observation, Review, Experience
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
@@ -35,7 +35,9 @@ def _get_queries(project_id: str) -> Queries:
def build_blackboard_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="blackboard", description="Agent blackboard operations")
parser = argparse.ArgumentParser(
prog="blackboard",
description="Agent blackboard operations")
sub = parser.add_subparsers(dest="command")
# read
@@ -206,7 +208,11 @@ def _cmd_comment(opts) -> int:
def _cmd_decide(opts) -> int:
bb = _get_bb(opts.project)
did = bb.add_decision(opts.task_id, opts.decider, opts.decision, opts.rationale)
did = bb.add_decision(
opts.task_id,
opts.decider,
opts.decision,
opts.rationale)
print(f"Decision recorded: {did}")
return 0
@@ -251,7 +257,8 @@ def _print_tasks(tasks, as_json: bool):
def build_admin_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="admin", description="Admin operations")
parser = argparse.ArgumentParser(
prog="admin", description="Admin operations")
sub = parser.add_subparsers(dest="command")
# project create
@@ -262,7 +269,7 @@ def build_admin_parser() -> argparse.ArgumentParser:
p_pc.add_argument("--description", default="")
# project list
p_pl = sub.add_parser("project-list", help="List projects")
sub.add_parser("project-list", help="List projects")
# project archive
p_pa = sub.add_parser("project-archive", help="Archive project")
@@ -300,7 +307,8 @@ def run_admin_cli(args: Optional[List[str]] = None) -> int:
for pid, info in projects.items():
status = info.get("status", "?")
agents = ",".join(info.get("agents", []))
print(f" {pid} [{status}] {info.get('name', '')} agents: {agents}")
print(
f" {pid} [{status}] {info.get('name', '')} agents: {agents}")
return 0
elif opts.command == "project-archive":
+11 -9
View File
@@ -11,8 +11,7 @@ A 类 Skill 由引擎确定性注入全文,不靠 Description 触发。
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, List
logger = logging.getLogger("moziplus-v2.bootstrap")
@@ -28,12 +27,12 @@ class BootstrapBuilder:
"""L2 引擎注入层构建器(v2.1 四段式)"""
ROLE_SKILL_MAP = {
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"reviewer-pangtong": "blackboard-reviewer-pangtong",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
}
# 默认从环境变量或配置读取,fallback 到默认路径
@@ -62,7 +61,9 @@ class BootstrapBuilder:
# 段 2: 前序产出(有依赖时注入)
if task.get("depends_on_outputs"):
sections.append(self._format_prior_outputs(task["depends_on_outputs"]))
sections.append(
self._format_prior_outputs(
task["depends_on_outputs"]))
# 段 3: 角色操作规范全文(通过 ROLE_SKILL_MAP 从 Skill 文件读取)
skill_name = self.ROLE_SKILL_MAP.get(role)
@@ -134,7 +135,8 @@ class BootstrapBuilder:
"""格式化前序产出摘要(段 2"""
parts = ["## 前序产出"]
for out in outputs:
parts.append(f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}")
parts.append(
f"- [{out.get('task_id', '?')}] {out.get('summary', '无摘要')}")
return "\n".join(parts)
def _format_constraints(self, role: str) -> str:
+9 -5
View File
@@ -68,20 +68,23 @@ class ActiveAgentCounter:
self._cooldown_until.pop(agent_id, None)
return False
def set_cooldown(self, agent_id: str, seconds: Optional[float] = None) -> None:
def set_cooldown(self, agent_id: str,
seconds: Optional[float] = None) -> None:
"""设置冷却期(默认 120 秒)"""
cd = seconds if seconds is not None else self._default_cooldown_seconds
self._cooldown_until[agent_id] = time.time() + cd
logger.info("Cooldown set for %s: %.0fs (until %.0f)",
agent_id, cd, self._cooldown_until[agent_id])
agent_id, cd, self._cooldown_until[agent_id])
async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool:
async def can_acquire(self, agent_id: str,
session_id: str = "main") -> bool:
"""三层检查:cooldown → global → per agent → per session key"""
if self.is_cooling_down(agent_id):
return False
if self._global_active >= self._max_global:
return False
if self._agent_active.get(agent_id, 0) >= self._max_concurrent_sessions:
if self._agent_active.get(
agent_id, 0) >= self._max_concurrent_sessions:
return False
key = self._make_key(agent_id, session_id)
if self._active_keys.get(key, 0) >= self._max_per_session:
@@ -122,7 +125,8 @@ class ActiveAgentCounter:
del self._active_keys[key]
if agent_id in self._agent_active:
self._agent_active[agent_id] = max(0, self._agent_active[agent_id] - 1)
self._agent_active[agent_id] = max(
0, self._agent_active[agent_id] - 1)
if self._agent_active[agent_id] == 0:
del self._agent_active[agent_id]
+169 -70
View File
@@ -14,7 +14,6 @@ from __future__ import annotations
import json
import logging
import sqlite3
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -22,7 +21,7 @@ from typing import Any, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
from src.daemon.router import AgentRouter, RouteDecision
from src.daemon.router import AgentRouter
logger = logging.getLogger("moziplus-v2.dispatcher")
@@ -64,7 +63,8 @@ class Dispatcher:
if self._legacy_mode:
self.registered_agents = set(registered_agents or [])
self.capability_map = capability_map or {}
logger.warning("Dispatcher running in legacy mode (no AgentRouter)")
logger.warning(
"Dispatcher running in legacy mode (no AgentRouter)")
def decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""调度决策(委托给 Router
@@ -124,16 +124,21 @@ class Dispatcher:
"""
# 安全红线检查(调度前拦截)
# Mail 是 Agent 间通信,不做 guardrail 检查
is_mail = project_config.get("project_id") == "_mail" if project_config else False
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
critical = [v for v in violations if v.action in ("block_and_notify", "terminate_and_escalate")]
critical = [
v for v in violations if v.action in (
"block_and_notify",
"terminate_and_escalate")]
if critical:
v = critical[0]
logger.warning("Task '%s' blocked by guardrail: %s - %s",
task.title, v.rule_id, v.message)
# 写入黑板事件
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else self.db_path
if _routing_db:
self._record_routing(task, {"level": DispatchLevel.BLOCKED, "agent_id": "none",
"reason": v.message}, "blocked", v.message, _routing_db)
@@ -152,7 +157,8 @@ class Dispatcher:
decision = self.decide(task, action_type)
level = decision["level"]
# 从 project_config 获取项目级 DB 路径(路由审计日志写入项目 DB)
_routing_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
_routing_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
agent_id = decision["agent_id"]
# v2.7.2: counter 检查移到 spawn_full_agent 内部
@@ -160,7 +166,8 @@ class Dispatcher:
# 本地执行
if level == DispatchLevel.LOCAL:
self._record_routing(task, decision, "dispatched", None, _routing_db)
self._record_routing(
task, decision, "dispatched", None, _routing_db)
return {
"level": level.value,
"agent_id": "daemon",
@@ -172,7 +179,8 @@ class Dispatcher:
# Full Agent / Escalate spawn
if level in (DispatchLevel.FULL_AGENT, DispatchLevel.ESCALATE):
if not self.spawner:
self._record_routing(task, decision, "error", "No spawner", _routing_db)
self._record_routing(
task, decision, "error", "No spawner", _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -183,9 +191,11 @@ class Dispatcher:
try:
# [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前)
is_mail = project_config.get("project_id") == "_mail" if project_config else False
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail:
db_path = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
db_path = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
# on_checks_passed: 所有检查通过后才标 working,检查失败不标
on_checks_passed = None
@@ -194,6 +204,7 @@ class Dispatcher:
_task_id = task.id
_mail_db = db_path
_disp = self
def _mail_on_checks_passed():
nonlocal _mail_marked_working
if not _disp._mail_auto_working(_task_id, _mail_db):
@@ -203,8 +214,9 @@ class Dispatcher:
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config,
mode=decision.get("mode", ""),
spawn_type=action_type or "executor")
mode=decision.get(
"mode", ""),
spawn_type=action_type or "executor")
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
# counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证
@@ -218,14 +230,17 @@ class Dispatcher:
def _mail_on_complete(aid, outcome):
# 幻觉门控:检查是否有回复,自动标 done/failed
try:
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves)
_dispatcher._mail_auto_complete(
_task_id, aid, _mail_db, _must_haves, outcome=outcome)
except Exception as e:
logger.error("Mail %s: on_complete error: %s", _task_id, e)
logger.error(
"Mail %s: on_complete error: %s", _task_id, e)
on_complete = _mail_on_complete
else:
# #02: Task 路径也加 on_complete(幻觉门控)
_task_id = task.id
_task_db = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
_task_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
_dispatcher = self
_is_review = action_type == "review"
@@ -239,10 +254,12 @@ class Dispatcher:
try:
# #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db:
_dispatcher._rollback_current_agent(_task_db, _task_id, aid)
_dispatcher._rollback_current_agent(
_task_db, _task_id, aid)
if _is_review:
if _task_db and outcome in ("completed", "session_revived"):
if _task_db and outcome in (
"completed", "session_revived"):
# #09: 读 verdict 决定后续动作
conn = get_connection(_task_db)
try:
@@ -254,14 +271,18 @@ class Dispatcher:
conn.close()
if review and review["verdict"] == "approved":
_dispatcher._mark_task_status(_task_db, _task_id, "done")
logger.info("Task %s: review approved, marking done", _task_id)
_dispatcher._mark_task_status(
_task_db, _task_id, "done")
logger.info(
"Task %s: review approved, marking done", _task_id)
else:
# 非 approved → @mention 被审 agentassignee,非 current_agent
# 非 approved → @mention 被审
# agentassignee,非 current_agent
verdict_str = review["verdict"] if review else "未知"
conn2 = get_connection(_task_db)
try:
task_row = conn2.execute("SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
task_row = conn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
finally:
conn2.close()
@@ -269,18 +290,21 @@ class Dispatcher:
from src.blackboard.blackboard import Blackboard
bb = Blackboard(_task_db)
bb.add_comment(_task_id, "daemon",
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Task %s: review verdict=%s, notified assignee=%s",
_task_id, verdict_str, task_row["assignee"] if task_row else "?")
# 不标 done,保持 review 状态
else:
logger.warning("Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
logger.warning(
"Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
else:
# executor: 三信号验证 → 标 review
_dispatcher._task_auto_complete(_task_id, _task_db)
_dispatcher._task_auto_complete(
_task_id, _task_db)
except Exception as e:
logger.error("Task %s: on_complete error: %s", _task_id, e)
logger.error(
"Task %s: on_complete error: %s", _task_id, e)
on_complete = _task_on_complete
session_id = await self.spawner.spawn_full_agent(
@@ -289,7 +313,8 @@ class Dispatcher:
task_id=task.id,
on_complete=on_complete,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
on_checks_passed=on_checks_passed,
)
@@ -312,9 +337,14 @@ class Dispatcher:
else:
log_level = logger.debug
detail_msg = f"Agent busy: {reason}"
log_level("Dispatch skipped %s for task %s: %s", agent_id, task.id, detail_msg)
log_level(
"Dispatch skipped %s for task %s: %s",
agent_id,
task.id,
detail_msg)
# on_checks_passed 未执行(check 失败在它之前),working 未标,无需回退
self._record_routing(task, decision, "skipped", detail_msg, _routing_db)
self._record_routing(
task, decision, "skipped", detail_msg, _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -326,7 +356,8 @@ class Dispatcher:
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
if _mail_marked_working:
self._mail_revert_to_pending(task.id, db_path)
self._record_routing(task, decision, "error", str(e), _routing_db)
self._record_routing(
task, decision, "error", str(e), _routing_db)
return {
"level": level.value,
"agent_id": agent_id,
@@ -385,9 +416,16 @@ class Dispatcher:
def _build_delegate_prompt(self, task: Task,
project_config: Optional[Dict]) -> str:
"""构建 delegate 模式的 prompt(协调员分配任务)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
project_id = project_config.get("project_id", "") if project_config else ""
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
project_id = project_config.get(
"project_id", "") if project_config else ""
return f"""你是任务协调员。请分析以下任务,决定最合适的执行者并分配。
@@ -478,7 +516,8 @@ class Dispatcher:
# ── Legacy 兼容(deprecated ──
def _legacy_decide(self, task: Task, action_type: str = "") -> Dict[str, Any]:
def _legacy_decide(
self, task: Task, action_type: str = "") -> Dict[str, Any]:
"""旧版三级决策树(兼容过渡用)"""
LOCAL_ACTIONS = frozenset({
"L1_guardrail", "format_check",
@@ -518,7 +557,8 @@ class Dispatcher:
return registered[0]
return "pangtong-fujunshi"
async def _legacy_dispatch(self, task, action_type="", project_config=None):
async def _legacy_dispatch(
self, task, action_type="", project_config=None):
"""旧版 dispatch(兼容过渡用)
v2.7.2: counter acquire/release 移到 spawn_full_agent 内部
@@ -541,15 +581,19 @@ class Dispatcher:
# NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。
# Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。
# 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。
is_mail_legacy = project_config.get("project_id") == "_mail" if project_config else False
is_mail_legacy = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail_legacy:
db_path_legacy = Path(project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(task.id, db_path_legacy):
db_path_legacy = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(
task.id, db_path_legacy):
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "mail_auto_working_failed"}
if hasattr(self.spawner, 'build_spawn_message') and project_config:
if hasattr(self.spawner,
'build_spawn_message') and project_config:
retry_ctx = self._build_retry_context(task)
message = self.spawner.build_spawn_message(
task_id=task.id, title=task.title,
@@ -576,9 +620,11 @@ class Dispatcher:
def _mail_oc_legacy(aid, outcome):
try:
_disp._mail_auto_complete(_t_id, aid, _m_db, _m_mh)
_disp._mail_auto_complete(
_t_id, aid, _m_db, _m_mh, outcome=outcome)
except Exception as e:
logger.error("Mail %s: legacy on_complete error: %s", _t_id, e)
logger.error(
"Mail %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _mail_oc_legacy
session_id = await self.spawner.spawn_full_agent(
@@ -586,14 +632,16 @@ class Dispatcher:
task_id=task.id,
on_complete=on_complete_legacy,
use_main_session=True, # #02: 统一投递到 main session
task_db_path=Path(project_config["db_path"]) if project_config and "db_path" in project_config else None,
task_db_path=Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None,
)
return {"level": level.value, "agent_id": agent_id,
"session_id": session_id, "status": "dispatched",
"reason": decision["reason"]}
except AgentBusyError as e:
reason = getattr(e, 'reason', 'busy')
detail_msg = f"Session busy: {reason}" if reason.startswith("session_") else f"Agent busy: {reason}"
detail_msg = f"Session busy: {reason}" if reason.startswith(
"session_") else f"Agent busy: {reason}"
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "skipped",
"reason": detail_msg}
@@ -618,9 +666,11 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
logger.warning("Mail %s: cannot mark working (task not found)", task_id)
logger.warning(
"Mail %s: cannot mark working (task not found)", task_id)
return False
if row["status"] not in ("pending", "claimed"):
logger.warning("Mail %s: cannot mark working (status=%s, expected pending/claimed)",
@@ -631,7 +681,10 @@ class Dispatcher:
(task_id,),
)
conn.commit()
logger.info("Mail %s: auto-marked working (system, was %s)", task_id, row["status"])
logger.info(
"Mail %s: auto-marked working (system, was %s)",
task_id,
row["status"])
return True
finally:
conn.close()
@@ -645,30 +698,40 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if row and row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?",
(task_id,),
)
conn.commit()
logger.info("Mail %s: reverted working → pending (spawn failed)", task_id)
logger.info(
"Mail %s: reverted working → pending (spawn failed)", task_id)
else:
logger.debug("Mail %s: skip revert (status=%s, expected working)", task_id, row["status"] if row else "not_found")
logger.debug(
"Mail %s: skip revert (status=%s, expected working)",
task_id,
row["status"] if row else "not_found")
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: failed to revert to pending: %s", task_id, e)
logger.error(
"Mail %s: failed to revert to pending: %s",
task_id,
e)
def _mail_auto_complete(self, task_id: str, agent_id: str,
db_path: Path, must_haves: str) -> None:
db_path: Path, must_haves: str, outcome=None) -> None:
"""Mail 任务:on_complete 后自动标 done/failed(含幻觉门控)"""
try:
# 解析 performative
performative = "request"
try:
meta = json.loads(must_haves) if must_haves else {}
performative = meta.get("performative", meta.get("type", "request"))
performative = meta.get(
"performative", meta.get(
"type", "request"))
except Exception:
pass
@@ -677,13 +740,15 @@ class Dispatcher:
has_reply = self._mail_check_reply(task_id, db_path)
if not has_reply:
# F3: 立刻标 failed(不等 ticker 30 分钟)
logger.error("Mail %s: no reply found, marking failed (no_reply_found)", task_id)
logger.error(
"Mail %s: no reply found, marking failed (no_reply_found)", task_id)
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
@@ -697,19 +762,35 @@ class Dispatcher:
json.dumps({"reason": "no_reply_found"}, ensure_ascii=False)),
)
conn.commit()
logger.info("Mail %s: marked failed (no_reply_found)", task_id)
logger.info(
"Mail %s: marked failed (no_reply_found)", task_id)
# Mail 失败通知:通知发件人
try:
from src.daemon.mail_notify import notify_mail_failed
notify_mail_failed(db_path, task_id, "no_reply_found")
notify_mail_failed(
db_path, task_id, "no_reply_found")
except Exception as ne:
logger.warning("Mail %s: failed to send no_reply_found notification: %s", task_id, ne)
logger.warning(
"Mail %s: failed to send no_reply_found notification: %s", task_id, ne)
return
finally:
conn.close()
except Exception as e:
logger.warning("Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error("Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
logger.warning(
"Mail %s: failed attempt %d: %s", task_id, attempt + 1, e)
logger.error(
"Mail %s: all 3 failed attempts failed, leaving for ticker", task_id)
return
# inform 类型:只对成功 outcome 标 done,失败 outcome 留 working 等 ticker 重投
# Task 路径不受此 bug 影响(走 _task_auto_complete 独立逻辑)
if performative == "inform":
INFORM_DONE_OUTCOMES = {"completed", "claimed", "no_reply"}
if outcome not in INFORM_DONE_OUTCOMES:
logger.info(
"Mail %s: inform outcome=%s, skip auto-done",
task_id,
outcome)
return
# 标 done(重试 3 次)
@@ -718,7 +799,8 @@ class Dispatcher:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return
if row["status"] == "working":
@@ -733,9 +815,15 @@ class Dispatcher:
finally:
conn.close()
except Exception as e:
logger.warning("Mail %s: done attempt %d failed: %s", task_id, attempt + 1, e)
logger.warning(
"Mail %s: done attempt %d failed: %s",
task_id,
attempt + 1,
e)
# 3 次都失败,留 working 等 ticker 超时兜底
logger.error("Mail %s: all 3 done attempts failed, leaving for ticker", task_id)
logger.error(
"Mail %s: all 3 done attempts failed, leaving for ticker",
task_id)
except Exception as e:
logger.error("Mail %s: auto-complete error: %s", task_id, e)
@@ -780,7 +868,9 @@ class Dispatcher:
logger.info("Task %s: verify passed, marking review", task_id)
self._mark_task_status(db_path, task_id, "review")
else:
logger.info("Task %s: verify not passed (no signal), leaving working", task_id)
logger.info(
"Task %s: verify not passed (no signal), leaving working",
task_id)
except Exception as e:
logger.error("Task %s: auto-complete error: %s", task_id, e)
@@ -815,7 +905,8 @@ class Dispatcher:
logger.error("Task %s: verify error: %s", task_id, e)
return True
def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None:
def _rollback_current_agent(
self, db_path: Path, task_id: str, agent_id: str) -> None:
"""#07.2: crash 后回退 current_agent 到 assignee,避免 exclude_current 卡死"""
try:
conn = get_connection(db_path)
@@ -829,11 +920,18 @@ class Dispatcher:
conn.commit()
finally:
conn.close()
logger.info("Task %s: rolled back current_agent from %s to assignee", task_id, agent_id)
logger.info(
"Task %s: rolled back current_agent from %s to assignee",
task_id,
agent_id)
except Exception as e:
logger.warning("Task %s: failed to rollback current_agent: %s", task_id, e)
logger.warning(
"Task %s: failed to rollback current_agent: %s",
task_id,
e)
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
def _mark_task_status(self, db_path: Path,
task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件"""
try:
conn = get_connection(db_path)
@@ -849,7 +947,8 @@ class Dispatcher:
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, payload) VALUES (?, 'dispatcher', 'status_change', ?)",
(task_id, f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
(task_id,
f'{{"from": "{old_status}", "to": "{status}", "source": "auto_complete"}}'),
)
conn.commit()
finally:
@@ -858,7 +957,7 @@ class Dispatcher:
logger.error("Task %s: mark status error: %s", task_id, e)
@staticmethod
def _check_crash_limit(task_id: str, db_path: pathlib.Path, limit: int = 3,
def _check_crash_limit(task_id: str, db_path: Path, limit: int = 3,
window_minutes: int = 30) -> bool:
"""v2.8.1 Fix-3c: 检查 task 最近 window_minutes 内的 crash 次数是否超限。
+3 -3
View File
@@ -14,7 +14,7 @@ import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
logger = logging.getLogger("moziplus-v2.experience")
@@ -68,7 +68,7 @@ class Experience:
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Experience:
return cls(**{k: v for k, v in data.items() if k != "id"},
experience_id=data.get("id"))
experience_id=data.get("id"))
class ExperienceStore:
@@ -284,7 +284,7 @@ class ExperienceDistiller:
all_tags.append(task_type)
results = self.store.search(tags=all_tags if all_tags else None,
query=query, limit=limit)
query=query, limit=limit)
# 按置信度排序
results.sort(key=lambda e: e.confidence, reverse=True)
+16 -6
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -38,7 +38,9 @@ class GuardrailEngine:
data = yaml.safe_load(f)
self.rules = data.get("rules", [])
self.settings = data.get("settings", {"enabled": True})
logger.info("Loaded %d guardrail rules from %s", len(self.rules), config_path)
logger.info(
"Loaded %d guardrail rules from %s", len(
self.rules), config_path)
def check_task(self, task: Any) -> List[GuardrailViolation]:
"""检查 Task 是否触犯安全红线(调度前调用)"""
@@ -95,7 +97,8 @@ class GuardrailEngine:
return violations
def check_token_usage(self, token_count: int) -> Optional[GuardrailViolation]:
def check_token_usage(
self, token_count: int) -> Optional[GuardrailViolation]:
"""检查 Token 消耗是否超标"""
if not self.settings.get("enabled", True):
return None
@@ -103,7 +106,10 @@ class GuardrailEngine:
for rule in self.rules:
if rule["id"] != "high_token_usage":
continue
threshold = rule.get("triggers", [{}])[0].get("token_threshold", 100000)
threshold = rule.get(
"triggers", [
{}])[0].get(
"token_threshold", 100000)
if token_count > threshold:
return GuardrailViolation(
rule_id=rule["id"],
@@ -114,7 +120,8 @@ class GuardrailEngine:
)
return None
def check_consecutive_failure(self, failure_count: int) -> Optional[GuardrailViolation]:
def check_consecutive_failure(
self, failure_count: int) -> Optional[GuardrailViolation]:
"""检查连续失败次数"""
if not self.settings.get("enabled", True):
return None
@@ -122,7 +129,10 @@ class GuardrailEngine:
for rule in self.rules:
if rule["id"] != "consecutive_failure":
continue
threshold = rule.get("triggers", [{}])[0].get("consecutive_failures", 3)
threshold = rule.get(
"triggers", [
{}])[0].get(
"consecutive_failures", 3)
if failure_count >= threshold:
return GuardrailViolation(
rule_id=rule["id"],
+11 -6
View File
@@ -9,9 +9,9 @@ from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict
from src.blackboard.db import get_connection, init_db
from src.blackboard.db import get_connection
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.health")
@@ -41,7 +41,7 @@ class HealthChecker:
{"healthy": bool, "zombie": bool, "stale_ticks": int,
"alert_written": bool, "resolved": bool}
"""
db_key = str(db_path)
str(db_path)
result: Dict[str, Any] = {
"healthy": True,
"zombie": False,
@@ -58,7 +58,8 @@ class HealthChecker:
# 用 event count 变化判断是否有真实变更
conn = queries._conn()
try:
total_events = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
conn.execute(
"SELECT COUNT(*) FROM events").fetchone()[0]
non_tick_events = conn.execute(
"SELECT COUNT(*) FROM events WHERE event_type != 'daemon_tick' "
"AND event_type != 'agent_zombie_detected'"
@@ -85,7 +86,8 @@ class HealthChecker:
self._stale_ticks[project_id] = stale
result["stale_ticks"] = stale
if stale >= self.zombie_threshold and not self._alerted.get(project_id):
if stale >= self.zombie_threshold and not self._alerted.get(
project_id):
# 写告警
self._write_alert(db_path, project_id, tick_num, stale)
self._alerted[project_id] = True
@@ -126,7 +128,10 @@ class HealthChecker:
conn.commit()
finally:
conn.close()
logger.warning("Zombie detected: %s (stale=%d)", project_id, stale_ticks)
logger.warning(
"Zombie detected: %s (stale=%d)",
project_id,
stale_ticks)
def _write_resolution(self, db_path: Path, project_id: str,
tick_num: int) -> None:
+6 -5
View File
@@ -15,7 +15,6 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
@@ -28,7 +27,8 @@ class InboxWatcher:
def __init__(
self,
inbox_path: Path,
process_callback: Optional[Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
process_callback: Optional[Callable[[
Dict[str, Any]], Coroutine[Any, Any, None]]] = None,
watch_interval: float = 1.0,
):
"""
@@ -57,7 +57,7 @@ class InboxWatcher:
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Inbox watcher started (path=%s, interval=%.1fs)",
self.inbox_path, self.watch_interval)
self.inbox_path, self.watch_interval)
async def stop(self) -> None:
"""停止监听"""
@@ -69,7 +69,7 @@ class InboxWatcher:
except asyncio.CancelledError:
pass
logger.info("Inbox watcher stopped (processed=%d, errors=%d)",
self._total_processed, self._total_errors)
self._total_processed, self._total_errors)
@property
def is_running(self) -> bool:
@@ -160,7 +160,8 @@ class InboxWatcher:
line_no, type(event).__name__)
self._total_errors += 1
except json.JSONDecodeError:
logger.warning("Inbox line %d: invalid JSON, skipping", line_no)
logger.warning(
"Inbox line %d: invalid JSON, skipping", line_no)
self._total_errors += 1
return events
+14 -5
View File
@@ -50,7 +50,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
bb = Blackboard(db_path)
original = bb.get_task(original_mail_id)
if not original:
logger.warning("notify_mail_failed: original mail %s not found", original_mail_id)
logger.warning(
"notify_mail_failed: original mail %s not found",
original_mail_id)
return
# 解析原邮件元数据
@@ -58,7 +60,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
# 防递归:系统通知邮件失败不再发通知
if meta.get("system_notify"):
logger.info("Mail %s: system notify mail failed, skipping recursive notification", original_mail_id)
logger.info(
"Mail %s: system notify mail failed, skipping recursive notification",
original_mail_id)
return
# 获取发件人(优先 assigned_byfallback must_haves.from
@@ -67,7 +71,9 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
title = original.title or ""
if not from_agent:
logger.warning("notify_mail_failed: cannot determine sender for mail %s", original_mail_id)
logger.warning(
"notify_mail_failed: cannot determine sender for mail %s",
original_mail_id)
return
# 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播
@@ -108,7 +114,10 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
)
bb.create_task(notify_task)
logger.info("Mail %s: sent failure notification to %s (original_sender=%s, reason=%s, notify_id=%s)",
original_mail_id, target_agent, from_agent, reason, notify_id)
original_mail_id, target_agent, from_agent, reason, notify_id)
except Exception as e:
logger.warning("notify_mail_failed: failed to send notification for mail %s: %s", original_mail_id, e)
logger.warning(
"notify_mail_failed: failed to send notification for mail %s: %s",
original_mail_id,
e)
+15 -11
View File
@@ -8,15 +8,12 @@ from __future__ import annotations
import json
import logging
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
logger = logging.getLogger("moziplus-v2.review")
@@ -151,12 +148,14 @@ class ReviewPipeline:
) -> ReviewResult:
"""Step 2: 格式合规"""
if not outputs:
return ReviewResult("format", ReviewVerdict.FAIL, 0.0, "No outputs")
return ReviewResult(
"format", ReviewVerdict.FAIL, 0.0, "No outputs")
issues = []
for out in outputs:
# output.md 必须存在且非空
if out.get("type") == "markdown" or out.get("path", "").endswith(".md"):
if out.get("type") == "markdown" or out.get(
"path", "").endswith(".md"):
content = out.get("content", "")
if not content and out.get("path"):
try:
@@ -167,7 +166,8 @@ class ReviewPipeline:
issues.append(f"Output too short: {out.get('path', '?')}")
# 结论 JSON 必须有效
if out.get("type") == "json" or out.get("path", "").endswith(".json"):
if out.get("type") == "json" or out.get(
"path", "").endswith(".json"):
content = out.get("content", "")
if not content and out.get("path"):
try:
@@ -177,7 +177,8 @@ class ReviewPipeline:
try:
data = json.loads(content)
if not isinstance(data, dict):
issues.append(f"JSON not a dict: {out.get('path', '?')}")
issues.append(
f"JSON not a dict: {out.get('path', '?')}")
except (json.JSONDecodeError, TypeError):
issues.append(f"Invalid JSON: {out.get('path', '?')}")
@@ -194,7 +195,8 @@ class ReviewPipeline:
) -> ReviewResult:
"""Step 3: 内容质量(自定义检查)"""
if not outputs:
return ReviewResult("quality", ReviewVerdict.FAIL, 0.0, "No outputs")
return ReviewResult(
"quality", ReviewVerdict.FAIL, 0.0, "No outputs")
suggestions = []
total_score = 0.0
@@ -215,7 +217,8 @@ class ReviewPipeline:
avg = 1.0 # 无自定义检查默认通过
verdict = ReviewVerdict.PASS if avg >= 0.6 else ReviewVerdict.FAIL
return ReviewResult("quality", verdict, round(avg, 2), suggestions=suggestions)
return ReviewResult("quality", verdict, round(
avg, 2), suggestions=suggestions)
def _determine_gate(
self, task: Task, results: List[ReviewResult]
@@ -329,6 +332,7 @@ class RebuttalManager:
return 0
try:
observations = self.bb.get_observations(task_id=task_id)
return sum(1 for o in observations if "Rebuttal round" in (o.body or ""))
return sum(
1 for o in observations if "Rebuttal round" in (o.body or ""))
except Exception:
return 0
+11 -5
View File
@@ -107,7 +107,8 @@ class AgentRouter:
# ── 快速路径 2: retry → 原执行者 ──
if action_type == "retry":
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
if current and current in self.agent_profiles:
return RouteDecision(
agent_id=current,
@@ -119,7 +120,8 @@ class AgentRouter:
# ── Mode B: Agent 声明式交接 ──
next_cap = task_info.get("next_capability")
if next_cap and self._validate_capability(next_cap):
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
exclude = {current} if current else set()
matched = self._match_capability(next_cap, exclude)
if matched:
@@ -129,7 +131,9 @@ class AgentRouter:
mode="agent_handoff",
latency_ms=int((time.monotonic() - start) * 1000),
)
logger.info("next_capability '%s' no match, delegate to coordinator", next_cap)
logger.info(
"next_capability '%s' no match, delegate to coordinator",
next_cap)
# ── 快速路径 3: 生命周期流转查表 ──
lifecycle = self.LIFECYCLE_CAPABILITY.get(action_type)
@@ -140,7 +144,8 @@ class AgentRouter:
exclude_current = lifecycle.get("exclude_current", False)
exclude = set()
if exclude_current:
current = task_info.get("current_agent") or task_info.get("assignee")
current = task_info.get(
"current_agent") or task_info.get("assignee")
if current:
exclude.add(current)
matched = self._match_capability(cap, exclude)
@@ -154,7 +159,8 @@ class AgentRouter:
# ── 快速路径 4: 有 assignee 且非生命周期流转 ──
assignee = task_info.get("assignee")
if assignee and assignee in self.agent_profiles and action_type not in ("review", "escalation"):
if assignee and assignee in self.agent_profiles and action_type not in (
"review", "escalation"):
return RouteDecision(
agent_id=assignee,
reason=f"Direct assignee: {assignee}",
+1 -2
View File
@@ -10,12 +10,11 @@ from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger("moziplus-v2.skill")
+175 -81
View File
@@ -15,7 +15,7 @@ from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.db import get_connection, init_db
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.spawner")
@@ -163,9 +163,12 @@ class AgentBusyError(Exception):
#07: reason 字段区分具体原因,便于 dispatcher 层区分处理。
"""
def __init__(self, agent_id: str, reason: str = "busy", detail: Optional[dict] = None):
def __init__(self, agent_id: str, reason: str = "busy",
detail: Optional[dict] = None):
self.agent_id = agent_id
self.reason = reason # counter_blocked / session_locked / session_running / session_compacting / session_stuck
# counter_blocked / session_locked / session_running / session_compacting / session_stuck
self.reason = reason
self.detail = detail or {}
super().__init__(f"{agent_id}: {reason}")
@@ -277,11 +280,15 @@ class AgentSpawner:
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
return self._build_mail_prompt(
task_id, title, description, must_haves, agent_id)
# 走 BootstrapBuilder 新路径
if self.bootstrap_builder and task is not None:
role_map = {"executor": "executor", "review": "reviewer", "discussion": "planner"}
role_map = {
"executor": "executor",
"review": "reviewer",
"discussion": "planner"}
role = role_map.get(spawn_type, "executor")
bootstrap_prompt = self.bootstrap_builder.build_for_task(
task=task,
@@ -293,13 +300,14 @@ class AgentSpawner:
# 无 BootstrapBuilder 或无 task 对象 → 最小 fallback
# 只保留任务上下文 + API 操作指令
logger.warning("No BootstrapBuilder or task object, using minimal fallback")
logger.warning(
"No BootstrapBuilder or task object, using minimal fallback")
return self._build_minimal_fallback(
task_id, title, description, must_haves,
project_id, agent_id)
def _build_minimal_fallback(self, task_id, title, description, must_haves,
project_id, agent_id):
project_id, agent_id):
"""最小 fallback:只有任务上下文 + API 指令"""
task_section = f"""## 任务
{title}
@@ -311,7 +319,7 @@ class AgentSpawner:
return task_section + "\n\n---\n\n" + api_section
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
@@ -337,8 +345,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
"""
def _build_discussion_prompt(self, task_id: str, title: str,
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
description: str, must_haves: str,
project_id: str, agent_id: str) -> str:
"""构建讨论类 spawn prompt(§3.3 框架 + Boids)"""
goal_snapshot = description or title
constraints = must_haves or "(无特殊约束)"
@@ -368,7 +376,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if not self.guardrails:
return "无特殊限制"
try:
return "".join(r.get("name", r.get("rule_id", "")) for r in self.guardrails.rules[:6])
return "".join(r.get("name", r.get("rule_id", ""))
for r in self.guardrails.rules[:6])
except Exception:
return "无特殊限制"
@@ -379,9 +388,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
return router.agent_profiles.get(agent_id)
return None
def _build_mail_prompt(self, task_id: str, title: str, description: str,
must_haves: str, agent_id: str) -> str:
must_haves: str, agent_id: str) -> str:
"""构建 Mail 专用精简模板"""
# 解析 must_haves 获取 from 和 performative
from_agent = agent_id
@@ -389,7 +397,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", agent_id)
performative = meta.get("performative", meta.get("type", "request"))
performative = meta.get(
"performative", meta.get(
"type", "request"))
except Exception:
pass
@@ -472,7 +482,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
self._revive_session(agent_id)
elif pre_state.get("status") == "running" and not pre_state.get("lock_pid_alive"):
# status=running 但 lock PID 已死 → 假死,revive
logger.warning("Phase 0: %s status=running but lock PID dead, reviving", agent_id)
logger.warning(
"Phase 0: %s status=running but lock PID dead, reviving",
agent_id)
self._revive_session(agent_id)
# Phase 1: Counter acquire(互斥锁)
@@ -487,12 +499,15 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if use_main_session:
session_state = self._check_session_state(agent_id)
logger.info("Phase 2 session check for %s: status=%s lock_pid=%s lock_pid_alive=%s compact=%s",
agent_id, session_state.get('status'), session_state.get('lock_pid'),
agent_id, session_state.get(
'status'), session_state.get('lock_pid'),
session_state.get('lock_pid_alive'), session_state.get('recent_compact'))
blockers = []
if session_state.get("lock_pid_alive") and not session_state.get("lock_expired"):
blockers.append(("session_locked", session_state.get("lock_pid")))
if session_state.get(
"lock_pid_alive") and not session_state.get("lock_expired"):
blockers.append(
("session_locked", session_state.get("lock_pid")))
if session_state.get("status") == "running":
if session_state.get("lock_pid_alive"):
# 真 running:外部进程占用
@@ -515,7 +530,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
# Phase 2.5: 假死修复(status=running + lock PID 死 → revive → 重检)
# 此场景应被 Phase 0 提前修复,这里做兜底
if session_state.get("status") == "running" and not session_state.get("lock_pid_alive"):
if session_state.get("status") == "running" and not session_state.get(
"lock_pid_alive"):
logger.warning("Phase 2.5: %s status=running + lock dead (should be caught in Phase 0), reviving",
agent_id)
self._revive_session(agent_id)
@@ -538,7 +554,10 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
raise
if self.dry_run:
logger.info("[DRY RUN] Would spawn agent %s (session=%s)", agent_id, _sid_key)
logger.info(
"[DRY RUN] Would spawn agent %s (session=%s)",
agent_id,
_sid_key)
self._register_session(_sid_key, agent_id, task_id, pid=None)
return _sid_key
@@ -554,7 +573,8 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("Business on_complete failed for %s", aid, exc_info=True)
logger.warning(
"Business on_complete failed for %s", aid, exc_info=True)
cmd = [
"openclaw", "agent",
@@ -575,7 +595,7 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
stderr=asyncio.subprocess.PIPE,
)
self._register_session(session_id, agent_id, task_id, proc.pid,
broadcast_task_ids=broadcast_task_ids)
broadcast_task_ids=broadcast_task_ids)
logger.info("Spawned agent %s (session=%s, pid=%d)",
agent_id, session_id, proc.pid)
@@ -593,7 +613,11 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
if self.counter:
self.counter.release(agent_id, _sid_key)
logger.exception("Failed to spawn agent %s", agent_id)
self._record_attempt(task_id, agent_id, "spawn_failed", error=str(e))
self._record_attempt(
task_id,
agent_id,
"spawn_failed",
error=str(e))
raise
async def spawn_subagent(
@@ -609,7 +633,9 @@ curl -X POST http://{self.api_host}:{self.api_port}/api/projects/{project_id}/ta
session_id = str(uuid.uuid4())
if self.dry_run:
logger.info("[DRY RUN] Would spawn subagent (session=%s)", session_id)
logger.info(
"[DRY RUN] Would spawn subagent (session=%s)",
session_id)
self._register_session(session_id, "subagent", task_id, pid=None)
return session_id
@@ -729,10 +755,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
agent_id, session_id, json_result)
# 查任务实际状态
task_status = self._get_task_status(db_path, task_id) if task_id else None
task_status = self._get_task_status(
db_path, task_id) if task_id else None
# 分类
cls = self._classify_outcome(exit_code, json_result, stderr_text, task_status, stdout_text)
cls = self._classify_outcome(
exit_code,
json_result,
stderr_text,
task_status,
stdout_text)
outcome = cls["outcome"]
# 更新 session 状态
@@ -761,17 +793,21 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
agent_id, session_id, outcome, exit_code, task_status)
# 广播反馈追踪(Phase 1 bug fix)
if task_id == "broadcast" and hasattr(self, '_ticker') and self._ticker:
if task_id == "broadcast" and hasattr(
self, '_ticker') and self._ticker:
# 广播任务:从 session 信息取真实 task_id 列表,逐一回调 tracker
sess_info = self._sessions.get(session_id or "main", {})
bt_ids = sess_info.get("broadcast_task_ids") or []
# 广播场景一律标 no_reply:Agent 只 claim 一个任务,
# 其余任务的 tracker 不能被 claimed 清除
for real_task_id in bt_ids:
self._ticker.record_broadcast_response(real_task_id, agent_id, "no_reply")
self._ticker.record_broadcast_response(
real_task_id, agent_id, "no_reply")
elif task_id and hasattr(self, '_ticker') and self._ticker:
outcome_str = "claimed" if cls.get("status") == "ok" else "no_reply"
self._ticker.record_broadcast_response(task_id, agent_id, outcome_str)
outcome_str = "claimed" if cls.get(
"status") == "ok" else "no_reply"
self._ticker.record_broadcast_response(
task_id, agent_id, outcome_str)
if cls["should_retry"]:
# cooldown: 新增的可恢复场景(A14/A15/A16/A8/A10)
@@ -848,13 +884,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# A8(gateway_unreachable), A11(lock_conflict),
# A10(compact_failed), A12(agent_error)
# v2.8.1 Fix-3a: crash 类 outcome 设 cooldown,给 agent session 恢复时间
if outcome in ("crashed", "compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
if outcome == "crashed" and self.counter:
self.counter.set_cooldown(agent_id, seconds=60)
logger.info(
"Crash cooldown set for %s: 60s (outcome=%s)",
agent_id,
outcome)
elif outcome in ("compact_failed", "process_crash", "session_stuck",
"compact_hanging", "agent_error", "compact_interrupted") and self.counter:
self.counter.set_cooldown(agent_id, seconds=300) # 5 分钟
logger.info("Crash/error cooldown set for %s: 300s (outcome=%s)", agent_id, outcome)
logger.info(
"Error cooldown set for %s: 300s (outcome=%s)",
agent_id,
outcome)
# F1: 不可恢复 outcome → 立刻标 failed + 写黑板
if outcome in ("auth_failed", "agent_error") and db_path and task_id:
logger.error("Task %s: unrecoverable outcome=%s, marking failed immediately", task_id, outcome)
if outcome in ("auth_failed",
"agent_error") and db_path and task_id:
logger.error(
"Task %s: unrecoverable outcome=%s, marking failed immediately",
task_id,
outcome)
self._mark_task(db_path, task_id, "failed", {
"reason": outcome,
"stderr_preview": (stderr_text or "")[:500],
@@ -878,13 +927,16 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except Exception:
pass
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# stderr collected but not used in this handler
# (kept for potential future diagnostics)
b"".join(stderr_chunks).decode("utf-8", errors="replace")
# 检查 session 状态
state = self._check_session_state(agent_id)
# B1: 假死 - 先复活,连续假死 ≥2 次再 failed
if state.get("status") == "running" and not state.get("lock_pid_alive", True):
if state.get("status") == "running" and not state.get(
"lock_pid_alive", True):
# 假死计数
stuck_count = self._stuck_counts.get(task_id, 0) + 1
self._stuck_counts[task_id] = stuck_count
@@ -910,7 +962,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
await self._do_on_complete_async(on_complete, agent_id, "session_revived")
else:
# 复活失败 → 标 failed
logger.error("Agent %s revive failed, marking failed", agent_id)
logger.error(
"Agent %s revive failed, marking failed", agent_id)
self._mark_task(db_path, task_id, "failed",
{"reason": "revive_failed", "stuck_count": stuck_count,
"diagnostics": state})
@@ -991,7 +1044,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
# Bug-6 fix: pending 不是终态
if row and row["status"] in ("done", "failed", "cancelled", "review"):
if row and row["status"] in (
"done", "failed", "cancelled", "review"):
logger.info("Retry skip: task %s already %s (agent=%s)",
task_id, row["status"], agent_id)
# on_complete = wrapped_on_complete,会 release counter
@@ -1000,7 +1054,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.warning("Retry status check failed for %s, proceeding", task_id)
logger.warning(
"Retry status check failed for %s, proceeding", task_id)
# 直接读写 tasks 表的 retry_count
if retry_field == "retry_count" and db_path and task_id:
@@ -1020,7 +1075,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry_count for task %s", task_id)
logger.exception(
"Failed to update retry_count for task %s", task_id)
count = 1
else:
retry_counts = self._get_retry_counts(db_path, task_id)
@@ -1104,7 +1160,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"""
text = stdout_text.strip()
if not text:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
return {"status": None, "summary": None, "fallback_used": False,
"fallback_reason": None, "payloads": []}
try:
data = json.loads(text)
except json.JSONDecodeError:
@@ -1116,7 +1173,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except json.JSONDecodeError:
continue
else:
return {"status": None, "summary": None, "fallback_used": False, "fallback_reason": None, "payloads": []}
return {"status": None, "summary": None, "fallback_used": False,
"fallback_reason": None, "payloads": []}
# 从 data.result.meta.executionTrace 取 fallback 信息
result = data.get("result", {})
@@ -1132,7 +1190,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
}
@staticmethod
def _get_task_status(db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
def _get_task_status(
db_path: Optional[Path], task_id: Optional[str]) -> Optional[str]:
"""查任务实际 API 状态"""
if not db_path or not task_id:
return None
@@ -1149,7 +1208,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return None
@staticmethod
def _get_task_info(db_path: Optional[Path], task_id: Optional[str]) -> Optional[dict]:
def _get_task_info(db_path: Optional[Path],
task_id: Optional[str]) -> Optional[dict]:
"""查任务基本信息"""
if not db_path or not task_id:
return None
@@ -1157,7 +1217,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id, title, status FROM tasks WHERE id=?", (task_id,)
"SELECT id, title, status FROM tasks WHERE id=?", (
task_id,)
).fetchone()
if not row:
return None
@@ -1189,7 +1250,9 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
sessions[main_key] = main_session
with open(sessions_path, "w") as f:
json.dump(sessions, f, indent=2)
logger.info("Revived %s: sessions.json status changed running→idle", agent_id)
logger.info(
"Revived %s: sessions.json status changed running→idle",
agent_id)
# #07 O4: 同时清理残留 lock 文件
sf = main_session.get("sessionFile", "")
if sf:
@@ -1197,7 +1260,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if lock_path.exists():
try:
lock_path.unlink()
logger.info("Cleaned stale lock for %s: %s", agent_id, lock_path.name)
logger.info(
"Cleaned stale lock for %s: %s",
agent_id,
lock_path.name)
except Exception:
pass
return True
@@ -1206,7 +1272,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return False
@staticmethod
def _check_recent_compaction_jsonl(session_file: str, window_seconds: int = 900) -> bool:
def _check_recent_compaction_jsonl(
session_file: str, window_seconds: int = 900) -> bool:
"""v2.8.2 Fix-2: 读 session jsonl 末尾,检查是否有 window_seconds 内的 compaction 记录。
compactionCheckpoints 更可靠:Gateway 每次完成 compact 必然在 jsonl 末尾追加记录,
@@ -1216,7 +1283,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
实测 50KB 在长对话中不够compact 记录被推出窗口导致漏检
正常扫描量不变从尾部往前扫遇到超过 15min timestamp break
"""
if not session_file or not pathlib.Path(session_file).exists():
if not session_file or not Path(session_file).exists():
return False
try:
from datetime import datetime, timezone
@@ -1238,7 +1305,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
ts = obj.get("timestamp", "")
if ts:
try:
ct = datetime.fromisoformat(ts.replace("Z", "+00:00"))
ct = datetime.fromisoformat(
ts.replace("Z", "+00:00"))
if (now - ct).total_seconds() < window_seconds:
return True
except (ValueError, TypeError):
@@ -1262,7 +1330,11 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
v2.8.1: compact 检测改用 session jsonl 末尾扫描(Fix-1),
替代失效的 compactionCheckpoints 检测
"""
result = {"status": "unknown", "lock_pid": None, "lock_pid_alive": False, "recent_compact": False}
result = {
"status": "unknown",
"lock_pid": None,
"lock_pid_alive": False,
"recent_compact": False}
sessions_path = Path(os.environ.get(
"OPENCLAW_HOME", str(Path.home() / ".openclaw")
)) / "agents" / agent_id / "sessions" / "sessions.json"
@@ -1301,8 +1373,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
created_at_str = lock_data.get("createdAt", "")
if created_at_str:
from datetime import datetime as _dt, timezone as _tz
created_dt = _dt.fromisoformat(created_at_str.replace("Z", "+00:00"))
elapsed = (_dt.now(_tz.utc) - created_dt).total_seconds()
created_dt = _dt.fromisoformat(
created_at_str.replace("Z", "+00:00"))
elapsed = (_dt.now(_tz.utc) -
created_dt).total_seconds()
if elapsed > 1800: # 30 minutes
result["lock_pid_alive"] = False
result["lock_expired"] = True
@@ -1315,8 +1389,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
if result["status"] not in ("done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
if result["status"] not in (
"done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
sf)
except Exception:
pass
return result
@@ -1361,45 +1437,53 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
# A15/A16: stderr 含 network/compact 关键字 → 可恢复
if stderr_text:
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
if any(kw in stderr_lower for kw in [
"econnrefused", "etimedout", "gateway closed", "econnreset"]):
return {"outcome": "gateway_unreachable", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_interrupted", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
# A17: 真正的 crash → 保持 working,ticker 兜底
return {"outcome": "crashed", "should_retry": False, "original": "process_crash"}
return {"outcome": "crashed", "should_retry": False,
"original": "process_crash"}
# stdout 为空但 exit=0:可能是正常完成但 --json 没输出
# 查任务状态判断
# A13 revised: stdout 为空但 exit=0 → 信任进程退出码,视为正常完成
# 实测发现 openclaw session=None + exit=0 是正常场景(inform 通知等)
# 旧逻辑按 task_status 区分,非终态判 agent_error → 导致 inform 邮件永不标 done
if status is None and not stdout_text.strip() and exit_code == 0:
terminal_statuses = {"done", "review"}
if task_status in terminal_statuses:
return {"outcome": "completed", "should_retry": False}
return {"outcome": "agent_error", "should_retry": False}
return {"outcome": "completed", "should_retry": False}
# A7-A12: status=error → 不续杯,stderr 辅助分类
if status == "error":
stderr_lower = stderr_text.lower()
if any(kw in stderr_lower for kw in ["401", "403", "unauthorized", "auth"]):
if any(kw in stderr_lower for kw in [
"401", "403", "unauthorized", "auth"]):
return {"outcome": "auth_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["econnrefused", "etimedout", "gateway closed", "econnreset"]):
if any(kw in stderr_lower for kw in [
"econnrefused", "etimedout", "gateway closed", "econnreset"]):
return {"outcome": "gateway_unreachable", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in ["rate_limit", "500", "503", "api error"]):
if any(kw in stderr_lower for kw in [
"rate_limit", "500", "503", "api error"]):
return {"outcome": "api_error", "should_retry": False}
if any(kw in stderr_lower for kw in ["compaction-diag", "context-overflow"]):
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
if any(kw in stderr_lower for kw in ["lock", "busy", "concurrent", "lane task error"]):
if any(kw in stderr_lower for kw in [
"lock", "busy", "concurrent", "lane task error"]):
return {"outcome": "lock_conflict", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
return {"outcome": "agent_error", "should_retry": False}
# 兜底:status 未知值
return {"outcome": "agent_error", "should_retry": False, "original": "unknown_status"}
return {"outcome": "agent_error",
"should_retry": False, "original": "unknown_status"}
@staticmethod
def _get_retry_counts(db_path: Optional[Path], task_id: Optional[str]) -> dict:
def _get_retry_counts(
db_path: Optional[Path], task_id: Optional[str]) -> dict:
"""从最新 task_attempt 的 metadata 读计数器"""
defaults = {"retry_count": 0, "connect_retry_count": 0,
"api_retry_count": 0, "lock_retry_count": 0,
@@ -1425,7 +1509,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return defaults
def _update_retry_counts(self, db_path: Optional[Path],
task_id: Optional[str], counts: dict):
task_id: Optional[str], counts: dict):
"""将 retry counts 写回最新 task_attempt 的 metadata"""
if not db_path or not task_id:
return
@@ -1439,7 +1523,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
(task_id,)
).fetchone()
if row:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
meta = json.loads(
row["metadata"]) if row["metadata"] else {}
meta.update(counts)
conn.execute(
"UPDATE task_attempts SET metadata=? WHERE rowid=?",
@@ -1449,7 +1534,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
finally:
conn.close()
except Exception:
logger.exception("Failed to update retry counts for task %s", task_id)
logger.exception(
"Failed to update retry counts for task %s", task_id)
def _mark_task(self, db_path: Optional[Path], task_id: Optional[str],
status: str, detail: Optional[dict] = None):
@@ -1467,7 +1553,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if detail:
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, "daemon", status, json.dumps(detail, ensure_ascii=False))
(task_id, "daemon", status, json.dumps(
detail, ensure_ascii=False))
)
conn.commit()
finally:
@@ -1485,10 +1572,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
from src.blackboard.operations import Blackboard
bb = Blackboard(db_path)
cid = bb.add_comment(task_id, "daemon",
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
f"@pangtong-fujunshi 任务执行失败: {reason},请评估是否需要介入",
comment_type="system")
bb.record_mentions(cid, task_id, ["pangtong-fujunshi"])
logger.info("Task %s: failure notified pangtong via comment+mention (reason=%s)", task_id, reason)
logger.info(
"Task %s: failure notified pangtong via comment+mention (reason=%s)",
task_id,
reason)
except Exception as e:
logger.warning("Task %s: failed to notify: %s", task_id, e)
except Exception:
@@ -1517,7 +1607,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
if asyncio.iscoroutine(result):
await result
except Exception:
logger.warning("on_complete callback failed for %s", agent_id, exc_info=True)
logger.warning(
"on_complete callback failed for %s",
agent_id,
exc_info=True)
def _register_session(
self,
@@ -1595,7 +1688,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
def get_session_by_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""v2.7.2: 根据 agent_id 获取活跃 session 信息(用于进程存活性检查)"""
for sid, info in self._sessions.items():
if info.get("agent_id") == agent_id and info.get("status") == "running":
if info.get("agent_id") == agent_id and info.get(
"status") == "running":
return info
return None
+3 -5
View File
@@ -9,14 +9,11 @@ from __future__ import annotations
import asyncio
import json
import logging
import subprocess
import uuid
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional
from src.blackboard.models import Event
logger = logging.getLogger("moziplus-v2.sse")
@@ -52,7 +49,8 @@ class SSEEvent:
"""格式化为 SSE 协议文本"""
lines = [f"id: {self.id}"]
lines.append(f"event: {self.event_type}")
lines.append(f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}")
lines.append(
f"data: {json.dumps(self.data, ensure_ascii=False, default=str)}")
return "\n".join(lines) + "\n\n"
+199 -96
View File
@@ -21,7 +21,6 @@ from dataclasses import dataclass, field as dc_field
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.daemon.spawner import AgentBusyError
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
@@ -32,9 +31,11 @@ class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = dc_field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = dc_field(default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
responded_agents: set = dc_field(
default_factory=set) # 已返回反馈的 Agent(含 NO_REPLY
round_number: int = 0 # 当前第几轮(0=未开始,1=第1轮)
logger = logging.getLogger("moziplus-v2.ticker")
@@ -46,7 +47,8 @@ class Ticker:
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
on_tick_complete: Optional[Callable[[],
Coroutine[Any, Any, None]]] = None,
dispatcher: Optional[Any] = None,
spawner: Optional[Any] = None,
max_dispatch_per_tick: int = 3,
@@ -194,7 +196,10 @@ class Ticker:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
logger.exception(
"Tick %d project %s error",
tick_num,
project_id)
results["projects"][project_id] = {"error": str(e)}
# 虚拟项目 _general:不在 registry 但需要调度
@@ -223,7 +228,10 @@ class Ticker:
logger.exception("Tick %d _mail error", tick_num)
results["projects"]["_mail"] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
logger.debug(
"Tick %d complete: %d projects",
tick_num,
len(active_projects))
if self.on_tick_complete:
try:
@@ -314,7 +322,8 @@ class Ticker:
# 8. 健康检查(僵尸检测)
if self.health_checker:
try:
self.health_checker.check(project_id, db_path, self._tick_count)
self.health_checker.check(
project_id, db_path, self._tick_count)
except Exception as e:
logger.warning("HealthChecker error for %s: %s", project_id, e)
@@ -335,7 +344,8 @@ class Ticker:
task_id=t.id, task_title=t.title, task_type=t.task_type
)
except Exception as e:
logger.warning("ExperienceDistiller error for %s: %s", project_id, e)
logger.warning(
"ExperienceDistiller error for %s: %s", project_id, e)
# 10. 扫描后状态
result["summary_after"] = queries.task_summary()
@@ -375,7 +385,8 @@ class Ticker:
(computed, pid),
)
refreshed.append(pid)
logger.info("Parent %s status aggregated: → %s", pid, computed)
logger.info(
"Parent %s status aggregated: → %s", pid, computed)
if refreshed:
conn.commit()
@@ -391,7 +402,7 @@ class Ticker:
MAX_ROUNDS = 5 # §4.5 防无限循环
async def _check_round_complete(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""检测 parent task 下所有 sub task 终态 → spawn 庞统 review
流程§4.4
@@ -462,7 +473,7 @@ class Ticker:
"Round %d review spawned for parent %s (subs: %s)",
new_round, parent_id, summary
)
except Exception as e:
except Exception:
logger.exception("Round check error for parent %s", parent_id)
return reviewed
@@ -531,9 +542,9 @@ Parent Task ID: {parent_task.id}
"""
async def _spawn_pangtong_review(self, parent_task,
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
review_prompt: str,
project_id: str,
new_round: int = 0) -> bool:
"""Spawn 庞统进行 review
流程
@@ -543,7 +554,7 @@ Parent Task ID: {parent_task.id}
"""
try:
agent_id = "pangtong-fujunshi"
session_id = f"review-{parent_task.id}-r{new_round}"
f"review-{parent_task.id}-r{new_round}"
# 构造 on_complete 回调:解析庞统结论,更新 parent 状态
async def _on_review_complete(aid: str, outcome: str):
@@ -555,7 +566,8 @@ Parent Task ID: {parent_task.id}
latest_meta = None
latest_time = ""
for sid, sess in self.spawner._sessions.items():
if sess.get("agent_id") == agent_id and sess.get("meta"):
if sess.get(
"agent_id") == agent_id and sess.get("meta"):
t = sess.get("completed_at", "")
if t > latest_time:
latest_time = t
@@ -586,8 +598,10 @@ Parent Task ID: {parent_task.id}
self._set_parent_reviewing(parent_task.id, project_id)
return True
return False
except Exception as e:
logger.exception("Failed to spawn pangtong review for %s", parent_task.id)
except Exception:
logger.exception(
"Failed to spawn pangtong review for %s",
parent_task.id)
return False
def _set_parent_reviewing(self, parent_id: str, project_id: str):
@@ -603,14 +617,14 @@ Parent Task ID: {parent_task.id}
(parent_id,))
conn.commit()
logger.info("Parent %s → reviewing (round review in progress)",
parent_id)
parent_id)
finally:
conn.close()
except Exception:
logger.exception("Failed to set parent %s to reviewing", parent_id)
def _handle_review_conclusion(self, parent_id: str, project_id: str,
review_text: str, round_num: int):
review_text: str, round_num: int):
"""解析庞统 review 结论,更新 parent 状态
review_text 是庞统回复的文本 spawner session meta payloads 拼接
@@ -619,7 +633,8 @@ Parent Task ID: {parent_task.id}
conn = get_connection(db_path)
try:
# 解析 GOAL_ACHIEVED
is_achieved = bool(review_text and "GOAL_ACHIEVED" in review_text.upper())
is_achieved = bool(
review_text and "GOAL_ACHIEVED" in review_text.upper())
if is_achieved:
# Goal 达成 → parent 最终完成
@@ -649,7 +664,9 @@ Parent Task ID: {parent_task.id}
"(round %d, subs=%d)",
parent_id, round_num, sub_count)
except Exception:
logger.exception("Failed to handle review conclusion for %s", parent_id)
logger.exception(
"Failed to handle review conclusion for %s",
parent_id)
# 安全恢复:reviewing → working
try:
conn.execute("BEGIN IMMEDIATE")
@@ -675,7 +692,7 @@ Parent Task ID: {parent_task.id}
MENTION_MAX_RETRIES = 5
async def _process_mentions(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 pending mentions → spawn 被 @ 的 Agent
流程§3.4
@@ -687,7 +704,8 @@ Parent Task ID: {parent_task.id}
return []
bb = Blackboard(db_path)
mentions = bb.get_pending_mentions(max_retries=self.MENTION_MAX_RETRIES)
mentions = bb.get_pending_mentions(
max_retries=self.MENTION_MAX_RETRIES)
if not mentions:
return []
@@ -751,27 +769,32 @@ Parent Task ID: {parent_task.id}
if new_review and new_review["verdict"] == "approved":
_ticker._transition_status(
get_connection(rdb_path), _t_id, "done",
get_connection(
rdb_path), _t_id, "done",
agent="daemon",
detail={"reason": "rebuttal_approved"})
logger.info("Rebuttal: task %s approved after rebuttal", _t_id)
logger.info(
"Rebuttal: task %s approved after rebuttal", _t_id)
else:
# 仍非 approved → @mention assignee
verdict_str = new_review["verdict"] if new_review else "未知"
rconn2 = get_connection(rdb_path)
try:
t_row = rconn2.execute("SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
t_row = rconn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_t_id,)).fetchone()
finally:
rconn2.close()
if t_row and t_row["assignee"]:
from src.blackboard.blackboard import Blackboard
bb2 = Blackboard(rdb_path)
bb2.add_comment(_t_id, "daemon",
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
f"@{t_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info(
"Rebuttal: task %s still %s after rebuttal", _t_id, verdict_str)
except Exception:
logger.exception("Rebuttal on_complete failed for task %s", _t_id)
logger.exception(
"Rebuttal on_complete failed for task %s", _t_id)
result = await self.spawner.spawn_full_agent(
agent_id=agent_id,
@@ -794,22 +817,30 @@ Parent Task ID: {parent_task.id}
for item in items:
bb.mark_mention_notified(item["id"])
processed.append(agent_id)
logger.info("Mention spawn success: %s (%d mentions)", agent_id, len(items))
logger.info(
"Mention spawn success: %s (%d mentions)",
agent_id,
len(items))
else:
# spawn 返回 None(其他原因)→ 递增 retry_count
for item in items:
bb.mark_mention_retry(item["id"])
logger.warning("Mention spawn failed: %s, retrying next tick", agent_id)
logger.warning(
"Mention spawn failed: %s, retrying next tick", agent_id)
except AgentBusyError:
# Agent 忙,不递增 retry_count,等下次 tick 自然重试
logger.info("Mention spawn skipped: %s busy, will retry next tick", agent_id)
logger.info(
"Mention spawn skipped: %s busy, will retry next tick",
agent_id)
except Exception as e:
logger.exception("Mention processing error for agent %s", agent_id)
except Exception:
logger.exception(
"Mention processing error for agent %s", agent_id)
for item in items:
try:
if item.get("retry_count", 0) >= self.MENTION_MAX_RETRIES - 1:
if item.get("retry_count",
0) >= self.MENTION_MAX_RETRIES - 1:
bb.mark_mention_failed(item["id"])
else:
bb.mark_mention_retry(item["id"])
@@ -822,8 +853,14 @@ Parent Task ID: {parent_task.id}
mention_lines: List[str],
project_id: str) -> str:
"""#03: @mention prompt(身份注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
@@ -899,7 +936,8 @@ Parent Task ID: {parent_task.id}
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
@@ -938,7 +976,8 @@ Parent Task ID: {parent_task.id}
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
(task_id, agent, event_type, json.dumps(
{"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
@@ -948,7 +987,7 @@ Parent Task ID: {parent_task.id}
# ------------------------------------------------------------------
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 pending 任务并调度
v3.0: 两条路径
@@ -978,9 +1017,12 @@ Parent Task ID: {parent_task.id}
try:
result = await self.dispatcher.dispatch(
task,
project_config={"project_id": project_id, "db_path": db_path},
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
if result["status"] == "dispatched" and result["level"] in (
"full", "escalate"):
conn = get_connection(db_path)
try:
# [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed
@@ -1073,7 +1115,8 @@ Parent Task ID: {parent_task.id}
detail={"reason": "no_taker_after_3_broadcasts",
"round_number": self._broadcast_tracker.get(t.id).round_number if self._broadcast_tracker.get(t.id) else 0},
)
logger.warning("Escalated %s: no taker after 3 broadcast rounds", t.id)
logger.warning(
"Escalated %s: no taker after 3 broadcast rounds", t.id)
self._broadcast_tracker.pop(t.id, None)
finally:
conn.close()
@@ -1083,7 +1126,8 @@ Parent Task ID: {parent_task.id}
idle_agents = self._get_idle_agents()
if not idle_agents:
logger.warning("No idle agents for broadcast, skipping (capacity issue)")
logger.warning(
"No idle agents for broadcast, skipping (capacity issue)")
return []
task_ids = [t.id for t in broadcastable]
@@ -1114,7 +1158,8 @@ Parent Task ID: {parent_task.id}
spawned = []
for agent_id in idle_agents:
prompt = self._build_claim_prompt(agent_id, broadcastable, project_id)
prompt = self._build_claim_prompt(
agent_id, broadcastable, project_id)
try:
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
@@ -1128,7 +1173,8 @@ Parent Task ID: {parent_task.id}
spawned.append(session_id)
# 记录已通知的 Agent
for t in broadcastable:
self._broadcast_tracker[t.id].notified_agents.add(agent_id)
self._broadcast_tracker[t.id].notified_agents.add(
agent_id)
except AgentBusyError:
logger.debug("Broadcast skip %s: busy", agent_id)
except Exception:
@@ -1139,8 +1185,14 @@ Parent Task ID: {parent_task.id}
def _build_claim_prompt(self, agent_id: str, tasks: list,
project_id: str) -> str:
"""#03: 广播认领 prompt(身份+专长注入)"""
api_host = getattr(self.spawner, 'api_host', '127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(self.spawner, 'api_port', 8083) if self.spawner else 8083
api_host = getattr(
self.spawner,
'api_host',
'127.0.0.1') if self.spawner else '127.0.0.1'
api_port = getattr(
self.spawner,
'api_port',
8083) if self.spawner else 8083
api_base = f"http://{api_host}:{api_port}/api"
# 获取 Agent 专长
@@ -1195,7 +1247,8 @@ Parent Task ID: {parent_task.id}
@property
def counter(self):
"""从 Dispatcher 获取 counter"""
return getattr(self.dispatcher, 'counter', None) if self.dispatcher else None
return getattr(self.dispatcher, 'counter',
None) if self.dispatcher else None
@staticmethod
def _is_pid_alive(pid: int) -> bool:
@@ -1207,7 +1260,8 @@ Parent Task ID: {parent_task.id}
except (ProcessLookupError, PermissionError):
return False
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
def record_broadcast_response(
self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(Spawner 调用的公共 API"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
@@ -1228,7 +1282,8 @@ Parent Task ID: {parent_task.id}
def _get_all_agent_ids(self) -> List[str]:
"""获取所有配置的 Agent ID"""
if self.dispatcher and hasattr(self.dispatcher, 'router') and self.dispatcher.router:
if self.dispatcher and hasattr(
self.dispatcher, 'router') and self.dispatcher.router:
return list(self.dispatcher.router.agent_profiles.keys())
return []
@@ -1237,12 +1292,13 @@ Parent Task ID: {parent_task.id}
if not self.counter:
return []
# agent_profiles 在 Router 初始化时从 config 填充,是完整 Agent 列表
all_agents = list(self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
all_agents = list(
self.dispatcher.router.agent_profiles.keys()) if self.dispatcher else []
active = self.counter.active_agents
return [aid for aid in all_agents if active.get(aid, 0) == 0]
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
# mail 任务不走 review 流程,直接跳过
if project_id == "_mail":
@@ -1291,7 +1347,9 @@ Parent Task ID: {parent_task.id}
result = await self.dispatcher.dispatch(
task,
action_type="review",
project_config={"project_id": project_id, "db_path": db_path},
project_config={
"project_id": project_id,
"db_path": db_path},
)
if result["status"] == "dispatched":
dispatched.append(task.id)
@@ -1344,7 +1402,7 @@ Parent Task ID: {parent_task.id}
)
reclaimed.append(task.id)
logger.warning("Escalated %s: no taker after %d broadcasts",
task.id, retry_count)
task.id, retry_count)
finally:
conn.close()
else:
@@ -1375,8 +1433,10 @@ Parent Task ID: {parent_task.id}
working = queries.tasks_by_status("working")
for task in working:
# #07.2: crash_limit 统一检查(比超时更严重的信号)
if self.dispatcher and hasattr(self.dispatcher, '_check_crash_limit'):
if self.dispatcher._check_crash_limit(task.id, db_path, limit=3, window_minutes=30):
if self.dispatcher and hasattr(
self.dispatcher, '_check_crash_limit'):
if self.dispatcher._check_crash_limit(
task.id, db_path, limit=3, window_minutes=30):
conn = get_connection(db_path)
try:
self._transition_status(
@@ -1388,7 +1448,8 @@ Parent Task ID: {parent_task.id}
finally:
conn.close()
reclaimed.append(task.id)
logger.error("Task %s: executor crash limit (3/30m), marking failed", task.id)
logger.error(
"Task %s: executor crash limit (3/30m), marking failed", task.id)
continue
# #07.3 ACT-1: updated_at fallback 覆盖 mail auto-working(无 started_at/claimed_at
@@ -1400,7 +1461,8 @@ Parent Task ID: {parent_task.id}
# per-task timeout: deadline 优先,否则用默认值
if task.deadline:
deadline_time = datetime.fromisoformat(task.deadline)
timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0
timeout_minutes = (
deadline_time - start_time).total_seconds() / 60.0
if timeout_minutes < 1:
timeout_minutes = self.default_task_timeout_minutes
else:
@@ -1423,7 +1485,7 @@ Parent Task ID: {parent_task.id}
if ok:
reclaimed.append(task.id)
logger.info("Mail %s: ticker recheck found reply, marked done (%.1fm)",
task.id, elapsed)
task.id, elapsed)
finally:
conn.close()
continue
@@ -1440,15 +1502,17 @@ Parent Task ID: {parent_task.id}
if ok:
reclaimed.append(task.id)
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
task.id, elapsed, timeout_minutes)
task.id, elapsed, timeout_minutes)
finally:
conn.close()
except (ValueError, TypeError):
pass
# v2.7.2: 进程存活性检查 — counter 占用但进程已死的兜底
if self.spawner and self.counter and hasattr(self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(self.counter, "active_agents") else []:
if self.spawner and self.counter and hasattr(
self.counter, "active_agents"):
for agent_id in list(self.counter.active_agents.keys()) if hasattr(
self.counter, "active_agents") else []:
session_info = self.spawner.get_session_by_agent(agent_id)
if not session_info:
continue
@@ -1465,20 +1529,24 @@ Parent Task ID: {parent_task.id}
conn = get_connection(db_path)
try:
current_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id_check,)
"SELECT status FROM tasks WHERE id=?", (
task_id_check,)
).fetchone()
if current_row and current_row["status"] == "review":
logger.info("Task %s in review, keeping status (process dead)", task_id_check)
logger.info(
"Task %s in review, keeping status (process dead)", task_id_check)
else:
self._transition_status(
conn, task_id_check, "pending",
agent="daemon",
detail={"reason": "process_dead", "pid": pid},
detail={
"reason": "process_dead", "pid": pid},
)
finally:
conn.close()
except Exception:
logger.exception("Failed to handle process dead for task %s", task_id_check)
logger.exception(
"Failed to handle process dead for task %s", task_id_check)
# #07.2: Fix-3b 已删除。review 超时/crash 统一由 process_dead + _check_timeouts 处理
@@ -1497,16 +1565,20 @@ Parent Task ID: {parent_task.id}
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: ticker reply check error: %s", original_task_id, e)
logger.error(
"Mail %s: ticker reply check error: %s",
original_task_id,
e)
return True # 保守:查询失败假设有回复
def _check_recent_routing(self, db_path: Path, task_id: str,
action_type: str) -> bool:
action_type: str) -> bool:
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
try:
conn = get_connection(db_path)
try:
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review dispatch
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review
# dispatch
if action_type == "review":
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
@@ -1537,17 +1609,22 @@ Parent Task ID: {parent_task.id}
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
projects = self.registry.list_projects()
recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0}
recovery_report = {
"projects": {},
"total_recovered": 0,
"total_noop": 0}
# 收集所有需要扫描的项目(registry + 虚拟项目)
project_dirs = {}
for project_id, project_info in projects.items():
if project_info.get("status") == "active":
project_dirs[project_id] = self.registry.root / project_id / "blackboard.db"
project_dirs[project_id] = self.registry.root / \
project_id / "blackboard.db"
# 虚拟项目
for virtual_id in ("_general", "_mail"):
virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db"
virtual_db = Path(self.registry.root) / \
virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
project_dirs[virtual_id] = virtual_db
@@ -1567,25 +1644,28 @@ Parent Task ID: {parent_task.id}
old_pid = self._current_project_id
self._current_project_id = project_id
try:
recovered, noop_count = self._recover_project(db_path, NON_TERMINAL)
recovered, noop_count = self._recover_project(
db_path, NON_TERMINAL)
if recovered:
recovery_report["projects"][project_id] = recovered
recovery_report["total_recovered"] += len(recovered)
recovery_report["total_noop"] += noop_count
except Exception:
logger.exception("Startup recovery failed for project %s", project_id)
logger.exception(
"Startup recovery failed for project %s", project_id)
finally:
self._current_project_id = old_pid
if recovery_report["total_recovered"] > 0:
logger.info("Startup recovery: %d tasks recovered across %d projects",
recovery_report["total_recovered"],
len(recovery_report["projects"]))
recovery_report["total_recovered"],
len(recovery_report["projects"]))
elif recovery_report["total_noop"] > 0:
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
recovery_report["total_noop"])
else:
logger.info("Startup recovery: no non-terminal tasks found, clean start")
logger.info(
"Startup recovery: no non-terminal tasks found, clean start")
return recovery_report
@@ -1608,10 +1688,13 @@ Parent Task ID: {parent_task.id}
for task in rows:
try:
action = self._determine_recovery_action(conn, task, status, db_path)
action = self._determine_recovery_action(
conn, task, status, db_path)
if action:
self._execute_recovery(conn, task["id"], action, db_path)
recovered.append({"task_id": task["id"], "from": status, "action": action})
self._execute_recovery(
conn, task["id"], action, db_path)
recovered.append(
{"task_id": task["id"], "from": status, "action": action})
else:
# 审计:保持原状的任务也记录事件
noop_count += 1
@@ -1622,14 +1705,15 @@ Parent Task ID: {parent_task.id}
)
conn.commit()
except Exception:
logger.exception("Startup recovery failed for task %s", task["id"])
logger.exception(
"Startup recovery failed for task %s", task["id"])
finally:
conn.close()
return recovered, noop_count
def _determine_recovery_action(self, conn, task, status: str,
db_path: Path) -> Optional[str]:
db_path: Path) -> Optional[str]:
"""根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
task_id = task["id"]
@@ -1700,7 +1784,8 @@ Parent Task ID: {parent_task.id}
# 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
return None
def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path):
def _execute_recovery(self, conn, task_id: str,
action: str, db_path: Path):
"""执行恢复动作"""
# 获取原始状态(用于审计)
orig_row = conn.execute(
@@ -1712,17 +1797,22 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 清空 current_agent(常规推 pending,无特定 agent 接手)
conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.execute(
"UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
conn.commit()
elif action == "push_to_pending_keep_agent":
self._transition_status(
conn, task_id, "pending",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
# 保留 current_agent,让同一 agent 重新接手
conn.commit()
@@ -1731,7 +1821,9 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "review",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": "working"},
detail={
"reason": "startup_recovery",
"original_status": "working"},
)
conn.commit()
@@ -1739,7 +1831,9 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "done",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
@@ -1747,22 +1841,30 @@ Parent Task ID: {parent_task.id}
self._transition_status(
conn, task_id, "failed",
agent="daemon",
detail={"reason": "startup_recovery", "original_status": orig_status},
detail={
"reason": "startup_recovery",
"original_status": orig_status},
)
conn.commit()
# 记录恢复审计事件
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task_id, "daemon", "startup_recovery", json.dumps({"action": action}))
(task_id, "daemon", "startup_recovery",
json.dumps({"action": action}))
)
conn.commit()
logger.info("Recovery: task %s%s (action=%s)", task_id, action, action)
logger.info(
"Recovery: task %s%s (action=%s)",
task_id,
action,
action)
def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
"""查 events 表找到 reviewing 之前的状态(done 或 failed"""
# _transition_status 写入 event_type=f"task_{new_status}"detail 用 from/to
# _transition_status 写入 event_type=f"task_{new_status}"detail 用
# from/to
rows = conn.execute(
"""SELECT detail FROM events
WHERE task_id=? AND event_type='task_reviewing'
@@ -1773,7 +1875,8 @@ Parent Task ID: {parent_task.id}
for event in rows:
try:
detail = json.loads(event["detail"])
# _transition_status detail 格式: {"from": old_status, "to": new_status, ...}
# _transition_status detail 格式: {"from": old_status, "to":
# new_status, ...}
prev = detail.get("from") or detail.get("old_status")
if prev in ("done", "failed"):
return prev
+1
View File
@@ -57,6 +57,7 @@ export interface V2Task {
estimated_duration_minutes: number | null;
escalated: number;
archived: number; // v2.8: 归档标记
resumed_from: string | null; // v2.8: 续杯来源
// API 聚合字段
comments_count?: number;
outputs_count?: number;
+30 -14
View File
@@ -1,6 +1,13 @@
"""v2.6 主入口 - FastAPI + Daemon ticker 共享 asyncio event loop"""
from __future__ import annotations
from src.api.toolchain_routes import router as toolchain_router
from src.api.mail_routes import router as mail_router
from src.api.sse_routes import router as sse_router
from src.api.project_routes import router as project_router
from src.api.daemon_routes import router as daemon_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.blackboard_routes import router as blackboard_router
import logging
from contextlib import asynccontextmanager
@@ -131,7 +138,8 @@ async def lifespan(app: FastAPI):
counter = ActiveAgentCounter(
max_global=daemon_config.get("max_global_agents", 5),
max_per_session=daemon_config.get("max_per_session", 1),
max_concurrent_sessions=daemon_config.get("max_concurrent_sessions", 3),
max_concurrent_sessions=daemon_config.get(
"max_concurrent_sessions", 3),
default_cooldown_seconds=daemon_config.get("cooldown_seconds", 120),
)
# BootstrapBuilderL2 四段式引擎注入层,v2.1)
@@ -181,7 +189,10 @@ async def lifespan(app: FastAPI):
spawner=spawner,
counter=counter,
db_path=default_db_path,
guardrails=GuardrailEngine(config_path=Path(__file__).parent.parent / "config" / "guardrails.yaml"),
guardrails=GuardrailEngine(
config_path=Path(__file__).parent.parent /
"config" /
"guardrails.yaml"),
)
# ── 集成模块 ──
@@ -191,7 +202,7 @@ async def lifespan(app: FastAPI):
)
# ExperienceDistiller(经验自动蒸馏)
experience_config = config.get("experience", {})
config.get("experience", {})
experience_distiller = ExperienceDistiller(
store=ExperienceStore(store_path=DATA_ROOT / "experiences.jsonl"),
)
@@ -252,13 +263,6 @@ app.add_middleware(
# API 路由注册
# ---------------------------------------------------------------------------
from src.api.blackboard_routes import router as blackboard_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.daemon_routes import router as daemon_router
from src.api.project_routes import router as project_router
from src.api.sse_routes import router as sse_router
from src.api.mail_routes import router as mail_router
from src.api.toolchain_routes import router as toolchain_router
app.include_router(blackboard_router)
app.include_router(checkpoint_router)
@@ -268,6 +272,17 @@ app.include_router(sse_router)
app.include_router(mail_router)
app.include_router(toolchain_router)
# ---------------------------------------------------------------------------
# 健康检查端点
# ---------------------------------------------------------------------------
@app.get("/api/healthz")
async def healthz():
"""轻量级健康检查,无需认证"""
return {"status": "ok"}
# ---------------------------------------------------------------------------
# 兼容端点
# ---------------------------------------------------------------------------
@@ -289,16 +304,17 @@ async def list_projects_compat():
DIST_DIR = Path(__file__).parent / "frontend" / "dist"
if DIST_DIR.exists():
# v3.1: 缓存策略 - HTML 不缓存(确保新版本生效),JS/CSS 长缓存(Vite content hash 已处理)
import mimetypes
_static_app = StaticFiles(directory=str(DIST_DIR), html=True)
class CachedStaticFiles:
"""包装 StaticFiles,添加 Cache-Control 头"""
def __init__(self, app):
self._app = app
async def __call__(self, scope, receive, send):
original_send = send
async def patched_send(message):
if message.get("type") == "http.response.start":
headers = dict(message.get("headers", []))
@@ -310,5 +326,5 @@ if DIST_DIR.exists():
message["headers"] = list(headers.items())
await original_send(message)
await self._app(scope, receive, patched_send)
app.mount("/", CachedStaticFiles(_static_app), name="frontend")
-1
View File
@@ -10,7 +10,6 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
def get_data_root() -> Path:
+15
View File
@@ -55,6 +55,21 @@ def client_with_isolation(isolated_data_root):
# ── E2E gate ──
def pytest_collection_modifyitems(config, items):
if not os.environ.get("RUN_INTEGRATION"):
skip_reason = "needs RUN_INTEGRATION=1"
remaining = []
deselected = []
for item in items:
if "integration" in item.keywords or "e2e" in item.keywords:
deselected.append(item)
else:
remaining.append(item)
if deselected:
config.hook.pytest_deselected(items=deselected)
items[:] = remaining
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
+1 -1
View File
@@ -123,7 +123,7 @@ class TestClassifyNoJsonExit0:
def test_task_status_pending(self):
result = Spawner._classify_outcome(0, {}, "", "pending", "")
assert result["outcome"] == "agent_error"
assert result["outcome"] == "completed"
assert result["should_retry"] is False