Files

106 lines
3.3 KiB
Python
Raw Permalink Normal View History

import json
import logging
import uuid
from typing import List, Optional
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from app.schemas.jobs import JobCreateResponse, JobDeleteResponse, JobStatus, JobStatusResponse
from app.schemas.process import ProcessingOptions
from app.services import facefusion_bridge, file_manager
from app.services.worker import worker_queue
logger = logging.getLogger(__name__)
router = APIRouter(prefix='/api/v1/jobs', tags=['jobs'])
@router.post('', response_model=JobCreateResponse)
async def create_job(
target: UploadFile = File(...),
source: Optional[List[UploadFile]] = File(None),
options: Optional[str] = Form(None),
):
"""Create an async processing job."""
job_id = str(uuid.uuid4())
request_dir = file_manager.create_request_dir()
try:
parsed_options = None
if options:
try:
parsed_options = json.loads(options)
ProcessingOptions(**parsed_options)
except (json.JSONDecodeError, Exception) as e:
raise HTTPException(status_code=422, detail=f'Invalid options: {e}')
target_path = await file_manager.save_upload(target, request_dir)
source_paths = []
if source:
source_paths = await file_manager.save_uploads(source, request_dir)
output_path = file_manager.generate_output_path(target_path)
args = facefusion_bridge.build_args_from_options(
source_paths=source_paths,
target_path=target_path,
output_path=output_path,
options=parsed_options,
)
worker_queue.submit(job_id, args)
return JobCreateResponse(job_id=job_id, status=JobStatus.pending)
except HTTPException:
raise
except Exception as e:
file_manager.cleanup_directory(request_dir)
logger.error(f'Job creation failed: {e}')
raise HTTPException(status_code=500, detail=str(e))
@router.get('/{job_id}', response_model=JobStatusResponse)
async def get_job_status(job_id: str):
"""Get job status."""
job = worker_queue.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail='Job not found')
return JobStatusResponse(
job_id=job.job_id,
status=JobStatus(job.status.value),
created_at=job.created_at,
updated_at=job.updated_at,
error=job.error,
)
@router.get('/{job_id}/result')
async def get_job_result(job_id: str):
"""Download job result. Only available when job is completed."""
job = worker_queue.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail='Job not found')
if job.status != 'completed':
raise HTTPException(status_code=409, detail=f'Job status is {job.status}, not completed')
if not job.output_path:
raise HTTPException(status_code=500, detail='No output file')
return FileResponse(
path=job.output_path,
media_type='application/octet-stream',
)
@router.delete('/{job_id}', response_model=JobDeleteResponse)
async def delete_job(job_id: str):
"""Cancel/delete a job."""
deleted = worker_queue.delete_job(job_id)
if not deleted:
raise HTTPException(status_code=404, detail='Job not found')
return JobDeleteResponse(job_id=job_id, deleted=True)