218 lines
6.2 KiB
Python
218 lines
6.2 KiB
Python
|
|
"""Async job worker queue."""
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
import uuid
|
||
|
|
from dataclasses import dataclass, asdict
|
||
|
|
from datetime import datetime
|
||
|
|
from queue import Queue
|
||
|
|
from typing import Callable, Dict, Optional
|
||
|
|
|
||
|
|
from app.config import settings
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Job:
|
||
|
|
"""Async job data."""
|
||
|
|
job_id: str
|
||
|
|
status: str # queued, processing, completed, failed
|
||
|
|
input_path: str
|
||
|
|
output_path: str
|
||
|
|
model: str
|
||
|
|
tile_size: Optional[int] = None
|
||
|
|
tile_pad: Optional[int] = None
|
||
|
|
outscale: Optional[float] = None
|
||
|
|
created_at: str = ''
|
||
|
|
started_at: Optional[str] = None
|
||
|
|
completed_at: Optional[str] = None
|
||
|
|
processing_time_seconds: Optional[float] = None
|
||
|
|
error: Optional[str] = None
|
||
|
|
|
||
|
|
def __post_init__(self):
|
||
|
|
if not self.created_at:
|
||
|
|
self.created_at = datetime.utcnow().isoformat()
|
||
|
|
|
||
|
|
def to_dict(self) -> dict:
|
||
|
|
"""Convert to dictionary."""
|
||
|
|
return asdict(self)
|
||
|
|
|
||
|
|
def save_metadata(self) -> None:
|
||
|
|
"""Save job metadata to JSON file."""
|
||
|
|
job_dir = os.path.join(settings.jobs_dir, self.job_id)
|
||
|
|
os.makedirs(job_dir, exist_ok=True)
|
||
|
|
|
||
|
|
metadata_path = os.path.join(job_dir, 'metadata.json')
|
||
|
|
with open(metadata_path, 'w') as f:
|
||
|
|
json.dump(self.to_dict(), f, indent=2)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def load_metadata(cls, job_id: str) -> Optional['Job']:
|
||
|
|
"""Load job metadata from JSON file."""
|
||
|
|
metadata_path = os.path.join(settings.jobs_dir, job_id, 'metadata.json')
|
||
|
|
if not os.path.exists(metadata_path):
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
with open(metadata_path, 'r') as f:
|
||
|
|
data = json.load(f)
|
||
|
|
return cls(**data)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f'Failed to load job metadata: {e}')
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
class WorkerQueue:
|
||
|
|
"""Thread pool worker queue for processing jobs."""
|
||
|
|
|
||
|
|
def __init__(self, worker_func: Callable, num_workers: int = 2):
|
||
|
|
"""
|
||
|
|
Initialize worker queue.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
worker_func: Function to process jobs (job: Job) -> None
|
||
|
|
num_workers: Number of worker threads
|
||
|
|
"""
|
||
|
|
self.queue: Queue = Queue()
|
||
|
|
self.worker_func = worker_func
|
||
|
|
self.num_workers = num_workers
|
||
|
|
self.workers = []
|
||
|
|
self.running = False
|
||
|
|
self.jobs: Dict[str, Job] = {}
|
||
|
|
self.lock = threading.Lock()
|
||
|
|
|
||
|
|
def start(self) -> None:
|
||
|
|
"""Start worker threads."""
|
||
|
|
if self.running:
|
||
|
|
return
|
||
|
|
|
||
|
|
self.running = True
|
||
|
|
for i in range(self.num_workers):
|
||
|
|
worker = threading.Thread(target=self._worker_loop, daemon=True)
|
||
|
|
worker.start()
|
||
|
|
self.workers.append(worker)
|
||
|
|
|
||
|
|
logger.info(f'Started {self.num_workers} worker threads')
|
||
|
|
|
||
|
|
def stop(self, timeout: int = 10) -> None:
|
||
|
|
"""Stop worker threads gracefully."""
|
||
|
|
self.running = False
|
||
|
|
|
||
|
|
# Signal workers to stop
|
||
|
|
for _ in range(self.num_workers):
|
||
|
|
self.queue.put(None)
|
||
|
|
|
||
|
|
# Wait for workers to finish
|
||
|
|
for worker in self.workers:
|
||
|
|
worker.join(timeout=timeout)
|
||
|
|
|
||
|
|
logger.info('Worker threads stopped')
|
||
|
|
|
||
|
|
def submit_job(
|
||
|
|
self,
|
||
|
|
input_path: str,
|
||
|
|
output_path: str,
|
||
|
|
model: str,
|
||
|
|
tile_size: Optional[int] = None,
|
||
|
|
tile_pad: Optional[int] = None,
|
||
|
|
outscale: Optional[float] = None,
|
||
|
|
) -> str:
|
||
|
|
"""
|
||
|
|
Submit a job for processing.
|
||
|
|
|
||
|
|
Returns: job_id
|
||
|
|
"""
|
||
|
|
job_id = str(uuid.uuid4())
|
||
|
|
job = Job(
|
||
|
|
job_id=job_id,
|
||
|
|
status='queued',
|
||
|
|
input_path=input_path,
|
||
|
|
output_path=output_path,
|
||
|
|
model=model,
|
||
|
|
tile_size=tile_size,
|
||
|
|
tile_pad=tile_pad,
|
||
|
|
outscale=outscale,
|
||
|
|
)
|
||
|
|
|
||
|
|
with self.lock:
|
||
|
|
self.jobs[job_id] = job
|
||
|
|
|
||
|
|
job.save_metadata()
|
||
|
|
self.queue.put(job)
|
||
|
|
logger.info(f'Job submitted: {job_id}')
|
||
|
|
|
||
|
|
return job_id
|
||
|
|
|
||
|
|
def get_job(self, job_id: str) -> Optional[Job]:
|
||
|
|
"""Get job by ID."""
|
||
|
|
with self.lock:
|
||
|
|
return self.jobs.get(job_id)
|
||
|
|
|
||
|
|
def get_all_jobs(self) -> Dict[str, Job]:
|
||
|
|
"""Get all jobs."""
|
||
|
|
with self.lock:
|
||
|
|
return dict(self.jobs)
|
||
|
|
|
||
|
|
def _worker_loop(self) -> None:
|
||
|
|
"""Worker thread main loop."""
|
||
|
|
logger.info(f'Worker thread started')
|
||
|
|
|
||
|
|
while self.running:
|
||
|
|
try:
|
||
|
|
job = self.queue.get(timeout=1)
|
||
|
|
if job is None: # Stop signal
|
||
|
|
break
|
||
|
|
|
||
|
|
self._process_job(job)
|
||
|
|
except Exception:
|
||
|
|
pass # Timeout is normal
|
||
|
|
|
||
|
|
logger.info(f'Worker thread stopped')
|
||
|
|
|
||
|
|
def _process_job(self, job: Job) -> None:
|
||
|
|
"""Process a single job."""
|
||
|
|
try:
|
||
|
|
with self.lock:
|
||
|
|
job.status = 'processing'
|
||
|
|
job.started_at = datetime.utcnow().isoformat()
|
||
|
|
self.jobs[job.job_id] = job
|
||
|
|
|
||
|
|
job.save_metadata()
|
||
|
|
|
||
|
|
start_time = time.time()
|
||
|
|
self.worker_func(job)
|
||
|
|
job.processing_time_seconds = time.time() - start_time
|
||
|
|
|
||
|
|
with self.lock:
|
||
|
|
job.status = 'completed'
|
||
|
|
job.completed_at = datetime.utcnow().isoformat()
|
||
|
|
self.jobs[job.job_id] = job
|
||
|
|
|
||
|
|
logger.info(f'Job completed: {job.job_id} ({job.processing_time_seconds:.2f}s)')
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f'Job failed: {job.job_id}: {e}', exc_info=True)
|
||
|
|
with self.lock:
|
||
|
|
job.status = 'failed'
|
||
|
|
job.error = str(e)
|
||
|
|
job.completed_at = datetime.utcnow().isoformat()
|
||
|
|
self.jobs[job.job_id] = job
|
||
|
|
|
||
|
|
job.save_metadata()
|
||
|
|
|
||
|
|
|
||
|
|
# Global instance
|
||
|
|
_worker_queue: Optional[WorkerQueue] = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_worker_queue(worker_func: Callable = None, num_workers: int = 2) -> WorkerQueue:
|
||
|
|
"""Get or create the global worker queue."""
|
||
|
|
global _worker_queue
|
||
|
|
if _worker_queue is None:
|
||
|
|
if worker_func is None:
|
||
|
|
raise ValueError('worker_func required for first initialization')
|
||
|
|
_worker_queue = WorkerQueue(worker_func, num_workers)
|
||
|
|
return _worker_queue
|