614 lines
22 KiB
Python
614 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Experience Pattern Scanner for OpenClaw Agent JSONL files.
|
|
|
|
Scans 6 experience patterns from conversation history:
|
|
1. correction - User corrects agent
|
|
2. trial_error - Repeated failures then success
|
|
3. success - Complex task (5+ tool_use) with no correction
|
|
4. collaboration- Mail/messaging between agents
|
|
5. decision - User hesitation/confirmation requests
|
|
6. experience - Agent declares lessons/tips
|
|
|
|
Output: JSON fragment index with context windows.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import glob
|
|
import argparse
|
|
import time
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
# ── Signal patterns ──────────────────────────────────────────────────────────
|
|
|
|
CORRECTION_SIGNALS = [
|
|
r'不对', r'错了', r'不要', r'停(?!止|顿|留|靠|滞|车|泊|放|歇)',
|
|
r'我说的不是', r'不是这个意思', r'重新来', r'别这样', r'不要这样',
|
|
r'你再看看', r'\bno\b', r'\bwrong\b', r'\bstop\b', r'\bnot what I meant\b',
|
|
]
|
|
|
|
DECISION_SIGNALS = [
|
|
r'先不要', r'方案', r'等等', r'确认', r'等等看', r'再说',
|
|
r'先看看', r'确认一下', r'你是说', r'你的意思是',
|
|
]
|
|
|
|
EXPERIENCE_SIGNALS = [
|
|
r'以后应该', r'这个方法好', r'经验是', r'教训是',
|
|
r'下次注意', r'注意要', r'记住', r'一定要', r'千万不要', r'重要提示',
|
|
]
|
|
|
|
COLLAB_TOOLS = {'sanguo_mail', 'send_message', 'sessions_send'}
|
|
COLLAB_TEXT = ['send_message', 'sanguo_mail', 'Sanguo Mail']
|
|
|
|
ERROR_INDICATORS = ['error', 'Error', 'ERROR', 'failed', 'Failed', 'FAILED',
|
|
'exception', 'Exception', 'Traceback', '不存在', '失败']
|
|
|
|
MAX_TEXT_SNIPPET = 500
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def truncate(text: str, limit: int = MAX_TEXT_SNIPPET) -> str:
|
|
if not text:
|
|
return ""
|
|
text = text.strip().replace('\n', ' ')
|
|
if len(text) > limit:
|
|
return text[:limit] + "..."
|
|
return text
|
|
|
|
|
|
def extract_text_from_content(content) -> str:
|
|
"""Extract plain text from message content (list of blocks or string)."""
|
|
if isinstance(content, str):
|
|
return content
|
|
if not isinstance(content, list):
|
|
return ""
|
|
parts = []
|
|
for block in content:
|
|
if isinstance(block, dict):
|
|
if block.get('type') == 'text':
|
|
parts.append(block.get('text', ''))
|
|
return ' '.join(parts)
|
|
|
|
|
|
def extract_tool_info(role: str, content) -> tuple:
|
|
"""Extract tool names and whether there are tool_results with errors.
|
|
|
|
v3 format: toolCall blocks in assistant messages, toolResult as separate role.
|
|
"""
|
|
tool_uses = []
|
|
has_error = False
|
|
|
|
# Extract tool calls from assistant messages
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if not isinstance(block, dict):
|
|
continue
|
|
btype = block.get('type', '')
|
|
# v3 uses 'toolCall', some formats use 'tool_use'
|
|
if btype in ('toolCall', 'tool_use'):
|
|
tool_uses.append(block.get('name', ''))
|
|
|
|
# Check for errors in toolResult role messages
|
|
if role == 'toolResult':
|
|
result_text = ''
|
|
if isinstance(content, str):
|
|
result_text = content
|
|
elif isinstance(content, list):
|
|
for sub in content:
|
|
if isinstance(sub, dict) and sub.get('type') == 'text':
|
|
result_text += sub.get('text', '')
|
|
if any(e in result_text for e in ERROR_INDICATORS):
|
|
has_error = True
|
|
|
|
return tool_uses, has_error
|
|
|
|
|
|
# Template blocks to strip before pattern matching (avoid false positives)
|
|
_TEMPLATE_BLOCKS = [
|
|
re.compile(r'<gate-rules>.*?</gate-rules>', re.DOTALL),
|
|
re.compile(r'## \u72b6\u6001\u673a.*?```', re.DOTALL), # state machine diagrams
|
|
]
|
|
|
|
|
|
def strip_templates(text: str) -> str:
|
|
"""Remove common template blocks that cause false positives."""
|
|
for pat in _TEMPLATE_BLOCKS:
|
|
text = pat.sub('', text)
|
|
return text
|
|
|
|
|
|
def match_signals(text: str, patterns: list) -> bool:
|
|
if not text:
|
|
return False
|
|
cleaned = strip_templates(text)
|
|
if not cleaned:
|
|
return False
|
|
for pat in patterns:
|
|
if re.search(pat, cleaned):
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_collab(text: str, tool_names: list) -> bool:
|
|
for t in COLLAB_TEXT:
|
|
if t in text:
|
|
return True
|
|
for tn in tool_names:
|
|
if tn in COLLAB_TOOLS:
|
|
return True
|
|
return False
|
|
|
|
|
|
# ── Parsed Message ───────────────────────────────────────────────────────────
|
|
|
|
class ParsedMsg:
|
|
__slots__ = ('role', 'text', 'timestamp', 'tool_names', 'has_error',
|
|
'raw_text_snippet', 'idx')
|
|
|
|
def __init__(self, role, text, timestamp, tool_names, has_error, idx):
|
|
self.role = role
|
|
self.text = text
|
|
self.timestamp = timestamp
|
|
self.tool_names = tool_names
|
|
self.has_error = has_error
|
|
self.raw_text_snippet = truncate(text)
|
|
self.idx = idx
|
|
|
|
|
|
# ── File parsers ─────────────────────────────────────────────────────────────
|
|
|
|
def parse_v3_jsonl(filepath: str) -> list:
|
|
"""Parse standard OpenClaw JSONL v3 format."""
|
|
messages = []
|
|
idx = 0
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if obj.get('type') != 'message':
|
|
continue
|
|
msg = obj.get('message', {})
|
|
role = msg.get('role', '')
|
|
if role not in ('user', 'assistant', 'toolResult'):
|
|
continue
|
|
content = msg.get('content', '')
|
|
text = extract_text_from_content(content)
|
|
tool_names, has_error = extract_tool_info(role, content)
|
|
ts = obj.get('timestamp', '')
|
|
# Skip toolResult messages from the main conversation flow
|
|
# (they're metadata, not turns)
|
|
if role == 'toolResult':
|
|
# Still track errors for trial_error pattern
|
|
if has_error:
|
|
# Attach error to previous assistant message
|
|
if messages and messages[-1].role == 'assistant':
|
|
messages[-1].has_error = True
|
|
continue
|
|
messages.append(ParsedMsg(role, text, ts, tool_names, has_error, idx))
|
|
idx += 1
|
|
except Exception as e:
|
|
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
|
|
return messages
|
|
|
|
|
|
def parse_trajectory_jsonl(filepath: str) -> list:
|
|
"""Parse OpenClaw trajectory JSONL format."""
|
|
messages = []
|
|
idx = 0
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if obj.get('traceSchema') != 'openclaw-trajectory':
|
|
continue
|
|
etype = obj.get('type', '')
|
|
data = obj.get('data', {})
|
|
ts = obj.get('ts', '')
|
|
|
|
if etype == 'user_message':
|
|
text = ''
|
|
content = data.get('content', '')
|
|
if isinstance(content, str):
|
|
text = content
|
|
elif isinstance(content, list):
|
|
text = extract_text_from_content(content)
|
|
messages.append(ParsedMsg('user', text, ts, [], False, idx))
|
|
idx += 1
|
|
|
|
elif etype == 'assistant_message':
|
|
text = ''
|
|
content = data.get('content', '')
|
|
if isinstance(content, str):
|
|
text = content
|
|
elif isinstance(content, list):
|
|
text = extract_text_from_content(content)
|
|
messages.append(ParsedMsg('assistant', text, ts, [], False, idx))
|
|
idx += 1
|
|
|
|
elif etype == 'tool_call':
|
|
tool_name = data.get('name', '')
|
|
# Attach tool info to previous assistant or create entry
|
|
if messages and messages[-1].role == 'assistant':
|
|
messages[-1].tool_names.append(tool_name)
|
|
# Don't increment idx for tool_call
|
|
|
|
elif etype == 'tool_result':
|
|
result_text = ''
|
|
rc = data.get('content', '')
|
|
if isinstance(rc, str):
|
|
result_text = rc
|
|
elif isinstance(rc, list):
|
|
for sub in rc:
|
|
if isinstance(sub, dict) and sub.get('type') == 'text':
|
|
result_text += sub.get('text', '')
|
|
has_err = any(e in result_text for e in ERROR_INDICATORS)
|
|
if has_err and messages:
|
|
messages[-1].has_error = True
|
|
|
|
except Exception as e:
|
|
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
|
|
return messages
|
|
|
|
|
|
def detect_and_parse(filepath: str) -> list:
|
|
"""Auto-detect JSONL type and parse accordingly."""
|
|
# Read first non-empty line to detect format
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
obj = json.loads(line)
|
|
if obj.get('traceSchema') == 'openclaw-trajectory':
|
|
return parse_trajectory_jsonl(filepath)
|
|
else:
|
|
return parse_v3_jsonl(filepath)
|
|
except Exception:
|
|
return []
|
|
return []
|
|
|
|
|
|
# ── Pattern scanners ─────────────────────────────────────────────────────────
|
|
|
|
def get_context(messages: list, center_idx: int, before: int = 3, after: int = 3) -> tuple:
|
|
"""Get context window around a message index (by message count, not turns)."""
|
|
context_before = []
|
|
context_after = []
|
|
|
|
# before: messages with idx < center_idx
|
|
for msg in messages:
|
|
if msg.idx < center_idx:
|
|
context_before.append(msg.raw_text_snippet)
|
|
elif msg.idx >= center_idx:
|
|
break
|
|
|
|
# after: messages with idx > center_idx
|
|
for msg in messages:
|
|
if msg.idx > center_idx:
|
|
context_after.append(msg.raw_text_snippet)
|
|
|
|
# Trim to window size
|
|
context_before = context_before[-before * 2:] # *2 because each "turn" ≈ 2 msgs
|
|
context_after = context_after[:after * 2]
|
|
|
|
return context_before, context_after
|
|
|
|
|
|
def get_tools_in_context(messages: list, center_idx: int, window: int = 6) -> list:
|
|
"""Collect tool names used near center_idx."""
|
|
tools = []
|
|
for msg in messages:
|
|
if abs(msg.idx - center_idx) <= window * 2 and msg.tool_names:
|
|
tools.extend(msg.tool_names)
|
|
return list(set(tools))
|
|
|
|
|
|
def scan_correction(messages: list) -> list:
|
|
"""Pattern ①: User correction signals."""
|
|
fragments = []
|
|
for msg in messages:
|
|
if msg.role == 'user' and match_signals(msg.text, CORRECTION_SIGNALS):
|
|
ctx_before, ctx_after = get_context(messages, msg.idx)
|
|
tools = get_tools_in_context(messages, msg.idx)
|
|
fragments.append({
|
|
'mode': 'correction',
|
|
'timestamp': msg.timestamp,
|
|
'trigger_message': msg.raw_text_snippet,
|
|
'trigger_role': 'user',
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': tools,
|
|
'summary': f"用户纠正了 Agent"
|
|
})
|
|
return fragments
|
|
|
|
|
|
def scan_trial_error(messages: list) -> list:
|
|
"""Pattern ②: 3+ consecutive errors then success."""
|
|
fragments = []
|
|
# Find sequences of tool_results with errors followed by success
|
|
error_streak = 0
|
|
streak_start_idx = -1
|
|
streak_tools = []
|
|
|
|
for i, msg in enumerate(messages):
|
|
if msg.has_error:
|
|
if error_streak == 0:
|
|
streak_start_idx = msg.idx
|
|
error_streak += 1
|
|
streak_tools.extend(msg.tool_names)
|
|
else:
|
|
if error_streak >= 3:
|
|
# Check if the successful message is an assistant message
|
|
ctx_before, ctx_after = get_context(messages, streak_start_idx)
|
|
tools = get_tools_in_context(messages, streak_start_idx, window=error_streak + 2)
|
|
trigger_msg = messages[i] if i < len(messages) else msg
|
|
fragments.append({
|
|
'mode': 'trial_error',
|
|
'timestamp': messages[streak_start_idx].timestamp if streak_start_idx < len(messages) else '',
|
|
'trigger_message': trigger_msg.raw_text_snippet,
|
|
'trigger_role': trigger_msg.role,
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': list(set(tools)),
|
|
'summary': f"Agent 经过 {error_streak} 次试错后成功"
|
|
})
|
|
error_streak = 0
|
|
streak_tools = []
|
|
|
|
return fragments
|
|
|
|
|
|
def scan_success(messages: list) -> list:
|
|
"""Pattern ③: Complex task (5+ tool_use in a session) with no correction."""
|
|
# Need at least 4 messages to be meaningful
|
|
if len(messages) < 4:
|
|
return []
|
|
|
|
# First check if the session has any corrections
|
|
has_correction = any(
|
|
m.role == 'user' and match_signals(m.text, CORRECTION_SIGNALS)
|
|
for m in messages
|
|
)
|
|
if has_correction:
|
|
return []
|
|
|
|
# Count tool_use across all messages
|
|
all_tools = []
|
|
for m in messages:
|
|
all_tools.extend(m.tool_names)
|
|
|
|
if len(all_tools) < 5:
|
|
return []
|
|
|
|
# Use the last assistant message as the completion point
|
|
last_asst = None
|
|
for m in reversed(messages):
|
|
if m.role == 'assistant':
|
|
last_asst = m
|
|
break
|
|
if not last_asst:
|
|
return []
|
|
|
|
ctx_before, ctx_after = get_context(messages, last_asst.idx)
|
|
tools = get_tools_in_context(messages, last_asst.idx)
|
|
|
|
return [{
|
|
'mode': 'success',
|
|
'timestamp': last_asst.timestamp,
|
|
'trigger_message': last_asst.raw_text_snippet,
|
|
'trigger_role': 'assistant',
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': tools,
|
|
'summary': f"复杂任务成功完成,共使用 {len(all_tools)} 次 tool_call,无用户纠正"
|
|
}]
|
|
|
|
|
|
def scan_collaboration(messages: list) -> list:
|
|
"""Pattern ④: Agent collaboration via mail/messaging."""
|
|
fragments = []
|
|
for msg in messages:
|
|
if is_collab(msg.text, msg.tool_names):
|
|
ctx_before, ctx_after = get_context(messages, msg.idx)
|
|
tools = get_tools_in_context(messages, msg.idx)
|
|
fragments.append({
|
|
'mode': 'collaboration',
|
|
'timestamp': msg.timestamp,
|
|
'trigger_message': msg.raw_text_snippet,
|
|
'trigger_role': msg.role,
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': tools,
|
|
'summary': f"{'用户' if msg.role == 'user' else 'Agent'} 触发了协作通信"
|
|
})
|
|
return fragments
|
|
|
|
|
|
def scan_decision(messages: list) -> list:
|
|
"""Pattern ⑤: User decision/hesitation signals."""
|
|
fragments = []
|
|
for msg in messages:
|
|
if msg.role == 'user' and match_signals(msg.text, DECISION_SIGNALS):
|
|
ctx_before, ctx_after = get_context(messages, msg.idx)
|
|
tools = get_tools_in_context(messages, msg.idx)
|
|
fragments.append({
|
|
'mode': 'decision',
|
|
'timestamp': msg.timestamp,
|
|
'trigger_message': msg.raw_text_snippet,
|
|
'trigger_role': 'user',
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': tools,
|
|
'summary': f"用户表达了决策犹豫或需要确认"
|
|
})
|
|
return fragments
|
|
|
|
|
|
def scan_experience(messages: list) -> list:
|
|
"""Pattern ⑥: Agent declares lessons/tips."""
|
|
fragments = []
|
|
for msg in messages:
|
|
if msg.role == 'assistant' and match_signals(msg.text, EXPERIENCE_SIGNALS):
|
|
ctx_before, ctx_after = get_context(messages, msg.idx)
|
|
tools = get_tools_in_context(messages, msg.idx)
|
|
fragments.append({
|
|
'mode': 'experience',
|
|
'timestamp': msg.timestamp,
|
|
'trigger_message': msg.raw_text_snippet,
|
|
'trigger_role': 'assistant',
|
|
'context_before': ctx_before,
|
|
'context_after': ctx_after,
|
|
'tool_calls_in_context': tools,
|
|
'summary': f"Agent 声明了经验/教训"
|
|
})
|
|
return fragments
|
|
|
|
|
|
# ── Main scanner ─────────────────────────────────────────────────────────────
|
|
|
|
MODE_NAMES = ['correction', 'trial_error', 'success', 'collaboration', 'decision', 'experience']
|
|
|
|
SCANNERS = [
|
|
scan_correction,
|
|
scan_trial_error,
|
|
scan_success,
|
|
scan_collaboration,
|
|
scan_decision,
|
|
scan_experience,
|
|
]
|
|
|
|
|
|
def scan_file(filepath: str) -> list:
|
|
"""Scan a single JSONL file for all patterns."""
|
|
messages = detect_and_parse(filepath)
|
|
if not messages:
|
|
return [], 0
|
|
|
|
all_fragments = []
|
|
for scanner in SCANNERS:
|
|
frags = scanner(messages)
|
|
all_fragments.extend(frags)
|
|
|
|
return all_fragments, len(messages)
|
|
|
|
|
|
def scan_directory(dirpath: str, limit: int = 0) -> dict:
|
|
"""Scan all .jsonl files (excluding trajectory/checkpoint) in directory."""
|
|
# Collect target files
|
|
pattern = os.path.join(dirpath, '*.jsonl')
|
|
all_files = glob.glob(pattern)
|
|
|
|
# Filter: only UUID.jsonl (main session files)
|
|
target_files = []
|
|
for f in all_files:
|
|
basename = os.path.basename(f)
|
|
# Skip trajectory and checkpoint
|
|
if '.trajectory.' in basename or '.checkpoint.' in basename:
|
|
continue
|
|
# Only main session files (UUID.jsonl)
|
|
if basename.endswith('.jsonl'):
|
|
target_files.append(f)
|
|
|
|
target_files.sort()
|
|
if limit > 0:
|
|
target_files = target_files[:limit]
|
|
|
|
total_files = len(target_files)
|
|
print(f"Found {total_files} session files to scan")
|
|
|
|
all_fragments = []
|
|
total_messages = 0
|
|
mode_counts = defaultdict(int)
|
|
frag_counter = 0
|
|
|
|
start = time.time()
|
|
for i, filepath in enumerate(target_files):
|
|
frags, msg_count = scan_file(filepath)
|
|
total_messages += msg_count
|
|
|
|
basename = os.path.basename(filepath)
|
|
for frag in frags:
|
|
frag_counter += 1
|
|
frag['id'] = f"frag_{frag_counter:04d}"
|
|
frag['source_file'] = basename
|
|
all_fragments.append(frag)
|
|
mode_counts[frag['mode']] += 1
|
|
|
|
if (i + 1) % 50 == 0 or i == total_files - 1:
|
|
elapsed = time.time() - start
|
|
rate = (i + 1) / elapsed if elapsed > 0 else 0
|
|
print(f" [{i+1}/{total_files}] {rate:.1f} files/s | "
|
|
f"{len(all_fragments)} fragments found | "
|
|
f"{msg_count} msgs in current file")
|
|
|
|
elapsed = time.time() - start
|
|
print(f"\nScan complete: {total_files} files, {total_messages} messages, "
|
|
f"{len(all_fragments)} fragments in {elapsed:.1f}s")
|
|
|
|
result = {
|
|
'scan_stats': {
|
|
'total_files': total_files,
|
|
'total_messages': total_messages,
|
|
'total_fragments': len(all_fragments),
|
|
'scan_duration_seconds': round(elapsed, 1),
|
|
'mode_counts': {m: mode_counts.get(m, 0) for m in MODE_NAMES},
|
|
},
|
|
'fragments': all_fragments,
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Scan OpenClaw JSONL for experience patterns')
|
|
parser.add_argument('--dir', default=os.path.expanduser(
|
|
'~/.openclaw/agents/pangtong-fujunshi/sessions/'),
|
|
help='Directory containing JSONL files')
|
|
parser.add_argument('--limit', type=int, default=0,
|
|
help='Limit number of files to scan (0 = all)')
|
|
parser.add_argument('--output', default=os.path.expanduser(
|
|
'~/.openclaw/sanguo_projects/sanguo_moziplus_v2/docs/research/distill-scan-pangtong-result.json'),
|
|
help='Output JSON path')
|
|
args = parser.parse_args()
|
|
|
|
print(f"Scanning directory: {args.dir}")
|
|
if args.limit:
|
|
print(f"Limit: {args.limit} files (test mode)")
|
|
print(f"Output: {args.output}")
|
|
print()
|
|
|
|
result = scan_directory(args.dir, limit=args.limit)
|
|
|
|
# Write output
|
|
os.makedirs(os.path.dirname(args.output), exist_ok=True)
|
|
with open(args.output, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\nResults written to: {args.output}")
|
|
print(f" Total fragments: {result['scan_stats']['total_fragments']}")
|
|
for mode, count in result['scan_stats']['mode_counts'].items():
|
|
print(f" {mode}: {count}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|