diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index aa121e8..12c0e96 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -1317,7 +1317,7 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ paths.append(p) return paths - # deprecated: §24 v3, 保留供方案 B 备选 + # deprecated: §24 v3, 保留供方案 B 备选(旧 rotation 结束标记检测,已被 v5 取代) @staticmethod def _check_compact_in_progress_gateway( session_key: str, window_seconds: int = 120) -> bool: @@ -1381,6 +1381,159 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ return False + # ─── v5: compact 开始标记检测(gateway log)+ 结束标记检测(jsonl) ─── + + @staticmethod + def _find_compact_start_in_gateway_log( + agent_id: str, window_seconds: int = 900) -> Optional[str]: + """v5: 检查 gateway 日志,找最近的 compact 开始标记。 + + 开始标记匹配规则(message 字段包含以下任一): + - "attempting auto-compaction" (overflow 路径) + - "[timeout-compaction]" 且包含 "attempting" (timeout 路径) + - "[context-overflow-precheck]" 且 "route=compact_then_truncate" (precheck 路径) + + 同时需要包含目标 agent 的 sessionKey(如 agent:simayi-challenger:main)。 + 超时兜底:开始标记超过 window_seconds 自动忽略。 + + 返回最近一个开始标记的 UTC ISO 时间字符串(带 Z 后缀),或 None。 + """ + from datetime import datetime as _dt, timezone as _tz, timedelta + log_paths = AgentSpawner._get_recent_gateway_logs() + if not log_paths: + return None + + session_key = f"agent:{agent_id}:main" + now = _dt.now(_tz.utc) + window_start = now - timedelta(seconds=window_seconds) + + latest_start_time = None # type: Optional[_dt] + latest_start_str = None # type: Optional[str] + + for log_path in log_paths: + if not os.path.exists(log_path): + continue + try: + with open(log_path, "rb") as f: + f.seek(0, 2) + size = f.tell() + f.seek(max(0, size - 2 * 1024 * 1024)) + tail = f.read().decode("utf-8", errors="replace") + except Exception: + continue + + for line in tail.splitlines(): + if not line.strip(): + continue + try: + obj = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + + msg = obj.get("message", "") + if session_key not in msg: + continue + + # 检测三种开始标记 + is_start = False + # overflow: "attempting auto-compaction" + if "attempting auto-compaction" in msg: + is_start = True + # timeout: "[timeout-compaction]" + "attempting" + elif "[timeout-compaction]" in msg and "attempting" in msg: + is_start = True + # precheck: "[context-overflow-precheck]" + "route=compact_then_truncate" + elif ("[context-overflow-precheck]" in msg + and "route=compact_then_truncate" in msg): + is_start = True + + if not is_start: + continue + + # 解析时间 + ts_str = obj.get("time", "") + if not ts_str: + continue + try: + event_time = _dt.fromisoformat( + ts_str.replace("Z", "+00:00")) + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=_tz.utc) + else: + # 确保 UTC + event_time = event_time.astimezone(_tz.utc) + except (ValueError, TypeError): + continue + + # 超时兜底:超过窗口的忽略 + if event_time < window_start: + continue + + if latest_start_time is None or event_time > latest_start_time: + latest_start_time = event_time + latest_start_str = event_time.strftime( + "%Y-%m-%dT%H:%M:%S.") + f"{event_time.microsecond:06d}" + "Z" + + return latest_start_str + + @staticmethod + def _check_compaction_finished_in_jsonl( + session_file: str, after_time: str) -> bool: + """v5: 检查 jsonl 是否有 after_time 之后的 compaction entry。 + + 有 → compact 已完成 → True + 没有 → compact 可能仍在进行 → False + + after_time 格式:UTC ISO(如 2026-06-12T10:25:27.581Z)。 + jsonl timestamp 格式也是 UTC ISO。 + """ + if not session_file or not Path(session_file).exists(): + return False + try: + from datetime import datetime as _dt, timezone as _tz + after_dt = _dt.fromisoformat(after_time.replace("Z", "+00:00")) + if after_dt.tzinfo is None: + after_dt = after_dt.replace(tzinfo=_tz.utc) + + with open(session_file, "rb") as sf: + sf.seek(0, 2) + size = sf.tell() + sf.seek(max(0, size - 1048576)) + tail = sf.read().decode("utf-8", errors="replace") + + for line in reversed(tail.splitlines()): + if not line.strip(): + continue + try: + obj = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + if obj.get("type") == "compaction": + ts = obj.get("timestamp", "") + if ts: + try: + ct = _dt.fromisoformat(ts.replace("Z", "+00:00")) + if ct.tzinfo is None: + ct = ct.replace(tzinfo=_tz.utc) + if ct >= after_dt: + return True + except (ValueError, TypeError): + pass + # 遇到早于 after_time 的 entry → 不需要继续往前扫 + ts = obj.get("timestamp", "") + if ts: + try: + ct = _dt.fromisoformat(ts.replace("Z", "+00:00")) + if ct.tzinfo is None: + ct = ct.replace(tzinfo=_tz.utc) + if ct < after_dt: + break + except (ValueError, TypeError): + pass + return False + except Exception: + return False + @staticmethod def _check_recent_compaction_jsonl( session_file: str, window_seconds: int = 900) -> bool: @@ -1497,16 +1650,26 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_ except Exception: pass - # §24 v4: compact 检测优先用 trajectory prompt.submitted - # fallback: _check_recent_compaction_jsonl (v2.8.2) - # 重要:compact 进行中时 status=done,所以不能按 status 过滤 - # 只跳过 idle/unknown(完全没有活动过的 session) + # §24 v5: compact 检测 = gateway log 开始标记 + jsonl 结束标记配对 + # 旧方法 (_check_compact_in_progress_trajectory, _check_recent_compaction_jsonl) + # 保留为 deprecated 但不再调用。 + # + # 逻辑: + # 1. 查 gateway log 最近的 compact 开始标记(overflow/timeout/precheck) + # 2. 有开始标记 → 查 jsonl 是否有对应的 compaction entry(结束标记) + # 3. 有开始无结束 → 阻塞(recent_compact=True) + # 4. 有开始有结束 → 放行 + # 5. 无开始标记 → threshold/manual 静默触发,靠 counter+lock+status 保护 + # 6. 超时兜底:开始标记超过 15 分钟自动忽略 if result["status"] not in ("idle", "unknown", None) and sf: - result["recent_compact"] = AgentSpawner._check_compact_in_progress_trajectory( - sf) - if not result["recent_compact"] and sf: - result["recent_compact"] = AgentSpawner._check_recent_compaction_jsonl( - sf) + compact_start = AgentSpawner._find_compact_start_in_gateway_log(agent_id) + if compact_start: + finished = AgentSpawner._check_compaction_finished_in_jsonl(sf, compact_start) + if not finished: + # 有开始标记且未完成 → 阻塞 + result["recent_compact"] = True + # 如果已完成 → recent_compact 保持 False(放行) + # 没有开始标记 → threshold/manual 静默触发,不阻塞 except Exception: pass return result