Initial Real-ESRGAN API project setup

This commit is contained in:
Developer
2026-02-16 19:56:25 +01:00
commit 0e59652575
34 changed files with 3668 additions and 0 deletions

View File

@@ -0,0 +1,126 @@
"""File management utilities."""
import logging
import os
import shutil
import uuid
from typing import List, Tuple
from fastapi import UploadFile
from app.config import settings
logger = logging.getLogger(__name__)
def ensure_directories() -> None:
"""Ensure all required directories exist."""
for path in (settings.upload_dir, settings.output_dir, settings.models_dir,
settings.temp_dir, settings.jobs_dir):
os.makedirs(path, exist_ok=True)
logger.info(f'Directory ensured: {path}')
def create_request_dir() -> str:
"""Create a unique request directory."""
request_id = str(uuid.uuid4())
request_dir = os.path.join(settings.upload_dir, request_id)
os.makedirs(request_dir, exist_ok=True)
return request_dir
async def save_upload(file: UploadFile, directory: str) -> str:
"""Save uploaded file to directory."""
ext = os.path.splitext(file.filename or '')[1] or '.jpg'
filename = f'{uuid.uuid4()}{ext}'
filepath = os.path.join(directory, filename)
with open(filepath, 'wb') as f:
while chunk := await file.read(1024 * 1024):
f.write(chunk)
logger.debug(f'File saved: {filepath}')
return filepath
async def save_uploads(files: List[UploadFile], directory: str) -> List[str]:
"""Save multiple uploaded files to directory."""
paths = []
for file in files:
path = await save_upload(file, directory)
paths.append(path)
return paths
def generate_output_path(input_path: str, suffix: str = '_upscaled') -> str:
"""Generate output path for processed image."""
base, ext = os.path.splitext(input_path)
name = os.path.basename(base)
filename = f'{name}{suffix}{ext}'
return os.path.join(settings.output_dir, filename)
def cleanup_directory(directory: str) -> None:
"""Remove directory and all contents."""
if os.path.isdir(directory):
shutil.rmtree(directory, ignore_errors=True)
logger.debug(f'Cleaned up directory: {directory}')
def cleanup_file(filepath: str) -> None:
"""Remove a file."""
if os.path.isfile(filepath):
os.remove(filepath)
logger.debug(f'Cleaned up file: {filepath}')
def get_directory_size_mb(directory: str) -> float:
"""Get total size of directory in MB."""
total = 0
for dirpath, dirnames, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
if os.path.exists(fp):
total += os.path.getsize(fp)
return total / (1024 * 1024)
def list_model_files() -> List[Tuple[str, str, int]]:
"""Return list of (name, path, size_bytes) for all .pth/.onnx files in models dir."""
models = []
models_dir = settings.models_dir
if not os.path.isdir(models_dir):
return models
for name in sorted(os.listdir(models_dir)):
if name.endswith(('.pth', '.onnx', '.pt', '.safetensors')):
path = os.path.join(models_dir, name)
try:
size = os.path.getsize(path)
models.append((name, path, size))
except OSError:
logger.warning(f'Could not get size of model: {path}')
return models
def cleanup_old_jobs(hours: int = 24) -> int:
"""Clean up old job directories (older than specified hours)."""
import time
cutoff_time = time.time() - (hours * 3600)
cleaned = 0
if not os.path.isdir(settings.jobs_dir):
return cleaned
for item in os.listdir(settings.jobs_dir):
item_path = os.path.join(settings.jobs_dir, item)
if os.path.isdir(item_path):
try:
if os.path.getmtime(item_path) < cutoff_time:
cleanup_directory(item_path)
cleaned += 1
except OSError:
pass
if cleaned > 0:
logger.info(f'Cleaned up {cleaned} old job directories')
return cleaned