From 36ba629b693ff4f2714b9cf5d88c716448098143 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 13 Jun 2026 00:27:39 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix(spawner):=20compact=20=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=20v5=20=E2=80=94=20gateway=20log=20=E5=BC=80=E5=A7=8B?= =?UTF-8?q?=E6=A0=87=E8=AE=B0=20+=20jsonl=20=E7=BB=93=E6=9D=9F=E6=A0=87?= =?UTF-8?q?=E8=AE=B0=E9=85=8D=E5=AF=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 _find_compact_start_in_gateway_log: 检测 overflow/timeout/precheck 三种开始标记 - 新增 _check_compaction_finished_in_jsonl: 检测 jsonl compaction entry 作为结束标记 - 重写 _check_session_state compact 检测逻辑: 开始+结束配对 - 无开始标记 (threshold/manual) 不阻塞,靠 counter+lock+status 保护 - 超时兜底 15 分钟保留 - 旧方法标记 deprecated 保留 - 427 passed --- src/daemon/spawner.py | 183 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 173 insertions(+), 10 deletions(-) diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index aa121e8..12c0e96 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1317,7 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ paths.append(p) return paths - # deprecated: §24 v3, 保留供方案 B 备选 + # deprecated: §24 v3, 保留供方案 B 备选(旧 rotation 结束标记检测,已被 v5 取代) @staticmethod def _check_compact_in_progress_gateway( session_key: str, window_seconds: int = 120) -> bool: @@ -1381,6 +1381,159 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return False + # ─── v5: compact 开始标记检测(gateway log)+ 结束标记检测(jsonl) ─── + + @staticmethod + def _find_compact_start_in_gateway_log( + agent_id: str, window_seconds: int = 900) -> Optional[str]: + """v5: 检查 gateway 日志,找最近的 compact 开始标记。 + + 开始标记匹配规则(message 字段包含以下任一): + - "attempting auto-compaction" (overflow 路径) + - "[timeout-compaction]" 且包含 "attempting" (timeout 路径) + - "[context-overflow-precheck]" 且 "route=compact_then_truncate" (precheck 路径) + + 同时需要包含目标 agent 的 sessionKey(如 agent:simayi-challenger:main)。 + 超时兜底:开始标记超过 window_seconds 自动忽略。 + + 返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。 + """ + from datetime import datetime as _dt, timezone as _tz, timedelta + log_paths = AgentSpawner._get_recent_gateway_logs() + if not log_paths: + return None + + session_key = f"agent:{agent_id}:main" + now = _dt.now(_tz.utc) + window_start = now - timedelta(seconds=window_seconds) + + latest_start_time = None # type: Optional[_dt] + latest_start_str = None # type: Optional[str] + + for log_path in log_paths: + if not os.path.exists(log_path): + continue + try: + with open(log_path, "rb") as f: + f.seek(0, 2) + size = f.tell() + f.seek(max(0, size - 2 * 1024 * 1024)) + tail = f.read().decode("utf-8", errors="replace") + except Exception: + continue + + for line in tail.splitlines(): + if not line.strip(): + continue + try: + obj = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + + msg = obj.get("message", "") + if session_key not in msg: + continue + + # 检测三种开始标记 + is_start = False + # overflow: "attempting auto-compaction" + if "attempting auto-compaction" in msg: + is_start = True + # timeout: "[timeout-compaction]" + "attempting" + elif "[timeout-compaction]" in msg and "attempting" in msg: + is_start = True + # precheck: "[context-overflow-precheck]" + "route=compact_then_truncate" + elif ("[context-overflow-precheck]" in msg + and "route=compact_then_truncate" in msg): + is_start = True + + if not is_start: + continue + + # 解析时间 + ts_str = obj.get("time", "") + if not ts_str: + continue + try: + event_time = _dt.fromisoformat( + ts_str.replace("Z", "+00:00")) + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=_tz.utc) + else: + # 确保 UTC + event_time = event_time.astimezone(_tz.utc) + except (ValueError, TypeError): + continue + + # 超时兜底:超过窗口的忽略 + if event_time < window_start: + continue + + if latest_start_time is None or event_time > latest_start_time: + latest_start_time = event_time + latest_start_str = event_time.strftime( + "%Y-%m-%dT%H:%M:%S.") + f"{event_time.microsecond:06d}" + "Z" + + return latest_start_str + + @staticmethod + def _check_compaction_finished_in_jsonl( + session_file: str, after_time: str) -> bool: + """v5: 检查 jsonl 是否有 after_time 之后的 compaction entry。 + + 有 → compact 已完成 → True + 没有 → compact 可能仍在进行 → False + + after_time 格式:UTC ISO(如 2026-06-12T10:25:27.581Z)。 + jsonl timestamp 格式也是 UTC ISO。 + """ + if not session_file or not Path(session_file).exists(): + return False + try: + from datetime import datetime as _dt, timezone as _tz + after_dt = _dt.fromisoformat(after_time.replace("Z", "+00:00")) + if after_dt.tzinfo is None: + after_dt = after_dt.replace(tzinfo=_tz.utc) + + with open(session_file, "rb") as sf: + sf.seek(0, 2) + size = sf.tell() + sf.seek(max(0, size - 1048576)) + tail = sf.read().decode("utf-8", errors="replace") + + for line in reversed(tail.splitlines()): + if not line.strip(): + continue + try: + obj = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + if obj.get("type") == "compaction": + ts = obj.get("timestamp", "") + if ts: + try: + ct = _dt.fromisoformat(ts.replace("Z", "+00:00")) + if ct.tzinfo is None: + ct = ct.replace(tzinfo=_tz.utc) + if ct >= after_dt: + return True + except (ValueError, TypeError): + pass + # 遇到早于 after_time 的 entry → 不需要继续往前扫 + ts = obj.get("timestamp", "") + if ts: + try: + ct = _dt.fromisoformat(ts.replace("Z", "+00:00")) + if ct.tzinfo is None: + ct = ct.replace(tzinfo=_tz.utc) + if ct < after_dt: + break + except (ValueError, TypeError): + pass + return False + except Exception: + return False + @staticmethod def _check_recent_compaction_jsonl( session_file: str, window_seconds: int = 900) -> bool: @@ -1497,16 +1650,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: pass - # §24 v4: compact 检测优先用 trajectory prompt.submitted - # fallback: _check_recent_compaction_jsonl (v2.8.2) - # 重要:compact 进行中时 status=done,所以不能按 status 过滤 - # 只跳过 idle/unknown(完全没有活动过的 session) + # §24 v5: compact 检测 = gateway log 开始标记 + jsonl 结束标记配对 + # 旧方法 (_check_compact_in_progress_trajectory, _check_recent_compaction_jsonl) + # 保留为 deprecated 但不再调用。 + # + # 逻辑: + # 1. 查 gateway log 最近的 compact 开始标记(overflow/timeout/precheck) + # 2. 有开始标记 → 查 jsonl 是否有对应的 compaction entry(结束标记) + # 3. 有开始无结束 → 阻塞(recent_compact=True) + # 4. 有开始有结束 → 放行 + # 5. 无开始标记 → threshold/manual 静默触发,靠 counter+lock+status 保护 + # 6. 超时兜底:开始标记超过 15 分钟自动忽略 if result["status"] not in ("idle", "unknown", None) and sf: - result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory( - sf) - if not result["recent_compact"] and sf: - result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl( - sf) + compact_start = AgentSpawner._find_compact_start_in_gateway_log(agent_id) + if compact_start: + finished = AgentSpawner._check_compaction_finished_in_jsonl(sf, compact_start) + if not finished: + # 有开始标记且未完成 → 阻塞 + result["recent_compact"] = True + # 如果已完成 → recent_compact 保持 False(放行) + # 没有开始标记 → threshold/manual 静默触发,不阻塞 except Exception: pass return result From 6c6e884ce30060bac07d45254824b5c7f40012b5 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sat, 13 Jun 2026 07:01:54 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix(spawner):=20Review=20M1/M2=20=E2=80=94?= =?UTF-8?q?=20=E5=88=A0=E9=99=A4=20overflow/timeout=20=E6=AD=BB=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E5=8F=AA=E4=BF=9D=E7=95=99=20precheck?= =?UTF-8?q?=EF=BC=9B=E6=9B=B4=E6=96=B0=E8=AE=BE=E8=AE=A1=E6=96=87=E6=A1=A3?= =?UTF-8?q?=20v5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M1: overflow/timeout 标记不含 sessionKey,被前置过滤跳过是死代码。 precheck 总在 overflow 之前触发且含 sessionKey,已覆盖 overflow 场景。 删除 overflow/timeout 分支,只保留 precheck route=compact_then_truncate。 M2: §24 设计文档新增 v5 章节描述(方案概述、三种触发路径分析、为什么只依赖 precheck)。 --- docs/design/24-compact-detection-fix.md | 51 +++++++++++++++++++++++-- src/daemon/spawner.py | 35 ++++++++--------- 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/docs/design/24-compact-detection-fix.md b/docs/design/24-compact-detection-fix.md index 86ab53e..431febf 100644 --- a/docs/design/24-compact-detection-fix.md +++ b/docs/design/24-compact-detection-fix.md @@ -1,12 +1,54 @@ # §24 — Compact 检测方案修正 -> 状态:v4(trajectory prompt.submitted),待实施 +> 状态:**v5 已实现**(gateway log + jsonl 配对) > 作者:庞统 -> 日期:2026-06-11 +> 日期:2026-06-11(v4),2026-06-13(v5) > 框架:基于 §07 Spawner Acquire-First -> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted) +> 评审:仲达 4+2 轮评审 > 备选方案:B(内存 flag + sessions.json status),见 §2B +--- + +## 0. v5 方案(已实现) + +### 0.1 方案概述 + +**gateway log 开始标记(precheck `route=compact_then_truncate`)+ jsonl 结束标记(`type: "compaction"` entry)配对**。 + +- **开始标记**:扫描 gateway 日志,找含目标 agent sessionKey 且 `route=compact_then_truncate` 的 precheck 日志行,提取时间戳。 +- **结束标记**:扫描 session jsonl,找开始时间之后的 `type: "compaction"` entry。 +- **判定逻辑**:有开始无结束 → compact 进行中 → skip ticker;有开始有结束 → compact 已完成 → 不 skip。 +- **超时兜底**:开始标记超过 15 分钟仍未结束 → 自动忽略(防止死锁)。 + +### 0.2 三种 Compact 触发路径分析 + +Gateway 的 compact 有多种触发路径,日志表现不同: + +| 触发路径 | 有开始标记? | 有 sessionKey? | 有 compaction 结束标记? | 检测策略 | +|---------|------------|---------------|----------------------|--------| +| **overflow** | 有(`attempting auto-compaction`) | ❌ 不含 | 有 | 依赖 precheck 覆盖 | +| **timeout** | 有(`[timeout-compaction]` + `attempting`) | ❌ 推测不含 | 有 | 依赖 precheck 覆盖 | +| **precheck** | 有(`[context-overflow-precheck]` + `route=compact_then_truncate`) | ✅ 含 | 有 | **直接检测** | +| **threshold** | 无(静默执行) | — | 有 | counter+lock+status 保护 | +| **manual** | 无(静默执行) | — | 有 | counter+lock+status 保护 | + +### 0.3 为什么只依赖 precheck 标记 + +1. **overflow/timeout 标记不含 sessionKey**:实测证实 overflow 标记(`context overflow detected; attempting auto-compaction for zhipu/glm-5.1`)不包含 `agent:xxx:main` 格式的 sessionKey,被前置 `session_key not in msg` 过滤跳过,是死代码。 +2. **precheck 总在 overflow 之前触发**:同一 compact 事件中,precheck `route=compact_then_truncate` 先检测到,overflow 是 fallback。所以 precheck 已覆盖 overflow 场景。 +3. **threshold/manual 无开始标记**:这两种是静默执行,没有 gateway 日志标记。它们依赖 counter+lock+status 三重保护(见 §07),不需要 gateway 日志检测。 + +> **注意**:`route=truncate_tool_results_only` 的 precheck 不触发 compact 检测,只有 `route=compact_then_truncate` 才触发。 + +### 0.4 超时兜底 + +15 分钟超时窗口:如果 compact 开始标记超过 15 分钟仍无结束标记,自动忽略该开始标记。这覆盖了: +- daemon 重启后残留的开始标记 +- 极端长时间的 compact(正常 compact 通常 < 7 分钟) +- 日志轮转导致的结束标记丢失 + +--- + ## 1. 问题 ### 1.1 现象 @@ -257,3 +299,6 @@ trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile - **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only - **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖 - **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常" +- **v5**:gateway log(precheck 开始标记)+ jsonl(compaction 结束标记)配对 → 仲达评审通过后实现,PR #48 Review 驳回 M1/M2 修正后合并 + +> ⚠️ **v4 已 deprecated**。v4 的 trajectory prompt.submitted 方案未实施,最终实施的是 v5。v4 的分析(skipPromptSubmission 路径、实测数据)仍有参考价值。 diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 12c0e96..a83170f 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1388,13 +1388,18 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ agent_id: str, window_seconds: int = 900) -> Optional[str]: """v5: 检查 gateway 日志,找最近的 compact 开始标记。 - 开始标记匹配规则(message 字段包含以下任一): - - "attempting auto-compaction" (overflow 路径) - - "[timeout-compaction]" 且包含 "attempting" (timeout 路径) - - "[context-overflow-precheck]" 且 "route=compact_then_truncate" (precheck 路径) + 只检测 precheck 路径:message 含 "[context-overflow-precheck]" 且 + "route=compact_then_truncate"。原因: + - overflow 标记("attempting auto-compaction")不含 sessionKey, + 被 `session_key not in msg` 前置过滤跳过,是死代码。 + - timeout 标记推测同理不含 sessionKey。 + - precheck 标记含 sessionKey 且实测总在 overflow 之前触发(同一 compact + 事件,precheck 先检测到,overflow 是 fallback),所以 precheck 已覆盖 + overflow 场景。 + - threshold/manual 触发的 compact 无开始标记(静默执行),依赖 + counter+lock+status 保护,不需要 gateway 日志检测。 - 同时需要包含目标 agent 的 sessionKey(如 agent:simayi-challenger:main)。 - 超时兜底:开始标记超过 window_seconds 自动忽略。 + 超时兜底:开始标记超过 window_seconds(默认 15 分钟)自动忽略。 返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。 """ @@ -1434,20 +1439,10 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ if session_key not in msg: continue - # 检测三种开始标记 - is_start = False - # overflow: "attempting auto-compaction" - if "attempting auto-compaction" in msg: - is_start = True - # timeout: "[timeout-compaction]" + "attempting" - elif "[timeout-compaction]" in msg and "attempting" in msg: - is_start = True - # precheck: "[context-overflow-precheck]" + "route=compact_then_truncate" - elif ("[context-overflow-precheck]" in msg - and "route=compact_then_truncate" in msg): - is_start = True - - if not is_start: + # 只检测 precheck 路径:route=compact_then_truncate + # overflow/timeout 标记不含 sessionKey,被前置过滤跳过(死代码),已删除 + if ("[context-overflow-precheck]" not in msg + or "route=compact_then_truncate" not in msg): continue # 解析时间