"""File management utilities.""" import logging import os import shutil import uuid from typing import List, Tuple from fastapi import UploadFile from app.config import settings logger = logging.getLogger(__name__) def ensure_directories() -> None: """Ensure all required directories exist.""" for path in (settings.upload_dir, settings.output_dir, settings.models_dir, settings.temp_dir, settings.jobs_dir): os.makedirs(path, exist_ok=True) logger.info(f'Directory ensured: {path}') def create_request_dir() -> str: """Create a unique request directory.""" request_id = str(uuid.uuid4()) request_dir = os.path.join(settings.upload_dir, request_id) os.makedirs(request_dir, exist_ok=True) return request_dir async def save_upload(file: UploadFile, directory: str) -> str: """Save uploaded file to directory.""" ext = os.path.splitext(file.filename or '')[1] or '.jpg' filename = f'{uuid.uuid4()}{ext}' filepath = os.path.join(directory, filename) with open(filepath, 'wb') as f: while chunk := await file.read(1024 * 1024): f.write(chunk) logger.debug(f'File saved: {filepath}') return filepath async def save_uploads(files: List[UploadFile], directory: str) -> List[str]: """Save multiple uploaded files to directory.""" paths = [] for file in files: path = await save_upload(file, directory) paths.append(path) return paths def generate_output_path(input_path: str, suffix: str = '_upscaled') -> str: """Generate output path for processed image.""" base, ext = os.path.splitext(input_path) name = os.path.basename(base) filename = f'{name}{suffix}{ext}' return os.path.join(settings.output_dir, filename) def cleanup_directory(directory: str) -> None: """Remove directory and all contents.""" if os.path.isdir(directory): shutil.rmtree(directory, ignore_errors=True) logger.debug(f'Cleaned up directory: {directory}') def cleanup_file(filepath: str) -> None: """Remove a file.""" if os.path.isfile(filepath): os.remove(filepath) logger.debug(f'Cleaned up file: {filepath}') def get_directory_size_mb(directory: str) -> float: """Get total size of directory in MB.""" total = 0 for dirpath, dirnames, filenames in os.walk(directory): for f in filenames: fp = os.path.join(dirpath, f) if os.path.exists(fp): total += os.path.getsize(fp) return total / (1024 * 1024) def list_model_files() -> List[Tuple[str, str, int]]: """Return list of (name, path, size_bytes) for all .pth/.onnx files in models dir.""" models = [] models_dir = settings.models_dir if not os.path.isdir(models_dir): return models for name in sorted(os.listdir(models_dir)): if name.endswith(('.pth', '.onnx', '.pt', '.safetensors')): path = os.path.join(models_dir, name) try: size = os.path.getsize(path) models.append((name, path, size)) except OSError: logger.warning(f'Could not get size of model: {path}') return models def cleanup_old_jobs(hours: int = 24) -> int: """Clean up old job directories (older than specified hours).""" import time cutoff_time = time.time() - (hours * 3600) cleaned = 0 if not os.path.isdir(settings.jobs_dir): return cleaned for item in os.listdir(settings.jobs_dir): item_path = os.path.join(settings.jobs_dir, item) if os.path.isdir(item_path): try: if os.path.getmtime(item_path) < cutoff_time: cleanup_directory(item_path) cleaned += 1 except OSError: pass if cleaned > 0: logger.info(f'Cleaned up {cleaned} old job directories') return cleaned