- Full RAG system for OpenClaw agents - Semantic search across chat history, code, docs, skills - ChromaDB integration (all-MiniLM-L6-v2 embeddings) - Automatic AI context retrieval - Ingest pipelines for sessions, workspace, skills - Python API and CLI interfaces - Document management (add, delete, stats, reset)
289 lines
8.7 KiB
Python
289 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RAG Session Ingestor - Load all session transcripts into vector store
|
|
Fixed to handle OpenClaw session event format
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from rag_system import RAGSystem
|
|
|
|
|
|
def parse_jsonl(file_path: Path) -> List[Dict]:
|
|
"""
|
|
Parse OpenClaw session JSONL format
|
|
|
|
Session files contain:
|
|
- Line 1: Session metadata (type: "session")
|
|
- Lines 2+: Events including messages, toolCalls, etc.
|
|
"""
|
|
messages = []
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
event = json.loads(line)
|
|
|
|
# Skip session metadata line
|
|
if line_num == 1 and event.get('type') == 'session':
|
|
continue
|
|
|
|
# Extract message events only
|
|
if event.get('type') == 'message':
|
|
msg_obj = event.get('message', {})
|
|
|
|
messages.append({
|
|
'role': msg_obj.get('role'),
|
|
'content': msg_obj.get('content'),
|
|
'timestamp': event.get('timestamp'),
|
|
'id': event.get('id'),
|
|
'sessionKey': event.get('sessionKey') # Not usually here, but check
|
|
})
|
|
|
|
except json.JSONDecodeError as e:
|
|
continue
|
|
except Exception as e:
|
|
print(f"❌ Error reading {file_path}: {e}")
|
|
|
|
return messages
|
|
|
|
|
|
def extract_session_key(file_name: str) -> str:
|
|
"""Extract session key from filename"""
|
|
return file_name.replace('.jsonl', '')
|
|
|
|
|
|
def extract_session_metadata(session_data: List[Dict], session_key: str) -> Dict:
|
|
"""Extract metadata from session messages"""
|
|
if not session_data:
|
|
return {}
|
|
|
|
first_msg = session_data[0]
|
|
last_msg = session_data[-1]
|
|
|
|
return {
|
|
"start_time": first_msg.get("timestamp"),
|
|
"end_time": last_msg.get("timestamp"),
|
|
"total_messages": len(session_data),
|
|
"has_system": any(msg.get("role") == "system" for msg in session_data),
|
|
"has_user": any(msg.get("role") == "user" for msg in session_data),
|
|
"has_assistant": any(msg.get("role") == "assistant" for msg in session_data),
|
|
}
|
|
|
|
|
|
def format_content(content) -> str:
|
|
"""
|
|
Format message content from OpenClaw format to text
|
|
|
|
Content can be:
|
|
- String
|
|
- List of dicts with 'type' field (text, thinking, toolCall, toolResult)
|
|
"""
|
|
if isinstance(content, str):
|
|
return content
|
|
|
|
if isinstance(content, list):
|
|
texts = []
|
|
|
|
for item in content:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
item_type = item.get('type', '')
|
|
|
|
if item_type == 'text':
|
|
texts.append(item.get('text', ''))
|
|
elif item_type == 'thinking':
|
|
# Skip reasoning, usually not useful for RAG
|
|
# texts.append(f"[Reasoning: {item.get('thinking', '')[:200]}]")
|
|
pass
|
|
elif item_type == 'toolCall':
|
|
tool_name = item.get('name', 'unknown')
|
|
args = str(item.get('arguments', ''))[:100]
|
|
texts.append(f"[Tool: {tool_name}({args})]")
|
|
elif item_type == 'toolResult':
|
|
result = str(item.get('text', item.get('result', ''))).strip()
|
|
# Truncate large tool results
|
|
if len(result) > 500:
|
|
result = result[:500] + "..."
|
|
texts.append(f"[Tool Result: {result}]")
|
|
|
|
return "\n".join(texts)
|
|
|
|
return str(content)[:500]
|
|
|
|
|
|
def chunk_messages(
|
|
messages: List[Dict],
|
|
context_window: int = 20,
|
|
overlap: int = 5
|
|
) -> List[Dict]:
|
|
"""
|
|
Chunk messages for better retrieval
|
|
|
|
Args:
|
|
messages: List of message objects
|
|
context_window: Messages per chunk
|
|
overlap: Message overlap between chunks
|
|
|
|
Returns:
|
|
List of {"text": str, "metadata": dict} chunks
|
|
"""
|
|
chunks = []
|
|
|
|
for i in range(0, len(messages), context_window - overlap):
|
|
chunk_messages = messages[i:i + context_window]
|
|
|
|
# Build text from messages
|
|
text_parts = []
|
|
|
|
for msg in chunk_messages:
|
|
role = msg.get("role", "unknown")
|
|
content = msg.get("content", "")
|
|
|
|
# Format content
|
|
text = format_content(content)
|
|
|
|
if text.strip():
|
|
text_parts.append(f"{role.upper()}: {text}")
|
|
|
|
text = "\n\n".join(text_parts)
|
|
|
|
# Don't add empty chunks
|
|
if not text.strip():
|
|
continue
|
|
|
|
# Metadata
|
|
metadata = {
|
|
"type": "session",
|
|
"source": str(chunk_messages[0].get("sessionKey") or chunk_messages[0].get("id") or session_key),
|
|
"chunk_index": int(i // (context_window - overlap)),
|
|
"chunk_start_time": str(chunk_messages[0].get("timestamp") or ""),
|
|
"chunk_end_time": str(chunk_messages[-1].get("timestamp") or ""),
|
|
"message_count": int(len(chunk_messages)),
|
|
"ingested_at": datetime.now().isoformat(),
|
|
"date": str(chunk_messages[0].get("timestamp") or datetime.now().isoformat())
|
|
}
|
|
|
|
chunks.append({
|
|
"text": text,
|
|
"metadata": metadata
|
|
})
|
|
|
|
return chunks
|
|
|
|
|
|
def ingest_sessions(
|
|
sessions_dir: str = None,
|
|
collection_name: str = "openclaw_knowledge",
|
|
chunk_size: int = 20,
|
|
chunk_overlap: int = 5
|
|
):
|
|
"""
|
|
Ingest all session transcripts into RAG system
|
|
|
|
Args:
|
|
sessions_dir: Directory containing session jsonl files
|
|
collection_name: Name of the ChromaDB collection
|
|
chunk_size: Messages per chunk
|
|
chunk_overlap: Message overlap between chunks
|
|
"""
|
|
if sessions_dir is None:
|
|
sessions_dir = os.path.expanduser("~/.openclaw/agents/main/sessions")
|
|
|
|
sessions_path = Path(sessions_dir)
|
|
|
|
if not sessions_path.exists():
|
|
print(f"❌ Sessions directory not found: {sessions_path}")
|
|
return
|
|
|
|
print(f"🔍 Finding session files in: {sessions_path}")
|
|
|
|
jsonl_files = list(sessions_path.glob("*.jsonl"))
|
|
|
|
if not jsonl_files:
|
|
print(f"⚠️ No jsonl files found in {sessions_path}")
|
|
return
|
|
|
|
print(f"✅ Found {len(jsonl_files)} session files\n")
|
|
|
|
rag = RAGSystem(collection_name=collection_name)
|
|
|
|
total_chunks = 0
|
|
total_messages = 0
|
|
skipped_empty = 0
|
|
|
|
for jsonl_file in sorted(jsonl_files):
|
|
session_key = extract_session_key(jsonl_file.name)
|
|
|
|
print(f"\n📄 Processing: {jsonl_file.name}")
|
|
|
|
messages = parse_jsonl(jsonl_file)
|
|
|
|
if not messages:
|
|
print(f" ⚠️ No messages, skipping")
|
|
skipped_empty += 1
|
|
continue
|
|
|
|
total_messages += len(messages)
|
|
|
|
# Extract session metadata
|
|
session_metadata = extract_session_metadata(messages, session_key)
|
|
print(f" Messages: {len(messages)}")
|
|
|
|
# Chunk messages
|
|
chunks = chunk_messages(messages, chunk_size, chunk_overlap)
|
|
|
|
if not chunks:
|
|
print(f" ⚠️ No valid chunks, skipping")
|
|
skipped_empty += 1
|
|
continue
|
|
|
|
print(f" Chunks: {len(chunks)}")
|
|
|
|
# Add to RAG
|
|
try:
|
|
ids = rag.add_documents_batch(chunks, batch_size=50)
|
|
total_chunks += len(chunks)
|
|
print(f" ✅ Indexed {len(chunks)} chunks")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
|
|
# Summary
|
|
print(f"\n📊 Summary:")
|
|
print(f" Sessions processed: {len(jsonl_files)}")
|
|
print(f" Skipped (empty): {skipped_empty}")
|
|
print(f" Total messages: {total_messages}")
|
|
print(f" Total chunks indexed: {total_chunks}")
|
|
|
|
stats = rag.get_stats()
|
|
print(f" Total documents in collection: {stats['total_documents']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Ingest OpenClaw session transcripts into RAG")
|
|
parser.add_argument("--sessions-dir", help="Path to sessions directory (default: ~/.openclaw/agents/main/sessions)")
|
|
parser.add_argument("--chunk-size", type=int, default=20, help="Messages per chunk (default: 20)")
|
|
parser.add_argument("--chunk-overlap", type=int, default=5, help="Message overlap (default: 5)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
ingest_sessions(
|
|
sessions_dir=args.sessions_dir,
|
|
chunk_size=args.chunk_size,
|
|
chunk_overlap=args.chunk_overlap
|
|
) |