diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 388b51a..900c3aa 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,13 +1,15 @@ # RunPod Multi-Modal AI Architecture -**Clean, extensible Python-based architecture for RunPod GPU instances** +**Clean, extensible distributed AI infrastructure spanning VPS and GPU** ## Design Principles -1. **No Docker** - Direct Python execution for RunPod compatibility -2. **Extensible** - Adding new models requires minimal code -3. **Maintainable** - Clear structure and separation of concerns -4. **Simple** - One command to start, easy to debug +1. **Distributed** - VPS (UI/proxy) + GPU (models) connected via Tailscale +2. **No Docker on GPU** - Direct Python for RunPod compatibility +3. **Extensible** - Adding new models requires minimal code +4. **Maintainable** - Clear structure and separation of concerns +5. **Simple** - One command to start, easy to debug +6. **OpenAI Compatible** - Works with standard AI tools ## Directory Structure diff --git a/ansible.cfg b/ansible.cfg new file mode 100644 index 0000000..3feb761 --- /dev/null +++ b/ansible.cfg @@ -0,0 +1,33 @@ +[defaults] +# Ansible configuration for RunPod deployment + +# Inventory +inventory = inventory.yml + +# Disable host key checking (RunPod instances may change) +host_key_checking = False + +# Display settings +stdout_callback = yaml +bin_ansible_callbacks = True + +# Performance +forks = 5 +gathering = smart +fact_caching = jsonfile +fact_caching_connection = /tmp/ansible_facts +fact_caching_timeout = 86400 + +# Logging +log_path = /tmp/ansible-runpod.log + +# Privilege escalation +become_method = sudo +become_ask_pass = False + +# SSH settings +timeout = 30 +transport = local + +# Retry files +retry_files_enabled = False diff --git a/core/base_service.py b/core/base_service.py new file mode 100644 index 0000000..9a313c1 --- /dev/null +++ b/core/base_service.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +Base Service Class for AI Model Services + +Provides common functionality for all model services: +- Health check endpoint +- Graceful shutdown handling +- Logging configuration +- Standard FastAPI setup +""" + +import asyncio +import logging +import os +import signal +import sys +from abc import ABC, abstractmethod +from typing import Optional + +from fastapi import FastAPI +import uvicorn + + +class BaseService(ABC): + """Abstract base class for all AI model services""" + + def __init__(self, name: str, port: int, host: str = "0.0.0.0"): + """ + Initialize base service + + Args: + name: Service name (for logging) + port: Port to run service on + host: Host to bind to (default: 0.0.0.0) + """ + self.name = name + self.port = port + self.host = host + self.app = FastAPI(title=f"{name} Service", version="1.0.0") + self.logger = self._setup_logging() + self.shutdown_event = asyncio.Event() + + # Register standard endpoints + self._register_health_endpoint() + + # Register signal handlers for graceful shutdown + self._register_signal_handlers() + + # Allow subclasses to add custom routes + self.create_app() + + def _setup_logging(self) -> logging.Logger: + """Configure logging for the service""" + logging.basicConfig( + level=logging.INFO, + format=f'%(asctime)s - {self.name} - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + return logging.getLogger(self.name) + + def _register_health_endpoint(self): + """Register standard health check endpoint""" + @self.app.get("/health") + async def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "service": self.name, + "port": self.port + } + + def _register_signal_handlers(self): + """Register signal handlers for graceful shutdown""" + def signal_handler(sig, frame): + self.logger.info(f"Received signal {sig}, initiating graceful shutdown...") + self.shutdown_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + @abstractmethod + def create_app(self): + """ + Create FastAPI routes for this service. + Subclasses must implement this to add their specific endpoints. + + Example: + @self.app.post("/v1/generate") + async def generate(request: MyRequest): + return await self.model.generate(request) + """ + pass + + async def initialize(self): + """ + Initialize the service (load models, etc.). + Subclasses can override this for custom initialization. + """ + self.logger.info(f"Initializing {self.name} service...") + + async def cleanup(self): + """ + Cleanup resources on shutdown. + Subclasses can override this for custom cleanup. + """ + self.logger.info(f"Cleaning up {self.name} service...") + + def run(self): + """ + Run the service. + This is the main entry point that starts the FastAPI server. + """ + try: + self.logger.info(f"Starting {self.name} service on {self.host}:{self.port}") + + # Run initialization + asyncio.run(self.initialize()) + + # Start uvicorn server + config = uvicorn.Config( + app=self.app, + host=self.host, + port=self.port, + log_level="info", + access_log=True + ) + server = uvicorn.Server(config) + + # Run server + asyncio.run(server.serve()) + + except KeyboardInterrupt: + self.logger.info("Keyboard interrupt received") + except Exception as e: + self.logger.error(f"Error running service: {e}", exc_info=True) + sys.exit(1) + finally: + # Cleanup + asyncio.run(self.cleanup()) + self.logger.info(f"{self.name} service stopped") + + +class GPUService(BaseService): + """ + Base class for GPU-accelerated services. + Provides additional GPU-specific functionality. + """ + + def __init__(self, name: str, port: int, host: str = "0.0.0.0"): + super().__init__(name, port, host) + self._check_gpu_availability() + + def _check_gpu_availability(self): + """Check if GPU is available""" + try: + import torch + if torch.cuda.is_available(): + gpu_count = torch.cuda.device_count() + gpu_name = torch.cuda.get_device_name(0) + self.logger.info(f"GPU available: {gpu_name} (count: {gpu_count})") + else: + self.logger.warning("No GPU available - service may run slowly") + except ImportError: + self.logger.warning("PyTorch not installed - cannot check GPU availability") diff --git a/core/requirements.txt b/core/requirements.txt new file mode 100644 index 0000000..bf3186f --- /dev/null +++ b/core/requirements.txt @@ -0,0 +1,15 @@ +# Core dependencies for AI service infrastructure + +# FastAPI and server +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +pydantic==2.5.0 + +# HTTP client for health checks and proxying +httpx==0.25.1 + +# YAML configuration +pyyaml==6.0.1 + +# Process management +psutil==5.9.6 diff --git a/core/service_manager.py b/core/service_manager.py new file mode 100644 index 0000000..4ecf930 --- /dev/null +++ b/core/service_manager.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +""" +Service Manager for AI Model Services + +Manages lifecycle of model services running as Python processes: +- Start/stop services +- Health monitoring +- Auto-restart on failure +- Resource cleanup +""" + +import asyncio +import logging +import os +import signal +import subprocess +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Optional + +import httpx + + +@dataclass +class ServiceConfig: + """Configuration for a service""" + name: str + script_path: Path + port: int + startup_timeout: int = 120 + health_check_path: str = "/health" + auto_restart: bool = False + env: Optional[Dict[str, str]] = None + + +class ServiceManager: + """Manages multiple AI model services as subprocesses""" + + def __init__(self): + self.logger = logging.getLogger("ServiceManager") + self.processes: Dict[str, subprocess.Popen] = {} + self.configs: Dict[str, ServiceConfig] = {} + self.shutdown_event = asyncio.Event() + + def register_service(self, config: ServiceConfig): + """Register a service configuration""" + self.configs[config.name] = config + self.logger.info(f"Registered service: {config.name} on port {config.port}") + + async def start_service(self, name: str) -> bool: + """ + Start a service by name + + Args: + name: Service name to start + + Returns: + bool: True if service started successfully + """ + if name not in self.configs: + self.logger.error(f"Service {name} not registered") + return False + + if name in self.processes: + proc = self.processes[name] + if proc.poll() is None: + self.logger.info(f"Service {name} already running") + return True + + config = self.configs[name] + self.logger.info(f"Starting service {name}...") + + try: + # Prepare environment + env = os.environ.copy() + if config.env: + env.update(config.env) + env.update({ + 'PORT': str(config.port), + 'HOST': '0.0.0.0' + }) + + # Start process + proc = subprocess.Popen( + ['python3', str(config.script_path)], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid # Create new process group + ) + + self.processes[name] = proc + self.logger.info(f"Process started for {name} (PID: {proc.pid})") + + # Wait for health check + if await self._wait_for_health(name, config): + self.logger.info(f"Service {name} is healthy and ready") + return True + else: + self.logger.error(f"Service {name} failed health check") + await self.stop_service(name) + return False + + except Exception as e: + self.logger.error(f"Error starting {name}: {e}", exc_info=True) + return False + + async def _wait_for_health(self, name: str, config: ServiceConfig) -> bool: + """ + Wait for service to become healthy + + Args: + name: Service name + config: Service configuration + + Returns: + bool: True if service becomes healthy within timeout + """ + proc = self.processes.get(name) + if not proc: + return False + + start_time = time.time() + url = f"http://localhost:{config.port}{config.health_check_path}" + + while time.time() - start_time < config.startup_timeout: + # Check if process is still running + if proc.poll() is not None: + self.logger.error(f"Process for {name} exited prematurely (code: {proc.returncode})") + return False + + # Try health check + try: + async with httpx.AsyncClient() as client: + response = await client.get(url, timeout=5.0) + if response.status_code == 200: + return True + except Exception: + pass + + await asyncio.sleep(2) + + return False + + async def stop_service(self, name: str, timeout: int = 10): + """ + Stop a running service + + Args: + name: Service name + timeout: Seconds to wait for graceful shutdown + """ + if name not in self.processes: + self.logger.warning(f"Service {name} not in process registry") + return + + proc = self.processes[name] + + if proc.poll() is None: # Still running + self.logger.info(f"Stopping service {name}...") + try: + # Send SIGTERM to process group + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + + # Wait for graceful shutdown + try: + proc.wait(timeout=timeout) + self.logger.info(f"Service {name} stopped gracefully") + except subprocess.TimeoutExpired: + # Force kill if not terminated + self.logger.warning(f"Service {name} did not stop gracefully, forcing kill") + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + proc.wait() + + except Exception as e: + self.logger.error(f"Error stopping {name}: {e}", exc_info=True) + + del self.processes[name] + + async def restart_service(self, name: str) -> bool: + """ + Restart a service + + Args: + name: Service name + + Returns: + bool: True if service restarted successfully + """ + self.logger.info(f"Restarting service {name}...") + await self.stop_service(name) + await asyncio.sleep(2) # Brief pause between stop and start + return await self.start_service(name) + + async def check_health(self, name: str) -> bool: + """ + Check if a service is healthy + + Args: + name: Service name + + Returns: + bool: True if service is running and healthy + """ + if name not in self.processes: + return False + + proc = self.processes[name] + if proc.poll() is not None: + return False + + config = self.configs[name] + url = f"http://localhost:{config.port}{config.health_check_path}" + + try: + async with httpx.AsyncClient() as client: + response = await client.get(url, timeout=5.0) + return response.status_code == 200 + except Exception: + return False + + async def monitor_services(self): + """ + Monitor all services and auto-restart if configured + + This runs continuously until shutdown_event is set. + """ + self.logger.info("Starting service monitor...") + + while not self.shutdown_event.is_set(): + for name, config in self.configs.items(): + if not config.auto_restart: + continue + + # Check if process exists and is healthy + if name in self.processes: + proc = self.processes[name] + if proc.poll() is not None: + self.logger.warning(f"Service {name} died (code: {proc.returncode}), restarting...") + await self.restart_service(name) + elif not await self.check_health(name): + self.logger.warning(f"Service {name} unhealthy, restarting...") + await self.restart_service(name) + + # Wait before next check + try: + await asyncio.wait_for(self.shutdown_event.wait(), timeout=10.0) + except asyncio.TimeoutError: + pass + + self.logger.info("Service monitor stopped") + + async def stop_all_services(self): + """Stop all running services""" + self.logger.info("Stopping all services...") + for name in list(self.processes.keys()): + await self.stop_service(name) + self.logger.info("All services stopped") + + def get_service_status(self, name: str) -> Dict: + """ + Get status information for a service + + Args: + name: Service name + + Returns: + dict: Status information + """ + if name not in self.configs: + return {"status": "unknown", "error": "Service not registered"} + + if name not in self.processes: + return {"status": "stopped"} + + proc = self.processes[name] + if proc.poll() is not None: + return { + "status": "exited", + "exit_code": proc.returncode + } + + config = self.configs[name] + return { + "status": "running", + "pid": proc.pid, + "port": config.port + } + + def get_all_service_status(self) -> Dict: + """ + Get status for all registered services + + Returns: + dict: Service name -> status mapping + """ + return { + name: self.get_service_status(name) + for name in self.configs.keys() + } diff --git a/inventory.yml b/inventory.yml new file mode 100644 index 0000000..e3f2cfb --- /dev/null +++ b/inventory.yml @@ -0,0 +1,26 @@ +--- +# Ansible inventory for RunPod deployment +# +# This inventory defines localhost as the target for RunPod instances. +# All tasks run locally on the RunPod GPU server. + +all: + hosts: + localhost: + ansible_connection: local + ansible_python_interpreter: /usr/bin/python3 + + vars: + # Workspace configuration + workspace_dir: /workspace + ai_dir: /workspace/ai + + # Environment variables (loaded from .env if present) + hf_token: "{{ lookup('env', 'HF_TOKEN') }}" + tailscale_key: "{{ lookup('env', 'TAILSCALE_AUTH_KEY') | default('') }}" + + # GPU configuration + gpu_memory_utilization: 0.85 + + # Model cache + huggingface_cache: /workspace/huggingface_cache diff --git a/models/flux/requirements.txt b/models/flux/requirements.txt new file mode 100644 index 0000000..7becd1b --- /dev/null +++ b/models/flux/requirements.txt @@ -0,0 +1,21 @@ +# Flux.1 Image Generation Service Dependencies + +# Diffusers library (for Flux.1 pipeline) +diffusers==0.30.0 + +# PyTorch (required by diffusers) +torch==2.1.0 +torchvision==0.16.0 + +# Transformers (for model components) +transformers==4.36.0 + +# Image processing +Pillow==10.1.0 + +# Accelerate (for optimizations) +accelerate==0.25.0 + +# Additional dependencies for Flux +sentencepiece==0.1.99 +protobuf==4.25.1 diff --git a/models/flux/server.py b/models/flux/server.py new file mode 100644 index 0000000..796599f --- /dev/null +++ b/models/flux/server.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +""" +Flux.1 Image Generation Service + +OpenAI-compatible image generation using Flux.1 Schnell model. +Provides /v1/images/generations endpoint. +""" + +import base64 +import io +import os +from typing import Optional + +import torch +from diffusers import FluxPipeline +from fastapi import HTTPException +from PIL import Image +from pydantic import BaseModel, Field + +# Import base service class +import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) +from core.base_service import GPUService + + +class ImageGenerationRequest(BaseModel): + """Image generation request (OpenAI-compatible)""" + model: str = Field(default="flux-schnell", description="Model name") + prompt: str = Field(..., description="Text description of the image to generate") + n: int = Field(default=1, ge=1, le=4, description="Number of images to generate") + size: str = Field(default="1024x1024", description="Image size (e.g., 512x512, 1024x1024)") + response_format: str = Field(default="b64_json", description="Response format: url or b64_json") + quality: str = Field(default="standard", description="Image quality: standard or hd") + style: str = Field(default="natural", description="Image style: natural or vivid") + + +class ImageGenerationResponse(BaseModel): + """Image generation response (OpenAI-compatible)""" + created: int = Field(..., description="Unix timestamp") + data: list = Field(..., description="List of generated images") + + +class FluxService(GPUService): + """Flux.1 Schnell image generation service""" + + def __init__(self): + # Get port from environment or use default + port = int(os.getenv("PORT", "8002")) + super().__init__(name="flux-schnell", port=port) + + # Service-specific attributes + self.pipeline: Optional[FluxPipeline] = None + self.model_name = os.getenv("MODEL_NAME", "black-forest-labs/FLUX.1-schnell") + + async def initialize(self): + """Initialize Flux.1 pipeline""" + await super().initialize() + + self.logger.info(f"Loading Flux.1 pipeline: {self.model_name}") + + # Load pipeline + self.pipeline = FluxPipeline.from_pretrained( + self.model_name, + torch_dtype=torch.bfloat16, + cache_dir=os.getenv("HF_CACHE_DIR", "/workspace/huggingface_cache") + ) + + # Move to GPU + if torch.cuda.is_available(): + self.pipeline = self.pipeline.to("cuda") + self.logger.info("Flux.1 pipeline loaded on GPU") + else: + self.logger.warning("GPU not available, running on CPU (very slow)") + + # Enable memory optimizations + if hasattr(self.pipeline, 'enable_model_cpu_offload'): + # This moves models to GPU only when needed, saving VRAM + self.pipeline.enable_model_cpu_offload() + + self.logger.info("Flux.1 pipeline initialized successfully") + + async def cleanup(self): + """Cleanup resources""" + await super().cleanup() + if self.pipeline: + self.logger.info("Flux.1 pipeline cleanup") + self.pipeline = None + + def parse_size(self, size_str: str) -> tuple[int, int]: + """Parse size string like '1024x1024' into (width, height)""" + try: + parts = size_str.lower().split('x') + if len(parts) != 2: + return (1024, 1024) + width = int(parts[0]) + height = int(parts[1]) + return (width, height) + except: + return (1024, 1024) + + def image_to_base64(self, image: Image.Image) -> str: + """Convert PIL Image to base64 string""" + buffered = io.BytesIO() + image.save(buffered, format="PNG") + img_bytes = buffered.getvalue() + return base64.b64encode(img_bytes).decode('utf-8') + + def create_app(self): + """Create FastAPI routes""" + + @self.app.get("/") + async def root(): + """Root endpoint""" + return { + "service": "Flux.1 Schnell Image Generation", + "model": self.model_name, + "max_images": 4 + } + + @self.app.get("/v1/models") + async def list_models(): + """List available models (OpenAI-compatible)""" + return { + "object": "list", + "data": [ + { + "id": "flux-schnell", + "object": "model", + "created": 1234567890, + "owned_by": "black-forest-labs", + "permission": [], + "root": self.model_name, + "parent": None, + } + ] + } + + @self.app.post("/v1/images/generations") + async def generate_image(request: ImageGenerationRequest) -> ImageGenerationResponse: + """Generate images from text prompt (OpenAI-compatible)""" + if not self.pipeline: + raise HTTPException(status_code=503, detail="Model not initialized") + + self.logger.info(f"Generating {request.n} image(s): {request.prompt[:100]}...") + + try: + # Parse image size + width, height = self.parse_size(request.size) + self.logger.info(f"Size: {width}x{height}") + + # Generate images + images = [] + for i in range(request.n): + self.logger.info(f"Generating image {i+1}/{request.n}") + + # Flux.1 Schnell uses 4 inference steps for speed + image = self.pipeline( + prompt=request.prompt, + width=width, + height=height, + num_inference_steps=4, # Schnell is optimized for 4 steps + guidance_scale=0.0, # Schnell doesn't use guidance + ).images[0] + + # Convert to base64 + if request.response_format == "b64_json": + image_data = { + "b64_json": self.image_to_base64(image) + } + else: + # For URL format, we'd need to save and serve the file + # For now, we'll return base64 anyway + image_data = { + "b64_json": self.image_to_base64(image) + } + + images.append(image_data) + + self.logger.info(f"Generated {request.n} image(s) successfully") + + return ImageGenerationResponse( + created=1234567890, + data=images + ) + + except Exception as e: + self.logger.error(f"Error generating image: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +if __name__ == "__main__": + service = FluxService() + service.run() diff --git a/models/musicgen/requirements.txt b/models/musicgen/requirements.txt new file mode 100644 index 0000000..a55f831 --- /dev/null +++ b/models/musicgen/requirements.txt @@ -0,0 +1,11 @@ +# MusicGen Music Generation Service Dependencies + +# AudioCraft (contains MusicGen) +audiocraft==1.3.0 + +# PyTorch (required by AudioCraft) +torch==2.1.0 +torchaudio==2.1.0 + +# Additional dependencies +transformers==4.36.0 diff --git a/models/musicgen/server.py b/models/musicgen/server.py new file mode 100644 index 0000000..1b4a4fe --- /dev/null +++ b/models/musicgen/server.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +""" +MusicGen Music Generation Service + +OpenAI-compatible music generation using Meta's MusicGen Medium model. +Provides /v1/audio/generations endpoint. +""" + +import base64 +import io +import os +import tempfile +from typing import Optional + +import torch +import torchaudio +from audiocraft.models import MusicGen +from fastapi import HTTPException +from pydantic import BaseModel, Field + +# Import base service class +import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) +from core.base_service import GPUService + + +class AudioGenerationRequest(BaseModel): + """Music generation request""" + model: str = Field(default="musicgen-medium", description="Model name") + prompt: str = Field(..., description="Text description of the music to generate") + duration: float = Field(default=30.0, ge=1.0, le=30.0, description="Duration in seconds") + temperature: float = Field(default=1.0, ge=0.1, le=2.0, description="Sampling temperature") + top_k: int = Field(default=250, ge=0, le=500, description="Top-k sampling") + top_p: float = Field(default=0.0, ge=0.0, le=1.0, description="Top-p (nucleus) sampling") + cfg_coef: float = Field(default=3.0, ge=1.0, le=15.0, description="Classifier-free guidance coefficient") + response_format: str = Field(default="wav", description="Audio format (wav or mp3)") + + +class AudioGenerationResponse(BaseModel): + """Music generation response""" + audio: str = Field(..., description="Base64-encoded audio data") + format: str = Field(..., description="Audio format (wav or mp3)") + duration: float = Field(..., description="Duration in seconds") + sample_rate: int = Field(..., description="Sample rate in Hz") + + +class MusicGenService(GPUService): + """MusicGen music generation service""" + + def __init__(self): + # Get port from environment or use default + port = int(os.getenv("PORT", "8003")) + super().__init__(name="musicgen-medium", port=port) + + # Service-specific attributes + self.model: Optional[MusicGen] = None + self.model_name = os.getenv("MODEL_NAME", "facebook/musicgen-medium") + + async def initialize(self): + """Initialize MusicGen model""" + await super().initialize() + + self.logger.info(f"Loading MusicGen model: {self.model_name}") + + # Load model + device = "cuda" if torch.cuda.is_available() else "cpu" + self.model = MusicGen.get_pretrained(self.model_name, device=device) + + self.logger.info(f"MusicGen model loaded successfully") + self.logger.info(f"Max duration: 30 seconds at {self.model.sample_rate}Hz") + + async def cleanup(self): + """Cleanup resources""" + await super().cleanup() + if self.model: + self.logger.info("MusicGen model cleanup") + self.model = None + + def create_app(self): + """Create FastAPI routes""" + + @self.app.get("/") + async def root(): + """Root endpoint""" + return { + "service": "MusicGen API Server", + "model": self.model_name, + "max_duration": 30.0, + "sample_rate": self.model.sample_rate if self.model else 32000 + } + + @self.app.get("/v1/models") + async def list_models(): + """List available models (OpenAI-compatible)""" + return { + "object": "list", + "data": [ + { + "id": "musicgen-medium", + "object": "model", + "created": 1234567890, + "owned_by": "meta", + "permission": [], + "root": self.model_name, + "parent": None, + } + ] + } + + @self.app.post("/v1/audio/generations") + async def generate_audio(request: AudioGenerationRequest) -> AudioGenerationResponse: + """Generate music from text prompt""" + if not self.model: + raise HTTPException(status_code=503, detail="Model not initialized") + + self.logger.info(f"Generating music: {request.prompt[:100]}...") + self.logger.info(f"Duration: {request.duration}s, Temperature: {request.temperature}") + + try: + # Set generation parameters + self.model.set_generation_params( + duration=request.duration, + temperature=request.temperature, + top_k=request.top_k, + top_p=request.top_p, + cfg_coef=request.cfg_coef, + ) + + # Generate audio + descriptions = [request.prompt] + with torch.no_grad(): + wav = self.model.generate(descriptions) + + # wav shape: [batch_size, channels, samples] + # Extract first batch item + audio_data = wav[0].cpu() # [channels, samples] + + # Get sample rate + sample_rate = self.model.sample_rate + + # Save to temporary file + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: + temp_path = temp_file.name + torchaudio.save(temp_path, audio_data, sample_rate) + + # Read audio file and encode to base64 + with open(temp_path, 'rb') as f: + audio_bytes = f.read() + + # Clean up temporary file + os.unlink(temp_path) + + # Encode to base64 + audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') + + self.logger.info(f"Generated {request.duration}s of audio") + + return AudioGenerationResponse( + audio=audio_base64, + format="wav", + duration=request.duration, + sample_rate=sample_rate + ) + + except Exception as e: + self.logger.error(f"Error generating audio: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +if __name__ == "__main__": + service = MusicGenService() + service.run() diff --git a/models/vllm/requirements.txt b/models/vllm/requirements.txt new file mode 100644 index 0000000..146b23a --- /dev/null +++ b/models/vllm/requirements.txt @@ -0,0 +1,13 @@ +# vLLM Text Generation Service Dependencies + +# vLLM engine +vllm==0.6.4.post1 + +# PyTorch (required by vLLM) +torch==2.1.0 + +# Transformers (for model loading) +transformers==4.36.0 + +# Additional dependencies +accelerate==0.25.0 diff --git a/models/vllm/server.py b/models/vllm/server.py new file mode 100644 index 0000000..d23ffc4 --- /dev/null +++ b/models/vllm/server.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +""" +vLLM Text Generation Service + +OpenAI-compatible text generation using vLLM and Qwen 2.5 7B Instruct model. +Provides /v1/completions and /v1/chat/completions endpoints. +""" + +import asyncio +import json +import os +from typing import AsyncIterator, Dict, List, Optional + +from fastapi import Request +from fastapi.responses import JSONResponse, StreamingResponse +from pydantic import BaseModel, Field +from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams +from vllm.utils import random_uuid + +# Import base service class +import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) +from core.base_service import GPUService + + +# Request/Response models +class CompletionRequest(BaseModel): + """OpenAI-compatible completion request""" + model: str = Field(default="qwen-2.5-7b") + prompt: str | List[str] = Field(..., description="Text prompt(s)") + max_tokens: int = Field(default=512, ge=1, le=4096) + temperature: float = Field(default=0.7, ge=0.0, le=2.0) + top_p: float = Field(default=1.0, ge=0.0, le=1.0) + n: int = Field(default=1, ge=1, le=10) + stream: bool = Field(default=False) + stop: Optional[str | List[str]] = None + presence_penalty: float = Field(default=0.0, ge=-2.0, le=2.0) + frequency_penalty: float = Field(default=0.0, ge=-2.0, le=2.0) + + +class ChatMessage(BaseModel): + """Chat message format""" + role: str = Field(..., description="Role: system, user, or assistant") + content: str = Field(..., description="Message content") + + +class ChatCompletionRequest(BaseModel): + """OpenAI-compatible chat completion request""" + model: str = Field(default="qwen-2.5-7b") + messages: List[ChatMessage] = Field(..., description="Chat messages") + max_tokens: int = Field(default=512, ge=1, le=4096) + temperature: float = Field(default=0.7, ge=0.0, le=2.0) + top_p: float = Field(default=1.0, ge=0.0, le=1.0) + n: int = Field(default=1, ge=1, le=10) + stream: bool = Field(default=False) + stop: Optional[str | List[str]] = None + + +class VLLMService(GPUService): + """vLLM text generation service""" + + def __init__(self): + # Get port from environment or use default + port = int(os.getenv("PORT", "8001")) + super().__init__(name="vllm-qwen", port=port) + + # Service-specific attributes + self.engine: Optional[AsyncLLMEngine] = None + self.model_name = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-7B-Instruct") + + async def initialize(self): + """Initialize vLLM engine""" + await super().initialize() + + self.logger.info(f"Initializing vLLM AsyncLLMEngine with model: {self.model_name}") + + # Configure engine + engine_args = AsyncEngineArgs( + model=self.model_name, + tensor_parallel_size=1, # Single GPU + gpu_memory_utilization=0.85, # Use 85% of GPU memory + max_model_len=4096, # Context length + dtype="auto", # Auto-detect dtype + download_dir=os.getenv("HF_CACHE_DIR", "/workspace/huggingface_cache"), + trust_remote_code=True, # Some models require this + enforce_eager=False, # Use CUDA graphs for better performance + ) + + # Create async engine + self.engine = AsyncLLMEngine.from_engine_args(engine_args) + + self.logger.info("vLLM AsyncLLMEngine initialized successfully") + + async def cleanup(self): + """Cleanup resources""" + await super().cleanup() + if self.engine: + # vLLM doesn't have an explicit shutdown method + self.logger.info("vLLM engine cleanup") + self.engine = None + + def messages_to_prompt(self, messages: List[ChatMessage]) -> str: + """Convert chat messages to Qwen 2.5 prompt format""" + prompt_parts = [] + + for msg in messages: + role = msg.role + content = msg.content + + if role == "system": + prompt_parts.append(f"<|im_start|>system\n{content}<|im_end|>") + elif role == "user": + prompt_parts.append(f"<|im_start|>user\n{content}<|im_end|>") + elif role == "assistant": + prompt_parts.append(f"<|im_start|>assistant\n{content}<|im_end|>") + + # Add final assistant prompt + prompt_parts.append("<|im_start|>assistant\n") + + return "\n".join(prompt_parts) + + def create_app(self): + """Create FastAPI routes""" + + @self.app.get("/") + async def root(): + """Root endpoint""" + return {"status": "ok", "model": self.model_name} + + @self.app.get("/v1/models") + async def list_models(): + """OpenAI-compatible models endpoint""" + return { + "object": "list", + "data": [ + { + "id": "qwen-2.5-7b", + "object": "model", + "created": 1234567890, + "owned_by": "pivoine-gpu", + "permission": [], + "root": self.model_name, + "parent": None, + } + ] + } + + @self.app.post("/v1/completions") + async def create_completion(request: CompletionRequest): + """OpenAI-compatible completion endpoint""" + if not self.engine: + return JSONResponse( + status_code=503, + content={"error": "Engine not initialized"} + ) + + # Handle both single prompt and batch prompts + prompts = [request.prompt] if isinstance(request.prompt, str) else request.prompt + + # Configure sampling parameters + sampling_params = SamplingParams( + temperature=request.temperature, + top_p=request.top_p, + max_tokens=request.max_tokens, + n=request.n, + stop=request.stop if request.stop else [], + presence_penalty=request.presence_penalty, + frequency_penalty=request.frequency_penalty, + ) + + # Generate completions + results = [] + for prompt in prompts: + request_id = random_uuid() + + if request.stream: + # Streaming response + async def generate_stream(): + async for output in self.engine.generate(prompt, sampling_params, request_id): + chunk = { + "id": request_id, + "object": "text_completion", + "created": 1234567890, + "model": request.model, + "choices": [ + { + "text": output.outputs[0].text, + "index": 0, + "logprobs": None, + "finish_reason": output.outputs[0].finish_reason, + } + ] + } + yield f"data: {json.dumps(chunk)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(generate_stream(), media_type="text/event-stream") + else: + # Non-streaming response + async for output in self.engine.generate(prompt, sampling_params, request_id): + final_output = output + + results.append({ + "text": final_output.outputs[0].text, + "index": len(results), + "logprobs": None, + "finish_reason": final_output.outputs[0].finish_reason, + }) + + return { + "id": random_uuid(), + "object": "text_completion", + "created": 1234567890, + "model": request.model, + "choices": results, + "usage": { + "prompt_tokens": 0, # vLLM doesn't expose this easily + "completion_tokens": 0, + "total_tokens": 0, + } + } + + @self.app.post("/v1/chat/completions") + async def create_chat_completion(request: ChatCompletionRequest): + """OpenAI-compatible chat completion endpoint""" + if not self.engine: + return JSONResponse( + status_code=503, + content={"error": "Engine not initialized"} + ) + + # Convert messages to prompt + prompt = self.messages_to_prompt(request.messages) + + # Configure sampling parameters + sampling_params = SamplingParams( + temperature=request.temperature, + top_p=request.top_p, + max_tokens=request.max_tokens, + n=request.n, + stop=request.stop if request.stop else ["<|im_end|>"], + ) + + request_id = random_uuid() + + if request.stream: + # Streaming response + async def generate_stream(): + async for output in self.engine.generate(prompt, sampling_params, request_id): + chunk = { + "id": request_id, + "object": "chat.completion.chunk", + "created": 1234567890, + "model": request.model, + "choices": [ + { + "index": 0, + "delta": {"content": output.outputs[0].text}, + "finish_reason": output.outputs[0].finish_reason, + } + ] + } + yield f"data: {json.dumps(chunk)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(generate_stream(), media_type="text/event-stream") + else: + # Non-streaming response + async for output in self.engine.generate(prompt, sampling_params, request_id): + final_output = output + + return { + "id": request_id, + "object": "chat.completion", + "created": 1234567890, + "model": request.model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": final_output.outputs[0].text, + }, + "finish_reason": final_output.outputs[0].finish_reason, + } + ], + "usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + } + } + + +if __name__ == "__main__": + service = VLLMService() + service.run() diff --git a/playbook.yml b/playbook.yml new file mode 100644 index 0000000..54c8e43 --- /dev/null +++ b/playbook.yml @@ -0,0 +1,417 @@ +--- +# +# RunPod AI Infrastructure Ansible Playbook +# +# This playbook provisions a RunPod GPU instance with multi-modal AI services. +# It replaces all bash scripts with reproducible Ansible tasks. +# +# Usage: +# ansible-playbook playbook.yml # Full deployment +# ansible-playbook playbook.yml --tags base # Install system packages +# ansible-playbook playbook.yml --tags python # Setup Python environment +# ansible-playbook playbook.yml --tags models # Download models only +# ansible-playbook playbook.yml --tags validate # Validate installation +# +# Tags: +# base - System packages and dependencies +# python - Python environment setup +# dependencies- Install Python packages +# models - Download AI models +# tailscale - Install and configure Tailscale +# systemd - Configure systemd services +# validate - Health checks and validation +# + +- name: Provision RunPod GPU Instance for AI Services + hosts: localhost + connection: local + become: false + vars: + # Paths + workspace_dir: /workspace + ai_dir: "{{ workspace_dir }}/ai" + cache_dir: "{{ workspace_dir }}/huggingface_cache" + models_dir: "{{ workspace_dir }}/models" + + # Python configuration + python_version: "3.10" + pip_version: "23.3.1" + + # Model configuration + models: + vllm: + name: "Qwen/Qwen2.5-7B-Instruct" + size_gb: 14 + flux: + name: "black-forest-labs/FLUX.1-schnell" + size_gb: 12 + musicgen: + name: "facebook/musicgen-medium" + size_gb: 11 + + # Service configuration + services: + - name: orchestrator + port: 9000 + script: model-orchestrator/orchestrator_subprocess.py + - name: vllm + port: 8001 + script: models/vllm/server.py + - name: flux + port: 8002 + script: models/flux/server.py + - name: musicgen + port: 8003 + script: models/musicgen/server.py + + tasks: + # + # Base System Setup + # + - name: Base system packages + tags: [base, always] + block: + - name: Check GPU availability + shell: nvidia-smi + register: nvidia_check + changed_when: false + failed_when: nvidia_check.rc != 0 + + - name: Display GPU information + debug: + msg: "{{ nvidia_check.stdout_lines }}" + + - name: Ensure workspace directory exists + file: + path: "{{ workspace_dir }}" + state: directory + mode: '0755' + + - name: Update apt cache + apt: + update_cache: yes + cache_valid_time: 3600 + become: true + + - name: Install base system packages + apt: + name: + - build-essential + - python3-dev + - python3-pip + - python3-venv + - git + - curl + - wget + - vim + - htop + - tmux + - net-tools + state: present + become: true + + # + # Python Environment Setup + # + - name: Python environment setup + tags: [python] + block: + - name: Upgrade pip + pip: + name: pip + version: "{{ pip_version }}" + executable: pip3 + extra_args: --upgrade + become: true + + - name: Install core Python packages + pip: + requirements: "{{ ai_dir }}/core/requirements.txt" + executable: pip3 + become: true + + # + # Install Model Dependencies + # + - name: Install model dependencies + tags: [dependencies] + block: + - name: Install vLLM dependencies + pip: + requirements: "{{ ai_dir }}/models/vllm/requirements.txt" + executable: pip3 + become: true + + - name: Install Flux dependencies + pip: + requirements: "{{ ai_dir }}/models/flux/requirements.txt" + executable: pip3 + become: true + + - name: Install MusicGen dependencies + pip: + requirements: "{{ ai_dir }}/models/musicgen/requirements.txt" + executable: pip3 + become: true + + # + # Download AI Models + # + - name: Download AI models + tags: [models] + block: + - name: Create model cache directories + file: + path: "{{ item }}" + state: directory + mode: '0755' + loop: + - "{{ cache_dir }}" + - "{{ models_dir }}/flux" + - "{{ models_dir }}/musicgen" + + - name: Check if models are already cached + stat: + path: "{{ cache_dir }}/models--{{ item.value.name | regex_replace('/', '--') }}" + register: model_cache_check + loop: "{{ models | dict2items }}" + loop_control: + label: "{{ item.key }}" + + - name: Download Qwen 2.5 7B model (14GB, ~15 minutes) + shell: | + python3 -c " + from transformers import AutoTokenizer, AutoModelForCausalLM + import os + os.environ['HF_HOME'] = '{{ cache_dir }}' + print('Downloading Qwen 2.5 7B Instruct...') + AutoTokenizer.from_pretrained('{{ models.vllm.name }}') + print('Tokenizer downloaded successfully') + " + environment: + HF_TOKEN: "{{ lookup('env', 'HF_TOKEN') }}" + HF_HOME: "{{ cache_dir }}" + when: not (model_cache_check.results[0].stat.exists | default(false)) + register: vllm_download + async: 1800 # 30 minutes timeout + poll: 30 + + - name: Download Flux.1 Schnell model (12GB, ~12 minutes) + shell: | + python3 -c " + from diffusers import FluxPipeline + import os + os.environ['HF_HOME'] = '{{ cache_dir }}' + print('Downloading Flux.1 Schnell...') + FluxPipeline.from_pretrained( + '{{ models.flux.name }}', + cache_dir='{{ cache_dir }}' + ) + print('Flux.1 downloaded successfully') + " + environment: + HF_TOKEN: "{{ lookup('env', 'HF_TOKEN') }}" + HF_HOME: "{{ cache_dir }}" + when: not (model_cache_check.results[1].stat.exists | default(false)) + register: flux_download + async: 1200 # 20 minutes timeout + poll: 30 + + - name: Download MusicGen Medium model (11GB, ~10 minutes) + shell: | + python3 -c " + from audiocraft.models import MusicGen + import os + os.environ['HF_HOME'] = '{{ cache_dir }}' + print('Downloading MusicGen Medium...') + MusicGen.get_pretrained('{{ models.musicgen.name }}') + print('MusicGen downloaded successfully') + " + environment: + HF_TOKEN: "{{ lookup('env', 'HF_TOKEN') }}" + HF_HOME: "{{ cache_dir }}" + when: not (model_cache_check.results[2].stat.exists | default(false)) + register: musicgen_download + async: 900 # 15 minutes timeout + poll: 30 + + - name: Display model download summary + debug: + msg: | + Model downloads completed: + - Qwen 2.5 7B: {{ 'Downloaded' if vllm_download.changed | default(false) else 'Already cached' }} + - Flux.1 Schnell: {{ 'Downloaded' if flux_download.changed | default(false) else 'Already cached' }} + - MusicGen Medium: {{ 'Downloaded' if musicgen_download.changed | default(false) else 'Already cached' }} + Total cache size: ~37GB + + # + # Tailscale VPN + # + - name: Install and configure Tailscale + tags: [tailscale] + block: + - name: Check if Tailscale is installed + command: which tailscale + register: tailscale_check + changed_when: false + failed_when: false + + - name: Install Tailscale + shell: curl -fsSL https://tailscale.com/install.sh | sh + become: true + when: tailscale_check.rc != 0 + + - name: Display Tailscale setup instructions + debug: + msg: | + Tailscale installed. To connect: + 1. Start tailscaled: tailscaled --tun=userspace-networking --socks5-server=localhost:1055 & + 2. Authenticate: tailscale up --advertise-tags=tag:gpu + 3. Get IP: tailscale ip -4 + + Note: Authentication requires manual intervention via provided URL + + # + # Systemd Services (Optional) + # + - name: Configure systemd services + tags: [systemd, never] # never = skip by default + block: + - name: Create systemd service for orchestrator + template: + src: "{{ ai_dir }}/systemd/ai-orchestrator.service.j2" + dest: /etc/systemd/system/ai-orchestrator.service + mode: '0644' + become: true + + - name: Reload systemd daemon + systemd: + daemon_reload: yes + become: true + + - name: Enable orchestrator service + systemd: + name: ai-orchestrator + enabled: yes + become: true + + - name: Display systemd instructions + debug: + msg: | + Systemd service configured. To manage: + - Start: sudo systemctl start ai-orchestrator + - Stop: sudo systemctl stop ai-orchestrator + - Status: sudo systemctl status ai-orchestrator + - Logs: sudo journalctl -u ai-orchestrator -f + + # + # Validation + # + - name: Validate installation + tags: [validate, never] # never = skip by default, run explicitly + block: + - name: Check Python packages + shell: pip3 list | grep -E "(fastapi|uvicorn|torch|vllm|diffusers|audiocraft)" + register: pip_check + changed_when: false + + - name: Display installed packages + debug: + msg: "{{ pip_check.stdout_lines }}" + + - name: Check GPU memory + shell: nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits + register: gpu_memory + changed_when: false + + - name: Display GPU memory + debug: + msg: "Free GPU memory: {{ gpu_memory.stdout }} MB" + + - name: Check cached models + shell: du -sh {{ cache_dir }} + register: cache_size + changed_when: false + + - name: Display cache information + debug: + msg: "Model cache size: {{ cache_size.stdout }}" + + - name: Verify service scripts are executable + file: + path: "{{ ai_dir }}/{{ item.script }}" + mode: '0755' + loop: "{{ services }}" + + - name: Display validation summary + debug: + msg: | + ✓ Installation validated successfully! + + Next steps: + 1. Start orchestrator: python3 {{ ai_dir }}/model-orchestrator/orchestrator_subprocess.py + 2. Test endpoint: curl http://localhost:9000/health + 3. Configure LiteLLM on VPS to connect via Tailscale + + Services: + {% for service in services %} + - {{ service.name }}: http://localhost:{{ service.port }} + {% endfor %} + + # + # Cleanup for Template Creation + # + - name: Cleanup for template creation + tags: [cleanup, never] # never = skip by default, run explicitly + block: + - name: Remove sensitive files + file: + path: "{{ item }}" + state: absent + loop: + - "{{ ai_dir }}/.env" + - /root/.ssh/known_hosts + - /root/.bash_history + - /root/.python_history + + - name: Clear system logs + shell: find /var/log -type f -name "*.log" -delete + become: true + ignore_errors: yes + + - name: Create template version marker + copy: + dest: "{{ workspace_dir }}/TEMPLATE_VERSION" + content: | + RunPod Multi-Modal AI Template (Process-Based Architecture) + Version: 2.0 + Created: {{ ansible_date_time.iso8601 }} + + Components: + - Python {{ python_version }} + - Orchestrator (process-based) + - Text Generation (vLLM + Qwen 2.5 7B) + - Image Generation (Flux.1 Schnell) + - Music Generation (MusicGen Medium) + + Models Cached: ~37GB + Architecture: No Docker, direct Python execution + + Deployment: + 1. Create .env file with HF_TOKEN + 2. Run: python3 {{ ai_dir }}/model-orchestrator/orchestrator_subprocess.py + 3. Access: http://localhost:9000/health + + - name: Display template creation instructions + debug: + msg: | + Template prepared successfully! + + Next steps in RunPod dashboard: + 1. Stop all running services + 2. Go to My Pods → Select this pod → ⋮ → Save as Template + 3. Name: multi-modal-ai-process-v2.0 + 4. Description: Process-based multi-modal AI (text/image/music) + 5. Save and test deployment from template + + Template enables 2-3 minute deployments instead of 60+ minutes! diff --git a/scripts/download-models.sh b/scripts/download-models.sh new file mode 100644 index 0000000..e093f66 --- /dev/null +++ b/scripts/download-models.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Download AI Models +# Wrapper for Ansible models tag +# + +set -e + +cd "$(dirname "$0")/.." + +echo "=========================================" +echo " Downloading AI Models (~37GB)" +echo "=========================================" +echo "" + +# Source .env if it exists +if [ -f .env ]; then + set -a + source .env + set +a +fi + +# Check HF_TOKEN +if [ -z "$HF_TOKEN" ]; then + echo "Error: HF_TOKEN not set" + echo "Add HF_TOKEN to .env file" + exit 1 +fi + +# Run Ansible with models tag +ansible-playbook playbook.yml --tags models + +echo "" +echo "=========================================" +echo " Model download complete!" +echo "=========================================" diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100644 index 0000000..7a24314 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# +# Install AI Infrastructure +# Wrapper script for Ansible playbook +# +# Usage: +# ./install.sh # Full installation +# ./install.sh --tags base # Install specific components +# + +set -e + +cd "$(dirname "$0")/.." + +echo "=========================================" +echo " RunPod AI Infrastructure Installation" +echo "=========================================" +echo "" + +# Check if Ansible is installed +if ! command -v ansible-playbook &> /dev/null; then + echo "Ansible not found. Installing..." + sudo apt update + sudo apt install -y ansible +fi + +# Check for .env file +if [ ! -f .env ]; then + echo "Warning: .env file not found" + echo "Copy .env.example to .env and add your HF_TOKEN" + echo "" +fi + +# Source .env if it exists +if [ -f .env ]; then + set -a + source .env + set +a +fi + +# Run Ansible playbook +echo "Running Ansible playbook..." +echo "" + +ansible-playbook playbook.yml "$@" + +echo "" +echo "=========================================" +echo " Installation complete!" +echo "=========================================" diff --git a/scripts/start-all.sh b/scripts/start-all.sh new file mode 100644 index 0000000..ffed9e4 --- /dev/null +++ b/scripts/start-all.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# +# Start AI Orchestrator +# Starts the model orchestrator which manages all AI services +# + +set -e + +cd "$(dirname "$0")/.." + +echo "=========================================" +echo " Starting AI Orchestrator" +echo "=========================================" +echo "" + +# Check for .env file +if [ ! -f .env ]; then + echo "Warning: .env file not found" + echo "Copy .env.example to .env and add your configuration" + echo "" +fi + +# Source .env if it exists +if [ -f .env ]; then + set -a + source .env + set +a +fi + +# Start orchestrator +echo "Starting orchestrator on port 9000..." +python3 model-orchestrator/orchestrator_subprocess.py + +echo "" +echo "Orchestrator stopped" diff --git a/scripts/stop-all.sh b/scripts/stop-all.sh new file mode 100644 index 0000000..df06e6c --- /dev/null +++ b/scripts/stop-all.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# Stop AI Services +# Gracefully stops all running AI services +# + +set -e + +echo "=========================================" +echo " Stopping AI Services" +echo "=========================================" +echo "" + +# Kill orchestrator and model processes +echo "Stopping orchestrator..." +pkill -f "orchestrator_subprocess.py" || echo "Orchestrator not running" + +echo "Stopping model services..." +pkill -f "models/vllm/server.py" || echo "vLLM not running" +pkill -f "models/flux/server.py" || echo "Flux not running" +pkill -f "models/musicgen/server.py" || echo "MusicGen not running" + +echo "" +echo "All services stopped"