From 80addaf4ec4a0210a606d8d087253168be33ace5 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 23:31:56 +0800 Subject: [PATCH] auto-sync: 2026-05-26 23:31:56 --- docs/research/distill-scan-simayi.py | 613 +++++++++++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 docs/research/distill-scan-simayi.py diff --git a/docs/research/distill-scan-simayi.py b/docs/research/distill-scan-simayi.py new file mode 100644 index 0000000..1f33dab --- /dev/null +++ b/docs/research/distill-scan-simayi.py @@ -0,0 +1,613 @@ +#!/usr/bin/env python3 +""" +Experience Pattern Scanner for OpenClaw Agent JSONL files. + +Scans 6 experience patterns from conversation history: + 1. correction - User corrects agent + 2. trial_error - Repeated failures then success + 3. success - Complex task (5+ tool_use) with no correction + 4. collaboration- Mail/messaging between agents + 5. decision - User hesitation/confirmation requests + 6. experience - Agent declares lessons/tips + +Output: JSON fragment index with context windows. +""" + +import json +import os +import re +import sys +import glob +import argparse +import time +from pathlib import Path +from collections import defaultdict + +# ── Signal patterns ────────────────────────────────────────────────────────── + +CORRECTION_SIGNALS = [ + r'不对', r'错了', r'不要', r'停(?!止|顿|留|靠|滞|车|泊|放|歇)', + r'我说的不是', r'不是这个意思', r'重新来', r'别这样', r'不要这样', + r'你再看看', r'\bno\b', r'\bwrong\b', r'\bstop\b', r'\bnot what I meant\b', +] + +DECISION_SIGNALS = [ + r'先不要', r'方案', r'等等', r'确认', r'等等看', r'再说', + r'先看看', r'确认一下', r'你是说', r'你的意思是', +] + +EXPERIENCE_SIGNALS = [ + r'以后应该', r'这个方法好', r'经验是', r'教训是', + r'下次注意', r'注意要', r'记住', r'一定要', r'千万不要', r'重要提示', +] + +COLLAB_TOOLS = {'sanguo_mail', 'send_message', 'sessions_send'} +COLLAB_TEXT = ['send_message', 'sanguo_mail', 'Sanguo Mail'] + +ERROR_INDICATORS = ['error', 'Error', 'ERROR', 'failed', 'Failed', 'FAILED', + 'exception', 'Exception', 'Traceback', '不存在', '失败'] + +MAX_TEXT_SNIPPET = 500 + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def truncate(text: str, limit: int = MAX_TEXT_SNIPPET) -> str: + if not text: + return "" + text = text.strip().replace('\n', ' ') + if len(text) > limit: + return text[:limit] + "..." + return text + + +def extract_text_from_content(content) -> str: + """Extract plain text from message content (list of blocks or string).""" + if isinstance(content, str): + return content + if not isinstance(content, list): + return "" + parts = [] + for block in content: + if isinstance(block, dict): + if block.get('type') == 'text': + parts.append(block.get('text', '')) + return ' '.join(parts) + + +def extract_tool_info(role: str, content) -> tuple: + """Extract tool names and whether there are tool_results with errors. + + v3 format: toolCall blocks in assistant messages, toolResult as separate role. + """ + tool_uses = [] + has_error = False + + # Extract tool calls from assistant messages + if isinstance(content, list): + for block in content: + if not isinstance(block, dict): + continue + btype = block.get('type', '') + # v3 uses 'toolCall', some formats use 'tool_use' + if btype in ('toolCall', 'tool_use'): + tool_uses.append(block.get('name', '')) + + # Check for errors in toolResult role messages + if role == 'toolResult': + result_text = '' + if isinstance(content, str): + result_text = content + elif isinstance(content, list): + for sub in content: + if isinstance(sub, dict) and sub.get('type') == 'text': + result_text += sub.get('text', '') + if any(e in result_text for e in ERROR_INDICATORS): + has_error = True + + return tool_uses, has_error + + +# Template blocks to strip before pattern matching (avoid false positives) +_TEMPLATE_BLOCKS = [ + re.compile(r'.*?', re.DOTALL), + re.compile(r'## \u72b6\u6001\u673a.*?```', re.DOTALL), # state machine diagrams +] + + +def strip_templates(text: str) -> str: + """Remove common template blocks that cause false positives.""" + for pat in _TEMPLATE_BLOCKS: + text = pat.sub('', text) + return text + + +def match_signals(text: str, patterns: list) -> bool: + if not text: + return False + cleaned = strip_templates(text) + if not cleaned: + return False + for pat in patterns: + if re.search(pat, cleaned): + return True + return False + + +def is_collab(text: str, tool_names: list) -> bool: + for t in COLLAB_TEXT: + if t in text: + return True + for tn in tool_names: + if tn in COLLAB_TOOLS: + return True + return False + + +# ── Parsed Message ─────────────────────────────────────────────────────────── + +class ParsedMsg: + __slots__ = ('role', 'text', 'timestamp', 'tool_names', 'has_error', + 'raw_text_snippet', 'idx') + + def __init__(self, role, text, timestamp, tool_names, has_error, idx): + self.role = role + self.text = text + self.timestamp = timestamp + self.tool_names = tool_names + self.has_error = has_error + self.raw_text_snippet = truncate(text) + self.idx = idx + + +# ── File parsers ───────────────────────────────────────────────────────────── + +def parse_v3_jsonl(filepath: str) -> list: + """Parse standard OpenClaw JSONL v3 format.""" + messages = [] + idx = 0 + try: + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if obj.get('type') != 'message': + continue + msg = obj.get('message', {}) + role = msg.get('role', '') + if role not in ('user', 'assistant', 'toolResult'): + continue + content = msg.get('content', '') + text = extract_text_from_content(content) + tool_names, has_error = extract_tool_info(role, content) + ts = obj.get('timestamp', '') + # Skip toolResult messages from the main conversation flow + # (they're metadata, not turns) + if role == 'toolResult': + # Still track errors for trial_error pattern + if has_error: + # Attach error to previous assistant message + if messages and messages[-1].role == 'assistant': + messages[-1].has_error = True + continue + messages.append(ParsedMsg(role, text, ts, tool_names, has_error, idx)) + idx += 1 + except Exception as e: + print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr) + return messages + + +def parse_trajectory_jsonl(filepath: str) -> list: + """Parse OpenClaw trajectory JSONL format.""" + messages = [] + idx = 0 + try: + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if obj.get('traceSchema') != 'openclaw-trajectory': + continue + etype = obj.get('type', '') + data = obj.get('data', {}) + ts = obj.get('ts', '') + + if etype == 'user_message': + text = '' + content = data.get('content', '') + if isinstance(content, str): + text = content + elif isinstance(content, list): + text = extract_text_from_content(content) + messages.append(ParsedMsg('user', text, ts, [], False, idx)) + idx += 1 + + elif etype == 'assistant_message': + text = '' + content = data.get('content', '') + if isinstance(content, str): + text = content + elif isinstance(content, list): + text = extract_text_from_content(content) + messages.append(ParsedMsg('assistant', text, ts, [], False, idx)) + idx += 1 + + elif etype == 'tool_call': + tool_name = data.get('name', '') + # Attach tool info to previous assistant or create entry + if messages and messages[-1].role == 'assistant': + messages[-1].tool_names.append(tool_name) + # Don't increment idx for tool_call + + elif etype == 'tool_result': + result_text = '' + rc = data.get('content', '') + if isinstance(rc, str): + result_text = rc + elif isinstance(rc, list): + for sub in rc: + if isinstance(sub, dict) and sub.get('type') == 'text': + result_text += sub.get('text', '') + has_err = any(e in result_text for e in ERROR_INDICATORS) + if has_err and messages: + messages[-1].has_error = True + + except Exception as e: + print(f" [WARN] Error reading {filepath}: {e}", file=sys.stderr) + return messages + + +def detect_and_parse(filepath: str) -> list: + """Auto-detect JSONL type and parse accordingly.""" + # Read first non-empty line to detect format + try: + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + for line in f: + line = line.strip() + if not line: + continue + obj = json.loads(line) + if obj.get('traceSchema') == 'openclaw-trajectory': + return parse_trajectory_jsonl(filepath) + else: + return parse_v3_jsonl(filepath) + except Exception: + return [] + return [] + + +# ── Pattern scanners ───────────────────────────────────────────────────────── + +def get_context(messages: list, center_idx: int, before: int = 3, after: int = 3) -> tuple: + """Get context window around a message index (by message count, not turns).""" + context_before = [] + context_after = [] + + # before: messages with idx < center_idx + for msg in messages: + if msg.idx < center_idx: + context_before.append(msg.raw_text_snippet) + elif msg.idx >= center_idx: + break + + # after: messages with idx > center_idx + for msg in messages: + if msg.idx > center_idx: + context_after.append(msg.raw_text_snippet) + + # Trim to window size + context_before = context_before[-before * 2:] # *2 because each "turn" ≈ 2 msgs + context_after = context_after[:after * 2] + + return context_before, context_after + + +def get_tools_in_context(messages: list, center_idx: int, window: int = 6) -> list: + """Collect tool names used near center_idx.""" + tools = [] + for msg in messages: + if abs(msg.idx - center_idx) <= window * 2 and msg.tool_names: + tools.extend(msg.tool_names) + return list(set(tools)) + + +def scan_correction(messages: list) -> list: + """Pattern ①: User correction signals.""" + fragments = [] + for msg in messages: + if msg.role == 'user' and match_signals(msg.text, CORRECTION_SIGNALS): + ctx_before, ctx_after = get_context(messages, msg.idx) + tools = get_tools_in_context(messages, msg.idx) + fragments.append({ + 'mode': 'correction', + 'timestamp': msg.timestamp, + 'trigger_message': msg.raw_text_snippet, + 'trigger_role': 'user', + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': tools, + 'summary': f"用户纠正了 Agent" + }) + return fragments + + +def scan_trial_error(messages: list) -> list: + """Pattern ②: 3+ consecutive errors then success.""" + fragments = [] + # Find sequences of tool_results with errors followed by success + error_streak = 0 + streak_start_idx = -1 + streak_tools = [] + + for i, msg in enumerate(messages): + if msg.has_error: + if error_streak == 0: + streak_start_idx = msg.idx + error_streak += 1 + streak_tools.extend(msg.tool_names) + else: + if error_streak >= 3: + # Check if the successful message is an assistant message + ctx_before, ctx_after = get_context(messages, streak_start_idx) + tools = get_tools_in_context(messages, streak_start_idx, window=error_streak + 2) + trigger_msg = messages[i] if i < len(messages) else msg + fragments.append({ + 'mode': 'trial_error', + 'timestamp': messages[streak_start_idx].timestamp if streak_start_idx < len(messages) else '', + 'trigger_message': trigger_msg.raw_text_snippet, + 'trigger_role': trigger_msg.role, + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': list(set(tools)), + 'summary': f"Agent 经过 {error_streak} 次试错后成功" + }) + error_streak = 0 + streak_tools = [] + + return fragments + + +def scan_success(messages: list) -> list: + """Pattern ③: Complex task (5+ tool_use in a session) with no correction.""" + # Need at least 4 messages to be meaningful + if len(messages) < 4: + return [] + + # First check if the session has any corrections + has_correction = any( + m.role == 'user' and match_signals(m.text, CORRECTION_SIGNALS) + for m in messages + ) + if has_correction: + return [] + + # Count tool_use across all messages + all_tools = [] + for m in messages: + all_tools.extend(m.tool_names) + + if len(all_tools) < 5: + return [] + + # Use the last assistant message as the completion point + last_asst = None + for m in reversed(messages): + if m.role == 'assistant': + last_asst = m + break + if not last_asst: + return [] + + ctx_before, ctx_after = get_context(messages, last_asst.idx) + tools = get_tools_in_context(messages, last_asst.idx) + + return [{ + 'mode': 'success', + 'timestamp': last_asst.timestamp, + 'trigger_message': last_asst.raw_text_snippet, + 'trigger_role': 'assistant', + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': tools, + 'summary': f"复杂任务成功完成,共使用 {len(all_tools)} 次 tool_call,无用户纠正" + }] + + +def scan_collaboration(messages: list) -> list: + """Pattern ④: Agent collaboration via mail/messaging.""" + fragments = [] + for msg in messages: + if is_collab(msg.text, msg.tool_names): + ctx_before, ctx_after = get_context(messages, msg.idx) + tools = get_tools_in_context(messages, msg.idx) + fragments.append({ + 'mode': 'collaboration', + 'timestamp': msg.timestamp, + 'trigger_message': msg.raw_text_snippet, + 'trigger_role': msg.role, + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': tools, + 'summary': f"{'用户' if msg.role == 'user' else 'Agent'} 触发了协作通信" + }) + return fragments + + +def scan_decision(messages: list) -> list: + """Pattern ⑤: User decision/hesitation signals.""" + fragments = [] + for msg in messages: + if msg.role == 'user' and match_signals(msg.text, DECISION_SIGNALS): + ctx_before, ctx_after = get_context(messages, msg.idx) + tools = get_tools_in_context(messages, msg.idx) + fragments.append({ + 'mode': 'decision', + 'timestamp': msg.timestamp, + 'trigger_message': msg.raw_text_snippet, + 'trigger_role': 'user', + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': tools, + 'summary': f"用户表达了决策犹豫或需要确认" + }) + return fragments + + +def scan_experience(messages: list) -> list: + """Pattern ⑥: Agent declares lessons/tips.""" + fragments = [] + for msg in messages: + if msg.role == 'assistant' and match_signals(msg.text, EXPERIENCE_SIGNALS): + ctx_before, ctx_after = get_context(messages, msg.idx) + tools = get_tools_in_context(messages, msg.idx) + fragments.append({ + 'mode': 'experience', + 'timestamp': msg.timestamp, + 'trigger_message': msg.raw_text_snippet, + 'trigger_role': 'assistant', + 'context_before': ctx_before, + 'context_after': ctx_after, + 'tool_calls_in_context': tools, + 'summary': f"Agent 声明了经验/教训" + }) + return fragments + + +# ── Main scanner ───────────────────────────────────────────────────────────── + +MODE_NAMES = ['correction', 'trial_error', 'success', 'collaboration', 'decision', 'experience'] + +SCANNERS = [ + scan_correction, + scan_trial_error, + scan_success, + scan_collaboration, + scan_decision, + scan_experience, +] + + +def scan_file(filepath: str) -> list: + """Scan a single JSONL file for all patterns.""" + messages = detect_and_parse(filepath) + if not messages: + return [], 0 + + all_fragments = [] + for scanner in SCANNERS: + frags = scanner(messages) + all_fragments.extend(frags) + + return all_fragments, len(messages) + + +def scan_directory(dirpath: str, limit: int = 0) -> dict: + """Scan all .jsonl files (excluding trajectory/checkpoint) in directory.""" + # Collect target files + pattern = os.path.join(dirpath, '*.jsonl') + all_files = glob.glob(pattern) + + # Filter: only UUID.jsonl (main session files) + target_files = [] + for f in all_files: + basename = os.path.basename(f) + # Skip trajectory and checkpoint + if '.trajectory.' in basename or '.checkpoint.' in basename: + continue + # Only main session files (UUID.jsonl) + if basename.endswith('.jsonl'): + target_files.append(f) + + target_files.sort() + if limit > 0: + target_files = target_files[:limit] + + total_files = len(target_files) + print(f"Found {total_files} session files to scan") + + all_fragments = [] + total_messages = 0 + mode_counts = defaultdict(int) + frag_counter = 0 + + start = time.time() + for i, filepath in enumerate(target_files): + frags, msg_count = scan_file(filepath) + total_messages += msg_count + + basename = os.path.basename(filepath) + for frag in frags: + frag_counter += 1 + frag['id'] = f"frag_{frag_counter:04d}" + frag['source_file'] = basename + all_fragments.append(frag) + mode_counts[frag['mode']] += 1 + + if (i + 1) % 50 == 0 or i == total_files - 1: + elapsed = time.time() - start + rate = (i + 1) / elapsed if elapsed > 0 else 0 + print(f" [{i+1}/{total_files}] {rate:.1f} files/s | " + f"{len(all_fragments)} fragments found | " + f"{msg_count} msgs in current file") + + elapsed = time.time() - start + print(f"\nScan complete: {total_files} files, {total_messages} messages, " + f"{len(all_fragments)} fragments in {elapsed:.1f}s") + + result = { + 'scan_stats': { + 'total_files': total_files, + 'total_messages': total_messages, + 'total_fragments': len(all_fragments), + 'scan_duration_seconds': round(elapsed, 1), + 'mode_counts': {m: mode_counts.get(m, 0) for m in MODE_NAMES}, + }, + 'fragments': all_fragments, + } + + return result + + +def main(): + parser = argparse.ArgumentParser(description='Scan OpenClaw JSONL for experience patterns') + parser.add_argument('--dir', default=os.path.expanduser( + '~/.openclaw/agents/simayi-challenger/sessions/'), + help='Directory containing JSONL files') + parser.add_argument('--limit', type=int, default=0, + help='Limit number of files to scan (0 = all)') + parser.add_argument('--output', default=os.path.expanduser( + '~/.openclaw/sanguo_projects/sanguo_moziplus_v2/docs/research/distill-scan-simayi-result.json'), + help='Output JSON path') + args = parser.parse_args() + + print(f"Scanning directory: {args.dir}") + if args.limit: + print(f"Limit: {args.limit} files (test mode)") + print(f"Output: {args.output}") + print() + + result = scan_directory(args.dir, limit=args.limit) + + # Write output + os.makedirs(os.path.dirname(args.output), exist_ok=True) + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"\nResults written to: {args.output}") + print(f" Total fragments: {result['scan_stats']['total_fragments']}") + for mode, count in result['scan_stats']['mode_counts'].items(): + print(f" {mode}: {count}") + + +if __name__ == '__main__': + main()