fix(spawner): compact 检测 v5 — gateway log 开始标记 + jsonl 结束标记配对 #48
@@ -1,12 +1,54 @@
|
|||||||
# §24 — Compact 检测方案修正
|
# §24 — Compact 检测方案修正
|
||||||
|
|
||||||
> 状态:v4(trajectory prompt.submitted),待实施
|
> 状态:**v5 已实现**(gateway log + jsonl 配对)
|
||||||
> 作者:庞统
|
> 作者:庞统
|
||||||
> 日期:2026-06-11
|
> 日期:2026-06-11(v4),2026-06-13(v5)
|
||||||
> 框架:基于 §07 Spawner Acquire-First
|
> 框架:基于 §07 Spawner Acquire-First
|
||||||
> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted)
|
> 评审:仲达 4+2 轮评审
|
||||||
> 备选方案:B(内存 flag + sessions.json status),见 §2B
|
> 备选方案:B(内存 flag + sessions.json status),见 §2B
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 0. v5 方案(已实现)
|
||||||
|
|
||||||
|
### 0.1 方案概述
|
||||||
|
|
||||||
|
**gateway log 开始标记(precheck `route=compact_then_truncate`)+ jsonl 结束标记(`type: "compaction"` entry)配对**。
|
||||||
|
|
||||||
|
- **开始标记**:扫描 gateway 日志,找含目标 agent sessionKey 且 `route=compact_then_truncate` 的 precheck 日志行,提取时间戳。
|
||||||
|
- **结束标记**:扫描 session jsonl,找开始时间之后的 `type: "compaction"` entry。
|
||||||
|
- **判定逻辑**:有开始无结束 → compact 进行中 → skip ticker;有开始有结束 → compact 已完成 → 不 skip。
|
||||||
|
- **超时兜底**:开始标记超过 15 分钟仍未结束 → 自动忽略(防止死锁)。
|
||||||
|
|
||||||
|
### 0.2 三种 Compact 触发路径分析
|
||||||
|
|
||||||
|
Gateway 的 compact 有多种触发路径,日志表现不同:
|
||||||
|
|
||||||
|
| 触发路径 | 有开始标记? | 有 sessionKey? | 有 compaction 结束标记? | 检测策略 |
|
||||||
|
|---------|------------|---------------|----------------------|--------|
|
||||||
|
| **overflow** | 有(`attempting auto-compaction`) | ❌ 不含 | 有 | 依赖 precheck 覆盖 |
|
||||||
|
| **timeout** | 有(`[timeout-compaction]` + `attempting`) | ❌ 推测不含 | 有 | 依赖 precheck 覆盖 |
|
||||||
|
| **precheck** | 有(`[context-overflow-precheck]` + `route=compact_then_truncate`) | ✅ 含 | 有 | **直接检测** |
|
||||||
|
| **threshold** | 无(静默执行) | — | 有 | counter+lock+status 保护 |
|
||||||
|
| **manual** | 无(静默执行) | — | 有 | counter+lock+status 保护 |
|
||||||
|
|
||||||
|
### 0.3 为什么只依赖 precheck 标记
|
||||||
|
|
||||||
|
1. **overflow/timeout 标记不含 sessionKey**:实测证实 overflow 标记(`context overflow detected; attempting auto-compaction for zhipu/glm-5.1`)不包含 `agent:xxx:main` 格式的 sessionKey,被前置 `session_key not in msg` 过滤跳过,是死代码。
|
||||||
|
2. **precheck 总在 overflow 之前触发**:同一 compact 事件中,precheck `route=compact_then_truncate` 先检测到,overflow 是 fallback。所以 precheck 已覆盖 overflow 场景。
|
||||||
|
3. **threshold/manual 无开始标记**:这两种是静默执行,没有 gateway 日志标记。它们依赖 counter+lock+status 三重保护(见 §07),不需要 gateway 日志检测。
|
||||||
|
|
||||||
|
> **注意**:`route=truncate_tool_results_only` 的 precheck 不触发 compact 检测,只有 `route=compact_then_truncate` 才触发。
|
||||||
|
|
||||||
|
### 0.4 超时兜底
|
||||||
|
|
||||||
|
15 分钟超时窗口:如果 compact 开始标记超过 15 分钟仍无结束标记,自动忽略该开始标记。这覆盖了:
|
||||||
|
- daemon 重启后残留的开始标记
|
||||||
|
- 极端长时间的 compact(正常 compact 通常 < 7 分钟)
|
||||||
|
- 日志轮转导致的结束标记丢失
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## 1. 问题
|
## 1. 问题
|
||||||
|
|
||||||
### 1.1 现象
|
### 1.1 现象
|
||||||
@@ -257,3 +299,6 @@ trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile
|
|||||||
- **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only
|
- **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only
|
||||||
- **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖
|
- **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖
|
||||||
- **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常"
|
- **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常"
|
||||||
|
- **v5**:gateway log(precheck 开始标记)+ jsonl(compaction 结束标记)配对 → 仲达评审通过后实现,PR #48 Review 驳回 M1/M2 修正后合并
|
||||||
|
|
||||||
|
> ⚠️ **v4 已 deprecated**。v4 的 trajectory prompt.submitted 方案未实施,最终实施的是 v5。v4 的分析(skipPromptSubmission 路径、实测数据)仍有参考价值。
|
||||||
|
|||||||
+168
-10
@@ -1317,7 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
|||||||
paths.append(p)
|
paths.append(p)
|
||||||
return paths
|
return paths
|
||||||
|
|
||||||
# deprecated: §24 v3, 保留供方案 B 备选
|
# deprecated: §24 v3, 保留供方案 B 备选(旧 rotation 结束标记检测,已被 v5 取代)
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _check_compact_in_progress_gateway(
|
def _check_compact_in_progress_gateway(
|
||||||
session_key: str, window_seconds: int = 120) -> bool:
|
session_key: str, window_seconds: int = 120) -> bool:
|
||||||
@@ -1381,6 +1381,154 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# ─── v5: compact 开始标记检测(gateway log)+ 结束标记检测(jsonl) ───
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _find_compact_start_in_gateway_log(
|
||||||
|
agent_id: str, window_seconds: int = 900) -> Optional[str]:
|
||||||
|
"""v5: 检查 gateway 日志,找最近的 compact 开始标记。
|
||||||
|
|
||||||
|
只检测 precheck 路径:message 含 "[context-overflow-precheck]" 且
|
||||||
|
"route=compact_then_truncate"。原因:
|
||||||
|
- overflow 标记("attempting auto-compaction")不含 sessionKey,
|
||||||
|
被 `session_key not in msg` 前置过滤跳过,是死代码。
|
||||||
|
- timeout 标记推测同理不含 sessionKey。
|
||||||
|
- precheck 标记含 sessionKey 且实测总在 overflow 之前触发(同一 compact
|
||||||
|
事件,precheck 先检测到,overflow 是 fallback),所以 precheck 已覆盖
|
||||||
|
overflow 场景。
|
||||||
|
- threshold/manual 触发的 compact 无开始标记(静默执行),依赖
|
||||||
|
counter+lock+status 保护,不需要 gateway 日志检测。
|
||||||
|
|
||||||
|
超时兜底:开始标记超过 window_seconds(默认 15 分钟)自动忽略。
|
||||||
|
|
||||||
|
返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。
|
||||||
|
"""
|
||||||
|
from datetime import datetime as _dt, timezone as _tz, timedelta
|
||||||
|
log_paths = AgentSpawner._get_recent_gateway_logs()
|
||||||
|
if not log_paths:
|
||||||
|
return None
|
||||||
|
|
||||||
|
session_key = f"agent:{agent_id}:main"
|
||||||
|
now = _dt.now(_tz.utc)
|
||||||
|
window_start = now - timedelta(seconds=window_seconds)
|
||||||
|
|
||||||
|
latest_start_time = None # type: Optional[_dt]
|
||||||
|
latest_start_str = None # type: Optional[str]
|
||||||
|
|
||||||
|
for log_path in log_paths:
|
||||||
|
if not os.path.exists(log_path):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
with open(log_path, "rb") as f:
|
||||||
|
f.seek(0, 2)
|
||||||
|
size = f.tell()
|
||||||
|
f.seek(max(0, size - 2 * 1024 * 1024))
|
||||||
|
tail = f.read().decode("utf-8", errors="replace")
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for line in tail.splitlines():
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
obj = json.loads(line)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
msg = obj.get("message", "")
|
||||||
|
if session_key not in msg:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 只检测 precheck 路径:route=compact_then_truncate
|
||||||
|
# overflow/timeout 标记不含 sessionKey,被前置过滤跳过(死代码),已删除
|
||||||
|
if ("[context-overflow-precheck]" not in msg
|
||||||
|
or "route=compact_then_truncate" not in msg):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 解析时间
|
||||||
|
ts_str = obj.get("time", "")
|
||||||
|
if not ts_str:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
event_time = _dt.fromisoformat(
|
||||||
|
ts_str.replace("Z", "+00:00"))
|
||||||
|
if event_time.tzinfo is None:
|
||||||
|
event_time = event_time.replace(tzinfo=_tz.utc)
|
||||||
|
else:
|
||||||
|
# 确保 UTC
|
||||||
|
event_time = event_time.astimezone(_tz.utc)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 超时兜底:超过窗口的忽略
|
||||||
|
if event_time < window_start:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if latest_start_time is None or event_time > latest_start_time:
|
||||||
|
latest_start_time = event_time
|
||||||
|
latest_start_str = event_time.strftime(
|
||||||
|
"%Y-%m-%dT%H:%M:%S.") + f"{event_time.microsecond:06d}" + "Z"
|
||||||
|
|
||||||
|
return latest_start_str
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _check_compaction_finished_in_jsonl(
|
||||||
|
session_file: str, after_time: str) -> bool:
|
||||||
|
"""v5: 检查 jsonl 是否有 after_time 之后的 compaction entry。
|
||||||
|
|
||||||
|
有 → compact 已完成 → True
|
||||||
|
没有 → compact 可能仍在进行 → False
|
||||||
|
|
||||||
|
after_time 格式:UTC ISO(如 2026-06-12T10:25:27.581Z)。
|
||||||
|
jsonl timestamp 格式也是 UTC ISO。
|
||||||
|
"""
|
||||||
|
if not session_file or not Path(session_file).exists():
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
from datetime import datetime as _dt, timezone as _tz
|
||||||
|
after_dt = _dt.fromisoformat(after_time.replace("Z", "+00:00"))
|
||||||
|
if after_dt.tzinfo is None:
|
||||||
|
after_dt = after_dt.replace(tzinfo=_tz.utc)
|
||||||
|
|
||||||
|
with open(session_file, "rb") as sf:
|
||||||
|
sf.seek(0, 2)
|
||||||
|
size = sf.tell()
|
||||||
|
sf.seek(max(0, size - 1048576))
|
||||||
|
tail = sf.read().decode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
for line in reversed(tail.splitlines()):
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
obj = json.loads(line)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
continue
|
||||||
|
if obj.get("type") == "compaction":
|
||||||
|
ts = obj.get("timestamp", "")
|
||||||
|
if ts:
|
||||||
|
try:
|
||||||
|
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
|
||||||
|
if ct.tzinfo is None:
|
||||||
|
ct = ct.replace(tzinfo=_tz.utc)
|
||||||
|
if ct >= after_dt:
|
||||||
|
return True
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
# 遇到早于 after_time 的 entry → 不需要继续往前扫
|
||||||
|
ts = obj.get("timestamp", "")
|
||||||
|
if ts:
|
||||||
|
try:
|
||||||
|
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
|
||||||
|
if ct.tzinfo is None:
|
||||||
|
ct = ct.replace(tzinfo=_tz.utc)
|
||||||
|
if ct < after_dt:
|
||||||
|
break
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _check_recent_compaction_jsonl(
|
def _check_recent_compaction_jsonl(
|
||||||
session_file: str, window_seconds: int = 900) -> bool:
|
session_file: str, window_seconds: int = 900) -> bool:
|
||||||
@@ -1497,16 +1645,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# §24 v4: compact 检测优先用 trajectory prompt.submitted
|
# §24 v5: compact 检测 = gateway log 开始标记 + jsonl 结束标记配对
|
||||||
# fallback: _check_recent_compaction_jsonl (v2.8.2)
|
# 旧方法 (_check_compact_in_progress_trajectory, _check_recent_compaction_jsonl)
|
||||||
# 重要:compact 进行中时 status=done,所以不能按 status 过滤
|
# 保留为 deprecated 但不再调用。
|
||||||
# 只跳过 idle/unknown(完全没有活动过的 session)
|
#
|
||||||
|
# 逻辑:
|
||||||
|
# 1. 查 gateway log 最近的 compact 开始标记(overflow/timeout/precheck)
|
||||||
|
# 2. 有开始标记 → 查 jsonl 是否有对应的 compaction entry(结束标记)
|
||||||
|
# 3. 有开始无结束 → 阻塞(recent_compact=True)
|
||||||
|
# 4. 有开始有结束 → 放行
|
||||||
|
# 5. 无开始标记 → threshold/manual 静默触发,靠 counter+lock+status 保护
|
||||||
|
# 6. 超时兜底:开始标记超过 15 分钟自动忽略
|
||||||
if result["status"] not in ("idle", "unknown", None) and sf:
|
if result["status"] not in ("idle", "unknown", None) and sf:
|
||||||
result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory(
|
compact_start = AgentSpawner._find_compact_start_in_gateway_log(agent_id)
|
||||||
sf)
|
if compact_start:
|
||||||
if not result["recent_compact"] and sf:
|
finished = AgentSpawner._check_compaction_finished_in_jsonl(sf, compact_start)
|
||||||
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
|
if not finished:
|
||||||
sf)
|
# 有开始标记且未完成 → 阻塞
|
||||||
|
result["recent_compact"] = True
|
||||||
|
# 如果已完成 → recent_compact 保持 False(放行)
|
||||||
|
# 没有开始标记 → threshold/manual 静默触发,不阻塞
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return result
|
return result
|
||||||
|
|||||||
Reference in New Issue
Block a user