- Add flux_image_gen.py manifold function for Flux.1 Schnell - Auto-mount functions via Docker volume (./functions:/app/backend/data/functions:ro) - Add comprehensive setup guide in FLUX_SETUP.md - Update CLAUDE.md with Flux integration documentation - Infrastructure as code approach - no manual import needed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
159 lines
4.9 KiB
Python
159 lines
4.9 KiB
Python
"""
|
|
title: Flux Image Generator
|
|
author: Valknar
|
|
version: 1.0.0
|
|
license: MIT
|
|
description: Generate images using Flux.1 Schnell via LiteLLM
|
|
requirements: requests, pydantic
|
|
"""
|
|
|
|
import os
|
|
import base64
|
|
import json
|
|
import requests
|
|
from typing import Generator
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class Pipe:
|
|
"""
|
|
Flux Image Generation Function for Open WebUI
|
|
Routes image generation requests to LiteLLM → Orchestrator → RunPod Flux
|
|
"""
|
|
|
|
class Valves(BaseModel):
|
|
"""Configuration valves for the image generation function"""
|
|
LITELLM_API_BASE: str = Field(
|
|
default="http://litellm:4000/v1",
|
|
description="LiteLLM API base URL"
|
|
)
|
|
LITELLM_API_KEY: str = Field(
|
|
default="dummy",
|
|
description="LiteLLM API key (not required for internal use)"
|
|
)
|
|
DEFAULT_MODEL: str = Field(
|
|
default="flux-schnell",
|
|
description="Default model to use for image generation"
|
|
)
|
|
DEFAULT_SIZE: str = Field(
|
|
default="1024x1024",
|
|
description="Default image size"
|
|
)
|
|
TIMEOUT: int = Field(
|
|
default=120,
|
|
description="Request timeout in seconds"
|
|
)
|
|
|
|
def __init__(self):
|
|
self.type = "manifold"
|
|
self.id = "flux_image_gen"
|
|
self.name = "Flux"
|
|
self.valves = self.Valves()
|
|
|
|
def pipes(self):
|
|
"""Return available models"""
|
|
return [
|
|
{
|
|
"id": "flux-schnell",
|
|
"name": "Flux.1 Schnell (4-5s)"
|
|
}
|
|
]
|
|
|
|
def pipe(self, body: dict) -> Generator[str, None, None]:
|
|
"""
|
|
Generate images via LiteLLM endpoint
|
|
|
|
Args:
|
|
body: Request body containing model, messages, etc.
|
|
|
|
Yields:
|
|
JSON chunks with generated image data
|
|
"""
|
|
try:
|
|
# Extract the prompt from messages
|
|
messages = body.get("messages", [])
|
|
if not messages:
|
|
yield self._error_response("No messages provided")
|
|
return
|
|
|
|
# Get the last user message as prompt
|
|
prompt = messages[-1].get("content", "")
|
|
if not prompt:
|
|
yield self._error_response("No prompt provided")
|
|
return
|
|
|
|
# Prepare image generation request
|
|
image_request = {
|
|
"model": body.get("model", self.valves.DEFAULT_MODEL),
|
|
"prompt": prompt,
|
|
"size": body.get("size", self.valves.DEFAULT_SIZE),
|
|
"n": 1,
|
|
"response_format": "b64_json"
|
|
}
|
|
|
|
# Call LiteLLM images endpoint
|
|
response = requests.post(
|
|
f"{self.valves.LITELLM_API_BASE}/images/generations",
|
|
json=image_request,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {self.valves.LITELLM_API_KEY}"
|
|
},
|
|
timeout=self.valves.TIMEOUT
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
yield self._error_response(
|
|
f"Image generation failed: {response.status_code} - {response.text}"
|
|
)
|
|
return
|
|
|
|
# Parse response
|
|
result = response.json()
|
|
|
|
# Check if we got image data
|
|
if "data" not in result or len(result["data"]) == 0:
|
|
yield self._error_response("No image data in response")
|
|
return
|
|
|
|
# Get base64 image data
|
|
image_data = result["data"][0].get("b64_json")
|
|
if not image_data:
|
|
yield self._error_response("No base64 image data in response")
|
|
return
|
|
|
|
# Return image as markdown
|
|
image_markdown = f"\n\n**Prompt:** {prompt}"
|
|
|
|
# Yield final response
|
|
yield json.dumps({
|
|
"choices": [{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": image_markdown
|
|
},
|
|
"finish_reason": "stop"
|
|
}]
|
|
})
|
|
|
|
except requests.Timeout:
|
|
yield self._error_response(f"Request timed out after {self.valves.TIMEOUT}s")
|
|
except requests.RequestException as e:
|
|
yield self._error_response(f"Request failed: {str(e)}")
|
|
except Exception as e:
|
|
yield self._error_response(f"Unexpected error: {str(e)}")
|
|
|
|
def _error_response(self, error_message: str) -> str:
|
|
"""Generate error response in OpenAI format"""
|
|
return json.dumps({
|
|
"choices": [{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": f"Error: {error_message}"
|
|
},
|
|
"finish_reason": "stop"
|
|
}]
|
|
})
|