Files
docker-compose/ai/webui-export.py

247 lines
7.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Export code blocks from Open WebUI chat conversations to local disk.
Usage:
# Export specific chat
python webui-export.py --chat-id <chat_id> --output-dir ./output
# Export all recent chats
python webui-export.py --all --output-dir ./output
# Watch for new messages and auto-export
python webui-export.py --watch --output-dir ./output
"""
import argparse
import json
import os
import re
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import requests
class OpenWebUIExporter:
"""Export code from Open WebUI chats via REST API."""
def __init__(self, base_url: str = "http://localhost:8080", api_key: Optional[str] = None):
"""
Initialize exporter.
:param base_url: Open WebUI base URL (default: http://localhost:8080)
:param api_key: Optional API key for authentication
"""
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.session = requests.Session()
if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"
def get_chats(self) -> List[Dict]:
"""Retrieve all user chats."""
response = self.session.get(f"{self.api_url}/chats/all")
response.raise_for_status()
return response.json()
def get_chat(self, chat_id: str) -> Dict:
"""Retrieve specific chat by ID."""
response = self.session.get(f"{self.api_url}/chats/{chat_id}")
response.raise_for_status()
return response.json()
def extract_code_blocks(self, chat: Dict) -> List[Dict]:
"""
Extract code blocks from chat messages.
:param chat: Chat object with messages
:return: List of code blocks with metadata
"""
code_blocks = []
messages = chat.get("chat", {}).get("messages", [])
for idx, message in enumerate(messages):
role = message.get("role", "unknown")
content = message.get("content", "")
# Find code blocks in markdown (```language\ncode\n```)
pattern = r"```(\w*)\n(.*?)```"
matches = re.findall(pattern, content, re.DOTALL)
for match_idx, (language, code) in enumerate(matches):
code_blocks.append({
"chat_id": chat.get("id"),
"chat_title": chat.get("title", "Untitled"),
"message_index": idx,
"role": role,
"language": language or "txt",
"code": code.strip(),
"block_index": match_idx,
"timestamp": message.get("timestamp", time.time()),
})
return code_blocks
def save_code_blocks(self, code_blocks: List[Dict], output_dir: Path):
"""
Save code blocks to disk.
:param code_blocks: List of code blocks with metadata
:param output_dir: Output directory
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
for block in code_blocks:
# Create subdirectory per chat
chat_dir = output_dir / self._sanitize_filename(block["chat_title"])
chat_dir.mkdir(exist_ok=True)
# Determine file extension
ext = self._get_extension(block["language"])
# Create filename: message_index-block_index.ext
filename = f"{block['message_index']:03d}-{block['block_index']:02d}{ext}"
filepath = chat_dir / filename
# Save code
with open(filepath, "w") as f:
f.write(block["code"])
# Save metadata
meta_filepath = filepath.with_suffix(filepath.suffix + ".meta.json")
with open(meta_filepath, "w") as f:
json.dump({
"chat_id": block["chat_id"],
"chat_title": block["chat_title"],
"message_index": block["message_index"],
"role": block["role"],
"language": block["language"],
"timestamp": block["timestamp"],
"timestamp_human": datetime.fromtimestamp(block["timestamp"]).isoformat(),
}, f, indent=2)
print(f"✓ Saved: {filepath}")
def export_chat(self, chat_id: str, output_dir: Path):
"""Export code blocks from specific chat."""
print(f"Fetching chat {chat_id}...")
chat = self.get_chat(chat_id)
code_blocks = self.extract_code_blocks(chat)
print(f"Found {len(code_blocks)} code blocks")
if code_blocks:
self.save_code_blocks(code_blocks, output_dir)
print(f"✓ Exported {len(code_blocks)} code blocks to {output_dir}")
else:
print("No code blocks found in chat")
def export_all_chats(self, output_dir: Path):
"""Export code blocks from all chats."""
print("Fetching all chats...")
chats = self.get_chats()
print(f"Found {len(chats)} chats")
total_blocks = 0
for chat in chats:
code_blocks = self.extract_code_blocks(chat)
if code_blocks:
self.save_code_blocks(code_blocks, output_dir)
total_blocks += len(code_blocks)
print(f"✓ Exported {total_blocks} code blocks from {len(chats)} chats to {output_dir}")
@staticmethod
def _sanitize_filename(name: str) -> str:
"""Sanitize filename by removing invalid characters."""
# Remove or replace invalid characters
name = re.sub(r'[<>:"/\\|?*]', '_', name)
# Limit length
return name[:100]
@staticmethod
def _get_extension(language: str) -> str:
"""Get file extension for language."""
extensions = {
"python": ".py",
"javascript": ".js",
"typescript": ".ts",
"java": ".java",
"c": ".c",
"cpp": ".cpp",
"csharp": ".cs",
"go": ".go",
"rust": ".rs",
"ruby": ".rb",
"php": ".php",
"swift": ".swift",
"kotlin": ".kt",
"bash": ".sh",
"shell": ".sh",
"sql": ".sql",
"html": ".html",
"css": ".css",
"json": ".json",
"yaml": ".yaml",
"yml": ".yml",
"xml": ".xml",
"markdown": ".md",
"md": ".md",
}
return extensions.get(language.lower(), ".txt")
def main():
parser = argparse.ArgumentParser(
description="Export code blocks from Open WebUI chats"
)
parser.add_argument(
"--base-url",
default="http://localhost:8080",
help="Open WebUI base URL (default: http://localhost:8080)",
)
parser.add_argument(
"--api-key",
help="Optional API key for authentication",
)
parser.add_argument(
"--chat-id",
help="Export specific chat by ID",
)
parser.add_argument(
"--all",
action="store_true",
help="Export all chats",
)
parser.add_argument(
"--output-dir",
default="./webui-exports",
help="Output directory (default: ./webui-exports)",
)
args = parser.parse_args()
exporter = OpenWebUIExporter(base_url=args.base_url, api_key=args.api_key)
output_dir = Path(args.output_dir)
if args.chat_id:
exporter.export_chat(args.chat_id, output_dir)
elif args.all:
exporter.export_all_chats(output_dir)
else:
parser.print_help()
print("\nError: Specify either --chat-id or --all")
return 1
return 0
if __name__ == "__main__":
exit(main())