Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5d24183c14 | |||
| 7b32994c75 | |||
| 40efa1c623 | |||
| f4fc941bd1 | |||
| c6a0567161 | |||
| 3f5b3619c8 | |||
| e9bbcf41c9 | |||
| 2c612baa04 | |||
| 98d17292b0 | |||
| fe541f6c89 | |||
| ddc1c7285a | |||
| 3f71f53e4a | |||
| 3c2c0f3175 | |||
| 95a8abca96 |
@@ -838,7 +838,7 @@ Agent spawn 走生产 openclaw 的决策理由:
|
||||
| CI 标准门控 → 代码审查 | **CI 通过后 daemon Webhook 转发 Mail 给审查者** | Mail 通知司马懿 Review | Gitea Webhook `pull_request` → daemon Webhook 模块 → Mail API |
|
||||
| 代码审查 → 修改(不通过) | 审查者提交 Review 意见 | daemon Webhook 转发 Mail 通知改动者(附 Review 摘要) | Gitea Webhook `pull_request_review` → daemon Webhook 模块 → Mail API |
|
||||
| 代码审查 → Merge(通过) | **审查者点 Approve** | daemon Webhook 转发 Mail 通知改动者 merge | Gitea Webhook `pull_request_review` → daemon Webhook 模块 → Mail API |
|
||||
| Merge → 部署 | **merge 到 main 自动触发** | 无需通知(自动化) | Gitea Actions `on: push: branches: [main]` |
|
||||
| Merge → 部署 | **merge 到 main 自动触发** | Mail 通知 PR 作者合并完成(PR #38) | Gitea Actions `on: push: branches: [main]` |
|
||||
| 部署 → E2E | **部署 job 成功后触发 E2E job** | E2E 结果评论到 merge commit | Gitea Actions `needs: [deploy]` |
|
||||
| E2E/部署 → Issue关闭 | 庞统或改动者手动确认后关闭 | Issue 关闭通知关注者 | Gitea API `PATCH /repos/{owner}/{repo}/issues/{id}` state=closed |
|
||||
| CI失败 → Issue评论 | **CI 失败自动评论** → daemon Webhook 转发 Mail 通知改动者 | 评论到关联 Issue + Mail 推送 Agent | Gitea Actions `if: failure()` 写 PR评论 → daemon Webhook 监听 `issue_comment` → Mail |
|
||||
@@ -1303,7 +1303,7 @@ Layer 3: Mail 执行层(Agent 接口)
|
||||
| `push` | 代码推送 | commit hash, 分支, 作者 | 不需要转发(Actions 自动处理) |
|
||||
| `pull_request` (opened) | PR 创建 | PR ID, 标题, 分支, 作者 | → Mail 通知司马懿 Review |
|
||||
| `pull_request_review` (submitted) | Review 提交 | PR ID, 审查者, 结论(APPROVE/REQUEST_CHANGES), 评论 | → Mail 通知张飞 Review 结果 |
|
||||
| `pull_request` (closed/merged) | PR 合并 | PR ID, 合并 commit | 不需要转发(Actions 自动触发 deploy) |
|
||||
| `pull_request` (closed/merged) | PR 合并 | PR ID, 合并 commit | Mail 通知 PR 作者合并完成(PR #38 恢复) |
|
||||
| `issue_comment` | PR/Issue 评论 | 评论者, 内容 | CI workflow 写的失败评论 → 转发 Mail |
|
||||
| `issues` (opened+assigned) | Issue 创建/指派 | Issue ID, 标题, 被指派人 | → Mail 通知开发者 |
|
||||
| `release` | Release 创建 | tag, 名称 | 触发完整 CI+部署 |
|
||||
@@ -1336,8 +1336,8 @@ async def handle_gitea_webhook(event: dict, x_gitea_event: str = Header(...), x_
|
||||
pr_author = to_agent_id(event["pull_request"]["user"]["login"])
|
||||
await send_mail(to="simayi-challenger", title=f"Review 请求: PR #{event['number']}", ...)
|
||||
elif action == "closed" and event["pull_request"]["merged"]:
|
||||
# merge 不需要通知,Actions 自动处理
|
||||
pass
|
||||
# PR #38: 通知 PR 作者合并完成
|
||||
await _handle_pr_closed(event)
|
||||
|
||||
elif x_gitea_event == "pull_request_review":
|
||||
state = event["review"]["state"]
|
||||
|
||||
@@ -119,3 +119,28 @@
|
||||
- 姜维第一次分析给出了错误根因(Gitea 双 notifier),第二次深入调查后自我纠正
|
||||
- 庞统把姜维的第一次结论当事实汇报给主公,没有标注"这是姜维的调查结论,尚未独立验证"
|
||||
- **改进**:SOUL.md 新增规则——推测 vs 事实显式标注、引用他人结论时标注来源、结论被推翻时及时更正
|
||||
|
||||
---
|
||||
|
||||
## PR #38 新增场景(synchronize fallback + merge 通知)
|
||||
|
||||
> 2026-06-12 新增,对应 PR #38 的设计变更
|
||||
|
||||
### 步骤 9:synchronize fallback ✅
|
||||
- 操作:创建 PR(无 review 历史)→ push 新 commit 到 PR 分支
|
||||
- 触发事件:`pull_request` (synchronize)
|
||||
- 预期:`simayi-challenger`(默认 reviewer)收到"请重新 review" Mail
|
||||
- 验证点:
|
||||
- PR 无 review 历史时,`_fetch_latest_reviewer()` 返回 None → fallback 到 `simayi-challenger`
|
||||
- Mail to 正确(默认 reviewer 而非跳过通知)
|
||||
- 模板使用 `review_updated.md`
|
||||
|
||||
### 步骤 10:merge 通知 ✅
|
||||
- 操作:PR 通过 Review 后 merge
|
||||
- 触发事件:`pull_request` (closed) + `merged=true`
|
||||
- 预期:PR 作者收到"PR 已合并" Mail
|
||||
- 验证点:
|
||||
- Mail to 正确(PR 作者)
|
||||
- `merged_by` 字段正确提取(payload `merged_by` → fallback `sender`)
|
||||
- 模板使用 `review_merged.md`
|
||||
- 纯 closed(非 merged)不触发通知
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
|---|---|---|---|---|
|
||||
| E1 | PR 更新(push 新 commit)→ 通知 reviewer | `pull_request.synchronize` | **高** | review 驳回→修改→重 review 的关键闭环 |
|
||||
| ~~E2~~ | ~~PR 合并通知~~ | ~~已删除~~ | ~~—~~ | ~~和 §22 CD 成功通知重叠,已删~~ |
|
||||
| E2 | PR 合并 → 通知 PR 作者 | `pull_request` (closed+merged) | **高** | PR #38 恢复:CD 通知语义不同(部署状态 vs 合并信息),文档 PR 无 CD 流程仍需通知 |
|
||||
| E3 | Review 评论(COMMENTED)→ 通知 PR 作者 | `pull_request_review` (COMMENTED) | 中 | reviewer 讨论提问,作者应知道 |
|
||||
| E4 | PR 上普通评论 → 通知相关人 | `issue_comment` (on PR) | 低 | 非关键路径 |
|
||||
|
||||
@@ -64,17 +65,42 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
新增 `_handle_pr_synchronize`:
|
||||
1. 从 payload 取 PR 信息(number、title、author、head sha)
|
||||
2. 查询最近一次 review(Gitea API `GET /repos/{owner}/{repo}/pulls/{number}/reviews`)取 reviewer
|
||||
3. 如果没有 review 记录(首次 push 后 reviewer 还没 review),跳过(opened 事件已经通知过了)
|
||||
3. 如果没有 review 记录(`_fetch_latest_reviewer()` 返回 None),fallback 到默认 reviewer `simayi-challenger`,而非跳过通知(PR #38 改动:确保无 review 历史时也能通知默认审查者)
|
||||
4. 渲染 `review_updated.md` 模板,发送 Mail 给 reviewer
|
||||
|
||||
**关键设计决策**:
|
||||
- 不用 `requested_reviewers`(可能为空),用最近 review 的提交者
|
||||
- 只在有 review 历史时才通知(避免 opened + synchronize 重复通知)
|
||||
- 无 review 历史时 fallback 到默认 reviewer `simayi-challenger`(PR #38:避免 opened + synchronize 间隔较短时 reviewer 未收到任何通知)
|
||||
- Mail from 用 `system`
|
||||
|
||||
### ~~Handler 2:PR 合并通知~~ — 已删除
|
||||
### Handler 2:`_handle_pr_closed`(PR 合并通知)— PR #38 恢复
|
||||
|
||||
> 司马懿 review 指出与 §22 CD 成功通知重叠。CD 成功通知已隐含合并信息,无需单独发 merged 通知。
|
||||
**触发**:`pull_request` 事件 + `action=closed` + `merged=true`
|
||||
|
||||
**通知对象**:PR 作者
|
||||
|
||||
**实现**:
|
||||
|
||||
修改 `_handle_pull_request` 的 action 分发,新增 `closed` 分支:
|
||||
|
||||
```python
|
||||
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
action = payload.get("action", "")
|
||||
if action == "opened":
|
||||
await _handle_pr_opened(payload)
|
||||
elif action == "synchronize":
|
||||
await _handle_pr_synchronize(payload)
|
||||
elif action == "closed" and payload.get("pull_request", {}).get("merged"):
|
||||
await _handle_pr_closed(payload)
|
||||
# 其他 action 忽略
|
||||
```
|
||||
|
||||
新增 `_handle_pr_closed`:
|
||||
1. 从 payload 取 PR 信息(number、title、merged_by)
|
||||
2. `merged_by` 优先从 `payload["pull_request"]["merged_by"]` 取,若为空则 fallback 到 `payload["sender"]`(PR #38:兼容不同 Gitea 版本和 merge 方式)
|
||||
3. 渲染 `review_merged.md` 模板,发送 Mail 给 PR 作者
|
||||
|
||||
**恢复说明**:此前因与 §22 CD 成功通知重叠而删除。但实际场景中 CD 通知发的是部署状态,PR 作者更关心的是"谁帮我 merge 了"这个信息,两者语义不同。且 CD 流程不一定每次都触发(如文档 PR),merge 通知仍需独立存在。(PR #38 恢复)
|
||||
|
||||
### 新增 Handler 3:review COMMENTED 处理
|
||||
|
||||
@@ -101,7 +127,7 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
| 模板文件 | 变量 | 说明 |
|
||||
|---|---|---|
|
||||
| `review_updated.md` | repo, pr_number, pr_title, pr_author, branch, new_sha, reviewer | PR 有新 commit,请重新 review |
|
||||
| ~~`pr_merged.md`~~ | ~~已删除~~ | ~~—~~ |
|
||||
| `review_merged.md` | repo, pr_number, pr_title, pr_author, merged_by | PR 已合并,通知作者(PR #38 恢复) |
|
||||
| `review_comment.md` | repo, pr_number, pr_title, reviewer, comment_body | reviewer 提交了评论 |
|
||||
|
||||
### `_EVENT_HANDLERS` 无需改动
|
||||
@@ -122,9 +148,9 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
|
||||
| 文件 | 改动 |
|
||||
|---|---|
|
||||
| `src/api/toolchain_routes.py` | 修改 `_handle_pull_request`(扩展 action 分发)+ 新增 `_handle_pr_synchronize` + 修改 `_handle_pull_request_review`(支持 COMMENTED) |
|
||||
| `src/api/toolchain_routes.py` | 修改 `_handle_pull_request`(扩展 action 分发 + closed 分支)+ 新增 `_handle_pr_synchronize` + `_handle_pr_closed` + 修改 `_handle_pull_request_review`(支持 COMMENTED) |
|
||||
| `templates/toolchain/review_updated.md` | 新增 |
|
||||
| ~~`templates/toolchain/pr_merged.md`~~ | ~~已删除~~ |
|
||||
| `templates/toolchain/review_merged.md` | 新增(PR #38 恢复) |
|
||||
| `templates/toolchain/review_comment.md` | 新增 |
|
||||
| `src/daemon/toolchain_templates.py` | `_TEMPLATE_MAP` 新增 3 个映射 |
|
||||
| `docs/design/23-toolchain-pr-lifecycle.md` | 本文档 |
|
||||
@@ -134,8 +160,9 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
在 `sanguo/moziplus-v2` 测试仓库上 E2E 验证:
|
||||
|
||||
1. **synchronize**:创建 PR → review 驳回 → push 新 commit → 验证 reviewer 收到"请重新 review" Mail
|
||||
~~2. merged~~:已删除
|
||||
3. **COMMENTED**:review 提交纯评论 → 验证 PR 作者收到通知
|
||||
2. **synchronize fallback**(PR #38):创建 PR → push commit(无 review 历史)→ 验证默认 reviewer (`simayi-challenger`) 收到通知
|
||||
3. **merge 通知**(PR #38 恢复):PR merge → 验证 PR 作者收到合并通知 Mail
|
||||
4. **COMMENTED**:review 提交纯评论 → 验证 PR 作者收到通知
|
||||
|
||||
## 风险评估
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
# §24 — Compact 检测方案修正
|
||||
|
||||
> 状态:v3(rotation-only),待实施
|
||||
> 状态:v4(trajectory prompt.submitted),待实施
|
||||
> 作者:庞统
|
||||
> 日期:2026-06-11
|
||||
> 框架:基于 §07 Spawner Acquire-First
|
||||
> 评审:仲达 3 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only)
|
||||
> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted)
|
||||
> 备选方案:B(内存 flag + sessions.json status),见 §2B
|
||||
|
||||
## 1. 问题
|
||||
|
||||
@@ -18,150 +19,186 @@
|
||||
|
||||
同时 Gateway 触发 compact 时先把 session 标为 `done`,所以 `status=running + lock_pid_alive` 检查也无效。14:02:11 实际状态:`status=done lock_pid_alive=False compact=False`——三个检查全部漏过。
|
||||
|
||||
## 2. 方案:Rotation-Only 检测(v3)
|
||||
## 2. 方案 A:Trajectory prompt.submitted 检测(v4,主选方案)
|
||||
|
||||
### 2.1 核心洞察(仲达 v2 评审)
|
||||
### 2.1 方案演进
|
||||
|
||||
v2 方案依赖 `[context-overflow-precheck]` route=compact 作为开始标志。但实测数据:
|
||||
| 版本 | 方案 | 问题 |
|
||||
|------|------|------|
|
||||
| v1 | trajectory jsonl 间接推断 | trajectoryPath 不可用,需多文件 |
|
||||
| v2 | gateway precheck 开始标志 | 覆盖率仅 30%,post-compact retry 无开始标志 |
|
||||
| v3 | rotation-only + 120s 窗口 | 120s 覆盖不了多轮 compact loop(实测 pangtong 13:59→14:50 共 5 轮 rotation,总耗时 ~51 分钟,PR #36 已合并但无法覆盖) |
|
||||
| **v4** | **trajectory prompt.submitted** | **源码+数据双重验证,仲达背靠背确认** |
|
||||
|
||||
| Agent | Rotation 事件 | 有 Precheck | 无 Precheck |
|
||||
|-------|:---:|:---:|:---:|
|
||||
| pangtong | 7 | 3 | 4 |
|
||||
| simayi | 3 | 0 | 3 |
|
||||
### 2.2 核心洞察
|
||||
|
||||
**10 次 compact 只有 3 次有 precheck,覆盖率 30%。** 原因:post-compact retry 触发的后续 compact 不经过 precheck 日志路径。
|
||||
**源码证据**(`selection-But6hGR0.js` L14040-14085):
|
||||
|
||||
**结论**:开始标志不可靠。反转检测逻辑——只用可靠的 rotation 事件作为信号。
|
||||
```javascript
|
||||
if (preemptiveCompaction?.shouldCompact) {
|
||||
skipPromptSubmission = true; // ← compact 时跳过 prompt.submitted
|
||||
}
|
||||
if (!skipPromptSubmission) {
|
||||
trajectoryRecorder?.recordEvent("prompt.submitted", { ... });
|
||||
}
|
||||
```
|
||||
|
||||
### 2.2 Rotation 事件
|
||||
当 context-overflow 触发 compact 时,Gateway 跳过 `prompt.submitted` 事件。
|
||||
正常 turn 一定有 `prompt.submitted`。
|
||||
|
||||
Gateway 日志中 `[compaction] rotated active transcript after compaction (sessionKey=...)` 事件:
|
||||
- **100% 覆盖率**:全天 10 次 compact 全部有 rotation 事件
|
||||
- **含 sessionKey**:可以精确匹配目标 session
|
||||
- **JSON 格式**:易解析
|
||||
**仲达背靠背验证**:`skipPromptSubmission` 有 7 条路径(不只 compact),但仲达指出:
|
||||
**检测目标不是"是否在 compact",而是"session 是否处于正常状态"。**
|
||||
所有跳过 prompt.submitted 的场景(compact/timeout/hook block/session 结束)
|
||||
都是不应该 spawn ticker 的状态,误判方向安全。
|
||||
|
||||
**实测数据**(仲达背靠背重新验证,2026-06-11):
|
||||
- pangtong 39 个 turn:34 有 prompt.submitted(正常),5 无
|
||||
- 4 个 tool loop 子迭代(compactionCount=0, <1s, gateway 无 compact 事件)
|
||||
- 1 个 context-overflow precheck 触发 compact
|
||||
- simayi 24 个 turn:23 有,1 无(tool-result truncation succeeded)
|
||||
- 合计 6/63 = ~9.5% 无 prompt.submitted,其中真正 compact 仅 1 例
|
||||
- **所有无 prompt.submitted 的场景都是不应 spawn ticker 的状态**,方向安全
|
||||
|
||||
### 2.3 检测逻辑
|
||||
|
||||
```
|
||||
1. 读 gateway 日志(当天 + 昨天尾部)
|
||||
2. 按目标 sessionKey 过滤 compact 相关事件
|
||||
3. 从后往前找最后一条 rotation 事件:
|
||||
a. 如果 rotation 事件在窗口内(< 120s)→ compact=True
|
||||
(刚完成一轮 compact,可能还在 post-compact retry 循环中)
|
||||
b. 无 rotation 事件或超出时间窗口 → compact=False
|
||||
|
||||
**注意:此方案仅检查 rotation 事件,不检查 model.completed 等其他事件。**
|
||||
这是有意为之的保守策略:不检查正常 turn 事件意味着 compact 完成后的
|
||||
120s 内都可能被误判为 compact 进行中,但误判代价低(仅 skip 一轮 ticker),
|
||||
宁可多拦也不漏放。
|
||||
1. 构造 trajectory jsonl 路径:{sessionFile}.trajectory.jsonl
|
||||
2. 读文件尾部,按 session.started 分组找最后一个完整 turn
|
||||
3. 如果该 turn 有 prompt.submitted → 正常 turn → 不 skip
|
||||
4. 如果该 turn 有 prompt.skipped → 空白 prompt → 不 skip
|
||||
5. 如果两者都无 → 非正常状态 → skip ticker
|
||||
6. 超过 30min 没有新事件 → 兜底放行
|
||||
```
|
||||
|
||||
**为什么 rotation + 时间窗口就够了?**
|
||||
- compact 后 Gateway 会 retry prompt
|
||||
- 如果 retry 又触发 overflow → 又一轮 compact → 又一个 rotation 事件
|
||||
- 如果 retry 成功 → 正常 turn → 新的 session.started / model.completed 事件
|
||||
- 所以「最近一个事件是 rotation 且时间很近」= compact 循环还在进行
|
||||
**为什么不需要 gateway 日志?**
|
||||
- trajectory jsonl 已经包含了完整的 turn 生命周期
|
||||
- prompt.submitted 是 turn 级别的标志,不需要匹配开始/结束
|
||||
- 不需要维护跨 tick 的内存状态
|
||||
|
||||
### 2.4 时间窗口选择
|
||||
### 2.4 为什么不用 session jsonl 的 `type: "compaction"` 事件?
|
||||
|
||||
compact 通常耗时 1-10 分钟。post-compact retry 如果又触发 compact,间隔通常 <60 秒。
|
||||
每轮 compact 结束,session jsonl 确实会写入 `type: "compaction"` 摘要事件。
|
||||
但 compact 后 Gateway 会 rotate transcript(创建新 session file),
|
||||
compaction 事件写在**旧 session jsonl** 里(变成 .reset 文件),
|
||||
当前 main session 指向的 jsonl 中没有这些事件。
|
||||
|
||||
- **窗口太短(如 30s)**:可能漏掉 compact 结束后正在 retry 但还没触发下一轮的场景
|
||||
- **窗口太长(如 900s)**:compact 完成后正常工作很久了还误判
|
||||
- **推荐 120s**:compact 循环中两次 rotation 间隔通常 <60s,120s 有足够余量
|
||||
这就是现有 `_check_recent_compaction_jsonl` 检测不到的根本原因。
|
||||
|
||||
误判代价低(skip 一轮 ticker),所以宁可多拦也不漏放。
|
||||
## 2B. 备选方案 B:内存 flag + sessions.json status
|
||||
|
||||
## 3. 改动范围
|
||||
如果方案 A 在实际使用中不够,可补充方案 B。
|
||||
|
||||
```
|
||||
1. gateway 日志发现 rotation 或 precheck → 设置内存 flag: compacting=True
|
||||
2. 每个 ticker 检查:
|
||||
- flag=True + sessions.json status=running → 清 flag(compact 结束)
|
||||
- flag=True + 超过 30min → 清 flag(兜底放行)
|
||||
- flag=True → skip ticker
|
||||
3. daemon 重启会丢失 flag(可接受,重启后状态已刷新)
|
||||
```
|
||||
|
||||
**优点**:精确检测 compact 结束(status 恢复 running)
|
||||
**缺点**:需要维护内存状态、依赖两个数据源、daemon 重启丢失状态
|
||||
**触发条件**:仅在方案 A 实际运行中发现不足时实施
|
||||
|
||||
## 3. 改动范围(方案 A)
|
||||
|
||||
| 文件 | 改动 | 行数估计 |
|
||||
|------|------|---------|
|
||||
| `spawner.py` | 新增 `_check_compact_in_progress_gateway()` | ~40 行 |
|
||||
| `spawner.py` | 新增 `_check_compact_in_progress_trajectory()` | ~50 行 |
|
||||
| `spawner.py` | `_check_session_state()` 调用新方法,替换旧方法 | ~5 行 |
|
||||
| `spawner.py` | 日志路径配置化 | ~5 行 |
|
||||
| `docs/design/07-spawner-acquire-first.md` | §4.5 O5 更新 | ~10 行 |
|
||||
| `docs/design/24-compact-detection-fix.md` | 本文档 | 已有 |
|
||||
| `tests/test_spawner_compact.py` | 更新单元测试 | ~30 行 |
|
||||
|
||||
**总计 ~60 行代码改动。**
|
||||
**总计 ~85 行代码改动。**
|
||||
|
||||
## 4. 实现细节
|
||||
## 4. 实现细节(方案 A)
|
||||
|
||||
### 4.1 核心方法
|
||||
|
||||
```python
|
||||
def _check_compact_in_progress_gateway(self, session_key: str, window_seconds: int = 120) -> bool:
|
||||
"""检查 gateway 日志,判断指定 session 是否刚完成 compact(可能在 retry 循环中)。
|
||||
def _check_compact_in_progress_trajectory(self, session_file: str, timeout_minutes: int = 30) -> bool:
|
||||
"""检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。
|
||||
|
||||
检测逻辑:如果目标 session 最近一个事件是 rotation 且在窗口内,视为 compact 进行中。
|
||||
检测逻辑:最后一个完整 turn 没有 prompt.submitted → 非正常状态 → skip ticker。
|
||||
覆盖:compact、timeout、hook block、session 结束等所有非正常状态。
|
||||
"""
|
||||
log_paths = self._get_recent_gateway_logs()
|
||||
if not log_paths:
|
||||
traj_path = f"{session_file}.trajectory.jsonl"
|
||||
if not os.path.exists(traj_path):
|
||||
return False
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
window_start = now - timedelta(seconds=window_seconds)
|
||||
# 读尾部 500KB
|
||||
with open(traj_path, 'rb') as f:
|
||||
f.seek(0, 2)
|
||||
size = f.tell()
|
||||
f.seek(max(0, size - 500 * 1024))
|
||||
tail_lines = f.readlines()
|
||||
|
||||
last_rotation_time = None
|
||||
|
||||
for log_path in log_paths:
|
||||
if not os.path.exists(log_path):
|
||||
# 按 session.started 分组,找最后一个完整 turn
|
||||
last_turn_events = []
|
||||
current_turn = []
|
||||
for raw_line in tail_lines:
|
||||
try:
|
||||
obj = json.loads(raw_line)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
with open(log_path, 'rb') as f:
|
||||
# 读尾部 2MB
|
||||
f.seek(0, 2)
|
||||
size = f.tell()
|
||||
f.seek(max(0, size - 2 * 1024 * 1024))
|
||||
|
||||
for raw_line in f:
|
||||
try:
|
||||
obj = json.loads(raw_line)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
msg = obj.get("message", "")
|
||||
ts_str = obj.get("time", "")
|
||||
|
||||
# 只看包含目标 sessionKey 的事件
|
||||
if session_key not in msg:
|
||||
continue
|
||||
|
||||
# rotation 事件
|
||||
if "[compaction] rotated active transcript" in msg:
|
||||
try:
|
||||
event_time = datetime.fromisoformat(ts_str)
|
||||
if last_rotation_time is None or event_time > last_rotation_time:
|
||||
last_rotation_time = event_time
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
event_type = obj.get("type", "")
|
||||
|
||||
if event_type == "session.started":
|
||||
if current_turn:
|
||||
last_turn_events = current_turn
|
||||
current_turn = [obj]
|
||||
else:
|
||||
current_turn.append(obj)
|
||||
|
||||
if last_rotation_time is not None:
|
||||
return last_rotation_time >= window_start
|
||||
if current_turn:
|
||||
last_turn_events = current_turn
|
||||
|
||||
return False
|
||||
if not last_turn_events:
|
||||
return False
|
||||
|
||||
# 30min 兜底:最后一个事件超过 30min → 放行
|
||||
last_ts = None
|
||||
for evt in reversed(last_turn_events):
|
||||
ts = evt.get("ts") or evt.get("timestamp")
|
||||
if ts:
|
||||
last_ts = ts
|
||||
break
|
||||
|
||||
if last_ts:
|
||||
try:
|
||||
from datetime import datetime, timezone
|
||||
# trajectory 时间是 ISO UTC
|
||||
if last_ts.endswith('Z'):
|
||||
last_dt = datetime.fromisoformat(last_ts.replace('Z', '+00:00'))
|
||||
else:
|
||||
last_dt = datetime.fromisoformat(last_ts)
|
||||
age = datetime.now(timezone.utc) - last_dt
|
||||
if age.total_seconds() > timeout_minutes * 60:
|
||||
return False # 超时放行
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# 检查最后一个 turn 是否有 prompt.submitted
|
||||
has_prompt_submitted = any(
|
||||
evt.get("type") == "prompt.submitted" for evt in last_turn_events
|
||||
)
|
||||
has_prompt_skipped = any(
|
||||
evt.get("type") == "prompt.skipped" for evt in last_turn_events
|
||||
)
|
||||
|
||||
if has_prompt_submitted or has_prompt_skipped:
|
||||
return False # 正常 turn
|
||||
|
||||
# 既无 submitted 也无 skipped → 非正常状态 → skip
|
||||
return True
|
||||
```
|
||||
|
||||
### 4.2 日志路径
|
||||
### 4.2 Phase 2 集成
|
||||
|
||||
```python
|
||||
def _get_recent_gateway_logs(self) -> list:
|
||||
"""获取当天和昨天的 gateway 日志路径"""
|
||||
log_dir = os.environ.get("OPENCLAW_LOG_DIR", "/tmp/openclaw")
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
paths = []
|
||||
for d in [today, yesterday]:
|
||||
p = os.path.join(log_dir, f"openclaw-{d}.log")
|
||||
if os.path.exists(p):
|
||||
paths.append(p)
|
||||
return paths
|
||||
```
|
||||
|
||||
### 4.3 Phase 2 集成
|
||||
|
||||
```python
|
||||
# 在 _check_session_state 中,不依赖 status,直接检查
|
||||
compact = self._check_compact_in_progress_gateway(session_key)
|
||||
# 在 _check_session_state 中替换旧方法
|
||||
compact = self._check_compact_in_progress_trajectory(session_file)
|
||||
if not compact:
|
||||
compact = self._check_recent_compaction_jsonl(...) # fallback
|
||||
|
||||
@@ -169,25 +206,41 @@ if compact:
|
||||
blockers.append(("session_compacting", None))
|
||||
```
|
||||
|
||||
### 4.3 trajectory 路径构造
|
||||
|
||||
trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile 来自 sessions.json。
|
||||
|
||||
实测验证:
|
||||
- `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.jsonl`
|
||||
- → trajectory: `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.trajectory.jsonl`
|
||||
|
||||
## 5. 边界情况
|
||||
|
||||
| 边界情况 | 处理 |
|
||||
|---------|------|
|
||||
| 日志文件不存在 | 返回 False(fallback 到旧方法) |
|
||||
| 跨天 compact | 同时检查昨天日志尾部 |
|
||||
| compact 失败(无 rotation) | rotation 事件不会出现 → 检测不到 → 回退到旧方法 |
|
||||
| 误判(compact 完成后正常工作中) | 时间窗口 120s 内可能被误判,但代价低(skip 一轮 ticker)。不检查正常 turn 事件,是保守策略 |
|
||||
| 边界情况 | 处理 | 误判方向 |
|
||||
|---------|------|----------|
|
||||
| trajectory 不存在 | 返回 False(fallback) | 安全 |
|
||||
| tool loop 子迭代 | 无 prompt.submitted → skip | 保守但安全(~8%) |
|
||||
| timeout turn | 无 prompt.submitted → skip | 安全(timeout 也不该 spawn) |
|
||||
| hook block | 无 prompt.submitted → skip | 安全 |
|
||||
| truncation 成功 | 无 prompt.submitted → skip | 安全(后面会 retry) |
|
||||
| session 结束空 turn | 无 prompt.submitted → skip | 安全 |
|
||||
| 空白 prompt | 有 prompt.skipped → 不 skip | 正确区分 |
|
||||
| 30min 无新事件 | 兜底放行 | 防死锁 |
|
||||
| compact 后 transcript rotate | 读当前 sessionFile 对应的 trajectory | 路径正确 |
|
||||
| budget compact | 有 prompt.submitted → 不 skip | 正确(budget compact 不阻止 spawn) |
|
||||
|
||||
## 6. 测试验证
|
||||
|
||||
### 6.1 单元测试
|
||||
### 6.1 单元测试(更新 test_spawner_compact.py)
|
||||
|
||||
- `_check_compact_in_progress_gateway`:
|
||||
- rotation 事件在窗口内 → True
|
||||
- rotation 事件超出窗口 → False
|
||||
- 无 rotation 事件 → False
|
||||
- 日志不存在 → False
|
||||
- sessionKey 不匹配 → False
|
||||
- `_check_compact_in_progress_trajectory`:
|
||||
- 正常 turn(有 prompt.submitted)→ False
|
||||
- compact turn(无 prompt.submitted)→ True
|
||||
- 空白 prompt(有 prompt.skipped)→ False
|
||||
- 超过 30min 兜底 → False
|
||||
- trajectory 不存在 → False
|
||||
- 空 trajectory → False
|
||||
- 多 turn 尾部只看最后一个 → 正确
|
||||
|
||||
### 6.2 集成验证
|
||||
|
||||
@@ -202,4 +255,5 @@ if compact:
|
||||
|
||||
- **v1**:trajectory jsonl 间接推断 → 仲达指出 trajectoryPath 不可用、需多文件等 3 个问题
|
||||
- **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only
|
||||
- **v3**:rotation-only(当前版本)→ 仲达已确认方向,待代码实现后再审
|
||||
- **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖
|
||||
- **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常"
|
||||
|
||||
@@ -259,13 +259,12 @@ def _repo_fullname(payload: Dict[str, Any]) -> str:
|
||||
|
||||
|
||||
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||||
"""处理 pull_request 事件:opened → 通知 reviewer;synchronize → 通知 reviewer 重新 review。"""
|
||||
"""处理 pull_request 事件:opened → 通知 reviewer;closed → merge 通知。"""
|
||||
action = payload.get("action", "")
|
||||
if action == "opened":
|
||||
await _handle_pr_opened(payload)
|
||||
elif action == "synchronize":
|
||||
await _handle_pr_synchronize(payload)
|
||||
# 其他 action 忽略
|
||||
elif action == "closed":
|
||||
await _handle_pr_closed(payload)
|
||||
|
||||
|
||||
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
|
||||
@@ -434,9 +433,9 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
|
||||
# 查询最近 review 的提交者
|
||||
reviewer = await _fetch_latest_reviewer(repo, pr_number)
|
||||
if not reviewer:
|
||||
# 没有 review 历史,跳过(opened 事件已经通知过)
|
||||
logger.debug("No review history for PR #%s, skipping synchronize notification", pr_number)
|
||||
return
|
||||
# 没有已有 review 历史,fallback 到默认 reviewer
|
||||
reviewer = "simayi-challenger"
|
||||
logger.info("No review history for PR #%s, using default reviewer %s", pr_number, reviewer)
|
||||
|
||||
text = render_template("review_updated", {
|
||||
"repo": repo,
|
||||
@@ -451,6 +450,38 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
|
||||
_send_mail(reviewer, title, text)
|
||||
|
||||
|
||||
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
|
||||
"""PR closed → 如果 merged,通知 PR 作者。"""
|
||||
pr = payload.get("pull_request")
|
||||
if not pr or not isinstance(pr, dict):
|
||||
return
|
||||
|
||||
# 只处理 merged 的 PR
|
||||
if not pr.get("merged", False):
|
||||
return
|
||||
|
||||
repo = _repo_fullname(payload)
|
||||
pr_number = pr.get("number", 0)
|
||||
pr_title = pr.get("title", "")
|
||||
pr_author = pr.get("user", {}).get("login", "unknown")
|
||||
# merged_by 可能不在 payload 中,fallback 到 sender
|
||||
merged_by = (
|
||||
pr.get("merged_by", {}).get("login", "")
|
||||
or payload.get("sender", {}).get("login", "unknown")
|
||||
)
|
||||
|
||||
text = render_template("review_merged", {
|
||||
"repo": repo,
|
||||
"pr_number": str(pr_number),
|
||||
"pr_title": pr_title,
|
||||
"pr_author": pr_author,
|
||||
"merged_by": merged_by,
|
||||
})
|
||||
|
||||
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
|
||||
_send_mail(pr_author, title, text)
|
||||
|
||||
|
||||
async def _handle_issues(payload: Dict[str, Any]) -> None:
|
||||
"""处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。"""
|
||||
action = payload.get("action", "")
|
||||
@@ -562,10 +593,12 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
|
||||
|
||||
_EVENT_HANDLERS: Dict[str, Any] = {
|
||||
"pull_request": _handle_pull_request,
|
||||
"pull_request_sync": _handle_pr_synchronize, # Gitea: PR branch push 是独立事件类型
|
||||
"pull_request_review": _handle_pull_request_review,
|
||||
"pull_request_review_approved": _handle_pull_request_review,
|
||||
"pull_request_review_rejected": _handle_pull_request_review,
|
||||
"pull_request_review_comment": _handle_pull_request_review,
|
||||
"pull_request_comment": _handle_pull_request_review, # Gitea: review comment 独立事件类型
|
||||
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
|
||||
"pull_request_approved": _handle_pull_request_review,
|
||||
"pull_request_rejected": _handle_pull_request_review,
|
||||
@@ -620,9 +653,11 @@ async def gitea_webhook(
|
||||
return Response(status_code=200, content="duplicate")
|
||||
|
||||
# 4. 查找 handler
|
||||
action = payload.get("action", "")
|
||||
logger.info("[WEBHOOK] event=%s action=%s delivery=%s", x_gitea_event, action, x_gitea_delivery)
|
||||
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
|
||||
if not handler:
|
||||
logger.debug("Unhandled event type: %s", x_gitea_event)
|
||||
logger.info("[WEBHOOK] Unhandled event type: %s", x_gitea_event)
|
||||
return Response(status_code=200,
|
||||
content=f"unhandled event: {x_gitea_event}")
|
||||
|
||||
|
||||
+110
-6
@@ -1297,6 +1297,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
logger.exception("Failed to revive %s", agent_id)
|
||||
return False
|
||||
|
||||
# deprecated: §24 v3, 保留供方案 B 备选
|
||||
@staticmethod
|
||||
def _get_recent_gateway_logs() -> list:
|
||||
"""获取当天和昨天的 gateway 日志路径。
|
||||
@@ -1316,6 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
paths.append(p)
|
||||
return paths
|
||||
|
||||
# deprecated: §24 v3, 保留供方案 B 备选
|
||||
@staticmethod
|
||||
def _check_compact_in_progress_gateway(
|
||||
session_key: str, window_seconds: int = 120) -> bool:
|
||||
@@ -1495,14 +1497,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# §24 v3: compact 检测优先用 gateway 日志 rotation 事件
|
||||
# 旧方法 _check_recent_compaction_jsonl 作为 fallback
|
||||
# §24 v4: compact 检测优先用 trajectory prompt.submitted
|
||||
# fallback: _check_recent_compaction_jsonl (v2.8.2)
|
||||
# 重要:compact 进行中时 status=done,所以不能按 status 过滤
|
||||
# 只跳过 idle/unknown(完全没有活动过的 session)
|
||||
if result["status"] not in ("idle", "unknown", None):
|
||||
session_key = f"agent:{agent_id}:main"
|
||||
result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
|
||||
session_key)
|
||||
if result["status"] not in ("idle", "unknown", None) and sf:
|
||||
result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory(
|
||||
sf)
|
||||
if not result["recent_compact"] and sf:
|
||||
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
|
||||
sf)
|
||||
@@ -1510,6 +1511,109 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
pass
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _check_compact_in_progress_trajectory(
|
||||
session_file: str, timeout_minutes: int = 30) -> bool:
|
||||
"""§24 v4: 检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。
|
||||
|
||||
检测逻辑:最后一个完整 turn 没有 prompt.submitted/skipped → 非正常 → skip。
|
||||
覆盖:compact、timeout、hook block、session 结束等所有非正常状态。
|
||||
|
||||
Returns:
|
||||
True = 非正常状态(skip ticker)
|
||||
False = 正常(不 skip)或超时兜底放行
|
||||
"""
|
||||
if not session_file:
|
||||
return False
|
||||
traj_path = f"{session_file}.trajectory.jsonl"
|
||||
if not os.path.exists(traj_path):
|
||||
return False
|
||||
|
||||
try:
|
||||
from datetime import datetime as _dt, timezone as _tz
|
||||
|
||||
# 读尾部 500KB
|
||||
with open(traj_path, "rb") as f:
|
||||
f.seek(0, 2)
|
||||
size = f.tell()
|
||||
f.seek(max(0, size - 500 * 1024))
|
||||
tail = f.read().decode("utf-8", errors="replace")
|
||||
|
||||
if not tail.strip():
|
||||
return False
|
||||
|
||||
# 解析所有有效行
|
||||
events = []
|
||||
for line in tail.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
events.append(obj)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
if not events:
|
||||
return False
|
||||
|
||||
# 按 session.started 分组找 turn
|
||||
# 每个 turn 以 session.started 开始
|
||||
turns = []
|
||||
current_turn = []
|
||||
for evt in events:
|
||||
if evt.get("type") == "session.started":
|
||||
if current_turn:
|
||||
turns.append(current_turn)
|
||||
current_turn = [evt]
|
||||
else:
|
||||
current_turn.append(evt)
|
||||
if current_turn:
|
||||
turns.append(current_turn)
|
||||
|
||||
if not turns:
|
||||
return False
|
||||
|
||||
# 检查最后一个完整 turn(包含 session.started)
|
||||
last_turn = turns[-1]
|
||||
turn_types = {evt.get("type") for evt in last_turn}
|
||||
|
||||
# 有 prompt.submitted 或 prompt.skipped → 正常 turn
|
||||
if "prompt.submitted" in turn_types or "prompt.skipped" in turn_types:
|
||||
return False
|
||||
|
||||
# 非正常状态 → 检查超时兜底
|
||||
# 找最后一个有 ts 的事件
|
||||
last_ts = None
|
||||
for evt in reversed(events):
|
||||
ts_str = evt.get("ts")
|
||||
if ts_str:
|
||||
try:
|
||||
last_ts = _dt.fromisoformat(
|
||||
ts_str.replace("Z", "+00:00"))
|
||||
if last_ts.tzinfo is None:
|
||||
last_ts = last_ts.replace(tzinfo=_tz.utc)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
break
|
||||
|
||||
if last_ts is None:
|
||||
# 没有 ts 信息,无法判断超时 → 非正常 → skip
|
||||
return True
|
||||
|
||||
now = _dt.now(_tz.utc)
|
||||
elapsed = (now - last_ts).total_seconds()
|
||||
if elapsed > timeout_minutes * 60:
|
||||
logger.debug("Trajectory last event %.0fs ago > %dm, fallback pass",
|
||||
elapsed, timeout_minutes)
|
||||
return False # 兜底放行
|
||||
|
||||
return True # 非正常状态且未超时
|
||||
|
||||
except Exception as e:
|
||||
logger.debug("_check_compact_in_progress_trajectory error: %s", e)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str,
|
||||
task_status: Optional[str], stdout_text: str = "") -> dict:
|
||||
|
||||
@@ -23,6 +23,7 @@ _TEMPLATE_MAP: Dict[str, str] = {
|
||||
"deploy_failure": "deploy_failure.md",
|
||||
"review_updated": "review_updated.md",
|
||||
"review_comment": "review_comment.md",
|
||||
"review_merged": "review_merged.md",
|
||||
}
|
||||
|
||||
# 模板缓存
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
## PR 已合并 ✅
|
||||
|
||||
**仓库**: {repo}
|
||||
**PR #{pr_number}**: {pr_title}
|
||||
**作者**: @{pr_author}
|
||||
**合并者**: @{merged_by}
|
||||
|
||||
PR 已成功合并到主分支。
|
||||
+152
-60
@@ -1,11 +1,10 @@
|
||||
"""单元测试:§24 v3 rotation-only compact 检测
|
||||
"""单元测试:§24 v4 trajectory prompt.submitted compact 检测
|
||||
|
||||
测试 _get_recent_gateway_logs 和 _check_compact_in_progress_gateway。
|
||||
用 tmp_path 构造 mock gateway 日志文件。
|
||||
测试 _check_compact_in_progress_trajectory 方法。
|
||||
用 tmp_path 构造 mock trajectory jsonl 文件。
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
@@ -16,77 +15,170 @@ from src.daemon.spawner import AgentSpawner
|
||||
|
||||
# ── helpers ──
|
||||
|
||||
_SESSION_KEY = "agent:pangtong-fujunshi:main"
|
||||
_TODAY_STR = datetime.now().strftime("%Y-%m-%d")
|
||||
_SESSION_ID = "sess-abc123"
|
||||
|
||||
|
||||
def _make_rotation_event(session_key: str, ts: datetime) -> dict:
|
||||
"""构造一条 rotation 日志事件"""
|
||||
return {
|
||||
"time": ts.isoformat(),
|
||||
"message": f"[compaction] rotated active transcript after compaction (sessionKey={session_key})",
|
||||
}
|
||||
def _make_trajectory_event(event_type: str, ts: str = None, **kwargs) -> dict:
|
||||
"""构造 trajectory jsonl 事件"""
|
||||
obj = {"type": event_type}
|
||||
if ts:
|
||||
obj["ts"] = ts
|
||||
obj.update(kwargs)
|
||||
return obj
|
||||
|
||||
|
||||
def _make_other_event(session_key: str, ts: datetime, msg: str = "something else") -> dict:
|
||||
"""构造一条普通日志事件"""
|
||||
return {
|
||||
"time": ts.isoformat(),
|
||||
"message": f"{msg} (sessionKey={session_key})",
|
||||
}
|
||||
def _write_trajectory(tmp_path: Path, session_id: str, turns: list[list[dict]]):
|
||||
"""写入 trajectory jsonl,按 turns 分组。
|
||||
|
||||
每个 turn 是一个 list of events。
|
||||
自动在每组前加 session.started(如果该 turn 没有的话)。
|
||||
"""
|
||||
traj_file = tmp_path / f"{session_id}.trajectory.jsonl"
|
||||
with open(traj_file, "w") as f:
|
||||
for turn_events in turns:
|
||||
# 如果 turn 第一个事件不是 session.started,自动加一个
|
||||
if not turn_events or turn_events[0].get("type") != "session.started":
|
||||
started = _make_trajectory_event(
|
||||
"session.started",
|
||||
ts=turn_events[0].get("ts") if turn_events else None,
|
||||
)
|
||||
f.write(json.dumps(started, ensure_ascii=False) + "\n")
|
||||
for evt in turn_events:
|
||||
f.write(json.dumps(evt, ensure_ascii=False) + "\n")
|
||||
|
||||
|
||||
def _write_log(tmp_path: Path, date_str: str, lines: list[dict]):
|
||||
"""写 mock 日志文件"""
|
||||
log_file = tmp_path / f"openclaw-{date_str}.log"
|
||||
with open(log_file, "w") as f:
|
||||
for obj in lines:
|
||||
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
||||
def _utc_now_str() -> str:
|
||||
"""返回当前 UTC 时间的 ISO 字符串(带 Z 后缀)"""
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.") + \
|
||||
f"{datetime.now(timezone.utc).microsecond // 1000:03d}Z"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _set_log_dir(tmp_path, monkeypatch):
|
||||
"""每个测试自动设置 OPENCLAW_LOG_DIR 到 tmp_path"""
|
||||
monkeypatch.setenv("OPENCLAW_LOG_DIR", str(tmp_path))
|
||||
def _utc_past_str(minutes_ago: int) -> str:
|
||||
"""返回过去 N 分钟的 UTC ISO 字符串"""
|
||||
ts = datetime.now(timezone.utc) - timedelta(minutes=minutes_ago)
|
||||
return ts.strftime("%Y-%m-%dT%H:%M:%S.") + \
|
||||
f"{ts.microsecond // 1000:03d}Z"
|
||||
|
||||
|
||||
# ── 测试用例 ──
|
||||
|
||||
|
||||
class TestCheckCompactInProgress:
|
||||
"""§24 v3: _check_compact_in_progress_gateway 单元测试"""
|
||||
class TestCheckCompactInProgressTrajectory:
|
||||
"""§24 v4: _check_compact_in_progress_trajectory 单元测试"""
|
||||
|
||||
def test_rotation_within_window_returns_true(self, tmp_path):
|
||||
"""TC1: rotation 事件在窗口内 → True"""
|
||||
now = datetime.now(timezone.utc)
|
||||
recent = now - timedelta(seconds=30)
|
||||
_write_log(tmp_path, _TODAY_STR, [_make_rotation_event(_SESSION_KEY, recent)])
|
||||
assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is True
|
||||
def test_tc1_normal_turn_with_submitted_returns_false(self, tmp_path):
|
||||
"""TC1: 正常 turn(有 prompt.submitted)→ False"""
|
||||
now = _utc_now_str()
|
||||
turns = [[
|
||||
_make_trajectory_event("session.started", ts=now),
|
||||
_make_trajectory_event("context.compiled", ts=now),
|
||||
_make_trajectory_event("prompt.submitted", ts=now),
|
||||
_make_trajectory_event("model.completed", ts=now),
|
||||
]]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, turns)
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_rotation_outside_window_returns_false(self, tmp_path):
|
||||
"""TC2: rotation 事件超出窗口 → False"""
|
||||
now = datetime.now(timezone.utc)
|
||||
old = now - timedelta(seconds=200)
|
||||
_write_log(tmp_path, _TODAY_STR, [_make_rotation_event(_SESSION_KEY, old)])
|
||||
assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False
|
||||
def test_tc2_compact_turn_returns_true(self, tmp_path):
|
||||
"""TC2: compact turn(无 prompt.submitted,有 context.compiled + model.completed)→ True"""
|
||||
now = _utc_now_str()
|
||||
turns = [[
|
||||
_make_trajectory_event("session.started", ts=now),
|
||||
_make_trajectory_event("context.compiled", ts=now),
|
||||
_make_trajectory_event("model.completed", ts=now),
|
||||
]]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, turns)
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True
|
||||
|
||||
def test_no_rotation_event_returns_false(self, tmp_path):
|
||||
"""TC3: 无 rotation 事件 → False"""
|
||||
now = datetime.now(timezone.utc)
|
||||
_write_log(tmp_path, _TODAY_STR, [
|
||||
_make_other_event(_SESSION_KEY, now, "model.completed"),
|
||||
])
|
||||
assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False
|
||||
def test_tc3_skipped_prompt_returns_false(self, tmp_path):
|
||||
"""TC3: 空白 prompt(有 prompt.skipped)→ False"""
|
||||
now = _utc_now_str()
|
||||
turns = [[
|
||||
_make_trajectory_event("session.started", ts=now),
|
||||
_make_trajectory_event("context.compiled", ts=now),
|
||||
_make_trajectory_event("prompt.skipped", ts=now),
|
||||
_make_trajectory_event("model.completed", ts=now),
|
||||
]]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, turns)
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_log_file_not_exists_returns_false(self, tmp_path):
|
||||
"""TC4: 日志文件不存在 → False"""
|
||||
# tmp_path 为空目录,无日志文件
|
||||
assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False
|
||||
def test_tc4_timeout_fallback_returns_false(self, tmp_path):
|
||||
"""TC4: 超过 30min 兜底 → False"""
|
||||
old = _utc_past_str(35)
|
||||
turns = [[
|
||||
_make_trajectory_event("session.started", ts=old),
|
||||
_make_trajectory_event("context.compiled", ts=old),
|
||||
_make_trajectory_event("model.completed", ts=old),
|
||||
]]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, turns)
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_session_key_mismatch_returns_false(self, tmp_path):
|
||||
"""TC5: sessionKey 不匹配 → False"""
|
||||
now = datetime.now(timezone.utc)
|
||||
recent = now - timedelta(seconds=10)
|
||||
other_key = "agent:simayi-challenger:main"
|
||||
_write_log(tmp_path, _TODAY_STR, [_make_rotation_event(other_key, recent)])
|
||||
assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False
|
||||
def test_tc5_trajectory_not_exists_returns_false(self, tmp_path):
|
||||
"""TC5: trajectory 不存在 → False"""
|
||||
session_file = str(tmp_path / "nonexistent-session")
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_tc6_empty_trajectory_returns_false(self, tmp_path):
|
||||
"""TC6: 空 trajectory → False"""
|
||||
traj_file = tmp_path / f"{_SESSION_ID}.trajectory.jsonl"
|
||||
traj_file.write_text("")
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_tc7_multi_turn_last_normal_returns_false(self, tmp_path):
|
||||
"""TC7: 多 turn 尾部只看最后一个(最后一个正常但之前有 compact)→ False"""
|
||||
old = _utc_past_str(10)
|
||||
now = _utc_now_str()
|
||||
turn1 = [
|
||||
_make_trajectory_event("session.started", ts=old),
|
||||
_make_trajectory_event("context.compiled", ts=old),
|
||||
_make_trajectory_event("model.completed", ts=old), # compact turn, no prompt
|
||||
]
|
||||
turn2 = [
|
||||
_make_trajectory_event("session.started", ts=now),
|
||||
_make_trajectory_event("prompt.submitted", ts=now),
|
||||
_make_trajectory_event("model.completed", ts=now), # normal turn
|
||||
]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2])
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False
|
||||
|
||||
def test_tc8_multi_turn_last_abnormal_returns_true(self, tmp_path):
|
||||
"""TC8: 多 turn 尾部最后一个非正常 → True"""
|
||||
old = _utc_past_str(5)
|
||||
now = _utc_now_str()
|
||||
turn1 = [
|
||||
_make_trajectory_event("session.started", ts=old),
|
||||
_make_trajectory_event("prompt.submitted", ts=old),
|
||||
_make_trajectory_event("model.completed", ts=old), # normal turn
|
||||
]
|
||||
turn2 = [
|
||||
_make_trajectory_event("session.started", ts=now),
|
||||
_make_trajectory_event("context.compiled", ts=now),
|
||||
_make_trajectory_event("model.completed", ts=now), # compact turn, no prompt
|
||||
]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2])
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True
|
||||
|
||||
def test_tc9_null_session_file_returns_false(self):
|
||||
"""TC9: session_file 为空字符串 → False"""
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory("") is False
|
||||
|
||||
def test_tc10_none_session_file_returns_false(self):
|
||||
"""TC10: session_file 为 None → False"""
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(None) is False
|
||||
|
||||
def test_tc11_events_without_ts_returns_true(self, tmp_path):
|
||||
"""TC11: 事件有 type 但无 ts 字段 → 无法判断超时 → True(skip)"""
|
||||
turns = [[
|
||||
_make_trajectory_event("session.started"), # 无 ts
|
||||
_make_trajectory_event("context.compiled"), # 无 ts
|
||||
_make_trajectory_event("model.completed"), # 无 ts
|
||||
]]
|
||||
_write_trajectory(tmp_path, _SESSION_ID, turns)
|
||||
session_file = str(tmp_path / _SESSION_ID)
|
||||
assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True
|
||||
|
||||
Reference in New Issue
Block a user