auto-sync: 2026-05-26 23:25:52
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -0,0 +1,578 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Experience Pattern Scanner for OpenClaw Agent JSONL files.
|
||||
|
||||
Scans 6 experience patterns from conversation history:
|
||||
1. correction - User corrects agent
|
||||
2. trial_error - Repeated failures then success
|
||||
3. success - Complex task (5+ tool_use) with no correction
|
||||
4. collaboration- Mail/messaging between agents
|
||||
5. decision - User hesitation/confirmation requests
|
||||
6. experience - Agent declares lessons/tips
|
||||
|
||||
Output: JSON fragment index with context windows.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import glob
|
||||
import argparse
|
||||
import time
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
|
||||
# ── Signal patterns ──────────────────────────────────────────────────────────
|
||||
|
||||
CORRECTION_SIGNALS = [
|
||||
r'不对', r'错了', r'不要', r'停(?!止|顿|留|靠|滞|车|泊|放|歇)',
|
||||
r'我说的不是', r'不是这个意思', r'重新来', r'别这样', r'不要这样',
|
||||
r'你再看看', r'\bno\b', r'\bwrong\b', r'\bstop\b', r'\bnot what I meant\b',
|
||||
]
|
||||
|
||||
DECISION_SIGNALS = [
|
||||
r'先不要', r'方案', r'等等', r'确认', r'等等看', r'再说',
|
||||
r'先看看', r'确认一下', r'你是说', r'你的意思是',
|
||||
]
|
||||
|
||||
EXPERIENCE_SIGNALS = [
|
||||
r'以后应该', r'这个方法好', r'经验是', r'教训是',
|
||||
r'下次注意', r'注意要', r'记住', r'一定要', r'千万不要', r'重要提示',
|
||||
]
|
||||
|
||||
COLLAB_TOOLS = {'sanguo_mail', 'send_message', 'sessions_send'}
|
||||
COLLAB_TEXT = ['send_message', 'sanguo_mail', 'Sanguo Mail']
|
||||
|
||||
ERROR_INDICATORS = ['error', 'Error', 'ERROR', 'failed', 'Failed', 'FAILED',
|
||||
'exception', 'Exception', 'Traceback', '不存在', '失败']
|
||||
|
||||
MAX_TEXT_SNIPPET = 500
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def truncate(text: str, limit: int = MAX_TEXT_SNIPPET) -> str:
|
||||
if not text:
|
||||
return ""
|
||||
text = text.strip().replace('\n', ' ')
|
||||
if len(text) > limit:
|
||||
return text[:limit] + "..."
|
||||
return text
|
||||
|
||||
|
||||
def extract_text_from_content(content) -> str:
|
||||
"""Extract plain text from message content (list of blocks or string)."""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if not isinstance(content, list):
|
||||
return ""
|
||||
parts = []
|
||||
for block in content:
|
||||
if isinstance(block, dict):
|
||||
if block.get('type') == 'text':
|
||||
parts.append(block.get('text', ''))
|
||||
return ' '.join(parts)
|
||||
|
||||
|
||||
def extract_tool_info(content) -> tuple:
|
||||
"""Extract tool names and whether there are tool_results with errors."""
|
||||
if not isinstance(content, list):
|
||||
return [], False
|
||||
tool_uses = []
|
||||
has_error = False
|
||||
for block in content:
|
||||
if not isinstance(block, dict):
|
||||
continue
|
||||
btype = block.get('type', '')
|
||||
if btype == 'tool_use':
|
||||
tool_uses.append(block.get('name', ''))
|
||||
elif btype == 'tool_result':
|
||||
result_text = ''
|
||||
rc = block.get('content', '')
|
||||
if isinstance(rc, str):
|
||||
result_text = rc
|
||||
elif isinstance(rc, list):
|
||||
for sub in rc:
|
||||
if isinstance(sub, dict) and sub.get('type') == 'text':
|
||||
result_text += sub.get('text', '')
|
||||
if any(e in result_text for e in ERROR_INDICATORS):
|
||||
has_error = True
|
||||
return tool_uses, has_error
|
||||
|
||||
|
||||
def match_signals(text: str, patterns: list) -> bool:
|
||||
if not text:
|
||||
return False
|
||||
for pat in patterns:
|
||||
if re.search(pat, text):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_collab(text: str, tool_names: list) -> bool:
|
||||
for t in COLLAB_TEXT:
|
||||
if t in text:
|
||||
return True
|
||||
for tn in tool_names:
|
||||
if tn in COLLAB_TOOLS:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ── Parsed Message ───────────────────────────────────────────────────────────
|
||||
|
||||
class ParsedMsg:
|
||||
__slots__ = ('role', 'text', 'timestamp', 'tool_names', 'has_error',
|
||||
'raw_text_snippet', 'idx')
|
||||
|
||||
def __init__(self, role, text, timestamp, tool_names, has_error, idx):
|
||||
self.role = role
|
||||
self.text = text
|
||||
self.timestamp = timestamp
|
||||
self.tool_names = tool_names
|
||||
self.has_error = has_error
|
||||
self.raw_text_snippet = truncate(text)
|
||||
self.idx = idx
|
||||
|
||||
|
||||
# ── File parsers ─────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_v3_jsonl(filepath: str) -> list:
|
||||
"""Parse standard OpenClaw JSONL v3 format."""
|
||||
messages = []
|
||||
idx = 0
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if obj.get('type') != 'message':
|
||||
continue
|
||||
msg = obj.get('message', {})
|
||||
role = msg.get('role', '')
|
||||
if role not in ('user', 'assistant'):
|
||||
continue
|
||||
content = msg.get('content', '')
|
||||
text = extract_text_from_content(content)
|
||||
tool_names, has_error = extract_tool_info(content)
|
||||
ts = obj.get('timestamp', '')
|
||||
messages.append(ParsedMsg(role, text, ts, tool_names, has_error, idx))
|
||||
idx += 1
|
||||
except Exception as e:
|
||||
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
|
||||
return messages
|
||||
|
||||
|
||||
def parse_trajectory_jsonl(filepath: str) -> list:
|
||||
"""Parse OpenClaw trajectory JSONL format."""
|
||||
messages = []
|
||||
idx = 0
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if obj.get('traceSchema') != 'openclaw-trajectory':
|
||||
continue
|
||||
etype = obj.get('type', '')
|
||||
data = obj.get('data', {})
|
||||
ts = obj.get('ts', '')
|
||||
|
||||
if etype == 'user_message':
|
||||
text = ''
|
||||
content = data.get('content', '')
|
||||
if isinstance(content, str):
|
||||
text = content
|
||||
elif isinstance(content, list):
|
||||
text = extract_text_from_content(content)
|
||||
messages.append(ParsedMsg('user', text, ts, [], False, idx))
|
||||
idx += 1
|
||||
|
||||
elif etype == 'assistant_message':
|
||||
text = ''
|
||||
content = data.get('content', '')
|
||||
if isinstance(content, str):
|
||||
text = content
|
||||
elif isinstance(content, list):
|
||||
text = extract_text_from_content(content)
|
||||
messages.append(ParsedMsg('assistant', text, ts, [], False, idx))
|
||||
idx += 1
|
||||
|
||||
elif etype == 'tool_call':
|
||||
tool_name = data.get('name', '')
|
||||
# Attach tool info to previous assistant or create entry
|
||||
if messages and messages[-1].role == 'assistant':
|
||||
messages[-1].tool_names.append(tool_name)
|
||||
# Don't increment idx for tool_call
|
||||
|
||||
elif etype == 'tool_result':
|
||||
result_text = ''
|
||||
rc = data.get('content', '')
|
||||
if isinstance(rc, str):
|
||||
result_text = rc
|
||||
elif isinstance(rc, list):
|
||||
for sub in rc:
|
||||
if isinstance(sub, dict) and sub.get('type') == 'text':
|
||||
result_text += sub.get('text', '')
|
||||
has_err = any(e in result_text for e in ERROR_INDICATORS)
|
||||
if has_err and messages:
|
||||
messages[-1].has_error = True
|
||||
|
||||
except Exception as e:
|
||||
print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr)
|
||||
return messages
|
||||
|
||||
|
||||
def detect_and_parse(filepath: str) -> list:
|
||||
"""Auto-detect JSONL type and parse accordingly."""
|
||||
# Read first non-empty line to detect format
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
obj = json.loads(line)
|
||||
if obj.get('traceSchema') == 'openclaw-trajectory':
|
||||
return parse_trajectory_jsonl(filepath)
|
||||
else:
|
||||
return parse_v3_jsonl(filepath)
|
||||
except Exception:
|
||||
return []
|
||||
return []
|
||||
|
||||
|
||||
# ── Pattern scanners ─────────────────────────────────────────────────────────
|
||||
|
||||
def get_context(messages: list, center_idx: int, before: int = 3, after: int = 3) -> tuple:
|
||||
"""Get context window around a message index (by message count, not turns)."""
|
||||
context_before = []
|
||||
context_after = []
|
||||
|
||||
# before: messages with idx < center_idx
|
||||
for msg in messages:
|
||||
if msg.idx < center_idx:
|
||||
context_before.append(msg.raw_text_snippet)
|
||||
elif msg.idx >= center_idx:
|
||||
break
|
||||
|
||||
# after: messages with idx > center_idx
|
||||
for msg in messages:
|
||||
if msg.idx > center_idx:
|
||||
context_after.append(msg.raw_text_snippet)
|
||||
|
||||
# Trim to window size
|
||||
context_before = context_before[-before * 2:] # *2 because each "turn" ≈ 2 msgs
|
||||
context_after = context_after[:after * 2]
|
||||
|
||||
return context_before, context_after
|
||||
|
||||
|
||||
def get_tools_in_context(messages: list, center_idx: int, window: int = 6) -> list:
|
||||
"""Collect tool names used near center_idx."""
|
||||
tools = []
|
||||
for msg in messages:
|
||||
if abs(msg.idx - center_idx) <= window * 2 and msg.tool_names:
|
||||
tools.extend(msg.tool_names)
|
||||
return list(set(tools))
|
||||
|
||||
|
||||
def scan_correction(messages: list) -> list:
|
||||
"""Pattern ①: User correction signals."""
|
||||
fragments = []
|
||||
for msg in messages:
|
||||
if msg.role == 'user' and match_signals(msg.text, CORRECTION_SIGNALS):
|
||||
ctx_before, ctx_after = get_context(messages, msg.idx)
|
||||
tools = get_tools_in_context(messages, msg.idx)
|
||||
fragments.append({
|
||||
'mode': 'correction',
|
||||
'timestamp': msg.timestamp,
|
||||
'trigger_message': msg.raw_text_snippet,
|
||||
'trigger_role': 'user',
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': tools,
|
||||
'summary': f"用户纠正了 Agent"
|
||||
})
|
||||
return fragments
|
||||
|
||||
|
||||
def scan_trial_error(messages: list) -> list:
|
||||
"""Pattern ②: 3+ consecutive errors then success."""
|
||||
fragments = []
|
||||
# Find sequences of tool_results with errors followed by success
|
||||
error_streak = 0
|
||||
streak_start_idx = -1
|
||||
streak_tools = []
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.has_error:
|
||||
if error_streak == 0:
|
||||
streak_start_idx = msg.idx
|
||||
error_streak += 1
|
||||
streak_tools.extend(msg.tool_names)
|
||||
else:
|
||||
if error_streak >= 3:
|
||||
# Check if the successful message is an assistant message
|
||||
ctx_before, ctx_after = get_context(messages, streak_start_idx)
|
||||
tools = get_tools_in_context(messages, streak_start_idx, window=error_streak + 2)
|
||||
trigger_msg = messages[i] if i < len(messages) else msg
|
||||
fragments.append({
|
||||
'mode': 'trial_error',
|
||||
'timestamp': messages[streak_start_idx].timestamp if streak_start_idx < len(messages) else '',
|
||||
'trigger_message': trigger_msg.raw_text_snippet,
|
||||
'trigger_role': trigger_msg.role,
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': list(set(tools)),
|
||||
'summary': f"Agent 经过 {error_streak} 次试错后成功"
|
||||
})
|
||||
error_streak = 0
|
||||
streak_tools = []
|
||||
|
||||
return fragments
|
||||
|
||||
|
||||
def scan_success(messages: list) -> list:
|
||||
"""Pattern ③: Complex task (5+ tool_use in a session) with no correction."""
|
||||
# First check if the session has any corrections
|
||||
has_correction = any(
|
||||
m.role == 'user' and match_signals(m.text, CORRECTION_SIGNALS)
|
||||
for m in messages
|
||||
)
|
||||
if has_correction:
|
||||
return []
|
||||
|
||||
# Count tool_use across all messages
|
||||
all_tools = []
|
||||
for m in messages:
|
||||
all_tools.extend(m.tool_names)
|
||||
|
||||
if len(all_tools) < 5:
|
||||
return []
|
||||
|
||||
# Find the richest assistant message (most tool_use)
|
||||
best_msg = None
|
||||
best_count = 0
|
||||
for m in messages:
|
||||
if m.role == 'assistant' and len(m.tool_names) > best_count:
|
||||
best_count = len(m.tool_names)
|
||||
best_msg = m
|
||||
|
||||
if not best_msg:
|
||||
return []
|
||||
|
||||
ctx_before, ctx_after = get_context(messages, best_msg.idx)
|
||||
tools = get_tools_in_context(messages, best_msg.idx)
|
||||
|
||||
return [{
|
||||
'mode': 'success',
|
||||
'timestamp': best_msg.timestamp,
|
||||
'trigger_message': best_msg.raw_text_snippet,
|
||||
'trigger_role': 'assistant',
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': tools,
|
||||
'summary': f"复杂任务成功完成,共使用 {len(all_tools)} 次 tool_call"
|
||||
}]
|
||||
|
||||
|
||||
def scan_collaboration(messages: list) -> list:
|
||||
"""Pattern ④: Agent collaboration via mail/messaging."""
|
||||
fragments = []
|
||||
for msg in messages:
|
||||
if is_collab(msg.text, msg.tool_names):
|
||||
ctx_before, ctx_after = get_context(messages, msg.idx)
|
||||
tools = get_tools_in_context(messages, msg.idx)
|
||||
fragments.append({
|
||||
'mode': 'collaboration',
|
||||
'timestamp': msg.timestamp,
|
||||
'trigger_message': msg.raw_text_snippet,
|
||||
'trigger_role': msg.role,
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': tools,
|
||||
'summary': f"{'用户' if msg.role == 'user' else 'Agent'} 触发了协作通信"
|
||||
})
|
||||
return fragments
|
||||
|
||||
|
||||
def scan_decision(messages: list) -> list:
|
||||
"""Pattern ⑤: User decision/hesitation signals."""
|
||||
fragments = []
|
||||
for msg in messages:
|
||||
if msg.role == 'user' and match_signals(msg.text, DECISION_SIGNALS):
|
||||
ctx_before, ctx_after = get_context(messages, msg.idx)
|
||||
tools = get_tools_in_context(messages, msg.idx)
|
||||
fragments.append({
|
||||
'mode': 'decision',
|
||||
'timestamp': msg.timestamp,
|
||||
'trigger_message': msg.raw_text_snippet,
|
||||
'trigger_role': 'user',
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': tools,
|
||||
'summary': f"用户表达了决策犹豫或需要确认"
|
||||
})
|
||||
return fragments
|
||||
|
||||
|
||||
def scan_experience(messages: list) -> list:
|
||||
"""Pattern ⑥: Agent declares lessons/tips."""
|
||||
fragments = []
|
||||
for msg in messages:
|
||||
if msg.role == 'assistant' and match_signals(msg.text, EXPERIENCE_SIGNALS):
|
||||
ctx_before, ctx_after = get_context(messages, msg.idx)
|
||||
tools = get_tools_in_context(messages, msg.idx)
|
||||
fragments.append({
|
||||
'mode': 'experience',
|
||||
'timestamp': msg.timestamp,
|
||||
'trigger_message': msg.raw_text_snippet,
|
||||
'trigger_role': 'assistant',
|
||||
'context_before': ctx_before,
|
||||
'context_after': ctx_after,
|
||||
'tool_calls_in_context': tools,
|
||||
'summary': f"Agent 声明了经验/教训"
|
||||
})
|
||||
return fragments
|
||||
|
||||
|
||||
# ── Main scanner ─────────────────────────────────────────────────────────────
|
||||
|
||||
MODE_NAMES = ['correction', 'trial_error', 'success', 'collaboration', 'decision', 'experience']
|
||||
|
||||
SCANNERS = [
|
||||
scan_correction,
|
||||
scan_trial_error,
|
||||
scan_success,
|
||||
scan_collaboration,
|
||||
scan_decision,
|
||||
scan_experience,
|
||||
]
|
||||
|
||||
|
||||
def scan_file(filepath: str) -> list:
|
||||
"""Scan a single JSONL file for all patterns."""
|
||||
messages = detect_and_parse(filepath)
|
||||
if not messages:
|
||||
return [], 0
|
||||
|
||||
all_fragments = []
|
||||
for scanner in SCANNERS:
|
||||
frags = scanner(messages)
|
||||
all_fragments.extend(frags)
|
||||
|
||||
return all_fragments, len(messages)
|
||||
|
||||
|
||||
def scan_directory(dirpath: str, limit: int = 0) -> dict:
|
||||
"""Scan all .jsonl files (excluding trajectory/checkpoint) in directory."""
|
||||
# Collect target files
|
||||
pattern = os.path.join(dirpath, '*.jsonl')
|
||||
all_files = glob.glob(pattern)
|
||||
|
||||
# Filter: only UUID.jsonl (main session files)
|
||||
target_files = []
|
||||
for f in all_files:
|
||||
basename = os.path.basename(f)
|
||||
# Skip trajectory and checkpoint
|
||||
if '.trajectory.' in basename or '.checkpoint.' in basename:
|
||||
continue
|
||||
# Only main session files (UUID.jsonl)
|
||||
if basename.endswith('.jsonl'):
|
||||
target_files.append(f)
|
||||
|
||||
target_files.sort()
|
||||
if limit > 0:
|
||||
target_files = target_files[:limit]
|
||||
|
||||
total_files = len(target_files)
|
||||
print(f"Found {total_files} session files to scan")
|
||||
|
||||
all_fragments = []
|
||||
total_messages = 0
|
||||
mode_counts = defaultdict(int)
|
||||
frag_counter = 0
|
||||
|
||||
start = time.time()
|
||||
for i, filepath in enumerate(target_files):
|
||||
frags, msg_count = scan_file(filepath)
|
||||
total_messages += msg_count
|
||||
|
||||
basename = os.path.basename(filepath)
|
||||
for frag in frags:
|
||||
frag_counter += 1
|
||||
frag['id'] = f"frag_{frag_counter:04d}"
|
||||
frag['source_file'] = basename
|
||||
all_fragments.append(frag)
|
||||
mode_counts[frag['mode']] += 1
|
||||
|
||||
if (i + 1) % 50 == 0 or i == total_files - 1:
|
||||
elapsed = time.time() - start
|
||||
rate = (i + 1) / elapsed if elapsed > 0 else 0
|
||||
print(f" [{i+1}/{total_files}] {rate:.1f} files/s | "
|
||||
f"{len(all_fragments)} fragments found | "
|
||||
f"{msg_count} msgs in current file")
|
||||
|
||||
elapsed = time.time() - start
|
||||
print(f"\nScan complete: {total_files} files, {total_messages} messages, "
|
||||
f"{len(all_fragments)} fragments in {elapsed:.1f}s")
|
||||
|
||||
result = {
|
||||
'scan_stats': {
|
||||
'total_files': total_files,
|
||||
'total_messages': total_messages,
|
||||
'total_fragments': len(all_fragments),
|
||||
'scan_duration_seconds': round(elapsed, 1),
|
||||
'mode_counts': {m: mode_counts.get(m, 0) for m in MODE_NAMES},
|
||||
},
|
||||
'fragments': all_fragments,
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Scan OpenClaw JSONL for experience patterns')
|
||||
parser.add_argument('--dir', default=os.path.expanduser(
|
||||
'~/.openclaw/agents/pangtong-fujunshi/sessions/'),
|
||||
help='Directory containing JSONL files')
|
||||
parser.add_argument('--limit', type=int, default=0,
|
||||
help='Limit number of files to scan (0 = all)')
|
||||
parser.add_argument('--output', default=os.path.expanduser(
|
||||
'~/.openclaw/sanguo_projects/sanguo_moziplus_v2/docs/research/distill-scan-pangtong-result.json'),
|
||||
help='Output JSON path')
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Scanning directory: {args.dir}")
|
||||
if args.limit:
|
||||
print(f"Limit: {args.limit} files (test mode)")
|
||||
print(f"Output: {args.output}")
|
||||
print()
|
||||
|
||||
result = scan_directory(args.dir, limit=args.limit)
|
||||
|
||||
# Write output
|
||||
os.makedirs(os.path.dirname(args.output), exist_ok=True)
|
||||
with open(args.output, 'w', encoding='utf-8') as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"\nResults written to: {args.output}")
|
||||
print(f" Total fragments: {result['scan_stats']['total_fragments']}")
|
||||
for mode, count in result['scan_stats']['mode_counts'].items():
|
||||
print(f" {mode}: {count}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user