460 lines
17 KiB
Python
460 lines
17 KiB
Python
|
|
import logging
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import threading
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
|
||
|
|
from app.config import settings
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
_initialized = False
|
||
|
|
_lock = threading.Lock()
|
||
|
|
|
||
|
|
# All job_keys registered by program.py
|
||
|
|
JOB_KEYS = [
|
||
|
|
'config_path', 'temp_path', 'jobs_path',
|
||
|
|
'source_pattern', 'target_pattern', 'output_pattern',
|
||
|
|
'download_providers', 'download_scope',
|
||
|
|
'execution_device_ids', 'execution_providers', 'execution_thread_count',
|
||
|
|
'video_memory_strategy', 'system_memory_limit',
|
||
|
|
'log_level', 'halt_on_error',
|
||
|
|
]
|
||
|
|
|
||
|
|
# All step_keys registered by program.py + processor modules
|
||
|
|
STEP_KEYS = [
|
||
|
|
# paths
|
||
|
|
'source_paths', 'target_path', 'output_path',
|
||
|
|
# face detector
|
||
|
|
'face_detector_model', 'face_detector_size', 'face_detector_margin',
|
||
|
|
'face_detector_angles', 'face_detector_score',
|
||
|
|
# face landmarker
|
||
|
|
'face_landmarker_model', 'face_landmarker_score',
|
||
|
|
# face selector
|
||
|
|
'face_selector_mode', 'face_selector_order', 'face_selector_gender',
|
||
|
|
'face_selector_race', 'face_selector_age_start', 'face_selector_age_end',
|
||
|
|
'reference_face_position', 'reference_face_distance', 'reference_frame_number',
|
||
|
|
# face masker
|
||
|
|
'face_occluder_model', 'face_parser_model', 'face_mask_types',
|
||
|
|
'face_mask_areas', 'face_mask_regions', 'face_mask_blur', 'face_mask_padding',
|
||
|
|
# voice extractor
|
||
|
|
'voice_extractor_model',
|
||
|
|
# frame extraction
|
||
|
|
'trim_frame_start', 'trim_frame_end', 'temp_frame_format', 'keep_temp',
|
||
|
|
# output creation
|
||
|
|
'output_image_quality', 'output_image_scale', 'output_audio_encoder',
|
||
|
|
'output_audio_quality', 'output_audio_volume', 'output_video_encoder',
|
||
|
|
'output_video_preset', 'output_video_quality', 'output_video_scale',
|
||
|
|
'output_video_fps',
|
||
|
|
# processors
|
||
|
|
'processors',
|
||
|
|
# processor-specific: face_swapper
|
||
|
|
'face_swapper_model', 'face_swapper_pixel_boost', 'face_swapper_weight',
|
||
|
|
# processor-specific: face_enhancer
|
||
|
|
'face_enhancer_model', 'face_enhancer_blend', 'face_enhancer_weight',
|
||
|
|
# processor-specific: face_editor
|
||
|
|
'face_editor_model', 'face_editor_eyebrow_direction',
|
||
|
|
'face_editor_eye_gaze_horizontal', 'face_editor_eye_gaze_vertical',
|
||
|
|
'face_editor_eye_open_ratio', 'face_editor_lip_open_ratio',
|
||
|
|
'face_editor_mouth_grim', 'face_editor_mouth_pout', 'face_editor_mouth_purse',
|
||
|
|
'face_editor_mouth_smile', 'face_editor_mouth_position_horizontal',
|
||
|
|
'face_editor_mouth_position_vertical', 'face_editor_head_pitch',
|
||
|
|
'face_editor_head_yaw', 'face_editor_head_roll',
|
||
|
|
# processor-specific: lip_syncer
|
||
|
|
'lip_syncer_model', 'lip_syncer_weight',
|
||
|
|
# processor-specific: age_modifier
|
||
|
|
'age_modifier_model', 'age_modifier_direction',
|
||
|
|
# processor-specific: expression_restorer
|
||
|
|
'expression_restorer_model', 'expression_restorer_factor', 'expression_restorer_areas',
|
||
|
|
# processor-specific: frame_enhancer
|
||
|
|
'frame_enhancer_model', 'frame_enhancer_blend',
|
||
|
|
# processor-specific: frame_colorizer
|
||
|
|
'frame_colorizer_model', 'frame_colorizer_blend', 'frame_colorizer_size',
|
||
|
|
# processor-specific: background_remover
|
||
|
|
'background_remover_model', 'background_remover_color',
|
||
|
|
# processor-specific: deep_swapper
|
||
|
|
'deep_swapper_model', 'deep_swapper_morph',
|
||
|
|
# processor-specific: face_debugger
|
||
|
|
'face_debugger_items',
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def _find_submodule_root() -> str:
|
||
|
|
"""Find the FaceFusion submodule root directory.
|
||
|
|
|
||
|
|
In development: {project_root}/facefusion/
|
||
|
|
In Docker: /app/facefusion-src/
|
||
|
|
"""
|
||
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
|
|
candidates = [
|
||
|
|
os.path.join(project_root, 'facefusion'), # dev: project_root/facefusion/
|
||
|
|
'/app/facefusion-src', # Docker
|
||
|
|
]
|
||
|
|
for candidate in candidates:
|
||
|
|
if os.path.isdir(os.path.join(candidate, 'facefusion')):
|
||
|
|
return candidate
|
||
|
|
raise RuntimeError('FaceFusion submodule not found')
|
||
|
|
|
||
|
|
|
||
|
|
def _setup_sys_path() -> None:
|
||
|
|
submodule_root = _find_submodule_root()
|
||
|
|
if submodule_root not in sys.path:
|
||
|
|
sys.path.insert(0, submodule_root)
|
||
|
|
|
||
|
|
|
||
|
|
def _find_config_path() -> str:
|
||
|
|
"""Find facefusion.ini - check submodule root first, then project root."""
|
||
|
|
submodule_root = _find_submodule_root()
|
||
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
|
|
for base in (submodule_root, project_root):
|
||
|
|
path = os.path.join(base, 'facefusion.ini')
|
||
|
|
if os.path.isfile(path):
|
||
|
|
return path
|
||
|
|
return os.path.join(project_root, 'facefusion.ini')
|
||
|
|
|
||
|
|
|
||
|
|
def _setup_models_symlink() -> None:
|
||
|
|
submodule_root = _find_submodule_root()
|
||
|
|
assets_dir = os.path.join(submodule_root, '.assets')
|
||
|
|
os.makedirs(assets_dir, exist_ok=True)
|
||
|
|
|
||
|
|
models_link = os.path.join(assets_dir, 'models')
|
||
|
|
models_target = settings.models_dir
|
||
|
|
os.makedirs(models_target, exist_ok=True)
|
||
|
|
|
||
|
|
if os.path.islink(models_link):
|
||
|
|
if os.readlink(models_link) == models_target:
|
||
|
|
return
|
||
|
|
os.unlink(models_link)
|
||
|
|
elif os.path.isdir(models_link):
|
||
|
|
# Real dir exists - leave it alone in dev mode
|
||
|
|
return
|
||
|
|
|
||
|
|
os.symlink(models_target, models_link)
|
||
|
|
logger.info(f'Symlinked {models_link} -> {models_target}')
|
||
|
|
|
||
|
|
|
||
|
|
def initialize() -> None:
|
||
|
|
global _initialized
|
||
|
|
if _initialized:
|
||
|
|
return
|
||
|
|
|
||
|
|
_setup_sys_path()
|
||
|
|
_setup_models_symlink()
|
||
|
|
|
||
|
|
from facefusion import state_manager
|
||
|
|
from facefusion.jobs import job_manager, job_store
|
||
|
|
|
||
|
|
# Register all keys (replicating program.py side-effects)
|
||
|
|
job_store.register_job_keys(JOB_KEYS)
|
||
|
|
job_store.register_step_keys(STEP_KEYS)
|
||
|
|
|
||
|
|
# Initialize state with API defaults
|
||
|
|
defaults = _build_defaults()
|
||
|
|
for key, value in defaults.items():
|
||
|
|
state_manager.init_item(key, value)
|
||
|
|
|
||
|
|
# Initialize job system
|
||
|
|
os.makedirs(settings.jobs_dir, exist_ok=True)
|
||
|
|
if not job_manager.init_jobs(settings.jobs_dir):
|
||
|
|
raise RuntimeError('Failed to initialize FaceFusion job system')
|
||
|
|
|
||
|
|
# Initialize logger
|
||
|
|
from facefusion import logger as ff_logger
|
||
|
|
ff_logger.init(settings.log_level)
|
||
|
|
|
||
|
|
_initialized = True
|
||
|
|
logger.info('FaceFusion bridge initialized')
|
||
|
|
|
||
|
|
|
||
|
|
def _build_defaults() -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
'command': 'headless-run',
|
||
|
|
'config_path': _find_config_path(),
|
||
|
|
'temp_path': settings.temp_dir,
|
||
|
|
'jobs_path': settings.jobs_dir,
|
||
|
|
'source_paths': None,
|
||
|
|
'target_path': None,
|
||
|
|
'output_path': None,
|
||
|
|
'source_pattern': None,
|
||
|
|
'target_pattern': None,
|
||
|
|
'output_pattern': None,
|
||
|
|
# face detector
|
||
|
|
'face_detector_model': settings.face_detector_model,
|
||
|
|
'face_detector_size': '640x640',
|
||
|
|
'face_detector_margin': [0, 0, 0, 0],
|
||
|
|
'face_detector_angles': [0],
|
||
|
|
'face_detector_score': 0.5,
|
||
|
|
# face landmarker
|
||
|
|
'face_landmarker_model': '2dfan4',
|
||
|
|
'face_landmarker_score': 0.5,
|
||
|
|
# face selector
|
||
|
|
'face_selector_mode': 'reference',
|
||
|
|
'face_selector_order': 'large-small',
|
||
|
|
'face_selector_age_start': None,
|
||
|
|
'face_selector_age_end': None,
|
||
|
|
'face_selector_gender': None,
|
||
|
|
'face_selector_race': None,
|
||
|
|
'reference_face_position': 0,
|
||
|
|
'reference_face_distance': 0.3,
|
||
|
|
'reference_frame_number': 0,
|
||
|
|
# face masker
|
||
|
|
'face_occluder_model': 'xseg_1',
|
||
|
|
'face_parser_model': 'bisenet_resnet_34',
|
||
|
|
'face_mask_types': ['box'],
|
||
|
|
'face_mask_areas': ['upper-face', 'lower-face', 'mouth'],
|
||
|
|
'face_mask_regions': ['skin', 'left-eyebrow', 'right-eyebrow', 'left-eye', 'right-eye',
|
||
|
|
'glasses', 'nose', 'mouth', 'upper-lip', 'lower-lip'],
|
||
|
|
'face_mask_blur': 0.3,
|
||
|
|
'face_mask_padding': [0, 0, 0, 0],
|
||
|
|
# voice extractor
|
||
|
|
'voice_extractor_model': 'kim_vocal_2',
|
||
|
|
# frame extraction
|
||
|
|
'trim_frame_start': None,
|
||
|
|
'trim_frame_end': None,
|
||
|
|
'temp_frame_format': 'png',
|
||
|
|
'keep_temp': False,
|
||
|
|
# output creation
|
||
|
|
'output_image_quality': 80,
|
||
|
|
'output_image_scale': 1.0,
|
||
|
|
'output_audio_encoder': None,
|
||
|
|
'output_audio_quality': 80,
|
||
|
|
'output_audio_volume': 100,
|
||
|
|
'output_video_encoder': None,
|
||
|
|
'output_video_preset': 'veryfast',
|
||
|
|
'output_video_quality': 80,
|
||
|
|
'output_video_scale': 1.0,
|
||
|
|
'output_video_fps': None,
|
||
|
|
# processors
|
||
|
|
'processors': ['face_swapper'],
|
||
|
|
# uis (not used in API but needed by apply_args)
|
||
|
|
'open_browser': False,
|
||
|
|
'ui_layouts': None,
|
||
|
|
'ui_workflow': 'instant_runner',
|
||
|
|
# execution
|
||
|
|
'execution_device_ids': [0],
|
||
|
|
'execution_providers': settings.get_execution_providers(),
|
||
|
|
'execution_thread_count': settings.execution_thread_count,
|
||
|
|
# download
|
||
|
|
'download_providers': settings.get_download_providers(),
|
||
|
|
'download_scope': settings.download_scope,
|
||
|
|
# memory
|
||
|
|
'video_memory_strategy': settings.video_memory_strategy,
|
||
|
|
'system_memory_limit': 0,
|
||
|
|
# misc
|
||
|
|
'log_level': settings.log_level,
|
||
|
|
'halt_on_error': False,
|
||
|
|
# benchmark (not used but apply_args references them)
|
||
|
|
'benchmark_mode': None,
|
||
|
|
'benchmark_resolutions': None,
|
||
|
|
'benchmark_cycle_count': None,
|
||
|
|
# jobs
|
||
|
|
'job_id': None,
|
||
|
|
'job_status': None,
|
||
|
|
'step_index': None,
|
||
|
|
# processor-specific defaults
|
||
|
|
'face_swapper_model': 'hyperswap_1a_256',
|
||
|
|
'face_swapper_pixel_boost': None,
|
||
|
|
'face_swapper_weight': 0.5,
|
||
|
|
'face_enhancer_model': 'gfpgan_1.4',
|
||
|
|
'face_enhancer_blend': 80,
|
||
|
|
'face_enhancer_weight': 0.5,
|
||
|
|
'face_editor_model': 'live_portrait',
|
||
|
|
'face_editor_eyebrow_direction': None,
|
||
|
|
'face_editor_eye_gaze_horizontal': None,
|
||
|
|
'face_editor_eye_gaze_vertical': None,
|
||
|
|
'face_editor_eye_open_ratio': None,
|
||
|
|
'face_editor_lip_open_ratio': None,
|
||
|
|
'face_editor_mouth_grim': None,
|
||
|
|
'face_editor_mouth_pout': None,
|
||
|
|
'face_editor_mouth_purse': None,
|
||
|
|
'face_editor_mouth_smile': None,
|
||
|
|
'face_editor_mouth_position_horizontal': None,
|
||
|
|
'face_editor_mouth_position_vertical': None,
|
||
|
|
'face_editor_head_pitch': None,
|
||
|
|
'face_editor_head_yaw': None,
|
||
|
|
'face_editor_head_roll': None,
|
||
|
|
'lip_syncer_model': 'wav2lip_96',
|
||
|
|
'lip_syncer_weight': None,
|
||
|
|
'age_modifier_model': 'styleganex_age',
|
||
|
|
'age_modifier_direction': 0,
|
||
|
|
'expression_restorer_model': 'live_portrait',
|
||
|
|
'expression_restorer_factor': 80,
|
||
|
|
'expression_restorer_areas': None,
|
||
|
|
'frame_enhancer_model': 'span_kendata_1x',
|
||
|
|
'frame_enhancer_blend': 80,
|
||
|
|
'frame_colorizer_model': 'ddcolor',
|
||
|
|
'frame_colorizer_blend': 80,
|
||
|
|
'frame_colorizer_size': '256x256',
|
||
|
|
'background_remover_model': 'isnet_general_use',
|
||
|
|
'background_remover_color': None,
|
||
|
|
'deep_swapper_model': None,
|
||
|
|
'deep_swapper_morph': None,
|
||
|
|
'face_debugger_items': None,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_args_from_options(
|
||
|
|
source_paths: List[str],
|
||
|
|
target_path: str,
|
||
|
|
output_path: str,
|
||
|
|
options: Optional[Dict[str, Any]] = None,
|
||
|
|
) -> Dict[str, Any]:
|
||
|
|
"""Build a FaceFusion args dict from API request options."""
|
||
|
|
args = dict(_build_defaults())
|
||
|
|
args['source_paths'] = source_paths
|
||
|
|
args['target_path'] = target_path
|
||
|
|
args['output_path'] = output_path
|
||
|
|
|
||
|
|
if not options:
|
||
|
|
return args
|
||
|
|
|
||
|
|
if 'processors' in options:
|
||
|
|
args['processors'] = options['processors']
|
||
|
|
|
||
|
|
# Face detector options
|
||
|
|
fd = options.get('face_detector')
|
||
|
|
if fd:
|
||
|
|
if 'model' in fd:
|
||
|
|
args['face_detector_model'] = fd['model']
|
||
|
|
if 'size' in fd:
|
||
|
|
args['face_detector_size'] = fd['size']
|
||
|
|
if 'score' in fd:
|
||
|
|
args['face_detector_score'] = fd['score']
|
||
|
|
|
||
|
|
# Face selector options
|
||
|
|
fs = options.get('face_selector')
|
||
|
|
if fs:
|
||
|
|
for key in ('mode', 'order', 'gender', 'race', 'age_start', 'age_end'):
|
||
|
|
if key in fs:
|
||
|
|
args[f'face_selector_{key}'] = fs[key]
|
||
|
|
|
||
|
|
# Output options
|
||
|
|
out = options.get('output')
|
||
|
|
if out:
|
||
|
|
for key in ('image_quality', 'image_scale', 'video_encoder', 'video_preset',
|
||
|
|
'video_quality', 'video_scale', 'video_fps', 'audio_encoder',
|
||
|
|
'audio_quality', 'audio_volume'):
|
||
|
|
if key in out:
|
||
|
|
args[f'output_{key}'] = out[key]
|
||
|
|
|
||
|
|
# Execution overrides
|
||
|
|
if 'execution_providers' in options:
|
||
|
|
args['execution_providers'] = options['execution_providers']
|
||
|
|
if 'execution_thread_count' in options:
|
||
|
|
args['execution_thread_count'] = options['execution_thread_count']
|
||
|
|
if 'video_memory_strategy' in options:
|
||
|
|
args['video_memory_strategy'] = options['video_memory_strategy']
|
||
|
|
|
||
|
|
# Per-processor options
|
||
|
|
_processor_option_map = {
|
||
|
|
'face_swapper': ['model', 'pixel_boost', 'weight'],
|
||
|
|
'face_enhancer': ['model', 'blend', 'weight'],
|
||
|
|
'face_editor': [
|
||
|
|
'model', 'eyebrow_direction', 'eye_gaze_horizontal', 'eye_gaze_vertical',
|
||
|
|
'eye_open_ratio', 'lip_open_ratio', 'mouth_grim', 'mouth_pout',
|
||
|
|
'mouth_purse', 'mouth_smile', 'mouth_position_horizontal',
|
||
|
|
'mouth_position_vertical', 'head_pitch', 'head_yaw', 'head_roll',
|
||
|
|
],
|
||
|
|
'lip_syncer': ['model', 'weight'],
|
||
|
|
'age_modifier': ['model', 'direction'],
|
||
|
|
'expression_restorer': ['model', 'factor', 'areas'],
|
||
|
|
'frame_enhancer': ['model', 'blend'],
|
||
|
|
'frame_colorizer': ['model', 'blend', 'size'],
|
||
|
|
'background_remover': ['model', 'color'],
|
||
|
|
'deep_swapper': ['model', 'morph'],
|
||
|
|
'face_debugger': ['items'],
|
||
|
|
}
|
||
|
|
|
||
|
|
for processor_name, fields in _processor_option_map.items():
|
||
|
|
proc_opts = options.get(processor_name)
|
||
|
|
if proc_opts:
|
||
|
|
for field in fields:
|
||
|
|
if field in proc_opts:
|
||
|
|
args[f'{processor_name}_{field}'] = proc_opts[field]
|
||
|
|
|
||
|
|
return args
|
||
|
|
|
||
|
|
|
||
|
|
def process_sync(args: Dict[str, Any]) -> bool:
|
||
|
|
"""Run FaceFusion processing synchronously. Must be called with _lock held."""
|
||
|
|
with _lock:
|
||
|
|
from facefusion import state_manager
|
||
|
|
from facefusion.args import apply_args, reduce_step_args
|
||
|
|
from facefusion.core import conditional_process, common_pre_check, processors_pre_check
|
||
|
|
from facefusion.jobs import job_helper, job_manager, job_runner
|
||
|
|
|
||
|
|
# Apply args to state
|
||
|
|
apply_args(args, state_manager.set_item)
|
||
|
|
|
||
|
|
# Create and run job
|
||
|
|
job_id = job_helper.suggest_job_id('api')
|
||
|
|
step_args = reduce_step_args(args)
|
||
|
|
|
||
|
|
if not job_manager.create_job(job_id):
|
||
|
|
raise RuntimeError(f'Failed to create job {job_id}')
|
||
|
|
if not job_manager.add_step(job_id, step_args):
|
||
|
|
raise RuntimeError(f'Failed to add step to job {job_id}')
|
||
|
|
if not job_manager.submit_job(job_id):
|
||
|
|
raise RuntimeError(f'Failed to submit job {job_id}')
|
||
|
|
|
||
|
|
from facefusion.core import process_step
|
||
|
|
success = job_runner.run_job(job_id, process_step)
|
||
|
|
|
||
|
|
if not success:
|
||
|
|
raise RuntimeError(f'Job {job_id} failed')
|
||
|
|
|
||
|
|
return success
|
||
|
|
|
||
|
|
|
||
|
|
def get_available_processors() -> List[str]:
|
||
|
|
"""Return list of available processor names."""
|
||
|
|
_setup_sys_path()
|
||
|
|
from facefusion.filesystem import get_file_name, resolve_file_paths
|
||
|
|
return [get_file_name(p) for p in resolve_file_paths('facefusion/processors/modules')]
|
||
|
|
|
||
|
|
|
||
|
|
def get_execution_providers() -> List[str]:
|
||
|
|
"""Return list of available ONNX execution providers."""
|
||
|
|
_setup_sys_path()
|
||
|
|
from facefusion.execution import get_available_execution_providers
|
||
|
|
return get_available_execution_providers()
|
||
|
|
|
||
|
|
|
||
|
|
def force_download_models(processors: Optional[List[str]] = None) -> bool:
|
||
|
|
"""Download models for specified processors (or all if None)."""
|
||
|
|
with _lock:
|
||
|
|
from facefusion import content_analyser, face_classifier, face_detector, face_landmarker, face_masker, face_recognizer, state_manager, voice_extractor
|
||
|
|
from facefusion.download import conditional_download_hashes, conditional_download_sources
|
||
|
|
from facefusion.filesystem import get_file_name, resolve_file_paths
|
||
|
|
from facefusion.processors.core import get_processors_modules
|
||
|
|
|
||
|
|
common_modules = [
|
||
|
|
content_analyser, face_classifier, face_detector,
|
||
|
|
face_landmarker, face_masker, face_recognizer, voice_extractor,
|
||
|
|
]
|
||
|
|
|
||
|
|
if processors is None:
|
||
|
|
available = [get_file_name(p) for p in resolve_file_paths('facefusion/processors/modules')]
|
||
|
|
else:
|
||
|
|
available = processors
|
||
|
|
|
||
|
|
processor_modules = get_processors_modules(available)
|
||
|
|
|
||
|
|
for module in common_modules + processor_modules:
|
||
|
|
if hasattr(module, 'create_static_model_set'):
|
||
|
|
for model in module.create_static_model_set(state_manager.get_item('download_scope')).values():
|
||
|
|
model_hash_set = model.get('hashes')
|
||
|
|
model_source_set = model.get('sources')
|
||
|
|
if model_hash_set and model_source_set:
|
||
|
|
if not conditional_download_hashes(model_hash_set) or not conditional_download_sources(model_source_set):
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def stop_processing() -> None:
|
||
|
|
"""Signal FaceFusion to stop current processing."""
|
||
|
|
_setup_sys_path()
|
||
|
|
from facefusion import process_manager
|
||
|
|
process_manager.stop()
|