Files
runpod/comfyui/fix_workflows.py
Sebastian Krüger 897dcb175a refactor: reorganize directory structure and remove hardcoded paths
Move comfyui and vllm out of models/ directory to top level for better
organization. Replace all hardcoded /workspace paths with relative paths
to make the configuration portable across different environments.

Changes:
- Move models/comfyui/ → comfyui/
- Move models/vllm/ → vllm/
- Remove models/ directory (empty)
- Update arty.yml: replace /workspace with environment variables
- Update supervisord.conf: use relative paths from /workspace/ai
- Update all script references to use new paths
- Maintain TQDM_DISABLE=1 to fix BrokenPipeError

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 20:49:27 +01:00

310 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ComfyUI Workflow Schema Fixer
Fixes missing schema fields in ComfyUI workflow JSON files:
- Adds missing 'flags', 'order', 'mode', 'properties', 'size' fields to nodes
- Reconstructs 'inputs' and 'outputs' arrays from links
- Builds complete 'links' array
- Updates outdated node names
Usage:
python3 fix_workflows.py <workflow_directory>
"""
import json
import sys
from pathlib import Path
from typing import Dict, List, Any
# Node name mapping (old → new)
NODE_NAME_MAPPING = {
'AnimateDiffLoader': 'AnimateDiffLoaderV1',
'VHSVideoCombine': 'VHS_VideoCombine',
'PreviewVideo': None, # Remove - use VHS_VideoCombine with preview enabled
'SaveVideo': None, # Remove - use VHS_VideoCombine
'IPAdapterApply': 'IPAdapter',
'IPAdapterApplyFace': 'IPAdapterFaceID',
'AudioSave': 'SaveAudio',
'AnimateDiffSampler': 'KSamplerAdvanced', # AnimateDiff uses standard KSampler
'ADE_AnimateDiffSampler': 'KSamplerAdvanced',
'SeedGenerator': 'ImpactInt', # Use Impact Pack integer node for seed generation
'BatchKSampler': 'KSampler', # Standard KSampler can handle batches
'ImageBatchToList': 'GetImageSize', # Placeholder - may need manual adjustment
}
# Default node sizes by category
NODE_SIZES = {
'Loader': {'0': 350, '1': 100},
'Sampler': {'0': 315, '1': 474},
'Encoder': {'0': 400, '1': 200},
'Default': {'0': 315, '1': 100},
}
def get_node_size(node_type: str) -> Dict[str, int]:
"""Get appropriate size for node based on type"""
if 'Loader' in node_type or 'Load' in node_type:
return NODE_SIZES['Loader']
elif 'Sampler' in node_type or 'KSampler' in node_type:
return NODE_SIZES['Sampler']
elif 'Encode' in node_type or 'CLIP' in node_type:
return NODE_SIZES['Encoder']
else:
return NODE_SIZES['Default']
def fix_workflow(workflow_path: Path) -> bool:
"""Fix a single workflow file"""
print(f"\n{'='*60}")
print(f"Processing: {workflow_path.name}")
print(f"{'='*60}")
try:
with open(workflow_path, 'r') as f:
workflow = json.load(f)
except json.JSONDecodeError as e:
print(f"✗ ERROR: Invalid JSON - {e}")
return False
if 'nodes' not in workflow:
print(f"✗ ERROR: No 'nodes' key in workflow")
return False
nodes = workflow['nodes']
links = workflow.get('links', [])
# Track changes
changes = {
'added_flags': 0,
'added_order': 0,
'added_mode': 0,
'added_properties': 0,
'added_size': 0,
'added_inputs': 0,
'added_outputs': 0,
'updated_node_names': 0,
'removed_nodes': 0,
'added_last_link_id': 0,
'added_links': 0,
}
# Build link index for quick lookup
link_index = {}
for link in links:
if len(link) >= 6:
link_id, src_node_id, src_slot, tgt_node_id, tgt_slot, data_type = link[:6]
link_index[link_id] = {
'source': {'node_id': src_node_id, 'slot': src_slot},
'target': {'node_id': tgt_node_id, 'slot': tgt_slot},
'type': data_type
}
# Build node ID index
node_by_id = {node['id']: node for node in nodes}
# Process each node
nodes_to_remove = []
for i, node in enumerate(nodes):
node_id = node.get('id')
node_type = node.get('type', '')
# Update node name if needed
if node_type in NODE_NAME_MAPPING:
new_name = NODE_NAME_MAPPING[node_type]
if new_name is None:
# Mark for removal
nodes_to_remove.append(i)
changes['removed_nodes'] += 1
print(f" Removing deprecated node {node_id}: {node_type}")
continue
else:
print(f" Updating node {node_id}: {node_type}{new_name}")
node['type'] = new_name
node_type = new_name
changes['updated_node_names'] += 1
# Add missing flags
if 'flags' not in node:
node['flags'] = {}
changes['added_flags'] += 1
# Add missing order (will recalculate later based on dependencies)
if 'order' not in node:
node['order'] = i # Temporary order
changes['added_order'] += 1
# Add missing mode (0 = execute, 4 = bypass)
if 'mode' not in node:
node['mode'] = 0
changes['added_mode'] += 1
# Add missing properties
if 'properties' not in node:
node['properties'] = {"Node name for S&R": node_type}
changes['added_properties'] += 1
# Add missing size
if 'size' not in node:
node['size'] = get_node_size(node_type)
changes['added_size'] += 1
# Reconstruct inputs from links
if 'inputs' not in node or not node['inputs']:
node_inputs = []
for link_id, link_data in link_index.items():
if link_data['target']['node_id'] == node_id:
# This link targets this node
# We need to know the input name, but we don't have it
# For now, create a placeholder
node_inputs.append({
'name': f'input_{link_data["target"]["slot"]}',
'type': link_data['type'],
'link': link_id
})
if node_inputs:
node['inputs'] = node_inputs
changes['added_inputs'] += 1
# Reconstruct outputs from links
if 'outputs' not in node or not node['outputs']:
node_outputs = {}
for link_id, link_data in link_index.items():
if link_data['source']['node_id'] == node_id:
slot = link_data['source']['slot']
if slot not in node_outputs:
node_outputs[slot] = {
'name': f'output_{slot}',
'type': link_data['type'],
'links': [],
'slot_index': slot
}
node_outputs[slot]['links'].append(link_id)
if node_outputs:
node['outputs'] = list(node_outputs.values())
changes['added_outputs'] += 1
# Remove deprecated nodes
for i in reversed(nodes_to_remove):
del nodes[i]
# Recalculate execution order based on dependencies
if changes['added_order'] > 0 or changes['removed_nodes'] > 0:
calculate_execution_order(nodes, link_index)
# Add missing links array
if 'links' not in workflow:
workflow['links'] = []
changes['added_links'] = 1
# Add missing last_link_id
if 'last_link_id' not in workflow:
# Calculate from existing links
max_link_id = 0
if workflow.get('links'):
for link in workflow['links']:
if link and len(link) > 0:
max_link_id = max(max_link_id, link[0])
workflow['last_link_id'] = max_link_id
changes['added_last_link_id'] = 1
# Update workflow
workflow['nodes'] = nodes
# Print summary
print(f"\nChanges made:")
for key, value in changes.items():
if value > 0:
print(f"{key.replace('_', ' ').title()}: {value}")
total_changes = sum(changes.values())
if total_changes == 0:
print(f" ✓ No changes needed - workflow already valid")
return True
# Save fixed workflow
try:
with open(workflow_path, 'w') as f:
json.dump(workflow, f, indent=2)
print(f"\n✓ Successfully fixed and saved workflow")
return True
except Exception as e:
print(f"\n✗ ERROR saving workflow: {e}")
return False
def calculate_execution_order(nodes: List[Dict], link_index: Dict):
"""Calculate execution order based on node dependencies"""
# Build dependency graph
dependencies = {}
node_by_id = {node['id']: node for node in nodes}
for node in nodes:
node_id = node['id']
dependencies[node_id] = set()
# Find all nodes this node depends on (inputs)
for link_id, link_data in link_index.items():
if link_data['target']['node_id'] == node_id:
# This node depends on the source node
dependencies[node_id].add(link_data['source']['node_id'])
# Topological sort to determine execution order
visited = set()
order_counter = [0]
def visit(node_id):
if node_id in visited:
return
visited.add(node_id)
# Visit dependencies first
for dep_id in dependencies.get(node_id, []):
if dep_id in node_by_id: # Skip if dependency not in current nodes
visit(dep_id)
# Assign order
if node_id in node_by_id:
node_by_id[node_id]['order'] = order_counter[0]
order_counter[0] += 1
# Visit all nodes
for node_id in node_by_id.keys():
visit(node_id)
def main():
if len(sys.argv) < 2:
print("Usage: python3 fix_workflows.py <workflow_directory>")
sys.exit(1)
workflow_dir = Path(sys.argv[1])
if not workflow_dir.exists():
print(f"Error: Directory {workflow_dir} does not exist")
sys.exit(1)
# Find all JSON files recursively
workflow_files = list(workflow_dir.rglob('*.json'))
if not workflow_files:
print(f"No workflow JSON files found in {workflow_dir}")
sys.exit(1)
print(f"\nFound {len(workflow_files)} workflow files")
# Process each workflow
success_count = 0
for workflow_path in sorted(workflow_files):
if fix_workflow(workflow_path):
success_count += 1
# Summary
print(f"\n{'='*60}")
print(f"SUMMARY")
print(f"{'='*60}")
print(f"Total workflows: {len(workflow_files)}")
print(f"Successfully fixed: {success_count}")
print(f"Failed: {len(workflow_files) - success_count}")
print(f"{'='*60}\n")
if __name__ == '__main__':
main()