Files

116 lines
3.7 KiB
Python
Raw Permalink Normal View History

import logging
import queue
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
class ApiJobStatus(str, Enum):
pending = 'pending'
processing = 'processing'
completed = 'completed'
failed = 'failed'
cancelled = 'cancelled'
@dataclass
class ApiJob:
job_id: str
status: ApiJobStatus = ApiJobStatus.pending
created_at: datetime = field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
error: Optional[str] = None
output_path: Optional[str] = None
args: Dict[str, Any] = field(default_factory=dict)
class WorkerQueue:
def __init__(self) -> None:
self._queue: queue.Queue[str] = queue.Queue()
self._jobs: Dict[str, ApiJob] = {}
self._lock = threading.Lock()
self._thread: Optional[threading.Thread] = None
self._running = False
self._process_fn: Optional[Callable[[Dict[str, Any]], bool]] = None
def start(self, process_fn: Callable[[Dict[str, Any]], bool]) -> None:
self._process_fn = process_fn
self._running = True
self._thread = threading.Thread(target=self._worker_loop, daemon=True, name='ff-worker')
self._thread.start()
logger.info('Worker thread started')
def stop(self) -> None:
self._running = False
self._queue.put('') # sentinel to unblock
if self._thread:
self._thread.join(timeout=5)
logger.info('Worker thread stopped')
def submit(self, job_id: str, args: Dict[str, Any]) -> ApiJob:
job = ApiJob(job_id=job_id, args=args)
with self._lock:
self._jobs[job_id] = job
self._queue.put(job_id)
return job
def get_job(self, job_id: str) -> Optional[ApiJob]:
with self._lock:
return self._jobs.get(job_id)
def delete_job(self, job_id: str) -> bool:
with self._lock:
job = self._jobs.get(job_id)
if not job:
return False
if job.status == ApiJobStatus.processing:
job.status = ApiJobStatus.cancelled
from app.services.facefusion_bridge import stop_processing
stop_processing()
elif job.status == ApiJobStatus.pending:
job.status = ApiJobStatus.cancelled
self._jobs.pop(job_id, None)
return True
def list_jobs(self) -> List[ApiJob]:
with self._lock:
return list(self._jobs.values())
def _worker_loop(self) -> None:
while self._running:
try:
job_id = self._queue.get(timeout=1)
except queue.Empty:
continue
if not job_id: # sentinel
break
with self._lock:
job = self._jobs.get(job_id)
if not job or job.status == ApiJobStatus.cancelled:
continue
job.status = ApiJobStatus.processing
job.updated_at = datetime.now()
try:
self._process_fn(job.args)
with self._lock:
job.status = ApiJobStatus.completed
job.updated_at = datetime.now()
job.output_path = job.args.get('output_path')
logger.info(f'Job {job_id} completed')
except Exception as e:
with self._lock:
job.status = ApiJobStatus.failed
job.updated_at = datetime.now()
job.error = str(e)
logger.error(f'Job {job_id} failed: {e}')
worker_queue = WorkerQueue()