fix(spawner): compact 检测 v5 — gateway log 开始标记 + jsonl 结束标记配对
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s

- 新增 _find_compact_start_in_gateway_log: 检测 overflow/timeout/precheck 三种开始标记
- 新增 _check_compaction_finished_in_jsonl: 检测 jsonl compaction entry 作为结束标记
- 重写 _check_session_state compact 检测逻辑: 开始+结束配对
- 无开始标记 (threshold/manual) 不阻塞,靠 counter+lock+status 保护
- 超时兜底 15 分钟保留
- 旧方法标记 deprecated 保留
- 427 passed
This commit is contained in:
cfdaily
2026-06-13 00:27:39 +08:00
parent 3b7ecaf446
commit 36ba629b69
+173 -10
View File
@@ -1317,7 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
paths.append(p)
return paths
# deprecated: §24 v3, 保留供方案 B 备选
# deprecated: §24 v3, 保留供方案 B 备选(旧 rotation 结束标记检测,已被 v5 取代)
@staticmethod
def _check_compact_in_progress_gateway(
session_key: str, window_seconds: int = 120) -> bool:
@@ -1381,6 +1381,159 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
return False
# ─── v5: compact 开始标记检测(gateway log)+ 结束标记检测(jsonl) ───
@staticmethod
def _find_compact_start_in_gateway_log(
agent_id: str, window_seconds: int = 900) -> Optional[str]:
"""v5: 检查 gateway 日志,找最近的 compact 开始标记。
开始标记匹配规则(message 字段包含以下任一):
- "attempting auto-compaction" overflow 路径)
- "[timeout-compaction]" 且包含 "attempting" timeout 路径)
- "[context-overflow-precheck]""route=compact_then_truncate" precheck 路径)
同时需要包含目标 agent 的 sessionKey(如 agent:simayi-challenger:main)。
超时兜底:开始标记超过 window_seconds 自动忽略。
返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。
"""
from datetime import datetime as _dt, timezone as _tz, timedelta
log_paths = AgentSpawner._get_recent_gateway_logs()
if not log_paths:
return None
session_key = f"agent:{agent_id}:main"
now = _dt.now(_tz.utc)
window_start = now - timedelta(seconds=window_seconds)
latest_start_time = None # type: Optional[_dt]
latest_start_str = None # type: Optional[str]
for log_path in log_paths:
if not os.path.exists(log_path):
continue
try:
with open(log_path, "rb") as f:
f.seek(0, 2)
size = f.tell()
f.seek(max(0, size - 2 * 1024 * 1024))
tail = f.read().decode("utf-8", errors="replace")
except Exception:
continue
for line in tail.splitlines():
if not line.strip():
continue
try:
obj = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
msg = obj.get("message", "")
if session_key not in msg:
continue
# 检测三种开始标记
is_start = False
# overflow: "attempting auto-compaction"
if "attempting auto-compaction" in msg:
is_start = True
# timeout: "[timeout-compaction]" + "attempting"
elif "[timeout-compaction]" in msg and "attempting" in msg:
is_start = True
# precheck: "[context-overflow-precheck]" + "route=compact_then_truncate"
elif ("[context-overflow-precheck]" in msg
and "route=compact_then_truncate" in msg):
is_start = True
if not is_start:
continue
# 解析时间
ts_str = obj.get("time", "")
if not ts_str:
continue
try:
event_time = _dt.fromisoformat(
ts_str.replace("Z", "+00:00"))
if event_time.tzinfo is None:
event_time = event_time.replace(tzinfo=_tz.utc)
else:
# 确保 UTC
event_time = event_time.astimezone(_tz.utc)
except (ValueError, TypeError):
continue
# 超时兜底:超过窗口的忽略
if event_time < window_start:
continue
if latest_start_time is None or event_time > latest_start_time:
latest_start_time = event_time
latest_start_str = event_time.strftime(
"%Y-%m-%dT%H:%M:%S.") + f"{event_time.microsecond:06d}" + "Z"
return latest_start_str
@staticmethod
def _check_compaction_finished_in_jsonl(
session_file: str, after_time: str) -> bool:
"""v5: 检查 jsonl 是否有 after_time 之后的 compaction entry。
有 → compact 已完成 → True
没有 → compact 可能仍在进行 → False
after_time 格式:UTC ISO(如 2026-06-12T10:25:27.581Z)。
jsonl timestamp 格式也是 UTC ISO。
"""
if not session_file or not Path(session_file).exists():
return False
try:
from datetime import datetime as _dt, timezone as _tz
after_dt = _dt.fromisoformat(after_time.replace("Z", "+00:00"))
if after_dt.tzinfo is None:
after_dt = after_dt.replace(tzinfo=_tz.utc)
with open(session_file, "rb") as sf:
sf.seek(0, 2)
size = sf.tell()
sf.seek(max(0, size - 1048576))
tail = sf.read().decode("utf-8", errors="replace")
for line in reversed(tail.splitlines()):
if not line.strip():
continue
try:
obj = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
if obj.get("type") == "compaction":
ts = obj.get("timestamp", "")
if ts:
try:
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
if ct.tzinfo is None:
ct = ct.replace(tzinfo=_tz.utc)
if ct >= after_dt:
return True
except (ValueError, TypeError):
pass
# 遇到早于 after_time 的 entry → 不需要继续往前扫
ts = obj.get("timestamp", "")
if ts:
try:
ct = _dt.fromisoformat(ts.replace("Z", "+00:00"))
if ct.tzinfo is None:
ct = ct.replace(tzinfo=_tz.utc)
if ct < after_dt:
break
except (ValueError, TypeError):
pass
return False
except Exception:
return False
@staticmethod
def _check_recent_compaction_jsonl(
session_file: str, window_seconds: int = 900) -> bool:
@@ -1497,16 +1650,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
except Exception:
pass
# §24 v4: compact 检测优先用 trajectory prompt.submitted
# fallback: _check_recent_compaction_jsonl (v2.8.2)
# 重要:compact 进行中时 status=done,所以不能按 status 过滤
# 只跳过 idle/unknown(完全没有活动过的 session)
# §24 v5: compact 检测 = gateway log 开始标记 + jsonl 结束标记配对
# 旧方法 (_check_compact_in_progress_trajectory, _check_recent_compaction_jsonl)
# 保留为 deprecated 但不再调用。
#
# 逻辑:
# 1. 查 gateway log 最近的 compact 开始标记(overflow/timeout/precheck
# 2. 有开始标记 → 查 jsonl 是否有对应的 compaction entry(结束标记)
# 3. 有开始无结束 → 阻塞(recent_compact=True
# 4. 有开始有结束 → 放行
# 5. 无开始标记 → threshold/manual 静默触发,靠 counter+lock+status 保护
# 6. 超时兜底:开始标记超过 15 分钟自动忽略
if result["status"] not in ("idle", "unknown", None) and sf:
result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory(
sf)
if not result["recent_compact"] and sf:
result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl(
sf)
compact_start = AgentSpawner._find_compact_start_in_gateway_log(agent_id)
if compact_start:
finished = AgentSpawner._check_compaction_finished_in_jsonl(sf, compact_start)
if not finished:
# 有开始标记且未完成 → 阻塞
result["recent_compact"] = True
# 如果已完成 → recent_compact 保持 False(放行)
# 没有开始标记 → threshold/manual 静默触发,不阻塞
except Exception:
pass
return result