Files
2026-05-26 23:28:54 +08:00

614 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Experience Pattern Scanner for OpenClaw Agent JSONL files.
Scans 6 experience patterns from conversation history:
1. correction - User corrects agent
2. trial_error - Repeated failures then success
3. success - Complex task (5+ tool_use) with no correction
4. collaboration- Mail/messaging between agents
5. decision - User hesitation/confirmation requests
6. experience - Agent declares lessons/tips
Output: JSON fragment index with context windows.
"""
import json
import os
import re
import sys
import glob
import argparse
import time
from pathlib import Path
from collections import defaultdict
# ── Signal patterns ──────────────────────────────────────────────────────────
CORRECTION_SIGNALS = [
r'不对', r'错了', r'不要', r'停(?!止|顿|留|靠|滞|车|泊|放|歇)',
r'我说的不是', r'不是这个意思', r'重新来', r'别这样', r'不要这样',
r'你再看看', r'\bno\b', r'\bwrong\b', r'\bstop\b', r'\bnot what I meant\b',
]
DECISION_SIGNALS = [
r'先不要', r'方案', r'等等', r'确认', r'等等看', r'再说',
r'先看看', r'确认一下', r'你是说', r'你的意思是',
]
EXPERIENCE_SIGNALS = [
r'以后应该', r'这个方法好', r'经验是', r'教训是',
r'下次注意', r'注意要', r'记住', r'一定要', r'千万不要', r'重要提示',
]
COLLAB_TOOLS = {'sanguo_mail', 'send_message', 'sessions_send'}
COLLAB_TEXT = ['send_message', 'sanguo_mail', 'Sanguo Mail']
ERROR_INDICATORS = ['error', 'Error', 'ERROR', 'failed', 'Failed', 'FAILED',
'exception', 'Exception', 'Traceback', '不存在', '失败']
MAX_TEXT_SNIPPET = 500
# ── Helpers ──────────────────────────────────────────────────────────────────
def truncate(text: str, limit: int = MAX_TEXT_SNIPPET) -> str:
if not text:
return ""
text = text.strip().replace('\n', ' ')
if len(text) > limit:
return text[:limit] + "..."
return text
def extract_text_from_content(content) -> str:
"""Extract plain text from message content (list of blocks or string)."""
if isinstance(content, str):
return content
if not isinstance(content, list):
return ""
parts = []
for block in content:
if isinstance(block, dict):
if block.get('type') == 'text':
parts.append(block.get('text', ''))
return ' '.join(parts)
def extract_tool_info(role: str, content) -> tuple:
"""Extract tool names and whether there are tool_results with errors.
v3 format: toolCall blocks in assistant messages, toolResult as separate role.
"""
tool_uses = []
has_error = False
# Extract tool calls from assistant messages
if isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
btype = block.get('type', '')
# v3 uses 'toolCall', some formats use 'tool_use'
if btype in ('toolCall', 'tool_use'):
tool_uses.append(block.get('name', ''))
# Check for errors in toolResult role messages
if role == 'toolResult':
result_text = ''
if isinstance(content, str):
result_text = content
elif isinstance(content, list):
for sub in content:
if isinstance(sub, dict) and sub.get('type') == 'text':
result_text += sub.get('text', '')
if any(e in result_text for e in ERROR_INDICATORS):
has_error = True
return tool_uses, has_error
# Template blocks to strip before pattern matching (avoid false positives)
_TEMPLATE_BLOCKS = [
re.compile(r'<gate-rules>.*?</gate-rules>', re.DOTALL),
re.compile(r'## \u72b6\u6001\u673a.*?```', re.DOTALL), # state machine diagrams
]
def strip_templates(text: str) -> str:
"""Remove common template blocks that cause false positives."""
for pat in _TEMPLATE_BLOCKS:
text = pat.sub('', text)
return text
def match_signals(text: str, patterns: list) -> bool:
if not text:
return False
cleaned = strip_templates(text)
if not cleaned:
return False
for pat in patterns:
if re.search(pat, cleaned):
return True
return False
def is_collab(text: str, tool_names: list) -> bool:
for t in COLLAB_TEXT:
if t in text:
return True
for tn in tool_names:
if tn in COLLAB_TOOLS:
return True
return False
# ── Parsed Message ───────────────────────────────────────────────────────────
class ParsedMsg:
__slots__ = ('role', 'text', 'timestamp', 'tool_names', 'has_error',
'raw_text_snippet', 'idx')
def __init__(self, role, text, timestamp, tool_names, has_error, idx):
self.role = role
self.text = text
self.timestamp = timestamp
self.tool_names = tool_names
self.has_error = has_error
self.raw_text_snippet = truncate(text)
self.idx = idx
# ── File parsers ─────────────────────────────────────────────────────────────
def parse_v3_jsonl(filepath: str) -> list:
"""Parse standard OpenClaw JSONL v3 format."""
messages = []
idx = 0
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get('type') != 'message':
continue
msg = obj.get('message', {})
role = msg.get('role', '')
if role not in ('user', 'assistant', 'toolResult'):
continue
content = msg.get('content', '')
text = extract_text_from_content(content)
tool_names, has_error = extract_tool_info(role, content)
ts = obj.get('timestamp', '')
# Skip toolResult messages from the main conversation flow
# (they're metadata, not turns)
if role == 'toolResult':
# Still track errors for trial_error pattern
if has_error:
# Attach error to previous assistant message
if messages and messages[-1].role == 'assistant':
messages[-1].has_error = True
continue
messages.append(ParsedMsg(role, text, ts, tool_names, has_error, idx))
idx += 1
except Exception as e:
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
return messages
def parse_trajectory_jsonl(filepath: str) -> list:
"""Parse OpenClaw trajectory JSONL format."""
messages = []
idx = 0
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if obj.get('traceSchema') != 'openclaw-trajectory':
continue
etype = obj.get('type', '')
data = obj.get('data', {})
ts = obj.get('ts', '')
if etype == 'user_message':
text = ''
content = data.get('content', '')
if isinstance(content, str):
text = content
elif isinstance(content, list):
text = extract_text_from_content(content)
messages.append(ParsedMsg('user', text, ts, [], False, idx))
idx += 1
elif etype == 'assistant_message':
text = ''
content = data.get('content', '')
if isinstance(content, str):
text = content
elif isinstance(content, list):
text = extract_text_from_content(content)
messages.append(ParsedMsg('assistant', text, ts, [], False, idx))
idx += 1
elif etype == 'tool_call':
tool_name = data.get('name', '')
# Attach tool info to previous assistant or create entry
if messages and messages[-1].role == 'assistant':
messages[-1].tool_names.append(tool_name)
# Don't increment idx for tool_call
elif etype == 'tool_result':
result_text = ''
rc = data.get('content', '')
if isinstance(rc, str):
result_text = rc
elif isinstance(rc, list):
for sub in rc:
if isinstance(sub, dict) and sub.get('type') == 'text':
result_text += sub.get('text', '')
has_err = any(e in result_text for e in ERROR_INDICATORS)
if has_err and messages:
messages[-1].has_error = True
except Exception as e:
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
return messages
def detect_and_parse(filepath: str) -> list:
"""Auto-detect JSONL type and parse accordingly."""
# Read first non-empty line to detect format
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
line = line.strip()
if not line:
continue
obj = json.loads(line)
if obj.get('traceSchema') == 'openclaw-trajectory':
return parse_trajectory_jsonl(filepath)
else:
return parse_v3_jsonl(filepath)
except Exception:
return []
return []
# ── Pattern scanners ─────────────────────────────────────────────────────────
def get_context(messages: list, center_idx: int, before: int = 3, after: int = 3) -> tuple:
"""Get context window around a message index (by message count, not turns)."""
context_before = []
context_after = []
# before: messages with idx < center_idx
for msg in messages:
if msg.idx < center_idx:
context_before.append(msg.raw_text_snippet)
elif msg.idx >= center_idx:
break
# after: messages with idx > center_idx
for msg in messages:
if msg.idx > center_idx:
context_after.append(msg.raw_text_snippet)
# Trim to window size
context_before = context_before[-before * 2:] # *2 because each "turn" ≈ 2 msgs
context_after = context_after[:after * 2]
return context_before, context_after
def get_tools_in_context(messages: list, center_idx: int, window: int = 6) -> list:
"""Collect tool names used near center_idx."""
tools = []
for msg in messages:
if abs(msg.idx - center_idx) <= window * 2 and msg.tool_names:
tools.extend(msg.tool_names)
return list(set(tools))
def scan_correction(messages: list) -> list:
"""Pattern ①: User correction signals."""
fragments = []
for msg in messages:
if msg.role == 'user' and match_signals(msg.text, CORRECTION_SIGNALS):
ctx_before, ctx_after = get_context(messages, msg.idx)
tools = get_tools_in_context(messages, msg.idx)
fragments.append({
'mode': 'correction',
'timestamp': msg.timestamp,
'trigger_message': msg.raw_text_snippet,
'trigger_role': 'user',
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': tools,
'summary': f"用户纠正了 Agent"
})
return fragments
def scan_trial_error(messages: list) -> list:
"""Pattern ②: 3+ consecutive errors then success."""
fragments = []
# Find sequences of tool_results with errors followed by success
error_streak = 0
streak_start_idx = -1
streak_tools = []
for i, msg in enumerate(messages):
if msg.has_error:
if error_streak == 0:
streak_start_idx = msg.idx
error_streak += 1
streak_tools.extend(msg.tool_names)
else:
if error_streak >= 3:
# Check if the successful message is an assistant message
ctx_before, ctx_after = get_context(messages, streak_start_idx)
tools = get_tools_in_context(messages, streak_start_idx, window=error_streak + 2)
trigger_msg = messages[i] if i < len(messages) else msg
fragments.append({
'mode': 'trial_error',
'timestamp': messages[streak_start_idx].timestamp if streak_start_idx < len(messages) else '',
'trigger_message': trigger_msg.raw_text_snippet,
'trigger_role': trigger_msg.role,
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': list(set(tools)),
'summary': f"Agent 经过 {error_streak} 次试错后成功"
})
error_streak = 0
streak_tools = []
return fragments
def scan_success(messages: list) -> list:
"""Pattern ③: Complex task (5+ tool_use in a session) with no correction."""
# Need at least 4 messages to be meaningful
if len(messages) < 4:
return []
# First check if the session has any corrections
has_correction = any(
m.role == 'user' and match_signals(m.text, CORRECTION_SIGNALS)
for m in messages
)
if has_correction:
return []
# Count tool_use across all messages
all_tools = []
for m in messages:
all_tools.extend(m.tool_names)
if len(all_tools) < 5:
return []
# Use the last assistant message as the completion point
last_asst = None
for m in reversed(messages):
if m.role == 'assistant':
last_asst = m
break
if not last_asst:
return []
ctx_before, ctx_after = get_context(messages, last_asst.idx)
tools = get_tools_in_context(messages, last_asst.idx)
return [{
'mode': 'success',
'timestamp': last_asst.timestamp,
'trigger_message': last_asst.raw_text_snippet,
'trigger_role': 'assistant',
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': tools,
'summary': f"复杂任务成功完成,共使用 {len(all_tools)} 次 tool_call,无用户纠正"
}]
def scan_collaboration(messages: list) -> list:
"""Pattern ④: Agent collaboration via mail/messaging."""
fragments = []
for msg in messages:
if is_collab(msg.text, msg.tool_names):
ctx_before, ctx_after = get_context(messages, msg.idx)
tools = get_tools_in_context(messages, msg.idx)
fragments.append({
'mode': 'collaboration',
'timestamp': msg.timestamp,
'trigger_message': msg.raw_text_snippet,
'trigger_role': msg.role,
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': tools,
'summary': f"{'用户' if msg.role == 'user' else 'Agent'} 触发了协作通信"
})
return fragments
def scan_decision(messages: list) -> list:
"""Pattern ⑤: User decision/hesitation signals."""
fragments = []
for msg in messages:
if msg.role == 'user' and match_signals(msg.text, DECISION_SIGNALS):
ctx_before, ctx_after = get_context(messages, msg.idx)
tools = get_tools_in_context(messages, msg.idx)
fragments.append({
'mode': 'decision',
'timestamp': msg.timestamp,
'trigger_message': msg.raw_text_snippet,
'trigger_role': 'user',
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': tools,
'summary': f"用户表达了决策犹豫或需要确认"
})
return fragments
def scan_experience(messages: list) -> list:
"""Pattern ⑥: Agent declares lessons/tips."""
fragments = []
for msg in messages:
if msg.role == 'assistant' and match_signals(msg.text, EXPERIENCE_SIGNALS):
ctx_before, ctx_after = get_context(messages, msg.idx)
tools = get_tools_in_context(messages, msg.idx)
fragments.append({
'mode': 'experience',
'timestamp': msg.timestamp,
'trigger_message': msg.raw_text_snippet,
'trigger_role': 'assistant',
'context_before': ctx_before,
'context_after': ctx_after,
'tool_calls_in_context': tools,
'summary': f"Agent 声明了经验/教训"
})
return fragments
# ── Main scanner ─────────────────────────────────────────────────────────────
MODE_NAMES = ['correction', 'trial_error', 'success', 'collaboration', 'decision', 'experience']
SCANNERS = [
scan_correction,
scan_trial_error,
scan_success,
scan_collaboration,
scan_decision,
scan_experience,
]
def scan_file(filepath: str) -> list:
"""Scan a single JSONL file for all patterns."""
messages = detect_and_parse(filepath)
if not messages:
return [], 0
all_fragments = []
for scanner in SCANNERS:
frags = scanner(messages)
all_fragments.extend(frags)
return all_fragments, len(messages)
def scan_directory(dirpath: str, limit: int = 0) -> dict:
"""Scan all .jsonl files (excluding trajectory/checkpoint) in directory."""
# Collect target files
pattern = os.path.join(dirpath, '*.jsonl')
all_files = glob.glob(pattern)
# Filter: only UUID.jsonl (main session files)
target_files = []
for f in all_files:
basename = os.path.basename(f)
# Skip trajectory and checkpoint
if '.trajectory.' in basename or '.checkpoint.' in basename:
continue
# Only main session files (UUID.jsonl)
if basename.endswith('.jsonl'):
target_files.append(f)
target_files.sort()
if limit > 0:
target_files = target_files[:limit]
total_files = len(target_files)
print(f"Found {total_files} session files to scan")
all_fragments = []
total_messages = 0
mode_counts = defaultdict(int)
frag_counter = 0
start = time.time()
for i, filepath in enumerate(target_files):
frags, msg_count = scan_file(filepath)
total_messages += msg_count
basename = os.path.basename(filepath)
for frag in frags:
frag_counter += 1
frag['id'] = f"frag_{frag_counter:04d}"
frag['source_file'] = basename
all_fragments.append(frag)
mode_counts[frag['mode']] += 1
if (i + 1) % 50 == 0 or i == total_files - 1:
elapsed = time.time() - start
rate = (i + 1) / elapsed if elapsed > 0 else 0
print(f" [{i+1}/{total_files}] {rate:.1f} files/s | "
f"{len(all_fragments)} fragments found | "
f"{msg_count} msgs in current file")
elapsed = time.time() - start
print(f"\nScan complete: {total_files} files, {total_messages} messages, "
f"{len(all_fragments)} fragments in {elapsed:.1f}s")
result = {
'scan_stats': {
'total_files': total_files,
'total_messages': total_messages,
'total_fragments': len(all_fragments),
'scan_duration_seconds': round(elapsed, 1),
'mode_counts': {m: mode_counts.get(m, 0) for m in MODE_NAMES},
},
'fragments': all_fragments,
}
return result
def main():
parser = argparse.ArgumentParser(description='Scan OpenClaw JSONL for experience patterns')
parser.add_argument('--dir', default=os.path.expanduser(
'~/.openclaw/agents/pangtong-fujunshi/sessions/'),
help='Directory containing JSONL files')
parser.add_argument('--limit', type=int, default=0,
help='Limit number of files to scan (0 = all)')
parser.add_argument('--output', default=os.path.expanduser(
'~/.openclaw/sanguo_projects/sanguo_moziplus_v2/docs/research/distill-scan-pangtong-result.json'),
help='Output JSON path')
args = parser.parse_args()
print(f"Scanning directory: {args.dir}")
if args.limit:
print(f"Limit: {args.limit} files (test mode)")
print(f"Output: {args.output}")
print()
result = scan_directory(args.dir, limit=args.limit)
# Write output
os.makedirs(os.path.dirname(args.output), exist_ok=True)
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\nResults written to: {args.output}")
print(f" Total fragments: {result['scan_stats']['total_fragments']}")
for mode, count in result['scan_stats']['mode_counts'].items():
print(f" {mode}: {count}")
if __name__ == '__main__':
main()