Files
facefusion-api/app/services/worker.py
Sebastian Krüger 800edc08ea Initial commit: FaceFusion REST API
FastAPI wrapper around FaceFusion v3.5.3 submodule with:
- Sync and async (job-based) processing endpoints
- FaceFusion bridge with manual key registration and Lock-serialized processing
- Multi-target Dockerfile (CPU + CUDA GPU)
- Docker Compose configs for dev, prod-cpu, and prod-gpu
- Gitea CI/CD workflow with dual image builds
- All 11 FaceFusion processors supported via options API

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:58:33 +01:00

116 lines
3.7 KiB
Python

import logging
import queue
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
class ApiJobStatus(str, Enum):
pending = 'pending'
processing = 'processing'
completed = 'completed'
failed = 'failed'
cancelled = 'cancelled'
@dataclass
class ApiJob:
job_id: str
status: ApiJobStatus = ApiJobStatus.pending
created_at: datetime = field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
error: Optional[str] = None
output_path: Optional[str] = None
args: Dict[str, Any] = field(default_factory=dict)
class WorkerQueue:
def __init__(self) -> None:
self._queue: queue.Queue[str] = queue.Queue()
self._jobs: Dict[str, ApiJob] = {}
self._lock = threading.Lock()
self._thread: Optional[threading.Thread] = None
self._running = False
self._process_fn: Optional[Callable[[Dict[str, Any]], bool]] = None
def start(self, process_fn: Callable[[Dict[str, Any]], bool]) -> None:
self._process_fn = process_fn
self._running = True
self._thread = threading.Thread(target=self._worker_loop, daemon=True, name='ff-worker')
self._thread.start()
logger.info('Worker thread started')
def stop(self) -> None:
self._running = False
self._queue.put('') # sentinel to unblock
if self._thread:
self._thread.join(timeout=5)
logger.info('Worker thread stopped')
def submit(self, job_id: str, args: Dict[str, Any]) -> ApiJob:
job = ApiJob(job_id=job_id, args=args)
with self._lock:
self._jobs[job_id] = job
self._queue.put(job_id)
return job
def get_job(self, job_id: str) -> Optional[ApiJob]:
with self._lock:
return self._jobs.get(job_id)
def delete_job(self, job_id: str) -> bool:
with self._lock:
job = self._jobs.get(job_id)
if not job:
return False
if job.status == ApiJobStatus.processing:
job.status = ApiJobStatus.cancelled
from app.services.facefusion_bridge import stop_processing
stop_processing()
elif job.status == ApiJobStatus.pending:
job.status = ApiJobStatus.cancelled
self._jobs.pop(job_id, None)
return True
def list_jobs(self) -> List[ApiJob]:
with self._lock:
return list(self._jobs.values())
def _worker_loop(self) -> None:
while self._running:
try:
job_id = self._queue.get(timeout=1)
except queue.Empty:
continue
if not job_id: # sentinel
break
with self._lock:
job = self._jobs.get(job_id)
if not job or job.status == ApiJobStatus.cancelled:
continue
job.status = ApiJobStatus.processing
job.updated_at = datetime.now()
try:
self._process_fn(job.args)
with self._lock:
job.status = ApiJobStatus.completed
job.updated_at = datetime.now()
job.output_path = job.args.get('output_path')
logger.info(f'Job {job_id} completed')
except Exception as e:
with self._lock:
job.status = ApiJobStatus.failed
job.updated_at = datetime.now()
job.error = str(e)
logger.error(f'Job {job_id} failed: {e}')
worker_queue = WorkerQueue()