FastAPI wrapper around FaceFusion v3.5.3 submodule with: - Sync and async (job-based) processing endpoints - FaceFusion bridge with manual key registration and Lock-serialized processing - Multi-target Dockerfile (CPU + CUDA GPU) - Docker Compose configs for dev, prod-cpu, and prod-gpu - Gitea CI/CD workflow with dual image builds - All 11 FaceFusion processors supported via options API Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
116 lines
3.7 KiB
Python
116 lines
3.7 KiB
Python
import logging
|
|
import queue
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ApiJobStatus(str, Enum):
|
|
pending = 'pending'
|
|
processing = 'processing'
|
|
completed = 'completed'
|
|
failed = 'failed'
|
|
cancelled = 'cancelled'
|
|
|
|
|
|
@dataclass
|
|
class ApiJob:
|
|
job_id: str
|
|
status: ApiJobStatus = ApiJobStatus.pending
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
updated_at: Optional[datetime] = None
|
|
error: Optional[str] = None
|
|
output_path: Optional[str] = None
|
|
args: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class WorkerQueue:
|
|
def __init__(self) -> None:
|
|
self._queue: queue.Queue[str] = queue.Queue()
|
|
self._jobs: Dict[str, ApiJob] = {}
|
|
self._lock = threading.Lock()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._running = False
|
|
self._process_fn: Optional[Callable[[Dict[str, Any]], bool]] = None
|
|
|
|
def start(self, process_fn: Callable[[Dict[str, Any]], bool]) -> None:
|
|
self._process_fn = process_fn
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._worker_loop, daemon=True, name='ff-worker')
|
|
self._thread.start()
|
|
logger.info('Worker thread started')
|
|
|
|
def stop(self) -> None:
|
|
self._running = False
|
|
self._queue.put('') # sentinel to unblock
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
logger.info('Worker thread stopped')
|
|
|
|
def submit(self, job_id: str, args: Dict[str, Any]) -> ApiJob:
|
|
job = ApiJob(job_id=job_id, args=args)
|
|
with self._lock:
|
|
self._jobs[job_id] = job
|
|
self._queue.put(job_id)
|
|
return job
|
|
|
|
def get_job(self, job_id: str) -> Optional[ApiJob]:
|
|
with self._lock:
|
|
return self._jobs.get(job_id)
|
|
|
|
def delete_job(self, job_id: str) -> bool:
|
|
with self._lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
return False
|
|
if job.status == ApiJobStatus.processing:
|
|
job.status = ApiJobStatus.cancelled
|
|
from app.services.facefusion_bridge import stop_processing
|
|
stop_processing()
|
|
elif job.status == ApiJobStatus.pending:
|
|
job.status = ApiJobStatus.cancelled
|
|
self._jobs.pop(job_id, None)
|
|
return True
|
|
|
|
def list_jobs(self) -> List[ApiJob]:
|
|
with self._lock:
|
|
return list(self._jobs.values())
|
|
|
|
def _worker_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
job_id = self._queue.get(timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if not job_id: # sentinel
|
|
break
|
|
|
|
with self._lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job or job.status == ApiJobStatus.cancelled:
|
|
continue
|
|
job.status = ApiJobStatus.processing
|
|
job.updated_at = datetime.now()
|
|
|
|
try:
|
|
self._process_fn(job.args)
|
|
with self._lock:
|
|
job.status = ApiJobStatus.completed
|
|
job.updated_at = datetime.now()
|
|
job.output_path = job.args.get('output_path')
|
|
logger.info(f'Job {job_id} completed')
|
|
except Exception as e:
|
|
with self._lock:
|
|
job.status = ApiJobStatus.failed
|
|
job.updated_at = datetime.now()
|
|
job.error = str(e)
|
|
logger.error(f'Job {job_id} failed: {e}')
|
|
|
|
|
|
worker_queue = WorkerQueue()
|