import os import random import numpy as np import gradio as gr from PIL import Image, ImageChops from typing import Dict, Any, List from core.settings import INPUT_DIR from utils.app_utils import ( sanitize_filename, get_lora_path, get_embedding_path, ensure_controlnet_model_downloaded, ensure_ipadapter_models_downloaded, _ensure_model_downloaded, ensure_sd3_ipadapter_models_downloaded, get_vae_path, ) def process_pipeline_inputs(ui_inputs: Dict[str, Any], progress: gr.Progress, workflow_model_type: str) -> Dict[str, Any]: task_type = ui_inputs['task_type'] temp_files_to_clean = [] lora_data = ui_inputs.get('lora_data', []) active_loras_for_gpu, active_loras_for_meta = [], [] if lora_data: sources, ids, scales, files = lora_data[0::4], lora_data[1::4], lora_data[2::4], lora_data[3::4] for i, (source, lora_id, scale, _) in enumerate(zip(sources, ids, scales, files)): if scale > 0 and lora_id and lora_id.strip(): lora_filename = None if source == "File": lora_filename = sanitize_filename(lora_id) elif source == "Civitai": local_path, status = get_lora_path(source, lora_id, os.environ.get("CIVITAI_API_KEY", ""), progress) if local_path: lora_filename = os.path.basename(local_path) else: raise gr.Error(f"Failed to prepare LoRA {lora_id}: {status}") if lora_filename: active_loras_for_gpu.append({"lora_name": lora_filename, "strength_model": scale, "strength_clip": scale}) active_loras_for_meta.append(f"{source} {lora_id}:{scale}") ui_inputs['denoise'] = 1.0 if task_type == 'img2img': ui_inputs['denoise'] = ui_inputs.get('img2img_denoise', 0.7) elif task_type == 'hires_fix': ui_inputs['denoise'] = ui_inputs.get('hires_denoise', 0.55) elif task_type == 'inpaint': ui_inputs['denoise'] = ui_inputs.get('inpaint_denoise', 1.0) if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) if task_type == 'img2img': input_image_pil = ui_inputs.get('img2img_image') if not input_image_pil: raise gr.Error("Please upload an image for Image-to-Image.") temp_file_path = os.path.join(INPUT_DIR, f"temp_input_{random.randint(1000, 9999)}.png") input_image_pil.save(temp_file_path, "PNG") ui_inputs['input_image'] = os.path.basename(temp_file_path) temp_files_to_clean.append(temp_file_path) ui_inputs['width'] = input_image_pil.width ui_inputs['height'] = input_image_pil.height elif task_type == 'inpaint': inpaint_dict = ui_inputs.get('inpaint_image_dict') if not inpaint_dict or not inpaint_dict.get('background') or not inpaint_dict.get('layers'): raise gr.Error("Inpainting requires an input image and a drawn mask.") background_img = inpaint_dict['background'].convert("RGBA") composite_mask_pil = Image.new('L', background_img.size, 0) for layer in inpaint_dict['layers']: if layer: layer_alpha = layer.split()[-1] composite_mask_pil = ImageChops.lighter(composite_mask_pil, layer_alpha) inverted_mask_alpha = Image.fromarray(255 - np.array(composite_mask_pil), mode='L') r, g, b, _ = background_img.split() composite_image_with_mask = Image.merge('RGBA', [r, g, b, inverted_mask_alpha]) temp_file_path = os.path.join(INPUT_DIR, f"temp_inpaint_composite_{random.randint(1000, 9999)}.png") composite_image_with_mask.save(temp_file_path, "PNG") ui_inputs['input_image'] = os.path.basename(temp_file_path) temp_files_to_clean.append(temp_file_path) ui_inputs.pop('inpaint_mask', None) elif task_type == 'outpaint': input_image_pil = ui_inputs.get('outpaint_image') if not input_image_pil: raise gr.Error("Please upload an image for Outpainting.") temp_file_path = os.path.join(INPUT_DIR, f"temp_input_{random.randint(1000, 9999)}.png") input_image_pil.save(temp_file_path, "PNG") ui_inputs['input_image'] = os.path.basename(temp_file_path) temp_files_to_clean.append(temp_file_path) ui_inputs['megapixels'] = 0.25 ui_inputs['grow_mask_by'] = ui_inputs.get('feathering', 10) elif task_type == 'hires_fix': input_image_pil = ui_inputs.get('hires_image') if not input_image_pil: raise gr.Error("Please upload an image for Hires Fix.") temp_file_path = os.path.join(INPUT_DIR, f"temp_input_{random.randint(1000, 9999)}.png") input_image_pil.save(temp_file_path, "PNG") ui_inputs['input_image'] = os.path.basename(temp_file_path) temp_files_to_clean.append(temp_file_path) embedding_data = ui_inputs.get('embedding_data', []) embedding_filenames = [] if embedding_data: emb_sources, emb_ids, emb_files = embedding_data[0::3], embedding_data[1::3], embedding_data[2::3] for i, (source, emb_id, _) in enumerate(zip(emb_sources, emb_ids, emb_files)): if emb_id and emb_id.strip(): emb_filename = None if source == "File": emb_filename = sanitize_filename(emb_id) elif source == "Civitai": local_path, status = get_embedding_path(source, emb_id, os.environ.get("CIVITAI_API_KEY", ""), progress) if local_path: emb_filename = os.path.basename(local_path) else: raise gr.Error(f"Failed to prepare Embedding {emb_id}: {status}") if emb_filename: embedding_filenames.append(emb_filename) if embedding_filenames: embedding_prompt_text = " ".join([f"embedding:{f}" for f in embedding_filenames]) if ui_inputs['positive_prompt']: ui_inputs['positive_prompt'] = f"{ui_inputs['positive_prompt']}, {embedding_prompt_text}" else: ui_inputs['positive_prompt'] = embedding_prompt_text controlnet_data = ui_inputs.get('controlnet_data', []) active_controlnets = [] if controlnet_data: (cn_images, _, _, cn_strengths, cn_filepaths) = [controlnet_data[i::5] for i in range(5)] for i in range(len(cn_images)): if cn_images[i] and cn_strengths[i] > 0 and cn_filepaths[i] and cn_filepaths[i] != "None": ensure_controlnet_model_downloaded(cn_filepaths[i], progress) if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) cn_temp_path = os.path.join(INPUT_DIR, f"temp_cn_{i}_{random.randint(1000, 9999)}.png") cn_images[i].save(cn_temp_path, "PNG") temp_files_to_clean.append(cn_temp_path) active_controlnets.append({ "image": os.path.basename(cn_temp_path), "strength": cn_strengths[i], "start_percent": 0.0, "end_percent": 1.0, "control_net_name": cn_filepaths[i] }) anima_controlnet_lllite_data = ui_inputs.get('anima_controlnet_lllite_data', []) active_anima_controlnets = [] if anima_controlnet_lllite_data: (cn_images, _, _, cn_strengths, cn_filepaths, cn_starts, cn_ends) = [anima_controlnet_lllite_data[i::7] for i in range(7)] for i in range(len(cn_images)): if cn_images[i] and cn_strengths[i] > 0 and cn_filepaths[i] and cn_filepaths[i] != "None": _ensure_model_downloaded(cn_filepaths[i], progress) if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) cn_temp_path = os.path.join(INPUT_DIR, f"temp_anima_cn_{i}_{random.randint(1000, 9999)}.png") cn_images[i].save(cn_temp_path, "PNG") temp_files_to_clean.append(cn_temp_path) active_anima_controlnets.append({ "image": os.path.basename(cn_temp_path), "strength": cn_strengths[i], "start_percent": cn_starts[i], "end_percent": cn_ends[i], "control_net_name": cn_filepaths[i] }) diffsynth_controlnet_data = ui_inputs.get('diffsynth_controlnet_data', []) active_diffsynth_controlnets = [] if diffsynth_controlnet_data: (cn_images, _, _, cn_strengths, cn_filepaths) = [diffsynth_controlnet_data[i::5] for i in range(5)] for i in range(len(cn_images)): if cn_images[i] and cn_strengths[i] > 0 and cn_filepaths[i] and cn_filepaths[i] != "None": ensure_controlnet_model_downloaded(cn_filepaths[i], progress) if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) cn_temp_path = os.path.join(INPUT_DIR, f"temp_diffsynth_cn_{i}_{random.randint(1000, 9999)}.png") cn_images[i].save(cn_temp_path, "PNG") temp_files_to_clean.append(cn_temp_path) active_diffsynth_controlnets.append({ "image": os.path.basename(cn_temp_path), "strength": cn_strengths[i], "control_net_name": cn_filepaths[i] }) ipadapter_data = ui_inputs.get('ipadapter_data', []) active_ipadapters = [] if ipadapter_data: num_ipa_units = (len(ipadapter_data) - 5) // 3 final_preset, final_weight, final_lora_strength, final_embeds_scaling, final_combine_method = ipadapter_data[-5:] ipa_images, ipa_weights, ipa_lora_strengths = [ipadapter_data[i*num_ipa_units:(i+1)*num_ipa_units] for i in range(3)] all_presets_to_download = set() for i in range(num_ipa_units): if ipa_images[i] and ipa_weights[i] > 0 and final_preset: all_presets_to_download.add(final_preset) if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) ipa_temp_path = os.path.join(INPUT_DIR, f"temp_ipa_{i}_{random.randint(1000, 9999)}.png") ipa_images[i].save(ipa_temp_path, "PNG") temp_files_to_clean.append(ipa_temp_path) active_ipadapters.append({ "image": os.path.basename(ipa_temp_path), "preset": final_preset, "weight": ipa_weights[i], "lora_strength": ipa_lora_strengths[i] }) if active_ipadapters and final_preset: all_presets_to_download.add(final_preset) for preset in all_presets_to_download: ensure_ipadapter_models_downloaded(preset, progress) model_type_key = 'sd15' if workflow_model_type == 'sd15' else 'sdxl' if active_ipadapters: active_ipadapters.append({ 'is_final_settings': True, 'model_type': model_type_key, 'final_preset': final_preset, 'final_weight': final_weight, 'final_lora_strength': final_lora_strength, 'final_embeds_scaling': final_embeds_scaling, 'final_combine_method': final_combine_method }) flux1_ipadapter_data = ui_inputs.get('flux1_ipadapter_data', []) active_flux1_ipadapters = [] if flux1_ipadapter_data: num_units = len(flux1_ipadapter_data) // 4 f_images = flux1_ipadapter_data[0*num_units : 1*num_units] f_weights = flux1_ipadapter_data[1*num_units : 2*num_units] f_starts = flux1_ipadapter_data[2*num_units : 3*num_units] f_ends = flux1_ipadapter_data[3*num_units : 4*num_units] for i in range(len(f_images)): if f_images[i] and f_weights[i] > 0: for filename in ["ip-adapter.bin"]: _ensure_model_downloaded(filename, progress) from huggingface_hub import snapshot_download progress(0.5, desc="Caching HF SigLIP model...") snapshot_download( repo_id="google/siglip-so400m-patch14-384", allow_patterns=["*.json", "*.safetensors", "*.txt"], ignore_patterns=["*.msgpack", "*.h5", "*.bin"] ) temp_path = os.path.join(INPUT_DIR, f"temp_fipa_{i}_{random.randint(1000, 9999)}.png") f_images[i].save(temp_path, "PNG") temp_files_to_clean.append(temp_path) active_flux1_ipadapters.append({ "image": os.path.basename(temp_path), "weight": f_weights[i], "start_percent": f_starts[i], "end_percent": f_ends[i] }) sd3_ipadapter_data = ui_inputs.get('sd3_ipadapter_chain', []) active_sd3_ipadapters = [] if sd3_ipadapter_data: num_units = len(sd3_ipadapter_data) // 4 s_images = sd3_ipadapter_data[0*num_units : 1*num_units] s_weights = sd3_ipadapter_data[1*num_units : 2*num_units] s_starts = sd3_ipadapter_data[2*num_units : 3*num_units] s_ends = sd3_ipadapter_data[3*num_units : 4*num_units] sd3_ipa_downloaded = False for i in range(len(s_images)): if s_images[i] and s_weights[i] > 0: if not sd3_ipa_downloaded: ensure_sd3_ipadapter_models_downloaded(progress) sd3_ipa_downloaded = True temp_path = os.path.join(INPUT_DIR, f"temp_s3ipa_{i}_{random.randint(1000, 9999)}.png") s_images[i].save(temp_path, "PNG") temp_files_to_clean.append(temp_path) active_sd3_ipadapters.append({ "image": os.path.basename(temp_path), "weight": s_weights[i], "start_percent": s_starts[i], "end_percent": s_ends[i] }) style_data = ui_inputs.get('style_data', []) active_styles = [] if style_data: num_units = len(style_data) // 2 st_images = style_data[0*num_units : 1*num_units] st_strengths = style_data[1*num_units : 2*num_units] for i in range(len(st_images)): if st_images[i] and st_strengths[i] > 0: _ensure_model_downloaded("sigclip_vision_patch14_384.safetensors", progress) temp_path = os.path.join(INPUT_DIR, f"temp_style_{i}_{random.randint(1000, 9999)}.png") st_images[i].save(temp_path, "PNG") temp_files_to_clean.append(temp_path) active_styles.append({ "image": os.path.basename(temp_path), "strength": st_strengths[i] }) reference_latent_data = ui_inputs.get('reference_latent_data', []) active_reference_latents = [] if reference_latent_data: for img in reference_latent_data: if img: if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) temp_path = os.path.join(INPUT_DIR, f"temp_ref_{random.randint(1000, 9999)}.png") img.save(temp_path, "PNG") temp_files_to_clean.append(temp_path) active_reference_latents.append(os.path.basename(temp_path)) hidream_o1_reference_data = ui_inputs.get('hidream_o1_reference_data', []) active_hidream_o1_reference = [] if hidream_o1_reference_data: for img in hidream_o1_reference_data: if img: if not os.path.exists(INPUT_DIR): os.makedirs(INPUT_DIR) temp_path = os.path.join(INPUT_DIR, f"temp_ho1_ref_{random.randint(1000, 9999)}.png") img.save(temp_path, "PNG") temp_files_to_clean.append(temp_path) active_hidream_o1_reference.append(os.path.basename(temp_path)) vae_source = ui_inputs.get('vae_source') vae_id = ui_inputs.get('vae_id') vae_name_override = None if vae_source and vae_source != "None": if vae_source == "File": vae_name_override = sanitize_filename(vae_id) elif vae_source == "Civitai" and vae_id and vae_id.strip(): local_path, status = get_vae_path(vae_source, vae_id, os.environ.get("CIVITAI_API_KEY", ""), progress) if local_path: vae_name_override = os.path.basename(local_path) else: raise gr.Error(f"Failed to prepare VAE {vae_id}: {status}") if vae_name_override: ui_inputs['vae_name'] = vae_name_override conditioning_data = ui_inputs.get('conditioning_data', []) active_conditioning = [] if conditioning_data: num_units = len(conditioning_data) // 6 prompts, widths, heights, xs, ys, strengths = [conditioning_data[i*num_units : (i+1)*num_units] for i in range(6)] for i in range(num_units): if prompts[i] and prompts[i].strip(): active_conditioning.append({ "prompt": prompts[i], "width": int(widths[i]), "height": int(heights[i]), "x": int(xs[i]), "y": int(ys[i]), "strength": float(strengths[i]) }) return { "active_loras_for_gpu": active_loras_for_gpu, "active_loras_for_meta": active_loras_for_meta, "active_controlnets": active_controlnets, "active_anima_controlnets": active_anima_controlnets, "active_diffsynth_controlnets": active_diffsynth_controlnets, "active_ipadapters": active_ipadapters, "active_flux1_ipadapters": active_flux1_ipadapters, "active_sd3_ipadapters": active_sd3_ipadapters, "active_styles": active_styles, "active_reference_latents": active_reference_latents, "active_hidream_o1_reference": active_hidream_o1_reference, "active_conditioning": active_conditioning, "temp_files_to_clean": temp_files_to_clean }