@@ -1,12 +1,54 @@
|
||||
# §24 — Compact 检测方案修正
|
||||
|
||||
> 状态:v4(trajectory prompt.submitted),待实施
|
||||
> 状态:**v5 已实现**(gateway log + jsonl 配对)
|
||||
> 作者:庞统
|
||||
> 日期:2026-06-11
|
||||
> 日期:2026-06-11(v4),2026-06-13(v5)
|
||||
> 框架:基于 §07 Spawner Acquire-First
|
||||
> 评审:仲达 4 轮评审(v1 trajectory → v2 gateway precheck → v3 rotation-only → v4 prompt.submitted)
|
||||
> 评审:仲达 4+2 轮评审
|
||||
> 备选方案:B(内存 flag + sessions.json status),见 §2B
|
||||
|
||||
---
|
||||
|
||||
## 0. v5 方案(已实现)
|
||||
|
||||
### 0.1 方案概述
|
||||
|
||||
**gateway log 开始标记(precheck `route=compact_then_truncate`)+ jsonl 结束标记(`type: "compaction"` entry)配对**。
|
||||
|
||||
- **开始标记**:扫描 gateway 日志,找含目标 agent sessionKey 且 `route=compact_then_truncate` 的 precheck 日志行,提取时间戳。
|
||||
- **结束标记**:扫描 session jsonl,找开始时间之后的 `type: "compaction"` entry。
|
||||
- **判定逻辑**:有开始无结束 → compact 进行中 → skip ticker;有开始有结束 → compact 已完成 → 不 skip。
|
||||
- **超时兜底**:开始标记超过 15 分钟仍未结束 → 自动忽略(防止死锁)。
|
||||
|
||||
### 0.2 三种 Compact 触发路径分析
|
||||
|
||||
Gateway 的 compact 有多种触发路径,日志表现不同:
|
||||
|
||||
| 触发路径 | 有开始标记? | 有 sessionKey? | 有 compaction 结束标记? | 检测策略 |
|
||||
|---------|------------|---------------|----------------------|--------|
|
||||
| **overflow** | 有(`attempting auto-compaction`) | ❌ 不含 | 有 | 依赖 precheck 覆盖 |
|
||||
| **timeout** | 有(`[timeout-compaction]` + `attempting`) | ❌ 推测不含 | 有 | 依赖 precheck 覆盖 |
|
||||
| **precheck** | 有(`[context-overflow-precheck]` + `route=compact_then_truncate`) | ✅ 含 | 有 | **直接检测** |
|
||||
| **threshold** | 无(静默执行) | — | 有 | counter+lock+status 保护 |
|
||||
| **manual** | 无(静默执行) | — | 有 | counter+lock+status 保护 |
|
||||
|
||||
### 0.3 为什么只依赖 precheck 标记
|
||||
|
||||
1. **overflow/timeout 标记不含 sessionKey**:实测证实 overflow 标记(`context overflow detected; attempting auto-compaction for zhipu/glm-5.1`)不包含 `agent:xxx:main` 格式的 sessionKey,被前置 `session_key not in msg` 过滤跳过,是死代码。
|
||||
2. **precheck 总在 overflow 之前触发**:同一 compact 事件中,precheck `route=compact_then_truncate` 先检测到,overflow 是 fallback。所以 precheck 已覆盖 overflow 场景。
|
||||
3. **threshold/manual 无开始标记**:这两种是静默执行,没有 gateway 日志标记。它们依赖 counter+lock+status 三重保护(见 §07),不需要 gateway 日志检测。
|
||||
|
||||
> **注意**:`route=truncate_tool_results_only` 的 precheck 不触发 compact 检测,只有 `route=compact_then_truncate` 才触发。
|
||||
|
||||
### 0.4 超时兜底
|
||||
|
||||
15 分钟超时窗口:如果 compact 开始标记超过 15 分钟仍无结束标记,自动忽略该开始标记。这覆盖了:
|
||||
- daemon 重启后残留的开始标记
|
||||
- 极端长时间的 compact(正常 compact 通常 < 7 分钟)
|
||||
- 日志轮转导致的结束标记丢失
|
||||
|
||||
---
|
||||
|
||||
## 1. 问题
|
||||
|
||||
### 1.1 现象
|
||||
@@ -257,3 +299,6 @@ trajectory jsonl 路径 = `{sessionFile}.trajectory.jsonl`,其中 sessionFile
|
||||
- **v2**:gateway 日志 precheck 开始标志 → 仲达指出开始标志覆盖率仅 30%,建议 rotation-only
|
||||
- **v3**:rotation-only + 120s 窗口 → 合并 PR #36,但实测 51 分钟 compact loop 无法覆盖
|
||||
- **v4**:trajectory prompt.submitted → 仲达背靠背验证(源码 7 条 skipPromptSubmission 路径 + 实际数据 ~8% 假阳性但方向安全)→ 修正检测目标为"session 是否正常"
|
||||
- **v5**:gateway log(precheck 开始标记)+ jsonl(compaction 结束标记)配对 → 仲达评审通过后实现,PR #48 Review 驳回 M1/M2 修正后合并
|
||||
|
||||
> ⚠️ **v4 已 deprecated**。v4 的 trajectory prompt.submitted 方案未实施,最终实施的是 v5。v4 的分析(skipPromptSubmission 路径、实测数据)仍有参考价值。
|
||||
|
||||
+168
-10
@@ -1317,7 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
paths.append(p)
|
||||
return paths
|
||||
|
||||
# deprecated: §24 v3, 保留供方案 B 备选
|
||||
# deprecated: §24 v3, 保留供方案 B 备选(旧 rotation 结束标记检测,已被 v5 取代)
|
||||
@staticmethod
|
||||
def _check_compact_in_progress_gateway(
|
||||
session_key: str, window_seconds: int = 120) -> bool:
|
||||
@@ -1381,6 +1381,154 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
|
||||
return False
|
||||
|
||||
# ─── v5: compact 开始标记检测(gateway log)+ 结束标记检测(jsonl) ───
|
||||
|
||||
@staticmethod
|
||||
def _find_compact_start_in_gateway_log(
|
||||
agent_id: str, window_seconds: int = 900) -> Optional[str]:
|
||||
"""v5: 检查 gateway 日志,找最近的 compact 开始标记。
|
||||
|
||||
只检测 precheck 路径:message 含 "[context-overflow-precheck]" 且
|
||||
"route=compact_then_truncate"。原因:
|
||||
- overflow 标记("attempting auto-compaction")不含 sessionKey,
|
||||
被 `session_key not in msg` 前置过滤跳过,是死代码。
|
||||
- timeout 标记推测同理不含 sessionKey。
|
||||
- precheck 标记含 sessionKey 且实测总在 overflow 之前触发(同一 compact
|
||||
事件,precheck 先检测到,overflow 是 fallback),所以 precheck 已覆盖
|
||||
overflow 场景。
|
||||
- threshold/manual 触发的 compact 无开始标记(静默执行),依赖
|
||||
counter+lock+status 保护,不需要 gateway 日志检测。
|
||||
|
||||
超时兜底:开始标记超过 window_seconds(默认 15 分钟)自动忽略。
|
||||
|
||||
返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。
|
||||
"""
|
||||
from datetime import datetime as _dt, timezone as _tz, timedelta
|
||||
log_paths = AgentSpawner._get_recent_gateway_logs()
|
||||
if not log_paths:
|
||||
return None
|
||||
|
||||
session_key = f"agent:{agent_id}:main"
|
||||
now = _dt.now(_tz.utc)
|
||||
window_start = now - timedelta(seconds=window_seconds)
|
||||
|
||||
latest_start_time = None # type: Optional[_dt]
|
||||
latest_start_str = None # type: Optional[str]
|
||||
|
||||
for log_path in log_paths:
|
||||
if not os.path.exists(log_path):
|
||||
continue
|
||||
try:
|
||||
with open(log_path, "rb") as f:
|
||||
f.seek(0, 2)
|
||||
size = f.tell()
|
||||
f.seek(max(0, size - 2 * 1024 * 1024))
|
||||
tail = f.read().decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
for line in tail.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
msg = obj.get("message", "")
|
||||
if session_key not in msg:
|
||||
continue
|
||||
|
||||
# 只检测 precheck 路径:route=compact_then_truncate
|
||||
# overflow/timeout 标记不含 sessionKey,被前置过滤跳过(死代码),已删除
|
||||
if ("[context-overflow-precheck]" not in msg
|
||||
or "route=compact_then_truncate" not in msg):
|
||||
continue
|
||||
|
||||
# 解析时间
|
||||
ts_str = obj.get("time", "")
|
||||
if not ts_str:
|
||||
continue
|
||||
try:
|
||||
event_time = _dt.fromisoformat(
|
||||
ts_str.replace("Z", "+00:00"))
|
||||
if event_time.tzinfo is None:
|
||||
event_time = event_time.replace(tzinfo=_tz.utc)
|
||||
else:
|
||||
# 确保 UTC
|
||||
event_time = event_time.astimezone(_tz.utc)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# 超时兜底:超过窗口的忽略
|
||||
if event_time < window_start:
|
||||
continue
|
||||
|
||||
if latest_start_time is None or event_time > latest_start_time:
|
||||
latest_start_time = event_time
|
||||
latest_start_str = event_time.strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.") + f"{event_time.microsecond:06d}" + "Z"
|
||||
|
||||
return latest_start_str
|
||||
|
||||
@staticmethod
|
||||
def _check_compaction_finished_in_jsonl(
|
||||
session_file: str, after_time: str) -> bool:
|
||||
"""v5: 检查 jsonl 是否有 after_time 之后的 compaction entry。
|
||||
|
||||
有 → compact 已完成 → True
|
||||
没有 → compact 可能仍在进行 → False
|
||||
|
||||
after_time 格式:UTC ISO(如 2026-06-12T10:25:27.581Z)。
|
||||
jsonl timestamp 格式也是 UTC ISO。
|
||||
"""
|
||||
if not session_file or not Path(session_file).exists():
|
||||
return False
|
||||
try:
|
||||
from datetime import datetime as _dt, timezone as _tz
|
||||
after_dt = _dt.fromisoformat(after_time.replace("Z", "+00:00"))
|
||||
if after_dt.tzinfo is None:
|
||||
after_dt = after_dt.replace(tzinfo=_tz.utc)
|
||||
|
||||
with open(session_file, "rb") as sf:
|
||||
sf.seek(0, 2)
|
||||
size = sf.tell()
|
||||
sf.seek(max(0, size - 1048576))
|
||||
tail = sf.read().decode("utf-8", errors="replace")
|
||||
|
||||
for line in reversed(tail.splitlines()):
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
if obj.get("type") == "compaction":
|
||||
ts = obj.get("timestamp", "")
|
||||
if ts:
|
||||
try:
|
||||
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if ct.tzinfo is None:
|
||||
ct = ct.replace(tzinfo=_tz.utc)
|
||||
if ct >= after_dt:
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
# 遇到早于 after_time 的 entry → 不需要继续往前扫
|
||||
ts = obj.get("timestamp", "")
|
||||
if ts:
|
||||
try:
|
||||
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if ct.tzinfo is None:
|
||||
ct = ct.replace(tzinfo=_tz.utc)
|
||||
if ct < after_dt:
|
||||
break
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _check_recent_compaction_jsonl(
|
||||
session_file: str, window_seconds: int = 900) -> bool:
|
||||
@@ -1497,16 +1645,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# §24 v4: compact 检测优先用 trajectory prompt.submitted
|
||||
# fallback: _check_recent_compaction_jsonl (v2.8.2)
|
||||
# 重要:compact 进行中时 status=done,所以不能按 status 过滤
|
||||
# 只跳过 idle/unknown(完全没有活动过的 session)
|
||||
# §24 v5: compact 检测 = gateway log 开始标记 + jsonl 结束标记配对
|
||||
# 旧方法 (_check_compact_in_progress_trajectory, _check_recent_compaction_jsonl)
|
||||
# 保留为 deprecated 但不再调用。
|
||||
#
|
||||
# 逻辑:
|
||||
# 1. 查 gateway log 最近的 compact 开始标记(overflow/timeout/precheck)
|
||||
# 2. 有开始标记 → 查 jsonl 是否有对应的 compaction entry(结束标记)
|
||||
# 3. 有开始无结束 → 阻塞(recent_compact=True)
|
||||
# 4. 有开始有结束 → 放行
|
||||
# 5. 无开始标记 → threshold/manual 静默触发,靠 counter+lock+status 保护
|
||||
# 6. 超时兜底:开始标记超过 15 分钟自动忽略
|
||||
if result["status"] not in ("idle", "unknown", None) and sf:
|
||||
result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory(
|
||||
sf)
|
||||
if not result["recent_compact"] and sf:
|
||||
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
|
||||
sf)
|
||||
compact_start = AgentSpawner._find_compact_start_in_gateway_log(agent_id)
|
||||
if compact_start:
|
||||
finished = AgentSpawner._check_compaction_finished_in_jsonl(sf, compact_start)
|
||||
if not finished:
|
||||
# 有开始标记且未完成 → 阻塞
|
||||
result["recent_compact"] = True
|
||||
# 如果已完成 → recent_compact 保持 False(放行)
|
||||
# 没有开始标记 → threshold/manual 静默触发,不阻塞
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user