import logging import queue import threading from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, List, Optional logger = logging.getLogger(__name__) class ApiJobStatus(str, Enum): pending = 'pending' processing = 'processing' completed = 'completed' failed = 'failed' cancelled = 'cancelled' @dataclass class ApiJob: job_id: str status: ApiJobStatus = ApiJobStatus.pending created_at: datetime = field(default_factory=datetime.now) updated_at: Optional[datetime] = None error: Optional[str] = None output_path: Optional[str] = None args: Dict[str, Any] = field(default_factory=dict) class WorkerQueue: def __init__(self) -> None: self._queue: queue.Queue[str] = queue.Queue() self._jobs: Dict[str, ApiJob] = {} self._lock = threading.Lock() self._thread: Optional[threading.Thread] = None self._running = False self._process_fn: Optional[Callable[[Dict[str, Any]], bool]] = None def start(self, process_fn: Callable[[Dict[str, Any]], bool]) -> None: self._process_fn = process_fn self._running = True self._thread = threading.Thread(target=self._worker_loop, daemon=True, name='ff-worker') self._thread.start() logger.info('Worker thread started') def stop(self) -> None: self._running = False self._queue.put('') # sentinel to unblock if self._thread: self._thread.join(timeout=5) logger.info('Worker thread stopped') def submit(self, job_id: str, args: Dict[str, Any]) -> ApiJob: job = ApiJob(job_id=job_id, args=args) with self._lock: self._jobs[job_id] = job self._queue.put(job_id) return job def get_job(self, job_id: str) -> Optional[ApiJob]: with self._lock: return self._jobs.get(job_id) def delete_job(self, job_id: str) -> bool: with self._lock: job = self._jobs.get(job_id) if not job: return False if job.status == ApiJobStatus.processing: job.status = ApiJobStatus.cancelled from app.services.facefusion_bridge import stop_processing stop_processing() elif job.status == ApiJobStatus.pending: job.status = ApiJobStatus.cancelled self._jobs.pop(job_id, None) return True def list_jobs(self) -> List[ApiJob]: with self._lock: return list(self._jobs.values()) def _worker_loop(self) -> None: while self._running: try: job_id = self._queue.get(timeout=1) except queue.Empty: continue if not job_id: # sentinel break with self._lock: job = self._jobs.get(job_id) if not job or job.status == ApiJobStatus.cancelled: continue job.status = ApiJobStatus.processing job.updated_at = datetime.now() try: self._process_fn(job.args) with self._lock: job.status = ApiJobStatus.completed job.updated_at = datetime.now() job.output_path = job.args.get('output_path') logger.info(f'Job {job_id} completed') except Exception as e: with self._lock: job.status = ApiJobStatus.failed job.updated_at = datetime.now() job.error = str(e) logger.error(f'Job {job_id} failed: {e}') worker_queue = WorkerQueue()