diff --git a/arty.yml b/arty.yml index 475f049..64cc4c3 100644 --- a/arty.yml +++ b/arty.yml @@ -327,6 +327,7 @@ scripts: echo "Services configured:" echo " - comfyui: ComfyUI server (port 8188) - autostart enabled" echo " - orchestrator: Model orchestrator (port 9000) - autostart disabled" + echo " - webdav-sync: WebDAV output sync service - autostart enabled" echo "" echo "To start: supervisord -c /workspace/supervisord.conf" echo "To manage: supervisorctl status" @@ -678,6 +679,13 @@ scripts: services/orchestrator/status: supervisorctl -c /workspace/supervisord.conf status ai-services:orchestrator services/orchestrator/logs: supervisorctl -c /workspace/supervisord.conf tail -f ai-services:orchestrator + # WebDAV Sync service + services/webdav-sync/start: supervisorctl -c /workspace/supervisord.conf start ai-services:webdav-sync + services/webdav-sync/stop: supervisorctl -c /workspace/supervisord.conf stop ai-services:webdav-sync + services/webdav-sync/restart: supervisorctl -c /workspace/supervisord.conf restart ai-services:webdav-sync + services/webdav-sync/status: supervisorctl -c /workspace/supervisord.conf status ai-services:webdav-sync + services/webdav-sync/logs: supervisorctl -c /workspace/supervisord.conf tail -f ai-services:webdav-sync + # # Health Checks # diff --git a/model-orchestrator/requirements.txt b/model-orchestrator/requirements.txt index 794b4af..fa6e4d3 100644 --- a/model-orchestrator/requirements.txt +++ b/model-orchestrator/requirements.txt @@ -4,3 +4,5 @@ httpx==0.25.1 docker==6.1.3 pyyaml==6.0.1 pydantic==2.5.0 +watchdog==3.0.0 +webdavclient3==3.14.6 diff --git a/scripts/comfyui_webdav_sync.py b/scripts/comfyui_webdav_sync.py new file mode 100644 index 0000000..7973b99 --- /dev/null +++ b/scripts/comfyui_webdav_sync.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +ComfyUI WebDAV Output Sync Service + +Monitors ComfyUI's output directory and automatically uploads new files +to HiDrive WebDAV storage. + +Usage: + python3 comfyui_webdav_sync.py + +Environment Variables: + WEBDAV_URL: WebDAV server URL (default: https://webdav.hidrive.ionos.com/) + WEBDAV_USERNAME: WebDAV username + WEBDAV_PASSWORD: WebDAV password + WEBDAV_REMOTE_PATH: Remote directory path (default: /users/valknar/Pictures/AI/ComfyUI) + COMFYUI_OUTPUT_DIR: Local directory to watch (default: /workspace/ComfyUI/output) + SYNC_DELAY: Seconds to wait after file write before upload (default: 2) +""" + +import os +import sys +import time +import logging +from pathlib import Path +from typing import Set +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent +from webdav3.client import Client + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger('comfyui-webdav-sync') + +# Configuration from environment variables +WEBDAV_URL = os.getenv('WEBDAV_URL') +WEBDAV_USERNAME = os.getenv('WEBDAV_USERNAME') +WEBDAV_PASSWORD = os.getenv('WEBDAV_PASSWORD') +WEBDAV_REMOTE_PATH = os.getenv('WEBDAV_REMOTE_PATH') +COMFYUI_OUTPUT_DIR = os.getenv('COMFYUI_OUTPUT_DIR', '/workspace/ComfyUI/output') +SYNC_DELAY = int(os.getenv('SYNC_DELAY', '2')) # Wait 2 seconds after file write +RETRY_ATTEMPTS = int(os.getenv('RETRY_ATTEMPTS', '3')) +RETRY_DELAY = int(os.getenv('RETRY_DELAY', '5')) + +# Validate required credentials +if not all([WEBDAV_URL, WEBDAV_USERNAME, WEBDAV_PASSWORD, WEBDAV_REMOTE_PATH]): + logger.error("Missing required WebDAV environment variables!") + logger.error("Required: WEBDAV_URL, WEBDAV_USERNAME, WEBDAV_PASSWORD, WEBDAV_REMOTE_PATH") + sys.exit(1) + + +class WebDAVClient: + """WebDAV client wrapper with retry logic""" + + def __init__(self): + options = { + 'webdav_hostname': WEBDAV_URL, + 'webdav_login': WEBDAV_USERNAME, + 'webdav_password': WEBDAV_PASSWORD, + 'webdav_timeout': 300, # 5 minutes for large files + } + self.client = Client(options) + self._ensure_remote_directory() + + def _ensure_remote_directory(self): + """Ensure the remote directory exists""" + try: + if not self.client.check(WEBDAV_REMOTE_PATH): + logger.info(f"Creating remote directory: {WEBDAV_REMOTE_PATH}") + # Create parent directories recursively + parts = Path(WEBDAV_REMOTE_PATH).parts + current = '' + for part in parts: + if not part or part == '/': + continue + current = f"{current}/{part}" + if not self.client.check(current): + self.client.mkdir(current) + logger.info(f"✓ Remote directory ready: {WEBDAV_REMOTE_PATH}") + except Exception as e: + logger.error(f"Failed to create remote directory: {e}") + raise + + def upload_file(self, local_path: str, remote_path: str) -> bool: + """Upload a file with retry logic""" + for attempt in range(1, RETRY_ATTEMPTS + 1): + try: + # Ensure parent directory exists + remote_dir = str(Path(remote_path).parent) + if not self.client.check(remote_dir): + self.client.mkdir(remote_dir) + + # Upload file + logger.info(f"[{attempt}/{RETRY_ATTEMPTS}] Uploading {Path(local_path).name} -> {remote_path}") + self.client.upload_sync(remote_path=remote_path, local_path=local_path) + + # Verify upload + if self.client.check(remote_path): + file_size = os.path.getsize(local_path) + logger.info(f"✓ Upload successful: {Path(local_path).name} ({file_size:,} bytes)") + return True + else: + logger.warning(f"Upload verification failed for {remote_path}") + + except Exception as e: + logger.error(f"Upload attempt {attempt} failed: {e}") + if attempt < RETRY_ATTEMPTS: + logger.info(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + + logger.error(f"✗ Failed to upload {local_path} after {RETRY_ATTEMPTS} attempts") + return False + + +class ComfyUIOutputHandler(FileSystemEventHandler): + """Handles file system events in ComfyUI output directory""" + + def __init__(self, webdav_client: WebDAVClient): + self.webdav_client = webdav_client + self.pending_files: Set[str] = set() # Files waiting for write completion + self.uploaded_files: Set[str] = set() # Track uploaded files + self.watch_dir = Path(COMFYUI_OUTPUT_DIR) + + def on_created(self, event): + """Handle file creation events""" + if event.is_directory: + return + + file_path = event.src_path + + # Ignore temp files and hidden files + if self._should_ignore(file_path): + return + + logger.info(f"New file detected: {Path(file_path).name}") + self.pending_files.add(file_path) + + # Schedule upload after delay (to ensure file write is complete) + self._schedule_upload(file_path) + + def on_modified(self, event): + """Handle file modification events""" + if event.is_directory: + return + + file_path = event.src_path + + # Ignore if already uploaded or should be ignored + if file_path in self.uploaded_files or self._should_ignore(file_path): + return + + # Add to pending if not already there + if file_path not in self.pending_files: + logger.info(f"Modified file detected: {Path(file_path).name}") + self.pending_files.add(file_path) + self._schedule_upload(file_path) + + def _should_ignore(self, file_path: str) -> bool: + """Check if file should be ignored""" + name = Path(file_path).name + + # Ignore hidden files, temp files, and partial downloads + if name.startswith('.') or name.endswith('.tmp') or name.endswith('.part'): + return True + + return False + + def _schedule_upload(self, file_path: str): + """Schedule file upload after ensuring write is complete""" + def upload_when_ready(): + time.sleep(SYNC_DELAY) # Wait for file write to complete + + # Verify file still exists and size is stable + if not os.path.exists(file_path): + logger.warning(f"File disappeared: {file_path}") + self.pending_files.discard(file_path) + return + + # Check if file size is stable (not being written) + size1 = os.path.getsize(file_path) + time.sleep(0.5) + size2 = os.path.getsize(file_path) + + if size1 != size2: + logger.info(f"File still being written: {Path(file_path).name}") + # Reschedule + self._schedule_upload(file_path) + return + + # Upload file + self._upload_file(file_path) + + # Run in background thread + import threading + threading.Thread(target=upload_when_ready, daemon=True).start() + + def _upload_file(self, local_path: str): + """Upload file to WebDAV""" + try: + # Calculate relative path from watch directory + rel_path = Path(local_path).relative_to(self.watch_dir) + remote_path = f"{WEBDAV_REMOTE_PATH}/{rel_path}".replace('\\', '/') + + # Upload + success = self.webdav_client.upload_file(local_path, remote_path) + + # Track result + self.pending_files.discard(local_path) + if success: + self.uploaded_files.add(local_path) + + except Exception as e: + logger.error(f"Failed to upload {local_path}: {e}") + self.pending_files.discard(local_path) + + +def main(): + """Main entry point""" + logger.info("=" * 80) + logger.info("ComfyUI WebDAV Output Sync Service") + logger.info("=" * 80) + logger.info(f"Watch directory: {COMFYUI_OUTPUT_DIR}") + logger.info(f"WebDAV URL: {WEBDAV_URL}") + logger.info(f"Remote path: {WEBDAV_REMOTE_PATH}") + logger.info(f"Sync delay: {SYNC_DELAY}s") + logger.info("=" * 80) + + # Verify watch directory exists + if not os.path.exists(COMFYUI_OUTPUT_DIR): + logger.error(f"Watch directory does not exist: {COMFYUI_OUTPUT_DIR}") + logger.info(f"Creating watch directory...") + os.makedirs(COMFYUI_OUTPUT_DIR, exist_ok=True) + + # Initialize WebDAV client + try: + webdav_client = WebDAVClient() + except Exception as e: + logger.error(f"Failed to initialize WebDAV client: {e}") + sys.exit(1) + + # Create event handler and observer + event_handler = ComfyUIOutputHandler(webdav_client) + observer = Observer() + observer.schedule(event_handler, COMFYUI_OUTPUT_DIR, recursive=True) + + # Start watching + observer.start() + logger.info("✓ Service started - watching for new files...") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + observer.stop() + + observer.join() + logger.info("Service stopped") + + +if __name__ == '__main__': + main() diff --git a/supervisord.conf b/supervisord.conf index 8303703..a872e3f 100644 --- a/supervisord.conf +++ b/supervisord.conf @@ -56,6 +56,23 @@ environment=HF_HOME="/workspace/huggingface_cache",HF_TOKEN="%(ENV_HF_TOKEN)s" priority=200 stopwaitsecs=30 +# ComfyUI WebDAV Sync Service +[program:webdav-sync] +command=python3 /workspace/ai/scripts/comfyui_webdav_sync.py +directory=/workspace/ai +autostart=true +autorestart=true +startretries=3 +stderr_logfile=/workspace/logs/webdav-sync.err.log +stdout_logfile=/workspace/logs/webdav-sync.out.log +stdout_logfile_maxbytes=50MB +stdout_logfile_backups=10 +stderr_logfile_maxbytes=50MB +stderr_logfile_backups=10 +environment=WEBDAV_URL="%(ENV_WEBDAV_URL)s",WEBDAV_USERNAME="%(ENV_WEBDAV_USERNAME)s",WEBDAV_PASSWORD="%(ENV_WEBDAV_PASSWORD)s",WEBDAV_REMOTE_PATH="%(ENV_WEBDAV_REMOTE_PATH)s",COMFYUI_OUTPUT_DIR="/workspace/ComfyUI/output" +priority=150 +stopwaitsecs=10 + [group:ai-services] -programs=comfyui,orchestrator +programs=comfyui,orchestrator,webdav-sync priority=999