From 3c2c0f317514aeccfeb5292fb0a033ffbb3bbee9 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 11 Jun 2026 23:57:09 +0800 Subject: [PATCH] =?UTF-8?q?fix(spawner):=20=C2=A724=20v4=20compact?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=20-=20trajectory=20prompt.submitted=20?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2=20gateway=20rotation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/design/24-compact-detection-fix.md | 292 ++++++++++++++---------- src/daemon/spawner.py | 118 +++++++++- tests/test_spawner_compact.py | 201 +++++++++++----- 3 files changed, 426 insertions(+), 185 deletions(-) diff --git a/docs/design/24-compact-detection-fix.md b/docs/design/24-compact-detection-fix.md index a27aa54..86ab53e 100644 --- a/docs/design/24-compact-detection-fix.md +++ b/docs/design/24-compact-detection-fix.md @@ -1,10 +1,11 @@ # §24 — Compact 检测方案修正 -> 状态:v3(rotation-only),待实施 +> 状态:v4(trajectory prompt.submitted),待实施 > 作者:庞统 > 日期:2026-06-11 > 框架:基于 §07 Spawner Acquire-First -> 评审:仲达 3 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only) +> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted) +> 备选方案:B(内存 flag + sessions.json status),见 §2B ## 1. 问题 @@ -18,150 +19,186 @@ 同时 Gateway 触发 compact 时先把 session 标为 `done`,所以 `status=running + lock_pid_alive` 检查也无效。14:02:11 实际状态:`status=done lock_pid_alive=False compact=False`——三个检查全部漏过。 -## 2. 方案:Rotation-Only 检测(v3) +## 2. 方案 A:Trajectory prompt.submitted 检测(v4,主选方案) -### 2.1 核心洞察(仲达 v2 评审) +### 2.1 方案演进 -v2 方案依赖 `[context-overflow-precheck]` route=compact 作为开始标志。但实测数据: +| 版本 | 方案 | 问题 | +|------|------|------| +| v1 | trajectory jsonl 间接推断 | trajectoryPath 不可用,需多文件 | +| v2 | gateway precheck 开始标志 | 覆盖率仅 30%,post-compact retry 无开始标志 | +| v3 | rotation-only + 120s 窗口 | 120s 覆盖不了多轮 compact loop(实测 pangtong 13:59→14:50 共 5 轮 rotation,总耗时 ~51 分钟,PR #36 已合并但无法覆盖) | +| **v4** | **trajectory prompt.submitted** | **源码+数据双重验证,仲达背靠背确认** | -| Agent | Rotation 事件 | 有 Precheck | 无 Precheck | -|-------|:---:|:---:|:---:| -| pangtong | 7 | 3 | 4 | -| simayi | 3 | 0 | 3 | +### 2.2 核心洞察 -**10 次 compact 只有 3 次有 precheck,覆盖率 30%。** 原因:post-compact retry 触发的后续 compact 不经过 precheck 日志路径。 +**源码证据**(`selection-But6hGR0.js` L14040-14085): -**结论**:开始标志不可靠。反转检测逻辑——只用可靠的 rotation 事件作为信号。 +```javascript +if (preemptiveCompaction?.shouldCompact) { + skipPromptSubmission = true; // ← compact 时跳过 prompt.submitted +} +if (!skipPromptSubmission) { + trajectoryRecorder?.recordEvent("prompt.submitted", { ... }); +} +``` -### 2.2 Rotation 事件 +当 context-overflow 触发 compact 时,Gateway 跳过 `prompt.submitted` 事件。 +正常 turn 一定有 `prompt.submitted`。 -Gateway 日志中 `[compaction] rotated active transcript after compaction (sessionKey=...)` 事件: -- **100% 覆盖率**:全天 10 次 compact 全部有 rotation 事件 -- **含 sessionKey**:可以精确匹配目标 session -- **JSON 格式**:易解析 +**仲达背靠背验证**:`skipPromptSubmission` 有 7 条路径(不只 compact),但仲达指出: +**检测目标不是"是否在 compact",而是"session 是否处于正常状态"。** +所有跳过 prompt.submitted 的场景(compact/timeout/hook block/session 结束) +都是不应该 spawn ticker 的状态,误判方向安全。 + +**实测数据**(仲达背靠背重新验证,2026-06-11): +- pangtong 39 个 turn:34 有 prompt.submitted(正常),5 无 + - 4 个 tool loop 子迭代(compactionCount=0, <1s, gateway 无 compact 事件) + - 1 个 context-overflow precheck 触发 compact +- simayi 24 个 turn:23 有,1 无(tool-result truncation succeeded) +- 合计 6/63 = ~9.5% 无 prompt.submitted,其中真正 compact 仅 1 例 +- **所有无 prompt.submitted 的场景都是不应 spawn ticker 的状态**,方向安全 ### 2.3 检测逻辑 ``` -1. 读 gateway 日志(当天 + 昨天尾部) -2. 按目标 sessionKey 过滤 compact 相关事件 -3. 从后往前找最后一条 rotation 事件: - a. 如果 rotation 事件在窗口内(< 120s)→ compact=True - (刚完成一轮 compact,可能还在 post-compact retry 循环中) - b. 无 rotation 事件或超出时间窗口 → compact=False - -**注意:此方案仅检查 rotation 事件,不检查 model.completed 等其他事件。** -这是有意为之的保守策略:不检查正常 turn 事件意味着 compact 完成后的 -120s 内都可能被误判为 compact 进行中,但误判代价低(仅 skip 一轮 ticker), -宁可多拦也不漏放。 +1. 构造 trajectory jsonl 路径:{sessionFile}.trajectory.jsonl +2. 读文件尾部,按 session.started 分组找最后一个完整 turn +3. 如果该 turn 有 prompt.submitted → 正常 turn → 不 skip +4. 如果该 turn 有 prompt.skipped → 空白 prompt → 不 skip +5. 如果两者都无 → 非正常状态 → skip ticker +6. 超过 30min 没有新事件 → 兜底放行 ``` -**为什么 rotation + 时间窗口就够了?** -- compact 后 Gateway 会 retry prompt -- 如果 retry 又触发 overflow → 又一轮 compact → 又一个 rotation 事件 -- 如果 retry 成功 → 正常 turn → 新的 session.started / model.completed 事件 -- 所以「最近一个事件是 rotation 且时间很近」= compact 循环还在进行 +**为什么不需要 gateway 日志?** +- trajectory jsonl 已经包含了完整的 turn 生命周期 +- prompt.submitted 是 turn 级别的标志,不需要匹配开始/结束 +- 不需要维护跨 tick 的内存状态 -### 2.4 时间窗口选择 +### 2.4 为什么不用 session jsonl 的 `type: "compaction"` 事件? -compact 通常耗时 1-10 分钟。post-compact retry 如果又触发 compact,间隔通常 <60 秒。 +每轮 compact 结束,session jsonl 确实会写入 `type: "compaction"` 摘要事件。 +但 compact 后 Gateway 会 rotate transcript(创建新 session file), +compaction 事件写在**旧 session jsonl** 里(变成 .reset 文件), +当前 main session 指向的 jsonl 中没有这些事件。 -- **窗口太短(如 30s)**:可能漏掉 compact 结束后正在 retry 但还没触发下一轮的场景 -- **窗口太长(如 900s)**:compact 完成后正常工作很久了还误判 -- **推荐 120s**:compact 循环中两次 rotation 间隔通常 <60s,120s 有足够余量 +这就是现有 `_check_recent_compaction_jsonl` 检测不到的根本原因。 -误判代价低(skip 一轮 ticker),所以宁可多拦也不漏放。 +## 2B. 备选方案 B:内存 flag + sessions.json status -## 3. 改动范围 +如果方案 A 在实际使用中不够,可补充方案 B。 + +``` +1. gateway 日志发现 rotation 或 precheck → 设置内存 flag: compacting=True +2. 每个 ticker 检查: + - flag=True + sessions.json status=running → 清 flag(compact 结束) + - flag=True + 超过 30min → 清 flag(兜底放行) + - flag=True → skip ticker +3. daemon 重启会丢失 flag(可接受,重启后状态已刷新) +``` + +**优点**:精确检测 compact 结束(status 恢复 running) +**缺点**:需要维护内存状态、依赖两个数据源、daemon 重启丢失状态 +**触发条件**:仅在方案 A 实际运行中发现不足时实施 + +## 3. 改动范围(方案 A) | 文件 | 改动 | 行数估计 | |------|------|---------| -| `spawner.py` | 新增 `_check_compact_in_progress_gateway()` | ~40 行 | +| `spawner.py` | 新增 `_check_compact_in_progress_trajectory()` | ~50 行 | | `spawner.py` | `_check_session_state()` 调用新方法,替换旧方法 | ~5 行 | -| `spawner.py` | 日志路径配置化 | ~5 行 | -| `docs/design/07-spawner-acquire-first.md` | §4.5 O5 更新 | ~10 行 | -| `docs/design/24-compact-detection-fix.md` | 本文档 | 已有 | +| `tests/test_spawner_compact.py` | 更新单元测试 | ~30 行 | -**总计 ~60 行代码改动。** +**总计 ~85 行代码改动。** -## 4. 实现细节 +## 4. 实现细节(方案 A) ### 4.1 核心方法 ```python -def _check_compact_in_progress_gateway(self, session_key: str, window_seconds: int = 120) -> bool: - """检查 gateway 日志,判断指定 session 是否刚完成 compact(可能在 retry 循环中)。 +def _check_compact_in_progress_trajectory(self, session_file: str, timeout_minutes: int = 30) -> bool: + """检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。 - 检测逻辑:如果目标 session 最近一个事件是 rotation 且在窗口内,视为 compact 进行中。 + 检测逻辑:最后一个完整 turn 没有 prompt.submitted → 非正常状态 → skip ticker。 + 覆盖:compact、timeout、hook block、session 结束等所有非正常状态。 """ - log_paths = self._get_recent_gateway_logs() - if not log_paths: + traj_path = f"{session_file}.trajectory.jsonl" + if not os.path.exists(traj_path): return False - now = datetime.now(timezone.utc) - window_start = now - timedelta(seconds=window_seconds) + # 读尾部 500KB + with open(traj_path, 'rb') as f: + f.seek(0, 2) + size = f.tell() + f.seek(max(0, size - 500 * 1024)) + tail_lines = f.readlines() - last_rotation_time = None - - for log_path in log_paths: - if not os.path.exists(log_path): + # 按 session.started 分组,找最后一个完整 turn + last_turn_events = [] + current_turn = [] + for raw_line in tail_lines: + try: + obj = json.loads(raw_line) + except (json.JSONDecodeError, ValueError): continue - with open(log_path, 'rb') as f: - # 读尾部 2MB - f.seek(0, 2) - size = f.tell() - f.seek(max(0, size - 2 * 1024 * 1024)) - - for raw_line in f: - try: - obj = json.loads(raw_line) - except (json.JSONDecodeError, ValueError): - continue - - msg = obj.get("message", "") - ts_str = obj.get("time", "") - - # 只看包含目标 sessionKey 的事件 - if session_key not in msg: - continue - - # rotation 事件 - if "[compaction] rotated active transcript" in msg: - try: - event_time = datetime.fromisoformat(ts_str) - if last_rotation_time is None or event_time > last_rotation_time: - last_rotation_time = event_time - except (ValueError, TypeError): - continue + event_type = obj.get("type", "") + + if event_type == "session.started": + if current_turn: + last_turn_events = current_turn + current_turn = [obj] + else: + current_turn.append(obj) - if last_rotation_time is not None: - return last_rotation_time >= window_start + if current_turn: + last_turn_events = current_turn - return False + if not last_turn_events: + return False + + # 30min 兜底:最后一个事件超过 30min → 放行 + last_ts = None + for evt in reversed(last_turn_events): + ts = evt.get("ts") or evt.get("timestamp") + if ts: + last_ts = ts + break + + if last_ts: + try: + from datetime import datetime, timezone + # trajectory 时间是 ISO UTC + if last_ts.endswith('Z'): + last_dt = datetime.fromisoformat(last_ts.replace('Z', '+00:00')) + else: + last_dt = datetime.fromisoformat(last_ts) + age = datetime.now(timezone.utc) - last_dt + if age.total_seconds() > timeout_minutes * 60: + return False # 超时放行 + except (ValueError, TypeError): + pass + + # 检查最后一个 turn 是否有 prompt.submitted + has_prompt_submitted = any( + evt.get("type") == "prompt.submitted" for evt in last_turn_events + ) + has_prompt_skipped = any( + evt.get("type") == "prompt.skipped" for evt in last_turn_events + ) + + if has_prompt_submitted or has_prompt_skipped: + return False # 正常 turn + + # 既无 submitted 也无 skipped → 非正常状态 → skip + return True ``` -### 4.2 日志路径 +### 4.2 Phase 2 集成 ```python -def _get_recent_gateway_logs(self) -> list: - """获取当天和昨天的 gateway 日志路径""" - log_dir = os.environ.get("OPENCLAW_LOG_DIR", "/tmp/openclaw") - today = datetime.now().strftime("%Y-%m-%d") - yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") - paths = [] - for d in [today, yesterday]: - p = os.path.join(log_dir, f"openclaw-{d}.log") - if os.path.exists(p): - paths.append(p) - return paths -``` - -### 4.3 Phase 2 集成 - -```python -# 在 _check_session_state 中,不依赖 status,直接检查 -compact = self._check_compact_in_progress_gateway(session_key) +# 在 _check_session_state 中替换旧方法 +compact = self._check_compact_in_progress_trajectory(session_file) if not compact: compact = self._check_recent_compaction_jsonl(...) # fallback @@ -169,25 +206,41 @@ if compact: blockers.append(("session_compacting", None)) ``` +### 4.3 trajectory 路径构造 + +trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile 来自 sessions.json。 + +实测验证: +- `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.jsonl` +- → trajectory: `~/.openclaw/agents/pangtong-fujunshi/sessions/745b35bb-...-e8e8988d.trajectory.jsonl` + ## 5. 边界情况 -| 边界情况 | 处理 | -|---------|------| -| 日志文件不存在 | 返回 False(fallback 到旧方法) | -| 跨天 compact | 同时检查昨天日志尾部 | -| compact 失败(无 rotation) | rotation 事件不会出现 → 检测不到 → 回退到旧方法 | -| 误判(compact 完成后正常工作中) | 时间窗口 120s 内可能被误判,但代价低(skip 一轮 ticker)。不检查正常 turn 事件,是保守策略 | +| 边界情况 | 处理 | 误判方向 | +|---------|------|----------| +| trajectory 不存在 | 返回 False(fallback) | 安全 | +| tool loop 子迭代 | 无 prompt.submitted → skip | 保守但安全(~8%) | +| timeout turn | 无 prompt.submitted → skip | 安全(timeout 也不该 spawn) | +| hook block | 无 prompt.submitted → skip | 安全 | +| truncation 成功 | 无 prompt.submitted → skip | 安全(后面会 retry) | +| session 结束空 turn | 无 prompt.submitted → skip | 安全 | +| 空白 prompt | 有 prompt.skipped → 不 skip | 正确区分 | +| 30min 无新事件 | 兜底放行 | 防死锁 | +| compact 后 transcript rotate | 读当前 sessionFile 对应的 trajectory | 路径正确 | +| budget compact | 有 prompt.submitted → 不 skip | 正确(budget compact 不阻止 spawn) | ## 6. 测试验证 -### 6.1 单元测试 +### 6.1 单元测试(更新 test_spawner_compact.py) -- `_check_compact_in_progress_gateway`: - - rotation 事件在窗口内 → True - - rotation 事件超出窗口 → False - - 无 rotation 事件 → False - - 日志不存在 → False - - sessionKey 不匹配 → False +- `_check_compact_in_progress_trajectory`: + - 正常 turn(有 prompt.submitted)→ False + - compact turn(无 prompt.submitted)→ True + - 空白 prompt(有 prompt.skipped)→ False + - 超过 30min 兜底 → False + - trajectory 不存在 → False + - 空 trajectory → False + - 多 turn 尾部只看最后一个 → 正确 ### 6.2 集成验证 @@ -202,4 +255,5 @@ if compact: - **v1**:trajectory jsonl 间接推断 → 仲达指出 trajectoryPath 不可用、需多文件等 3 个问题 - **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only -- **v3**:rotation-only(当前版本)→ 仲达已确认方向,待代码实现后再审 +- **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖 +- **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常" diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index c8cd170..bd117eb 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1297,6 +1297,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ logger.exception("Failed to revive %s", agent_id) return False + @staticmethod + # deprecated: §24 v3, 保留供方案 B 备选 @staticmethod def _get_recent_gateway_logs() -> list: """获取当天和昨天的 gateway 日志路径。 @@ -1316,6 +1318,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ paths.append(p) return paths + @staticmethod + # deprecated: §24 v3, 保留供方案 B 备选 @staticmethod def _check_compact_in_progress_gateway( session_key: str, window_seconds: int = 120) -> bool: @@ -1495,14 +1499,13 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: pass - # §24 v3: compact 检测优先用 gateway 日志 rotation 事件 - # 旧方法 _check_recent_compaction_jsonl 作为 fallback + # §24 v4: compact 检测优先用 trajectory prompt.submitted + # fallback: _check_recent_compaction_jsonl (v2.8.2) # 重要:compact 进行中时 status=done,所以不能按 status 过滤 # 只跳过 idle/unknown(完全没有活动过的 session) - if result["status"] not in ("idle", "unknown", None): - session_key = f"agent:{agent_id}:main" - result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway( - session_key) + if result["status"] not in ("idle", "unknown", None) and sf: + result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory( + sf) if not result["recent_compact"] and sf: result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl( sf) @@ -1510,6 +1513,109 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ pass return result + @staticmethod + def _check_compact_in_progress_trajectory( + session_file: str, timeout_minutes: int = 30) -> bool: + """§24 v4: 检查 trajectory jsonl 尾部,判断 session 是否处于非正常状态。 + + 检测逻辑:最后一个完整 turn 没有 prompt.submitted/skipped → 非正常 → skip。 + 覆盖:compact、timeout、hook block、session 结束等所有非正常状态。 + + Returns: + True = 非正常状态(skip ticker) + False = 正常(不 skip)或超时兜底放行 + """ + if not session_file: + return False + traj_path = f"{session_file}.trajectory.jsonl" + if not os.path.exists(traj_path): + return False + + try: + from datetime import datetime as _dt, timezone as _tz, timedelta + + # 读尾部 500KB + with open(traj_path, "rb") as f: + f.seek(0, 2) + size = f.tell() + f.seek(max(0, size - 500 * 1024)) + tail = f.read().decode("utf-8", errors="replace") + + if not tail.strip(): + return False + + # 解析所有有效行 + events = [] + for line in tail.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + events.append(obj) + except (json.JSONDecodeError, ValueError): + continue + + if not events: + return False + + # 按 session.started 分组找 turn + # 每个 turn 以 session.started 开始 + turns = [] + current_turn = [] + for evt in events: + if evt.get("type") == "session.started": + if current_turn: + turns.append(current_turn) + current_turn = [evt] + else: + current_turn.append(evt) + if current_turn: + turns.append(current_turn) + + if not turns: + return False + + # 检查最后一个完整 turn(包含 session.started) + last_turn = turns[-1] + turn_types = {evt.get("type") for evt in last_turn} + + # 有 prompt.submitted 或 prompt.skipped → 正常 turn + if "prompt.submitted" in turn_types or "prompt.skipped" in turn_types: + return False + + # 非正常状态 → 检查超时兜底 + # 找最后一个有 ts 的事件 + last_ts = None + for evt in reversed(events): + ts_str = evt.get("ts") + if ts_str: + try: + last_ts = _dt.fromisoformat( + ts_str.replace("Z", "+00:00")) + if last_ts.tzinfo is None: + last_ts = last_ts.replace(tzinfo=_tz.utc) + except (ValueError, TypeError): + continue + break + + if last_ts is None: + # 没有 ts 信息,无法判断超时 → 非正常 → skip + return True + + now = _dt.now(_tz.utc) + elapsed = (now - last_ts).total_seconds() + if elapsed > timeout_minutes * 60: + logger.debug("Trajectory last event %.0fs ago > %dm, fallback pass", + elapsed, timeout_minutes) + return False # 兜底放行 + + return True # 非正常状态且未超时 + + except Exception as e: + logger.debug("_check_compact_in_progress_trajectory error: %s", e) + return False + @staticmethod def _classify_outcome(exit_code: int, json_result: dict, stderr_text: str, task_status: Optional[str], stdout_text: str = "") -> dict: diff --git a/tests/test_spawner_compact.py b/tests/test_spawner_compact.py index dec7dad..28b38aa 100644 --- a/tests/test_spawner_compact.py +++ b/tests/test_spawner_compact.py @@ -1,11 +1,10 @@ -"""单元测试:§24 v3 rotation-only compact 检测 +"""单元测试:§24 v4 trajectory prompt.submitted compact 检测 -测试 _get_recent_gateway_logs 和 _check_compact_in_progress_gateway。 -用 tmp_path 构造 mock gateway 日志文件。 +测试 _check_compact_in_progress_trajectory 方法。 +用 tmp_path 构造 mock trajectory jsonl 文件。 """ import json -import os from datetime import datetime, timedelta, timezone from pathlib import Path @@ -16,77 +15,159 @@ from src.daemon.spawner import AgentSpawner # ── helpers ── -_SESSION_KEY = "agent:pangtong-fujunshi:main" -_TODAY_STR = datetime.now().strftime("%Y-%m-%d") +_SESSION_ID = "sess-abc123" -def _make_rotation_event(session_key: str, ts: datetime) -> dict: - """构造一条 rotation 日志事件""" - return { - "time": ts.isoformat(), - "message": f"[compaction] rotated active transcript after compaction (sessionKey={session_key})", - } +def _make_trajectory_event(event_type: str, ts: str = None, **kwargs) -> dict: + """构造 trajectory jsonl 事件""" + obj = {"type": event_type} + if ts: + obj["ts"] = ts + obj.update(kwargs) + return obj -def _make_other_event(session_key: str, ts: datetime, msg: str = "something else") -> dict: - """构造一条普通日志事件""" - return { - "time": ts.isoformat(), - "message": f"{msg} (sessionKey={session_key})", - } +def _write_trajectory(tmp_path: Path, session_id: str, turns: list[list[dict]]): + """写入 trajectory jsonl,按 turns 分组。 + + 每个 turn 是一个 list of events。 + 自动在每组前加 session.started(如果该 turn 没有的话)。 + """ + traj_file = tmp_path / f"{session_id}.trajectory.jsonl" + with open(traj_file, "w") as f: + for turn_events in turns: + # 如果 turn 第一个事件不是 session.started,自动加一个 + if not turn_events or turn_events[0].get("type") != "session.started": + started = _make_trajectory_event( + "session.started", + ts=turn_events[0].get("ts") if turn_events else None, + ) + f.write(json.dumps(started, ensure_ascii=False) + "\n") + for evt in turn_events: + f.write(json.dumps(evt, ensure_ascii=False) + "\n") -def _write_log(tmp_path: Path, date_str: str, lines: list[dict]): - """写 mock 日志文件""" - log_file = tmp_path / f"openclaw-{date_str}.log" - with open(log_file, "w") as f: - for obj in lines: - f.write(json.dumps(obj, ensure_ascii=False) + "\n") +def _utc_now_str() -> str: + """返回当前 UTC 时间的 ISO 字符串(带 Z 后缀)""" + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.") + \ + f"{datetime.now(timezone.utc).microsecond // 1000:03d}Z" -@pytest.fixture(autouse=True) -def _set_log_dir(tmp_path, monkeypatch): - """每个测试自动设置 OPENCLAW_LOG_DIR 到 tmp_path""" - monkeypatch.setenv("OPENCLAW_LOG_DIR", str(tmp_path)) +def _utc_past_str(minutes_ago: int) -> str: + """返回过去 N 分钟的 UTC ISO 字符串""" + ts = datetime.now(timezone.utc) - timedelta(minutes=minutes_ago) + return ts.strftime("%Y-%m-%dT%H:%M:%S.") + \ + f"{ts.microsecond // 1000:03d}Z" # ── 测试用例 ── -class TestCheckCompactInProgress: - """§24 v3: _check_compact_in_progress_gateway 单元测试""" +class TestCheckCompactInProgressTrajectory: + """§24 v4: _check_compact_in_progress_trajectory 单元测试""" - def test_rotation_within_window_returns_true(self, tmp_path): - """TC1: rotation 事件在窗口内 → True""" - now = datetime.now(timezone.utc) - recent = now - timedelta(seconds=30) - _write_log(tmp_path, _TODAY_STR, [_make_rotation_event(_SESSION_KEY, recent)]) - assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is True + def test_tc1_normal_turn_with_submitted_returns_false(self, tmp_path): + """TC1: 正常 turn(有 prompt.submitted)→ False""" + now = _utc_now_str() + turns = [[ + _make_trajectory_event("session.started", ts=now), + _make_trajectory_event("context.compiled", ts=now), + _make_trajectory_event("prompt.submitted", ts=now), + _make_trajectory_event("model.completed", ts=now), + ]] + _write_trajectory(tmp_path, _SESSION_ID, turns) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False - def test_rotation_outside_window_returns_false(self, tmp_path): - """TC2: rotation 事件超出窗口 → False""" - now = datetime.now(timezone.utc) - old = now - timedelta(seconds=200) - _write_log(tmp_path, _TODAY_STR, [_make_rotation_event(_SESSION_KEY, old)]) - assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False + def test_tc2_compact_turn_returns_true(self, tmp_path): + """TC2: compact turn(无 prompt.submitted,有 context.compiled + model.completed)→ True""" + now = _utc_now_str() + turns = [[ + _make_trajectory_event("session.started", ts=now), + _make_trajectory_event("context.compiled", ts=now), + _make_trajectory_event("model.completed", ts=now), + ]] + _write_trajectory(tmp_path, _SESSION_ID, turns) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True - def test_no_rotation_event_returns_false(self, tmp_path): - """TC3: 无 rotation 事件 → False""" - now = datetime.now(timezone.utc) - _write_log(tmp_path, _TODAY_STR, [ - _make_other_event(_SESSION_KEY, now, "model.completed"), - ]) - assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False + def test_tc3_skipped_prompt_returns_false(self, tmp_path): + """TC3: 空白 prompt(有 prompt.skipped)→ False""" + now = _utc_now_str() + turns = [[ + _make_trajectory_event("session.started", ts=now), + _make_trajectory_event("context.compiled", ts=now), + _make_trajectory_event("prompt.skipped", ts=now), + _make_trajectory_event("model.completed", ts=now), + ]] + _write_trajectory(tmp_path, _SESSION_ID, turns) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False - def test_log_file_not_exists_returns_false(self, tmp_path): - """TC4: 日志文件不存在 → False""" - # tmp_path 为空目录,无日志文件 - assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False + def test_tc4_timeout_fallback_returns_false(self, tmp_path): + """TC4: 超过 30min 兜底 → False""" + old = _utc_past_str(35) + turns = [[ + _make_trajectory_event("session.started", ts=old), + _make_trajectory_event("context.compiled", ts=old), + _make_trajectory_event("model.completed", ts=old), + ]] + _write_trajectory(tmp_path, _SESSION_ID, turns) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False - def test_session_key_mismatch_returns_false(self, tmp_path): - """TC5: sessionKey 不匹配 → False""" - now = datetime.now(timezone.utc) - recent = now - timedelta(seconds=10) - other_key = "agent:simayi-challenger:main" - _write_log(tmp_path, _TODAY_STR, [_make_rotation_event(other_key, recent)]) - assert AgentSpawner._check_compact_in_progress_gateway(_SESSION_KEY) is False + def test_tc5_trajectory_not_exists_returns_false(self, tmp_path): + """TC5: trajectory 不存在 → False""" + session_file = str(tmp_path / "nonexistent-session") + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False + + def test_tc6_empty_trajectory_returns_false(self, tmp_path): + """TC6: 空 trajectory → False""" + traj_file = tmp_path / f"{_SESSION_ID}.trajectory.jsonl" + traj_file.write_text("") + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False + + def test_tc7_multi_turn_last_normal_returns_false(self, tmp_path): + """TC7: 多 turn 尾部只看最后一个(最后一个正常但之前有 compact)→ False""" + old = _utc_past_str(10) + now = _utc_now_str() + turn1 = [ + _make_trajectory_event("session.started", ts=old), + _make_trajectory_event("context.compiled", ts=old), + _make_trajectory_event("model.completed", ts=old), # compact turn, no prompt + ] + turn2 = [ + _make_trajectory_event("session.started", ts=now), + _make_trajectory_event("prompt.submitted", ts=now), + _make_trajectory_event("model.completed", ts=now), # normal turn + ] + _write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2]) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is False + + def test_tc8_multi_turn_last_abnormal_returns_true(self, tmp_path): + """TC8: 多 turn 尾部最后一个非正常 → True""" + old = _utc_past_str(5) + now = _utc_now_str() + turn1 = [ + _make_trajectory_event("session.started", ts=old), + _make_trajectory_event("prompt.submitted", ts=old), + _make_trajectory_event("model.completed", ts=old), # normal turn + ] + turn2 = [ + _make_trajectory_event("session.started", ts=now), + _make_trajectory_event("context.compiled", ts=now), + _make_trajectory_event("model.completed", ts=now), # compact turn, no prompt + ] + _write_trajectory(tmp_path, _SESSION_ID, [turn1, turn2]) + session_file = str(tmp_path / _SESSION_ID) + assert AgentSpawner._check_compact_in_progress_trajectory(session_file) is True + + def test_tc9_null_session_file_returns_false(self): + """TC9: session_file 为空字符串 → False""" + assert AgentSpawner._check_compact_in_progress_trajectory("") is False + + def test_tc10_none_session_file_returns_false(self): + """TC10: session_file 为 None → False""" + assert AgentSpawner._check_compact_in_progress_trajectory(None) is False