Spaces:
Running on Zero
Running on Zero
| import os | |
| import hashlib | |
| import re | |
| from pathlib import Path | |
| import shutil | |
| from typing import Sequence, Mapping, Any, Union | |
| import gradio as gr | |
| PREPROCESSOR_MODEL_MAP = None | |
| PREPROCESSOR_PARAMETER_MAP = None | |
| def save_uploaded_file_with_hash(file_obj: gr.File, target_dir: str) -> str: | |
| if not file_obj: | |
| return "" | |
| temp_path = file_obj.name | |
| sha256 = hashlib.sha256() | |
| with open(temp_path, 'rb') as f: | |
| for block in iter(lambda: f.read(65536), b''): | |
| sha256.update(block) | |
| file_hash = sha256.hexdigest() | |
| _, extension = os.path.splitext(temp_path) | |
| hashed_filename = f"{file_hash}{extension.lower()}" | |
| dest_path = os.path.join(target_dir, hashed_filename) | |
| os.makedirs(target_dir, exist_ok=True) | |
| if not os.path.exists(dest_path): | |
| shutil.copy(temp_path, dest_path) | |
| print(f"✅ Saved uploaded file as: {dest_path}") | |
| else: | |
| print(f"ℹ️ File already exists (deduplicated): {dest_path}") | |
| return hashed_filename | |
| def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
| try: | |
| return obj[index] | |
| except (KeyError, IndexError): | |
| try: | |
| return obj["result"][index] | |
| except (KeyError, IndexError): | |
| return None | |
| def build_preprocessor_model_map(): | |
| global PREPROCESSOR_MODEL_MAP | |
| if PREPROCESSOR_MODEL_MAP is not None: return PREPROCESSOR_MODEL_MAP | |
| print("--- Building ControlNet Preprocessor model map ---") | |
| manual_map = { | |
| "dwpose": [("yzd-v/DWPose", "yolox_l.onnx"), ("yzd-v/DWPose", "dw-ll_ucoco_384.onnx"), ("hr16/UnJIT-DWPose", "dw-ll_ucoco.onnx"), ("hr16/DWPose-TorchScript-BatchSize5", "dw-ll_ucoco_384_bs5.torchscript.pt"), ("hr16/DWPose-TorchScript-BatchSize5", "rtmpose-m_ap10k_256_bs5.torchscript.pt"), ("hr16/yolo-nas-fp16", "yolo_nas_l_fp16.onnx"), ("hr16/yolo-nas-fp16", "yolo_nas_m_fp16.onnx"), ("hr16/yolo-nas-fp16", "yolo_nas_s_fp16.onnx")], | |
| "densepose": [("LayerNorm/DensePose-TorchScript-with-hint-image", "densepose_r50_fpn_dl.torchscript"), ("LayerNorm/DensePose-TorchScript-with-hint-image", "densepose_r101_fpn_dl.torchscript")] | |
| } | |
| temp_map = {} | |
| wrappers_dir = Path("./custom_nodes/comfyui_controlnet_aux/node_wrappers/") | |
| if not wrappers_dir.exists(): | |
| print("⚠️ ControlNet AUX wrappers directory not found. Cannot build model map.") | |
| PREPROCESSOR_MODEL_MAP = {}; return PREPROCESSOR_MODEL_MAP | |
| for wrapper_file in wrappers_dir.glob("*.py"): | |
| if wrapper_file.name == "__init__.py": continue | |
| with open(wrapper_file, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| display_name_matches = re.findall(r'NODE_DISPLAY_NAME_MAPPINGS\s*=\s*{(?:.|\n)*?["\'](.*?)["\']\s*:\s*["\'](.*?)["\']', content) | |
| for _, display_name in display_name_matches: | |
| if display_name not in temp_map: temp_map[display_name] = [] | |
| manual_key = wrapper_file.stem | |
| if manual_key in manual_map: temp_map[display_name].extend(manual_map[manual_key]) | |
| matches = re.findall(r"from_pretrained\s*\(\s*(?:filename=)?\s*f?[\"']([^\"']+)[\"']", content) | |
| for model_filename in matches: | |
| repo_id = "lllyasviel/Annotators" | |
| if "depth_anything" in model_filename and "v2" in model_filename: repo_id = "LiheYoung/Depth-Anything-V2" | |
| elif "depth_anything" in model_filename: repo_id = "LiheYoung/Depth-Anything" | |
| elif "diffusion_edge" in model_filename: repo_id = "hr16/Diffusion-Edge" | |
| temp_map[display_name].append((repo_id, model_filename)) | |
| final_map = {name: sorted(list(set(models))) for name, models in temp_map.items() if models} | |
| PREPROCESSOR_MODEL_MAP = final_map | |
| print("✅ ControlNet Preprocessor model map built."); return PREPROCESSOR_MODEL_MAP | |
| def build_preprocessor_parameter_map(): | |
| global PREPROCESSOR_PARAMETER_MAP | |
| if PREPROCESSOR_PARAMETER_MAP is not None: return | |
| print("--- Building ControlNet Preprocessor parameter map ---") | |
| param_map = {} | |
| from nodes import NODE_CLASS_MAPPINGS, NODE_DISPLAY_NAME_MAPPINGS | |
| for class_name, node_class in NODE_CLASS_MAPPINGS.items(): | |
| if not hasattr(node_class, "INPUT_TYPES"): continue | |
| if hasattr(node_class, '__module__') and 'comfyui_controlnet_aux.node_wrappers' not in node_class.__module__: continue | |
| display_name = NODE_DISPLAY_NAME_MAPPINGS.get(class_name) | |
| if not display_name: continue | |
| try: | |
| input_types = node_class.INPUT_TYPES() | |
| all_inputs = {**input_types.get('required', {}), **input_types.get('optional', {})} | |
| params = [] | |
| for name, details in all_inputs.items(): | |
| if name in ['image', 'resolution', 'pose_kps']: continue | |
| if not isinstance(details, (list, tuple)) or not details: continue | |
| param_type = details[0] | |
| param_config = details[1] if len(details) > 1 and isinstance(details[1], dict) else {} | |
| param_info = {"name": name, "type": param_type, "config": param_config} | |
| params.append(param_info) | |
| if params: param_map[display_name] = params | |
| except Exception as e: | |
| print(f"⚠️ Could not parse parameters for {display_name}: {e}") | |
| PREPROCESSOR_PARAMETER_MAP = param_map | |
| print("✅ ControlNet Preprocessor parameter map built.") | |
| def print_welcome_message(): | |
| border = "=" * 72 | |
| message = ( | |
| f"\n{border}\n\n" | |
| f" Welcome to ControlNet Preprocessors\n" | |
| f" Based on comfyui_controlnet_aux\n\n" | |
| f"{border}\n" | |
| ) | |
| print(message) |