Compare commits

...

18 Commits

Author SHA1 Message Date
cfdaily c6a0567161 fix(toolchain): 注册 pull_request_sync 和 pull_request_comment event type
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
Gitea 对 PR branch push 发的是独立事件类型 pull_request_sync,
不是 pull_request + action=synchronize。
同时补注册 pull_request_comment(review comment)。
删除 _handle_pull_request 中永远不会触发的 synchronize 分支。
2026-06-12 10:11:49 +08:00
pangtong-fujunshi 3f5b3619c8 Merge pull request 'fix(toolchain): synchronize fallback + merge 通知' (#38) from fix/toolchain-synchronize-fallback-and-merge-notify into main 2026-06-12 00:27:42 +00:00
cfdaily e9bbcf41c9 fix(toolchain): 模板双花括号→单花括号 (仲达 M1)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-12 08:26:42 +08:00
cfdaily 2c612baa04 fix(toolchain): synchronize fallback + merge 通知 2026-06-12 08:26:42 +08:00
pangtong-fujunshi 98d17292b0 Merge PR #37: §24 v4 compact检测 2026-06-11 16:06:19 +00:00
cfdaily fe541f6c89 fix(spawner): §24 v4 仲达review M1(双staticmethod) + S1(TC11)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-12 00:04:10 +08:00
cfdaily ddc1c7285a fix(lint): remove unused timedelta import
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 10s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-12 00:00:18 +08:00
cfdaily 3f71f53e4a fix(spawner): §24 v4 修正注释缩进 + 仲达评审 M1 数据修正 + S2 证据补充
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-11 23:58:29 +08:00
cfdaily 3c2c0f3175 fix(spawner): §24 v4 compact检测 - trajectory prompt.submitted 替换 gateway rotation
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-11 23:57:09 +08:00
pangtong-fujunshi 95a8abca96 Merge PR #36: §24 compact detection via gateway log rotation events 2026-06-11 13:47:55 +00:00
cfdaily bcb8ced17a fix(spawner): address PR#36 review feedback (M1+M2+S1+S2)
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-11 21:40:09 +08:00
cfdaily caf750fad6 fix(spawner): §24 compact check must run when status=done (compact in progress)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-11 21:18:33 +08:00
cfdaily 7918b12ff7 feat(spawner): §24 compact detection via gateway log rotation events 2026-06-11 21:18:33 +08:00
admin 3441f4325f Merge PR #35: §23 PR 全生命周期通知 2026-06-11 06:10:44 +00:00
cfdaily a4bb752d71 feat(toolchain): add PR synchronize and review comment notifications
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
- pull_request.synchronize: notify reviewer to re-review after push
- pull_request_review COMMENTED: notify PR author of review comments
- New templates: review_updated.md, review_comment.md
- Idempotency: add review ID to content dedup key
- Design doc: docs/design/23-toolchain-pr-lifecycle.md
2026-06-11 14:00:44 +08:00
cfdaily d6612de6de fix(cd): move success notification to independent job
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- needs.deploy.result is not available inside steps, only in job-level context
- Split into notify-deploy-success job (symmetric with notify-deploy-failure)
- Default NOTIFY_TO to jiangwei-infra for direct push scenario
2026-06-11 13:25:48 +08:00
cfdaily f33190dc1e feat(cd): add deploy success notification
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
- Query merged PR author via Gitea API
- Send Mail notification to PR author + pangtong
- Non-blocking: mail failure does not affect deploy
- Uses --max-time 5 on all curl calls
2026-06-11 13:22:12 +08:00
pangtong-fujunshi 1089991455 fix(lint): resolve all 37 flake8 issues (#33)
Deploy / ci (push) Successful in 10s
Deploy / deploy (push) Successful in 11s
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-11 02:34:50 +00:00
19 changed files with 1295 additions and 63 deletions
+63
View File
@@ -83,3 +83,66 @@ jobs:
else
echo "Deploy succeeded."
fi
# ── Job 4: 部署成功通知 ──────────────────────────────
notify-deploy-success:
runs-on: macos-arm64
needs: [ci, deploy]
if: always()
steps:
- name: Notify deploy success
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
DEPLOY_RESULT: ${{ needs.deploy.result }}
run: |
if [ "$DEPLOY_RESULT" != "success" ]; then
echo "Deploy did not succeed (result: $DEPLOY_RESULT), skipping success notification."
exit 0
fi
echo "Deploy succeeded, sending notification..."
API_URL="${{ gitea.api_url }}"
REPO="${{ gitea.repository }}"
COMMIT_SHA="${{ gitea.sha }}"
# 查询关联的 merged PR 作者
PR_AUTHOR=$(curl --max-time 5 -sf \
-H "Authorization: token $GITEA_TOKEN" \
"$API_URL/repos/$REPO/pulls?state=closed&sort=updated&order=desc&limit=10" | \
python3 -c "
import json, sys
sha = '$COMMIT_SHA'
for pr in json.load(sys.stdin):
merge_sha = pr.get('merge_commit_sha', '') or ''
if merge_sha.startswith(sha) or sha.startswith(merge_sha):
print(pr['user']['login'])
break
" 2>/dev/null || echo "")
# 确定通知对象
if [ -n "$PR_AUTHOR" ]; then
NOTIFY_TO="$PR_AUTHOR"
else
# direct push 场景通知 jiangwei-infra
NOTIFY_TO="jiangwei-infra"
fi
# 发送 Mail 通知
MAIL_TITLE="[CD] 部署成功: $(echo $COMMIT_SHA | cut -c1-8)"
MAIL_TEXT="部署成功。Commit: ${COMMIT_SHA}"
curl --max-time 5 -s -X POST http://localhost:8083/api/mail \
-H "Content-Type: application/json" \
-d "{\"from\":\"system\",\"to\":\"$NOTIFY_TO\",\"title\":\"$MAIL_TITLE\",\"text\":\"$MAIL_TEXT\",\"type\":\"inform\"}" \
|| echo "Mail notification failed (non-blocking)"
# 同时通知 pangtong-fujunshi(如果 PR 作者不是 pangtong
if [ "$NOTIFY_TO" != "pangtong-fujunshi" ]; then
curl --max-time 5 -s -X POST http://localhost:8083/api/mail \
-H "Content-Type: application/json" \
-d "{\"from\":\"system\",\"to\":\"pangtong-fujunshi\",\"title\":\"$MAIL_TITLE\",\"text\":\"$MAIL_TEXT\",\"type\":\"inform\"}" \
|| echo "Mail notification failed (non-blocking)"
fi
echo "Deploy success notification sent to: $NOTIFY_TO"
+14 -7
View File
@@ -233,20 +233,27 @@ def _revive_session(agent_id: str) -> bool:
pass
```
### 4.5 O5: compact 扫描条件收紧
### 4.5 O5: compact 检测(§24 rotation-only v3
当前 compact 扫描在 status 非 idle/done/unknown/None 时都触发,范围过宽。
§24 设计文档:`docs/design/24-compact-detection-fix.md`
**改后**只在 status 为 running 或 compacting 相关时扫描:
**检测方法**读 gateway 日志尾部 2MB,按 sessionKey 过滤 `[compaction] rotated active transcript` 事件。
如果最近的 rotation 事件在 120s 窗口内 → 视为 compact 循环进行中(可能还在 post-compact retry)。
旧方法 `_check_recent_compaction_jsonl`(扫描 session jsonl 的 `type=compaction` 事件)保留作为 fallback。
```python
# 只在这些状态下检查 compact
if result["status"] in ("running",) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
# §24 v3: compact 检测优先用 gateway 日志 rotation 事件
if result["status"] not in ("idle", "unknown", None):
session_key = f"agent:{agent_id}:main"
result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
session_key)
if not result["recent_compact"] and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(sf)
```
注:Gateway 的 sessions.json status 实际值主要是 `idle/running/timeout/failed`
`running` 时检查 compact 有意义(agent turn 执行中可能触发 compact
非空闲状态(`running`/`timeout`/`failed`)时检查 compact 有意义
其他状态不需要检查。
## 五、改动范围
+155 -8
View File
@@ -33,26 +33,173 @@
| 项 | 配置 |
|----|------|
| 地址 | `http://192.168.2.154:3000` |
| 版本 | v1.23.4 |
| 认证 | HTTP + token(待配置 |
| 权限 | cfdaily 用户;姜维持有 admin 权限(启用 Actions、分支保护等) |
| 版本 | v1.26.22026-06-11 从 v1.23.4 升级) |
| 认证 | HTTP + tokenadmin 账号(姜维持有 |
| 权限 | 姜维持有 admin 权限(启用 Actions、分支保护、org webhook 等) |
| 数据库 | SQLite3 |
| 部署方式 | Docker(NAS 群晖),数据卷 `/volume2/@docker/volumes/gitea-data/_data` |
### 2.2 CI/CDGitea Actions
| 项 | 配置 |
|----|------|
| Runner | Mac mini 裸机,act-runnerGo 二进制 |
| Runner | Mac mini 裸机,gitea-runner v1.0.8(通过 PM2 管理 `sanguo-act-runner` |
| 配置文件 | `.gitea/workflows/*.yml`,每个项目自管 |
| 语法 | 兼容 GitHub Actionsv1.23.4 已验证支持) |
| 触发 | push / PR / tag |
| 语法 | 兼容 GitHub Actionsv1.26.2 已验证支持 concurrency groups |
| 触发 | push / PR / tag / workflow_dispatch |
| v1.26 新增 | concurrency groups、re-run failed jobs、可配置 GITEA_TOKEN 权限 |
| 仍不支持 | `failure()``continue-on-error``timeout-minutes` |
### 2.3 部署目标
### 2.4 Gitea 基础设施 Setup 记录(2026-06-11 姜维)
> 以下为 Gitea 从 v1.23.4 升级到 v1.26.2 的完整操作记录,作为未来参考。
#### 2.4.1 升级 v1.23.4 → v1.26.2
**升级原因**v1.23.4 不支持 concurrency groups,导致双倍触发问题无根因解法。
**升级步骤**
1. 备份:`docker exec sanguo_gitea gitea dump -c /data/gitea/conf/app.ini -f /data/gitea/gitea-backup-pre-v126.zip`765MB
2. 拉取镜像:Mac 上 skopeo 下载 → python docker SDK 远程 load(群晖 Docker Hub 太慢)
3. 停止旧容器 + rename 保留回滚
4. 启动新容器(数据库自动迁移 Migration[312]→[326],含 concurrency #323
5. 验证:API + Web UI + 仓库数据 + 用户数据
**踩坑:群晖内核 3.10 + git 2.52 不兼容**
- 根因:git 2.52 使用 `getrandom(2)` syscall,群晖内核 3.10.108 不支持(3.17 才加入)
- 症状:`git push``unable to create temporary file: Function not implemented`
- 修复:entrypoint 脚本在容器启动时自动从本地缓存降级 git 到 2.45.4
- 持久化:`/data/entrypoint-wrapper.sh` + `/data/git-2.45.4-r0.apk` 在数据卷里,容器重建不丢失
- 群晖内核**无法通过 DSM 升级**,内核版本跟硬件型号绑定
**完整重建命令**
```bash
docker -H tcp://192.168.2.154:2375 run -d \
--name sanguo_gitea \
--restart=always \
-p 3000:3000 \
-p 2221:22 \
-v /volume2/@docker/volumes/gitea-data/_data:/data \
-e GITEA__database__DB_TYPE=sqlite3 \
-e GITEA__database__PATH=/data/gitea/gitea.db \
-e GITEA__server__ROOT_URL=http://192.168.2.154:3000/ \
--entrypoint /bin/sh \
gitea/gitea:1.26.2 \
-c '/data/entrypoint-wrapper.sh'
```
#### 2.4.2 act_runner 升级 v0.2.11 → v1.0.8
**升级原因**v0.2.11 的 multi-step job 执行有 bugSetup Python 和 Lint step 被跳过。
**升级步骤**
1. 下载 `gitea-runner-1.0.8-darwin-arm64`(从 gitea.com releases
2. `codesign --force --sign -` 重签(macOS Gatekeeper 会 SIGKILL 未签名的二进制)
3. 替换 `/Users/chufeng/bin/act_runner`
4. PM2 restart `sanguo-act-runner`
**注意**act_runner 通过 **PM2** 管理(`sanguo-act-runner`),不是 launchd。launchd plist 仅为备份。
**PM2 常用命令**
```bash
pm2 restart sanguo-act-runner # 重启
pm2 logs sanguo-act-runner # 查看日志
pm2 show sanguo-act-runner # 详情
```
#### 2.4.3 CI Workflow 配置
**三个 workflow 文件**
| 文件 | 触发 | concurrency | 说明 |
|------|------|-------------|------|
| `ci.yml` | `pull_request` | `group: ci-${{ gitea.ref }}, cancel-in-progress: true` | 同一 PR 新 push 自动取消旧 run |
| `deploy.yml` | `push to main` | `group: deploy-${{ gitea.ref }}, cancel-in-progress: false` | 部署排队不取消 |
| `e2e.yml` | `workflow_dispatch` | `group: e2e-${{ gitea.ref }}, cancel-in-progress: true` | 手动触发 |
**Branch Protectionmain 分支)**
- 禁止直接 push
- status check`CI / lint (pull_request)` 必须通过
- 至少 1 人 Review
**⚠️ 踩坑**v1.26 上报的 commit status context 格式变了:
- 旧格式:`lint`
- 新格式:`CI / lint (pull_request)`
- branch protection 必须用新格式匹配,否则 merge 报 "Not all required status checks successful"
#### 2.4.4 Org Webhook 配置
- **对象**Gitea 组织 `sanguo` webhook id=28
- **URL**`http://192.168.2.153:8083/webhook/gitea`
- **事件**16 个(push/issues/PR/PR review 等)
**⚠️ 踩坑**Gitea v1.26 的 PATCH hooks API,只传 `{"active": true}` 会把 events 重置为 `["push"]`。**必须每次 PATCH 都带上完整的 events 列表。**
**临时措施(已恢复)**2026-06-10 曾临时关闭 webhookCI 错误大爆炸期间),2026-06-11 已恢复。
#### 2.4.5 凭据管理
| 凭据 | 用途 | 持有者 |
|------|------|--------|
| Gitea admin:cf7561523 | 仓库管理、branch protection、org webhook | 姜维 |
| Gitea PAT (jiangwei-infra) | API 操作、git clone/push | 姜维 |
| Gitea PAT (cfdaily) | CI workflow 中的 git 操作 | CI secrets |
#### 2.4.6 备份与回滚
| 项目 | 路径 | 说明 |
|------|------|------|
| Gitea 数据库备份 | `/data/gitea/gitea-backup-pre-v126.zip` | 升级前 dump |
| 旧容器 | 已清理 | 升级验证通过后 `docker rm` |
| 变更记录 | `~/.openclaw/workspace-jiangwei/changes/gitea-emergency-2026-06-10.md` | 完整操作日志 |
| 环境 | 位置 | 说明 |
|------|------|------|
| Mac mini 本机 | `~/.sanguo_projects/<project>/` | 主力开发和运行环境 |
| NAS Docker | `192.168.2.154` | 部分服务(Gitea、回测等) |
#### 2.4.7 Gitea 迁移验证记录(2026-06-11 司马懿)
> 验证 Gitea 从 gitee 迁移完成后的状态。所有验证在 2026-06-11 完成。
**仓库迁移状态**
| 项目 | Gitea 仓库 | 开发目录 | 远程地址 | gitee 残留 |
|------|-----------|---------|---------|----------|
| sanguo_moziplus_v2 | `sanguo/sanguo_moziplus_v2` | `~/.openclaw/sanguo_projects/sanguo_moziplus_v2/` | `http://192.168.2.154:3000/sanguo/sanguo_moziplus_v2.git` | ✅ 无 |
| sanguo_quant_live | `sanguo/sanguo_quant_live` | `~/.openclaw/sanguo_projects/sanguo_quant_live/` | `http://192.168.2.154:3000/sanguo/sanguo_quant_live.git` | ✅ 无 |
| sanguo_vnpy | `sanguo/sanguo_vnpy` | `~/.openclaw/sanguo_projects/sanguo_vnpy/` | `http://192.168.2.154:3000/sanguo/sanguo_vnpy.git` | ✅ 无 |
**验证方法**:在 3 个开发目录分别执行 `git remote -v`,确认 origin 指向 gitea 且无 gitee remote。
**CI 管道验证**
| 验证项 | 结果 | 备注 |
|--------|------|------|
| PR #33 Lint 修复 CI 通过 | ✅ | flake8 全通过 |
| CD pipeline (deploy.yml) 合并 | ✅ | 含 CI + deploy + notify-deploy-failure 三个 job |
| Branch protection 生效 | ✅ | main 分支需 CI 通过 + 1 人 Review 才能合并 |
| Gitea squash merge 兼容 | ✅ | `merge_commit_sha` 在 squash merge 下仍等于 gitea.sha |
**工具链事件中枢验证**
| Webhook → Mail 流 | 验证结果 |
|-------------------|--------|
| PR opened → Review 请求 Mail | ✅ 司马懿收到 PR #30-#35 的 Review 请求 |
| PR review → 结果 Mail | ✅ 张飞/庞统收到 Review 结果通知 |
| Issue assigned → 指派 Mail | ✅ E2E 验证通过) |
| CI 失败评论 → 通知 Mail | ✅ (E2E 验证通过) |
| PR synchronize → reviewer 重审 Mail | ✅ 新增(§23 |
| Review COMMENTED → PR 作者通知 | ✅ 新增(§23) |
**Agent Gitea 凭据**(各 Agent 自行持有 PAT):
| Agent | Gitea 用户名 | PAT 用途 |
|-------|-------------|--------:|
| simayi-challenger | simayi-challenger | PR Review 提交 |
| pangtong-fujunshi | pangtong-fujunshi | PR 创建/合并、代码 push |
| jiangwei-infra | jiangwei-infra | 基础设施配置(admin 级操作) |
---
## §3. 分支策略
@@ -157,7 +304,7 @@ Open → In Progress → Review → Closed
每个项目在 `.gitea/workflows/ci.yml` 自定义具体步骤,但遵循统一骨架。
> **注**Gitea Actions v1.23.4 不支持 `paths` 过滤触发条件。通过路径判断放在 job 级别的 `if` 条件中,使用确定支持的语法。(M4 修订)
> **注**Gitea Actions v1.26.2 不支持 `paths` 过滤触发条件。通过路径判断放在 job 级别的 `if` 条件中,使用确定支持的语法。(M4 修订)
```yaml
name: CI
+81
View File
@@ -0,0 +1,81 @@
# 22 — CD 生产环境落地方案
> 状态:草案,待评审
> 作者:庞统
> 日期:2026-06-11
## 背景
CD 管道已在测试仓库 `sanguo/mojiplus-v2` 验证通过(run#282 全链路 success)。
现需将 CD 落地到生产仓库 `sanguo/sanguo_moziplus_v2`
### 当前状态
| 组件 | 状态 |
|---|---|
| `deploy.sh` | ✅ 完整(rsync + build + pm2 restart + health check + deploy history |
| `deploy.yml` | ⚠️ deploy job 已调用 deploy.sh,但缺少成功通知 |
| CI 失败通知 | ✅ ci.yml → PR comment → webhook → Mail |
| Deploy 失败通知 | ✅ deploy.yml → Issue → webhook → Mail |
| Deploy 成功通知 | ❌ 缺失 |
## 改动方案
### 改动 1deploy.yml 增加 deploy 成功通知
在 deploy job 最后增加一个 stepdeploy 成功后:
1. 从 Gitea API 查询触发 commit 关联的 merged PR
2. 获取 PR 作者
3. 通过 Mail API 发送成功通知给 PR 作者 + pangtong-fujunshi
4. 如果是 direct push(非 PR merge),只通知 jiangwei-infra + pangtong-fujunshi
**文件**`.gitea/workflows/deploy.yml`
**改动范围**deploy job 内新增 1 个 step(约 30 行 shell
**关键逻辑**
```bash
# 查询关联 PR
PR_INFO=$(curl -sf \
-H "Authorization: token $GITEA_TOKEN" \
"$API_URL/repos/$REPO/pulls?state=closed&limit=5" | \
python3 -c "
import json,sys
for pr in json.load(sys.stdin):
if pr.get('merge_commit_sha','') == '$COMMIT_SHA':
print(pr['user']['login'])
break
" 2>/dev/null || echo "")
# 发 Mail
curl -s -X POST http://localhost:8083/api/mail \
-H "Content-Type: application/json" \
-d "{\"from\":\"daemon\",\"to\":\"$PR_AUTHOR\",\"title\":\"...\",\"text\":\"...\",\"type\":\"inform\"}"
```
**约束**
- 使用 `if: always()` + shell 判断 `needs.deploy.result == "success"`,确保只在成功时执行
- GITEA_TOKEN 通过 secrets 注入
- Mail API 调用超时 5 秒,失败不影响部署结果
### 不改的文件
| 文件 | 原因 |
|---|---|
| `src/api/toolchain_routes.py` | 不新增 webhook 事件,deploy 成功通知在 yml 内闭环 |
| `src/daemon/toolchain_handler.py` | 不涉及 |
| `templates/toolchain/*.md` | 不新增模板,通知内容直接在 shell 中构建 |
| `scripts/deploy.sh` | 已完整,不需改动 |
## 影响范围
- **风险**:低。只在 deploy job 末尾追加通知 step,不修改已有的 deploy/notify 逻辑
- **回退**:删除新增 step 即可
- **测试**push main 后观察 deploy workflow 执行结果
## E2E 验证计划
1. 在生产仓库创建测试分支,push → PR → merge → 触发 deploy
2. 验证 deploy 成功后 Mail 通知到达 PR 作者
3. 验证部署文件同步到 `~/.sanguo_projects/sanguo_moziplus_v2/`
4. 验证 health check 通过
+144
View File
@@ -0,0 +1,144 @@
# §23 — 工具链事件中枢补全:PR 全生命周期通知
> 状态:草案,待评审
> 作者:庞统
> 日期:2026-06-11
> 框架:基于 §20 Task Type Architecture + §13 工具链设计
## 背景
### 问题
工具链事件中枢(`toolchain_routes.py`)当前只覆盖了 PR 生命周期中约一半的交互节点。review 驳回后 PR 作者修改代码,没有机制通知 reviewer 重新 review——流程在这里断链。
### 当前覆盖
| 事件节点 | handler | 模板 | 状态 |
|---|---|---|---|
| PR 创建 → 通知 reviewer | `_handle_pull_request` (opened) | `review_request.md` | ✅ |
| Review 通过 → 通知 PR 作者 | `_handle_pull_request_review` (APPROVED) | `review_result.md` | ✅ |
| Review 驳回 → 通知 PR 作者 | `_handle_pull_request_review` (REQUEST_CHANGES) | `review_result.md` | ✅ |
| Issue 指派 → 通知被指派人 | `_handle_issues` (assigned) | `issue_assigned.md` | ✅ |
| CI 失败评论 → 通知 | `_handle_issue_comment` ([CI]) | `ci_failure.md` | ✅ |
| 部署失败 Issue → 通知 | `_handle_issues` (opened+"部署失败") | `deploy_failure.md` | ✅ |
### 缺失节点
| # | 事件节点 | Gitea 事件 | 优先级 | 理由 |
|---|---|---|---|---|
| E1 | PR 更新(push 新 commit)→ 通知 reviewer | `pull_request.synchronize` | **高** | review 驳回→修改→重 review 的关键闭环 |
| ~~E2~~ | ~~PR 合并通知~~ | ~~已删除~~ | ~~—~~ | ~~和 §22 CD 成功通知重叠,已删~~ |
| E3 | Review 评论(COMMENTED)→ 通知 PR 作者 | `pull_request_review` (COMMENTED) | 中 | reviewer 讨论提问,作者应知道 |
| E4 | PR 上普通评论 → 通知相关人 | `issue_comment` (on PR) | 低 | 非关键路径 |
## 方案
### 框架对齐
按 §20 Task Type Architecture,新增事件处理遵循:
1. `_EVENT_HANDLERS` 映射 → 路由到对应 handler 函数
2. handler 提取变量 → `render_template()` 渲染模板
3. `_TEMPLATE_MAP` 注册模板名 → `templates/toolchain/` 下新建模板文件
4. 通知目标通过 Gitea username → `to_agent_id()` 映射
### 新增 Handler 1`_handle_pull_request_synchronize`
**触发**`pull_request` 事件 + `action=synchronize`PR 分支有新 push
**通知对象**PR 的 reviewer(从 PR 的 `requested_reviewers` 或最近一次 non-COMMENTED review 的提交者)
**实现**
修改 `_handle_pull_request` 的 action 过滤,从只处理 `opened` 扩展为同时处理 `synchronize`
```python
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
action = payload.get("action", "")
if action == "opened":
await _handle_pr_opened(payload)
elif action == "synchronize":
await _handle_pr_synchronize(payload)
# 其他 action 忽略
```
新增 `_handle_pr_synchronize`
1. 从 payload 取 PR 信息(number、title、author、head sha
2. 查询最近一次 reviewGitea API `GET /repos/{owner}/{repo}/pulls/{number}/reviews`)取 reviewer
3. 如果没有 review 记录(首次 push 后 reviewer 还没 review),跳过(opened 事件已经通知过了)
4. 渲染 `review_updated.md` 模板,发送 Mail 给 reviewer
**关键设计决策**
- 不用 `requested_reviewers`(可能为空),用最近 review 的提交者
- 只在有 review 历史时才通知(避免 opened + synchronize 重复通知)
- Mail from 用 `system`
### ~~Handler 2PR 合并通知~~ — 已删除
> 司马懿 review 指出与 §22 CD 成功通知重叠。CD 成功通知已隐含合并信息,无需单独发 merged 通知。
### 新增 Handler 3review COMMENTED 处理
**触发**`pull_request_review` 事件 + `state=COMMENTED`
**通知对象**PR 作者(不是 reviewer
**实现**
修改现有 `_handle_pull_request_review`,当前逻辑是"非 COMMENTED 才通知",改为 COMMENTED 也通知,但用不同模板:
```python
# 现有逻辑:非 COMMENTED 通知 PR 作者
if state in ("APPROVED", "REQUEST_CHANGES"):
template_name = "review_result"
elif state == "COMMENTED":
template_name = "review_comment"
else:
return # PENDING 等忽略
```
### 新增模板
| 模板文件 | 变量 | 说明 |
|---|---|---|
| `review_updated.md` | repo, pr_number, pr_title, pr_author, branch, new_sha, reviewer | PR 有新 commit,请重新 review |
| ~~`pr_merged.md`~~ | ~~已删除~~ | ~~—~~ |
| `review_comment.md` | repo, pr_number, pr_title, reviewer, comment_body | reviewer 提交了评论 |
### `_EVENT_HANDLERS` 无需改动
`synchronize``closed` 都是 `pull_request` 事件的 action 子类型,已映射到 `_handle_pull_request`。COMMENTED 是 `pull_request_review` 的 state 子类型,已映射到 `_handle_pull_request_review`
所以 **`_EVENT_HANDLERS` 不需要修改**,只需修改 handler 内部的 action/state 分发逻辑。
### 不做的事
| 项 | 理由 |
|---|---|
| E4 PR 上普通评论通知 | 低优,非关键路径,后续按需加 |
| Issue 关闭通知 | 低优,关怀性质 |
| reviewer 从 `requested_reviewers` 取 | 不可靠(可能为空),用最近 review 记录更稳定 |
## 改动范围
| 文件 | 改动 |
|---|---|
| `src/api/toolchain_routes.py` | 修改 `_handle_pull_request`(扩展 action 分发)+ 新增 `_handle_pr_synchronize` + 修改 `_handle_pull_request_review`(支持 COMMENTED |
| `templates/toolchain/review_updated.md` | 新增 |
| ~~`templates/toolchain/pr_merged.md`~~ | ~~已删除~~ |
| `templates/toolchain/review_comment.md` | 新增 |
| `src/daemon/toolchain_templates.py` | `_TEMPLATE_MAP` 新增 3 个映射 |
| `docs/design/23-toolchain-pr-lifecycle.md` | 本文档 |
## 验证计划
`sanguo/moziplus-v2` 测试仓库上 E2E 验证:
1. **synchronize**:创建 PR → review 驳回 → push 新 commit → 验证 reviewer 收到"请重新 review" Mail
~~2. merged~~:已删除
3. **COMMENTED**:review 提交纯评论 → 验证 PR 作者收到通知
## 风险评估
- **风险等级**:低。新增事件处理,不修改现有 handler 逻辑
- **幂等性**:复用现有 `_is_duplicate` 机制
- **性能**synchronize handler 有一次 Gitea API 调用(查 review 历史),频率低(只在 push 后触发)
+259
View File
@@ -0,0 +1,259 @@
# §24 — Compact 检测方案修正
> 状态:v4trajectory prompt.submitted),待实施
> 作者:庞统
> 日期:2026-06-11
> 框架:基于 §07 Spawner Acquire-First
> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted
> 备选方案:B(内存 flag + sessions.json status),见 §2B
## 1. 问题
### 1.1 现象
2026-06-11 14:02pangtong main session 正在做 compaction13:59:26 开始,14:06:00 结束,耗时 ~6.5 分钟),但 spawner Phase 2 检查时 `compact=False`,仍然 spawn 了新进程处理 Mail,导致两个 agent turn 撞车。
### 1.2 根因
当前 compact 检测方法 `_check_recent_compaction_jsonl` 扫描 session jsonl,查找 `type == "compaction"` 事件。这是 compact **完成后**才写入的摘要记录,compact **进行中**时不存在 → 漏检。
同时 Gateway 触发 compact 时先把 session 标为 `done`,所以 `status=running + lock_pid_alive` 检查也无效。14:02:11 实际状态:`status=done lock_pid_alive=False compact=False`——三个检查全部漏过。
## 2. 方案 ATrajectory prompt.submitted 检测(v4,主选方案)
### 2.1 方案演进
| 版本 | 方案 | 问题 |
|------|------|------|
| v1 | trajectory jsonl 间接推断 | trajectoryPath 不可用,需多文件 |
| v2 | gateway precheck 开始标志 | 覆盖率仅 30%post-compact retry 无开始标志 |
| v3 | rotation-only + 120s 窗口 | 120s 覆盖不了多轮 compact loop(实测 pangtong 13:59→14:50 共 5 轮 rotation,总耗时 ~51 分钟,PR #36 已合并但无法覆盖) |
| **v4** | **trajectory prompt.submitted** | **源码+数据双重验证,仲达背靠背确认** |
### 2.2 核心洞察
**源码证据**`selection-But6hGR0.js` L14040-14085):
```javascript
if (preemptiveCompaction?.shouldCompact) {
skipPromptSubmission = true; // ← compact 时跳过 prompt.submitted
}
if (!skipPromptSubmission) {
trajectoryRecorder?.recordEvent("prompt.submitted", { ... });
}
```
当 context-overflow 触发 compact 时,Gateway 跳过 `prompt.submitted` 事件。
正常 turn 一定有 `prompt.submitted`
**仲达背靠背验证**`skipPromptSubmission` 有 7 条路径(不只 compact),但仲达指出:
**检测目标不是"是否在 compact",而是"session 是否处于正常状态"。**
所有跳过 prompt.submitted 的场景(compact/timeout/hook block/session 结束)
都是不应该 spawn ticker 的状态,误判方向安全。
**实测数据**(仲达背靠背重新验证,2026-06-11):
- pangtong 39 个 turn34 有 prompt.submitted(正常),5 无
- 4 个 tool loop 子迭代(compactionCount=0, <1s, gateway 无 compact 事件)
- 1 个 context-overflow precheck 触发 compact
- simayi 24 个 turn23 有,1 无(tool-result truncation succeeded
- 合计 6/63 = ~9.5% 无 prompt.submitted,其中真正 compact 仅 1 例
- **所有无 prompt.submitted 的场景都是不应 spawn ticker 的状态**,方向安全
### 2.3 检测逻辑
```
1. 构造 trajectory jsonl 路径:{sessionFile}.trajectory.jsonl
2. 读文件尾部,按 session.started 分组找最后一个完整 turn
3. 如果该 turn 有 prompt.submitted → 正常 turn → 不 skip
4. 如果该 turn 有 prompt.skipped → 空白 prompt → 不 skip
5. 如果两者都无 → 非正常状态 → skip ticker
6. 超过 30min 没有新事件 → 兜底放行
```
**为什么不需要 gateway 日志?**
- trajectory jsonl 已经包含了完整的 turn 生命周期
- prompt.submitted 是 turn 级别的标志,不需要匹配开始/结束
- 不需要维护跨 tick 的内存状态
### 2.4 为什么不用 session jsonl 的 `type: "compaction"` 事件?
每轮 compact 结束,session jsonl 确实会写入 `type: "compaction"` 摘要事件。
但 compact 后 Gateway 会 rotate transcript(创建新 session file),
compaction 事件写在**旧 session jsonl** 里(变成 .reset 文件),
当前 main session 指向的 jsonl 中没有这些事件。
这就是现有 `_check_recent_compaction_jsonl` 检测不到的根本原因。
## 2B. 备选方案 B:内存 flag + sessions.json status
如果方案 A 在实际使用中不够,可补充方案 B。
```
1. gateway 日志发现 rotation 或 precheck → 设置内存 flag: compacting=True
2. 每个 ticker 检查:
- flag=True + sessions.json status=running → 清 flagcompact 结束)
- flag=True + 超过 30min → 清 flag(兜底放行)
- flag=True → skip ticker
3. daemon 重启会丢失 flag(可接受,重启后状态已刷新)
```
**优点**:精确检测 compact 结束(status 恢复 running
**缺点**:需要维护内存状态、依赖两个数据源、daemon 重启丢失状态
**触发条件**:仅在方案 A 实际运行中发现不足时实施
## 3. 改动范围(方案 A
| 文件 | 改动 | 行数估计 |
|------|------|---------|
| `spawner.py` | 新增 `_check_compact_in_progress_trajectory()` | ~50 行 |
| `spawner.py` | `_check_session_state()` 调用新方法,替换旧方法 | ~5 行 |
| `tests/test_spawner_compact.py` | 更新单元测试 | ~30 行 |
**总计 ~85 行代码改动。**
## 4. 实现细节(方案 A
### 4.1 核心方法
```python
def _check_compact_in_progress_trajectory(self, session_file: str, timeout_minutes: int = 30) -> bool:
"""检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。
检测逻辑:最后一个完整 turn 没有 prompt.submitted → 非正常状态 → skip ticker。
覆盖:compact、timeout、hook block、session 结束等所有非正常状态。
"""
traj_path = f"{session_file}.trajectory.jsonl"
if not os.path.exists(traj_path):
return False
# 读尾部 500KB
with open(traj_path, 'rb') as f:
f.seek(0, 2)
size = f.tell()
f.seek(max(0, size - 500 * 1024))
tail_lines = f.readlines()
# 按 session.started 分组,找最后一个完整 turn
last_turn_events = []
current_turn = []
for raw_line in tail_lines:
try:
obj = json.loads(raw_line)
except (json.JSONDecodeError, ValueError):
continue
event_type = obj.get("type", "")
if event_type == "session.started":
if current_turn:
last_turn_events = current_turn
current_turn = [obj]
else:
current_turn.append(obj)
if current_turn:
last_turn_events = current_turn
if not last_turn_events:
return False
# 30min 兜底:最后一个事件超过 30min → 放行
last_ts = None
for evt in reversed(last_turn_events):
ts = evt.get("ts") or evt.get("timestamp")
if ts:
last_ts = ts
break
if last_ts:
try:
from datetime import datetime, timezone
# trajectory 时间是 ISO UTC
if last_ts.endswith('Z'):
last_dt = datetime.fromisoformat(last_ts.replace('Z', '+00:00'))
else:
last_dt = datetime.fromisoformat(last_ts)
age = datetime.now(timezone.utc) - last_dt
if age.total_seconds() > timeout_minutes * 60:
return False # 超时放行
except (ValueError, TypeError):
pass
# 检查最后一个 turn 是否有 prompt.submitted
has_prompt_submitted = any(
evt.get("type") == "prompt.submitted" for evt in last_turn_events
)
has_prompt_skipped = any(
evt.get("type") == "prompt.skipped" for evt in last_turn_events
)
if has_prompt_submitted or has_prompt_skipped:
return False # 正常 turn
# 既无 submitted 也无 skipped → 非正常状态 → skip
return True
```
### 4.2 Phase 2 集成
```python
# 在 _check_session_state 中替换旧方法
compact = self._check_compact_in_progress_trajectory(session_file)
if not compact:
compact = self._check_recent_compaction_jsonl(...) # fallback
if compact:
blockers.append(("session_compacting", None))
```
### 4.3 trajectory 路径构造
trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile 来自 sessions.json。
实测验证:
- `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.jsonl`
- → trajectory: `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.trajectory.jsonl`
## 5. 边界情况
| 边界情况 | 处理 | 误判方向 |
|---------|------|----------|
| trajectory 不存在 | 返回 Falsefallback | 安全 |
| tool loop 子迭代 | 无 prompt.submitted → skip | 保守但安全(~8% |
| timeout turn | 无 prompt.submitted → skip | 安全(timeout 也不该 spawn |
| hook block | 无 prompt.submitted → skip | 安全 |
| truncation 成功 | 无 prompt.submitted → skip | 安全(后面会 retry |
| session 结束空 turn | 无 prompt.submitted → skip | 安全 |
| 空白 prompt | 有 prompt.skipped → 不 skip | 正确区分 |
| 30min 无新事件 | 兜底放行 | 防死锁 |
| compact 后 transcript rotate | 读当前 sessionFile 对应的 trajectory | 路径正确 |
| budget compact | 有 prompt.submitted → 不 skip | 正确(budget compact 不阻止 spawn |
## 6. 测试验证
### 6.1 单元测试(更新 test_spawner_compact.py
- `_check_compact_in_progress_trajectory`
- 正常 turn(有 prompt.submitted)→ False
- compact turn(无 prompt.submitted)→ True
- 空白 prompt(有 prompt.skipped)→ False
- 超过 30min 兜底 → False
- trajectory 不存在 → False
- 空 trajectory → False
- 多 turn 尾部只看最后一个 → 正确
### 6.2 集成验证
- `pytest -m "not e2e"` 全量测试
## 7. 关联设计
- §07 Spawner Acquire-First(§4.5 O5 compact 扫描条件收紧)
- §08 Classify Outcome Optimizationcompact_hanging 处理)
## 8. 评审记录
- **v1**trajectory jsonl 间接推断 → 仲达指出 trajectoryPath 不可用、需多文件等 3 个问题
- **v2**gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only
- **v3**rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖
- **v4**trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常"
+130 -9
View File
@@ -77,7 +77,8 @@ def _is_duplicate(event: str, delivery: str,
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
content_key = f"content:{event}:{pr_num}:{sender}:{content_hash}"
review_id = review.get("id", "")
content_key = f"content:{event}:{pr_num}:{sender}:{review_id}:{content_hash}"
if content_key in _delivery_cache:
logger.info(
"Content-based duplicate detected: %s PR#%s by %s",
@@ -258,11 +259,16 @@ def _repo_fullname(payload: Dict[str, Any]) -> str:
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
"""处理 pull_request 事件:opened → 通知 simayi-challenger"""
"""处理 pull_request 事件:opened → 通知 reviewerclosed → merge 通知"""
action = payload.get("action", "")
if action != "opened":
return
if action == "opened":
await _handle_pr_opened(payload)
elif action == "closed":
await _handle_pr_closed(payload)
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
"""PR opened → 通知 simayi-challenger。"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning(
@@ -327,10 +333,6 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
}
state = type_map.get(review_type, "")
# 只通知 APPROVED 和 REQUEST_CHANGES,跳过 COMMENTED 和其他状态
if state == "COMMENTED":
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
@@ -347,6 +349,23 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
if state == "COMMENTED":
# Review 评论 → 通知 PR 作者
review_body = review.get("body", "") or review.get("content", "(无评论)")
reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown")
text = render_template("review_comment", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
})
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
return
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
return
@@ -365,6 +384,104 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
_send_mail(pr_author, title, text)
async def _fetch_latest_reviewer(repo: str, pr_number: int) -> str:
"""查询 PR 最近一次非 PENDING review 的提交者。
Returns:
reviewer login 或空字符串
"""
if not _GITEA_TOKEN:
return ""
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/reviews"
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
reviews = resp.json()
# 取最后一个非 PENDING 的 review 的 user
for review in reversed(reviews):
state = review.get("state", "")
if state in ("APPROVED", "REQUEST_CHANGES", "COMMENTED"):
user = review.get("user", {})
return user.get("login", "")
except Exception as e:
logger.warning("Failed to fetch reviews for %s#%d: %s", repo, pr_number, e)
return ""
async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
"""PR 更新(新 push)→ 通知 reviewer 重新 review。
查询最近一次 review 的提交者作为通知目标
只在有 review 历史时才通知避免和 opened 重复
"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
new_sha = pr.get("head", {}).get("sha", "unknown")[:12]
# 查询最近 review 的提交者
reviewer = await _fetch_latest_reviewer(repo, pr_number)
if not reviewer:
# 没有已有 review 历史,fallback 到默认 reviewer
reviewer = "simayi-challenger"
logger.info("No review history for PR #%s, using default reviewer %s", pr_number, reviewer)
text = render_template("review_updated", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
})
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_mail(reviewer, title, text)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
"""PR closed → 如果 merged,通知 PR 作者。"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
return
# 只处理 merged 的 PR
if not pr.get("merged", False):
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
# merged_by 可能不在 payload 中,fallback 到 sender
merged_by = (
pr.get("merged_by", {}).get("login", "")
or payload.get("sender", {}).get("login", "unknown")
)
text = render_template("review_merged", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
})
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
async def _handle_issues(payload: Dict[str, Any]) -> None:
"""处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。"""
action = payload.get("action", "")
@@ -476,10 +593,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_sync": _handle_pr_synchronize, # Gitea: PR branch push 是独立事件类型
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
"pull_request_comment": _handle_pull_request_review, # Gitea: review comment 独立事件类型
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
@@ -534,9 +653,11 @@ async def gitea_webhook(
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
action = payload.get("action", "")
logger.info("[WEBHOOK] event=%s action=%s delivery=%s", x_gitea_event, action, x_gitea_delivery)
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
logger.debug("Unhandled event type: %s", x_gitea_event)
logger.info("[WEBHOOK] Unhandled event type: %s", x_gitea_event)
return Response(status_code=200,
content=f"unhandled event: {x_gitea_event}")
+21 -21
View File
@@ -10,7 +10,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from src.daemon.prompt_composer import PromptContext, PromptComposer, PromptSection
from src.daemon.prompt_composer import PromptContext, PromptSection
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
@@ -28,46 +28,46 @@ class VerifyResult:
class BaseTaskHandler:
"""所有 task type handler 的基类。
职责L2 引擎注入层的业务逻辑prompt 构建完成验证状态标记
不管进程生命周期exit 分类重试决策这些归 spawner
"""
# crash 类 outcome(进程级异常,需要 rollback)
CRASH_OUTCOMES = frozenset({
"crashed", "compact_failed", "process_crash",
"session_stuck", "compact_hanging",
})
task_type: str = ""
virtual_project: Optional[str] = None
display_name: str = "" # 中文展示名(ticker 扫描日志用)
# === 子类必须实现 ===
def build_prompt(self, context: PromptContext) -> str:
"""构建 L2 prompt(通过 PromptComposer 拼 section)。子类实现。"""
raise NotImplementedError
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""验证任务完成质量。每个 handler 自己的验证逻辑。子类实现。"""
raise NotImplementedError
def target_success_status(self) -> str:
"""验证通过后的目标状态。task='review', mail/toolchain='done'"""
return "review"
def get_sections(self) -> list[PromptSection]:
"""返回此 handler 的 prompt section 列表。子类实现。"""
return []
# === 基类提供统一流程 ===
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""spawn 前业务准备。默认 True。
mail/toolchain override auto_working"""
return True
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""spawn 完成后的业务处理。统一 4 步流程:
@@ -80,10 +80,10 @@ class BaseTaskHandler:
if outcome in self.CRASH_OUTCOMES:
self._rollback_current_agent(db_path, task_id, agent_id)
return
# 2. verify
result = self.verify_completion(task_id, db_path)
# 3. mark
if result.passed:
self._mark_task_status(db_path, task_id, self.target_success_status())
@@ -92,20 +92,20 @@ class BaseTaskHandler:
else:
# 4. notify
self.on_failure(task_id, agent_id, db_path, result)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败处理。默认:标 failed。子类可 override。"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Task %s: verify failed (%s), marked failed",
task_id, verify.reason)
task_id, verify.reason)
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查。默认:False。"""
return False
# === 内部工具方法 ===
def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None:
"""crash 后回退 current_agent → assignee,避免 exclude_current 卡死。
dispatcher._rollback_current_agent 迁移"""
@@ -126,7 +126,7 @@ class BaseTaskHandler:
except Exception as e:
logger.warning("Task %s: failed to rollback current_agent: %s",
task_id, e)
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件(带 3 次重试,防 SQLite DB 锁)。"""
for attempt in range(3):
@@ -157,7 +157,7 @@ class BaseTaskHandler:
logger.warning("Handler: mark %s%s attempt %d failed: %s",
task_id, status, attempt + 1, e)
logger.error("Handler: mark %s%s all 3 attempts failed", task_id, status)
def _auto_mark_working(self, task_id: str, db_path: Path) -> bool:
"""pending → workingmail/toolchain 通用)。"""
try:
+3 -3
View File
@@ -7,7 +7,6 @@ from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
@@ -15,6 +14,7 @@ from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.mail")
class MailHandler(BaseTaskHandler):
"""Mail 任务 handler。"""
@@ -65,7 +65,7 @@ class MailHandler(BaseTaskHandler):
"""request 验证失败 → 标 failed + 通知发件人"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Mail %s: request verify failed (%s), marked failed",
task_id, verify.reason)
task_id, verify.reason)
# 通知发件人
try:
@@ -95,7 +95,7 @@ class MailHandler(BaseTaskHandler):
def _check_reply(self, task_id: str, db_path: Path) -> bool:
"""检查是否已回复(查 tasks 表找 in_reply_to 回复邮件)
dispatcher._mail_check_reply 迁移
Mail 回复机制创建新 taskmust_haves JSON 中包含 in_reply_to = original_task_id
不能查 comments 回复邮件是独立的 task不是 comment
+1 -1
View File
@@ -6,7 +6,7 @@ prompt_composer.py — PromptSection Protocol + PromptContext + PromptComposer
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
from typing import Dict, List, Optional, Protocol, runtime_checkable
logger = logging.getLogger("moziplus-v2.prompt_composer")
+197 -7
View File
@@ -16,11 +16,10 @@ from pathlib import Path
from typing import Any, Dict, List, Optional
from src.blackboard.db import get_connection
from src.daemon.task_type_registry import TaskTypeRegistry
logger = logging.getLogger("moziplus-v2.spawner")
from src.daemon.task_type_registry import TaskTypeRegistry
# ── Prompt 模板 ──
@@ -1298,6 +1297,90 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
logger.exception("Failed to revive %s", agent_id)
return False
# deprecated: §24 v3, 保留供方案 B 备选
@staticmethod
def _get_recent_gateway_logs() -> list:
"""获取当天和昨天的 gateway 日志路径。
日志路径通过 OPENCLAW_LOG_DIR 环境变量配置默认 /tmp/openclaw
文件名格式openclaw-{YYYY-MM-DD}.log
"""
from datetime import timedelta
log_dir = os.environ.get("OPENCLAW_LOG_DIR", "/tmp/openclaw")
now_local = datetime.now()
today = now_local.strftime("%Y-%m-%d")
yesterday = (now_local - timedelta(days=1)).strftime("%Y-%m-%d")
paths = []
for d in [today, yesterday]:
p = os.path.join(log_dir, f"openclaw-{d}.log")
if os.path.exists(p):
paths.append(p)
return paths
# deprecated: §24 v3, 保留供方案 B 备选
@staticmethod
def _check_compact_in_progress_gateway(
session_key: str, window_seconds: int = 120) -> bool:
"""§24 v3 rotation-only: 检查 gateway 日志,判断指定 session 是否刚完成 compact。
检测逻辑读日志尾部 2MB按目标 sessionKey 过滤
找最后一个 rotation 事件如果在窗口内 compact 可能仍在 retry 循环中
"""
from datetime import datetime as _dt, timezone as _tz, timedelta
log_paths = AgentSpawner._get_recent_gateway_logs()
if not log_paths:
return False
now = _dt.now(_tz.utc)
window_start = now - timedelta(seconds=window_seconds)
last_rotation_time = None
for log_path in log_paths:
if not os.path.exists(log_path):
continue
try:
with open(log_path, "rb") as f:
f.seek(0, 2)
size = f.tell()
f.seek(max(0, size - 2 * 1024 * 1024))
tail = f.read().decode("utf-8", errors="replace")
except Exception:
continue
for line in tail.splitlines():
if not line.strip():
continue
try:
obj = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
msg = obj.get("message", "")
# 只看包含目标 sessionKey 的事件
if session_key not in msg:
continue
# rotation 事件
if "[compaction] rotated active transcript" in msg:
ts_str = obj.get("time", "")
if ts_str:
try:
event_time = _dt.fromisoformat(
ts_str.replace("Z", "+00:00"))
# timezone-aware: normalize to UTC
if event_time.tzinfo is None:
event_time = event_time.replace(tzinfo=_tz.utc)
if last_rotation_time is None or event_time > last_rotation_time:
last_rotation_time = event_time
except (ValueError, TypeError):
continue
if last_rotation_time is not None:
return last_rotation_time >= window_start
return False
@staticmethod
def _check_recent_compaction_jsonl(
session_file: str, window_seconds: int = 900) -> bool:
@@ -1414,16 +1497,123 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except Exception:
pass
# v2.8.1 Fix-1: compact 检测改用 session jsonl 末尾扫描
# 只在 agent 非空闲时才扫描(减少不必要 I/O)
if result["status"] not in (
"done", "idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
# §24 v4: compact 检测优先用 trajectory prompt.submitted
# fallback: _check_recent_compaction_jsonl (v2.8.2)
# 重要:compact 进行中时 status=done,所以不能按 status 过滤
# 只跳过 idle/unknown(完全没有活动过的 session)
if result["status"] not in ("idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory(
sf)
if not result["recent_compact"] and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
sf)
except Exception:
pass
return result
@staticmethod
def _check_compact_in_progress_trajectory(
session_file: str, timeout_minutes: int = 30) -> bool:
"""§24 v4: 检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。
检测逻辑最后一个完整 turn 没有 prompt.submitted/skipped 非正常 skip
覆盖compacttimeouthook blocksession 结束等所有非正常状态
Returns:
True = 非正常状态skip ticker
False = 正常 skip或超时兜底放行
"""
if not session_file:
return False
traj_path = f"{session_file}.trajectory.jsonl"
if not os.path.exists(traj_path):
return False
try:
from datetime import datetime as _dt, timezone as _tz
# 读尾部 500KB
with open(traj_path, "rb") as f:
f.seek(0, 2)
size = f.tell()
f.seek(max(0, size - 500 * 1024))
tail = f.read().decode("utf-8", errors="replace")
if not tail.strip():
return False
# 解析所有有效行
events = []
for line in tail.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
events.append(obj)
except (json.JSONDecodeError, ValueError):
continue
if not events:
return False
# 按 session.started 分组找 turn
# 每个 turn 以 session.started 开始
turns = []
current_turn = []
for evt in events:
if evt.get("type") == "session.started":
if current_turn:
turns.append(current_turn)
current_turn = [evt]
else:
current_turn.append(evt)
if current_turn:
turns.append(current_turn)
if not turns:
return False
# 检查最后一个完整 turn(包含 session.started
last_turn = turns[-1]
turn_types = {evt.get("type") for evt in last_turn}
# 有 prompt.submitted 或 prompt.skipped → 正常 turn
if "prompt.submitted" in turn_types or "prompt.skipped" in turn_types:
return False
# 非正常状态 → 检查超时兜底
# 找最后一个有 ts 的事件
last_ts = None
for evt in reversed(events):
ts_str = evt.get("ts")
if ts_str:
try:
last_ts = _dt.fromisoformat(
ts_str.replace("Z", "+00:00"))
if last_ts.tzinfo is None:
last_ts = last_ts.replace(tzinfo=_tz.utc)
except (ValueError, TypeError):
continue
break
if last_ts is None:
# 没有 ts 信息,无法判断超时 → 非正常 → skip
return True
now = _dt.now(_tz.utc)
elapsed = (now - last_ts).total_seconds()
if elapsed > timeout_minutes * 60:
logger.debug("Trajectory last event %.0fs ago > %dm, fallback pass",
elapsed, timeout_minutes)
return False # 兜底放行
return True # 非正常状态且未超时
except Exception as e:
logger.debug("_check_compact_in_progress_trajectory error: %s", e)
return False
@staticmethod
def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str,
task_status: Optional[str], stdout_text: str = "") -> dict:
+3 -3
View File
@@ -7,7 +7,7 @@ from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
@@ -182,7 +182,7 @@ class TaskConstraintsSection:
class TaskHandler(BaseTaskHandler):
"""黑板标准任务 handler。
- verify: 三信号检查output / comment / terminal status
- 成功 review
- 失败 保持 working ticker 重试
@@ -198,7 +198,7 @@ class TaskHandler(BaseTaskHandler):
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""Task on_complete:区分 executor 和 review。
executor: 基类统一流程crash verify mark review
review: handle_review_complete verdict done/keep review
"""
+1 -1
View File
@@ -9,7 +9,7 @@ from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Dict, Optional, Protocol, runtime_checkable
if TYPE_CHECKING:
from src.daemon.prompt_composer import PromptContext
+3 -3
View File
@@ -38,13 +38,13 @@ class ToolchainContextSection:
return render_template(event_type, variables)
# fallback:通用事件描述
lines = [f"## 工具链事件", f""]
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append(f"- **事件详情**:")
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append(f"")
lines.append("")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
+3
View File
@@ -21,6 +21,9 @@ _TEMPLATE_MAP: Dict[str, str] = {
"issue_assigned": "issue_assigned.md",
"ci_failure": "ci_failure.md",
"deploy_failure": "deploy_failure.md",
"review_updated": "review_updated.md",
"review_comment": "review_comment.md",
"review_merged": "review_merged.md",
}
# 模板缓存
+9
View File
@@ -0,0 +1,9 @@
Review 评论
PR: http://192.168.2.154:3000/{repo}/pulls/{pr_number}
标题: {pr_title}
评论者: {reviewer}
{comment_body}
请查看评论并回复或修改代码。
+8
View File
@@ -0,0 +1,8 @@
## PR 已合并 ✅
**仓库**: {repo}
**PR #{pr_number}**: {pr_title}
**作者**: @{pr_author}
**合并者**: @{merged_by}
PR 已成功合并到主分支。
+16
View File
@@ -0,0 +1,16 @@
PR 有新提交,请重新 Review
PR: http://192.168.2.154:3000/{repo}/pulls/{pr_number}
标题: {pr_title}
作者: {pr_author}
新 commit: {new_sha}
您之前已审查过此 PR,作者已根据反馈更新了代码。请重新 Review。
流程:
1. 读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff
2. 重点检查上次 Review 意见的修改部分
3. 提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews
4. 提交后改动者会自动收到通知
完成后回复此 Mail 确认。
+184
View File
@@ -0,0 +1,184 @@
"""单元测试:§24 v4 trajectory prompt.submitted compact 检测
测试 _check_compact_in_progress_trajectory 方法
tmp_path 构造 mock trajectory jsonl 文件
"""
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from src.daemon.spawner import AgentSpawner
# ── helpers ──
_SESSION_ID = "sess-abc123"
def _make_trajectory_event(event_type: str, ts: str = None, **kwargs) -> dict:
"""构造 trajectory jsonl 事件"""
obj = {"type": event_type}
if ts:
obj["ts"] = ts
obj.update(kwargs)
return obj
def _write_trajectory(tmp_path: Path, session_id: str, turns: list[list[dict]]):
"""写入 trajectory jsonl,按 turns 分组。
每个 turn 是一个 list of events
自动在每组前加 session.started如果该 turn 没有的话
"""
traj_file = tmp_path / f"{session_id}.trajectory.jsonl"
with open(traj_file, "w") as f:
for turn_events in turns:
# 如果 turn 第一个事件不是 session.started,自动加一个
if not turn_events or turn_events[0].get("type") != "session.started":
started = _make_trajectory_event(
"session.started",
ts=turn_events[0].get("ts") if turn_events else None,
)
f.write(json.dumps(started, ensure_ascii=False) + "\n")
for evt in turn_events:
f.write(json.dumps(evt, ensure_ascii=False) + "\n")
def _utc_now_str() -> str:
"""返回当前 UTC 时间的 ISO 字符串(带 Z 后缀)"""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.") + \
f"{datetime.now(timezone.utc).microsecond // 1000:03d}Z"
def _utc_past_str(minutes_ago: int) -> str:
"""返回过去 N 分钟的 UTC ISO 字符串"""
ts = datetime.now(timezone.utc) - timedelta(minutes=minutes_ago)
return ts.strftime("%Y-%m-%dT%H:%M:%S.") + \
f"{ts.microsecond // 1000:03d}Z"
# ── 测试用例 ──
class TestCheckCompactInProgressTrajectory:
"""§24 v4: _check_compact_in_progress_trajectory 单元测试"""
def test_tc1_normal_turn_with_submitted_returns_false(self, tmp_path):
"""TC1: 正常 turn(有 prompt.submitted)→ False"""
now = _utc_now_str()
turns = [[
_make_trajectory_event("session.started", ts=now),
_make_trajectory_event("context.compiled", ts=now),
_make_trajectory_event("prompt.submitted", ts=now),
_make_trajectory_event("model.completed", ts=now),
]]
_write_trajectory(tmp_path, _SESSION_ID, turns)
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc2_compact_turn_returns_true(self, tmp_path):
"""TC2: compact turn(无 prompt.submitted,有 context.compiled + model.completed)→ True"""
now = _utc_now_str()
turns = [[
_make_trajectory_event("session.started", ts=now),
_make_trajectory_event("context.compiled", ts=now),
_make_trajectory_event("model.completed", ts=now),
]]
_write_trajectory(tmp_path, _SESSION_ID, turns)
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True
def test_tc3_skipped_prompt_returns_false(self, tmp_path):
"""TC3: 空白 prompt(有 prompt.skipped)→ False"""
now = _utc_now_str()
turns = [[
_make_trajectory_event("session.started", ts=now),
_make_trajectory_event("context.compiled", ts=now),
_make_trajectory_event("prompt.skipped", ts=now),
_make_trajectory_event("model.completed", ts=now),
]]
_write_trajectory(tmp_path, _SESSION_ID, turns)
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc4_timeout_fallback_returns_false(self, tmp_path):
"""TC4: 超过 30min 兜底 → False"""
old = _utc_past_str(35)
turns = [[
_make_trajectory_event("session.started", ts=old),
_make_trajectory_event("context.compiled", ts=old),
_make_trajectory_event("model.completed", ts=old),
]]
_write_trajectory(tmp_path, _SESSION_ID, turns)
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc5_trajectory_not_exists_returns_false(self, tmp_path):
"""TC5: trajectory 不存在 → False"""
session_file = str(tmp_path / "nonexistent-session")
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc6_empty_trajectory_returns_false(self, tmp_path):
"""TC6: 空 trajectory → False"""
traj_file = tmp_path / f"{_SESSION_ID}.trajectory.jsonl"
traj_file.write_text("")
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc7_multi_turn_last_normal_returns_false(self, tmp_path):
"""TC7: 多 turn 尾部只看最后一个(最后一个正常但之前有 compact)→ False"""
old = _utc_past_str(10)
now = _utc_now_str()
turn1 = [
_make_trajectory_event("session.started", ts=old),
_make_trajectory_event("context.compiled", ts=old),
_make_trajectory_event("model.completed", ts=old), # compact turn, no prompt
]
turn2 = [
_make_trajectory_event("session.started", ts=now),
_make_trajectory_event("prompt.submitted", ts=now),
_make_trajectory_event("model.completed", ts=now), # normal turn
]
_write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2])
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
def test_tc8_multi_turn_last_abnormal_returns_true(self, tmp_path):
"""TC8: 多 turn 尾部最后一个非正常 → True"""
old = _utc_past_str(5)
now = _utc_now_str()
turn1 = [
_make_trajectory_event("session.started", ts=old),
_make_trajectory_event("prompt.submitted", ts=old),
_make_trajectory_event("model.completed", ts=old), # normal turn
]
turn2 = [
_make_trajectory_event("session.started", ts=now),
_make_trajectory_event("context.compiled", ts=now),
_make_trajectory_event("model.completed", ts=now), # compact turn, no prompt
]
_write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2])
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True
def test_tc9_null_session_file_returns_false(self):
"""TC9: session_file 为空字符串 → False"""
assert AgentSpawner._check_compact_in_progress_trajectory("") is False
def test_tc10_none_session_file_returns_false(self):
"""TC10: session_file 为 None → False"""
assert AgentSpawner._check_compact_in_progress_trajectory(None) is False
def test_tc11_events_without_ts_returns_true(self, tmp_path):
"""TC11: 事件有 type 但无 ts 字段 → 无法判断超时 → True(skip)"""
turns = [[
_make_trajectory_event("session.started"), # 无 ts
_make_trajectory_event("context.compiled"), # 无 ts
_make_trajectory_event("model.completed"), # 无 ts
]]
_write_trajectory(tmp_path, _SESSION_ID, turns)
session_file = str(tmp_path / _SESSION_ID)
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True