ENABLE_AUDIO = False # Set to True to show audio checkbox (adds ~4h on CPU) """LTX 2.3 CPU Space -- 10Eros + cond_safe distill LoRA via ComfyUI GGUF. Path C: 10Eros fine-tune (Q3_K_M GGUF) + cond_safe distill 1.1 LoRA. Abliterated Gemma-3-12B text encoder. Free HF CPU Space (18 GB RAM). """ import json import os import re import shutil import subprocess import sys import tempfile import time import uuid from pathlib import Path COMFY = Path("/app/ComfyUI") MODELS = COMFY / "models" OUTPUT = COMFY / "output" DOWNLOAD_MANIFEST = [ { "repo": "vantagewithai/LTX2.3-10Eros-GGUF", "file": "10Eros_v1-Q3_K_M.gguf", "dest": MODELS / "diffusion_models" / "10Eros_v1-Q3_K_M.gguf", "label": "10Eros DiT Q3_K_M (10.4 GB)", }, { "repo": "mradermacher/gemma-3-12b-it-qat-abliterated-GGUF", "file": "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", "dest": MODELS / "text_encoders" / "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", "label": "Gemma-3-12B abliterated Q3_K_M (5.6 GB)", }, { "repo": "Kijai/LTX2.3_comfy", "file": "text_encoders/ltx-2.3_text_projection_bf16.safetensors", "dest": MODELS / "text_encoders" / "ltx-2.3_text_projection_bf16.safetensors", "label": "Text projection (2.2 GB)", }, { "repo": "Kijai/LTX2.3_comfy", "file": "vae/taeltx2_3.safetensors", "dest": MODELS / "vae" / "taeltx2_3.safetensors", "label": "Tiny VAE (22 MB)", }, { "repo": "Kijai/LTX2.3_comfy", "file": "vae/LTX23_video_vae_bf16.safetensors", "dest": MODELS / "vae" / "LTX23_video_vae_bf16.safetensors", "label": "Full video VAE (1.4 GB)", }, { "repo": "TenStrip/LTX2.3_Distilled_Lora_1.1_Experiments", "file": "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", "dest": MODELS / "loras" / "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", "label": "cond_safe distill LoRA (662 MB)", }, ] WORKFLOW_TEMPLATE = { "1": { "class_type": "UnetLoaderGGUF", "inputs": {"unet_name": "10Eros_v1-Q3_K_M.gguf"}, }, "2": { "class_type": "DualCLIPLoaderGGUF", "inputs": { "clip_name1": "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", "clip_name2": "ltx-2.3_text_projection_bf16.safetensors", "type": "ltxv", }, }, "3": { "class_type": "LoraLoaderModelOnly", "inputs": { "model": ["1", 0], "lora_name": "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", "strength_model": 0.6, }, }, "40": { "class_type": "CLIPTextEncode", "inputs": {"text": "__PROMPT__", "clip": ["2", 0]}, }, "41": { "class_type": "CLIPTextEncode", "inputs": {"text": "blurry, oversaturated, low resolution, distorted", "clip": ["2", 0]}, }, "4": { "class_type": "LTXVConditioning", "inputs": {"positive": ["40", 0], "negative": ["41", 0], "frame_rate": 24}, }, "5": { "class_type": "EmptyLTXVLatentVideo", "inputs": {"width": 512, "height": 320, "length": 49, "batch_size": 1}, }, "50": { "class_type": "LTXVEmptyLatentAudio", "inputs": {"audio_vae": ["53", 0], "frame_rate": 24, "frames_number": 49, "batch_size": 1}, }, "51": { "class_type": "LTXVConcatAVLatent", "inputs": {"video_latent": ["5", 0], "audio_latent": ["50", 0]}, }, "7": { "class_type": "CFGGuider", "inputs": { "model": ["3", 0], "positive": ["4", 0], "negative": ["4", 1], "cfg": 1.0, }, }, "8": { "class_type": "LTXVScheduler", "inputs": {"steps": 8, "max_shift": 2.05, "base_shift": 0.95, "stretch": True, "terminal": 0.1}, }, "9": { "class_type": "KSamplerSelect", "inputs": {"sampler_name": "euler_ancestral_cfg_pp"}, }, "10": { "class_type": "RandomNoise", "inputs": {"noise_seed": 42}, }, "11": { "class_type": "SamplerCustomAdvanced", "inputs": { "noise": ["10", 0], "guider": ["7", 0], "sampler": ["9", 0], "sigmas": ["8", 0], "latent_image": ["51", 0], }, }, "52": { "class_type": "LTXVSeparateAVLatent", "inputs": {"av_latent": ["11", 0]}, }, "12": { "class_type": "VAELoader", "inputs": {"vae_name": "taeltx2_3.safetensors"}, }, "13": { "class_type": "VAEDecode", "inputs": {"samples": ["52", 0], "vae": ["12", 0]}, }, "53": { "class_type": "VAELoaderKJ", "inputs": {"vae_name": "LTX23_audio_vae_bf16.safetensors", "device": "main_device", "dtype": "bf16", "weight_dtype": "bf16"}, }, "54": { "class_type": "LTXVAudioVAEDecode", "inputs": {"audio_latent": ["52", 1], "vae": ["53", 0]}, }, "14": { "class_type": "SaveAnimatedWEBP", "inputs": { "images": ["13", 0], "filename_prefix": "ltx_output", "fps": 24.0, "lossless": False, "quality": 80, "method": "default", }, }, } NODE_LABELS = { "1": "Loading DiT GGUF", "2": "Loading Gemma+Projection", "3": "Applying distill LoRA", "4": "Encoding text", "5": "Creating video latent", "7": "Building guider", "8": "Computing schedule", "9": "Selecting sampler", "10": "Generating noise", "11": "Diffusion", "12": "Loading VAE", "13": "Decoding video", "14": "Saving output", "50": "Creating audio latent", "51": "Merging AV latents", "52": "Separating AV", "53": "Loading audio VAE", "54": "Decoding audio", "20": "Loading image", "21": "Preprocessing image", "22": "I2V conditioning", "30": "Applying user LoRA", "40": "Encoding prompt", "41": "Encoding negative", } def _download_models(progress_cb=None): from huggingface_hub import hf_hub_download for i, m in enumerate(DOWNLOAD_MANIFEST): dest = Path(m["dest"]) if dest.exists(): continue label = m["label"] if progress_cb: progress_cb((i / len(DOWNLOAD_MANIFEST)), desc=f"Downloading {label} (cache miss)...") print(f"[download] {label} from {m['repo']}/{m['file']}", flush=True) cached = hf_hub_download(repo_id=m["repo"], filename=m["file"]) dest.parent.mkdir(parents=True, exist_ok=True) try: os.symlink(cached, str(dest)) except OSError: shutil.copy2(cached, str(dest)) if progress_cb: progress_cb(1.0, desc="Models ready") _comfy_proc = None def _ensure_comfy(): global _comfy_proc if _comfy_proc is not None and _comfy_proc.poll() is None: return print("[comfy] Starting ComfyUI headless (--cpu)...", flush=True) _comfy_proc = subprocess.Popen( [ sys.executable, "-u", str(COMFY / "main.py"), "--cpu", "--listen", "127.0.0.1", "--port", "8188", "--dont-print-server", "--force-fp32", "--cache-none", ], cwd=str(COMFY), stdout=sys.stdout, stderr=sys.stderr, ) import urllib.request for attempt in range(120): time.sleep(2) try: urllib.request.urlopen("http://127.0.0.1:8188/system_stats", timeout=2) print("[comfy] Server ready", flush=True) return except Exception: pass raise RuntimeError("ComfyUI failed to start within 240s") def _search_hf_loras(query: str) -> list[str]: if not query or len(query) < 2: query = "ltx 2.3 lora" try: from huggingface_hub import HfApi api = HfApi() results = list(api.list_models(search=query, limit=15)) return [m.id for m in results if m.id] except Exception: return [] def _resolve_lora_files(repo_id: str) -> list[str]: if not repo_id or "/" not in repo_id: return [] try: from huggingface_hub import HfApi api = HfApi() files = api.list_repo_files(repo_id) return [f for f in files if f.endswith(".safetensors") and "lora" in f.lower()] except Exception: return [] _ic_lora_cache: dict[str, bool] = {} def _is_ic_lora(lora_path: str) -> bool: if not lora_path: return False if lora_path in _ic_lora_cache: return _ic_lora_cache[lora_path] result = _detect_ic_lora(lora_path) _ic_lora_cache[lora_path] = result return result def _detect_ic_lora(lora_path: str) -> bool: if re.search(r"ic[-_]?lora", lora_path, re.IGNORECASE): return True local = MODELS / "loras" / lora_path if local.exists(): try: return _check_safetensors_header(str(local)) except Exception: return False if "/" in lora_path: parts = lora_path.split("/") if len(parts) >= 3: repo_id = f"{parts[0]}/{parts[1]}" filename = "/".join(parts[2:]) try: from huggingface_hub import hf_hub_download cached = hf_hub_download(repo_id=repo_id, filename=filename) return _check_safetensors_header(cached) except Exception: pass return False def _check_safetensors_header(filepath: str) -> bool: with open(filepath, "rb") as f: header_size = int.from_bytes(f.read(8), "little") if header_size > 10_000_000: return False header_json = f.read(header_size).decode("utf-8", errors="ignore") return "reference_downscale_factor" in header_json def _download_user_lora(repo_id: str, filename: str) -> str | None: if not repo_id or not filename: return None from huggingface_hub import hf_hub_download lora_dir = MODELS / "loras" lora_dir.mkdir(parents=True, exist_ok=True) local_name = f"{repo_id.replace('/', '_')}_{filename.replace('/', '_')}" dest = lora_dir / local_name if dest.exists(): return local_name try: token = os.environ.get("HF_TOKEN") cached = hf_hub_download(repo_id=repo_id, filename=filename, token=token) try: os.symlink(cached, str(dest)) except OSError: shutil.copy2(cached, str(dest)) return local_name except Exception as e: print(f"[lora] Failed to download {repo_id}/{filename}: {e}", flush=True) return None def _build_workflow(prompt: str, steps: int, duration_sec: float, seed: int, img_name: str | None = None, user_lora: str | None = None, lora_strength: float = 0.6, vid_w: int | None = None, vid_h: int | None = None, enable_audio: bool = True) -> dict: wf = json.loads(json.dumps(WORKFLOW_TEMPLATE)) wf["40"]["inputs"]["text"] = prompt frames = max(9, int(duration_sec * 24) + 1) wf["5"]["inputs"]["length"] = frames if not enable_audio: for n in ["49", "50", "51", "52", "53", "54"]: wf.pop(n, None) wf["11"]["inputs"]["latent_image"] = ["5", 0] wf["13"]["inputs"]["samples"] = ["11", 0] else: wf["50"]["inputs"]["frames_number"] = frames if vid_w and vid_h: wf["5"]["inputs"]["width"] = vid_w wf["5"]["inputs"]["height"] = vid_h wf["8"]["inputs"]["steps"] = steps wf["10"]["inputs"]["noise_seed"] = seed model_source = "3" is_ic = _is_ic_lora(user_lora) if user_lora else False if user_lora and is_ic: wf["30"] = { "class_type": "LTXICLoRALoaderModelOnly", "inputs": { "model": [model_source, 0], "lora_name": user_lora, "strength_model": lora_strength, }, } model_source = "30" wf["7"]["inputs"]["model"] = [model_source, 0] elif user_lora: wf["30"] = { "class_type": "LoraLoaderModelOnly", "inputs": { "model": [model_source, 0], "lora_name": user_lora, "strength_model": lora_strength, }, } model_source = "30" wf["7"]["inputs"]["model"] = [model_source, 0] if img_name: wf["20"] = { "class_type": "LoadImage", "inputs": {"image": img_name}, } if not is_ic: wf["12"]["inputs"]["vae_name"] = "LTX23_video_vae_bf16.safetensors" wf["25"] = { "class_type": "ImageScale", "inputs": { "image": ["20", 0], "upscale_method": "lanczos", "width": wf["5"]["inputs"]["width"], "height": wf["5"]["inputs"]["height"], "crop": "center", }, } wf["21"] = { "class_type": "LTXVPreprocess", "inputs": {"image": ["25", 0], "img_compression": 18}, } wf["22"] = { "class_type": "LTXVImgToVideoInplace", "inputs": { "latent": ["5", 0], "vae": ["12", 0], "image": ["21", 0], "strength": 0.7, "bypass": False, "use_slerp": False, }, } if "51" in wf: wf["51"]["inputs"]["video_latent"] = ["22", 0] else: wf["11"]["inputs"]["latent_image"] = ["22", 0] if is_ic and img_name: wf["12"]["inputs"]["vae_name"] = "LTX23_video_vae_bf16.safetensors" wf["25"] = { "class_type": "ImageScale", "inputs": { "image": ["20", 0], "upscale_method": "lanczos", "width": wf["5"]["inputs"]["width"], "height": wf["5"]["inputs"]["height"], "crop": "center", }, } wf["31"] = { "class_type": "LTXAddVideoICLoRAGuide", "inputs": { "positive": ["4", 0], "negative": ["4", 1], "vae": ["12", 0], "latent": ["5", 0], "image": ["25", 0], "frame_idx": 0, "strength": 1.0, "latent_downscale_factor": ["30", 1], "crop": "disabled", "use_tiled_encode": False, "tile_size": 512, "tile_overlap": 64, }, } wf["7"]["inputs"]["positive"] = ["31", 0] wf["7"]["inputs"]["negative"] = ["31", 1] wf["11"]["inputs"]["latent_image"] = ["31", 2] return wf def _submit_and_poll(workflow: dict, status_cb=None, timeout: int = 21600) -> str | None: import urllib.request import websocket client_id = str(uuid.uuid4()) payload = json.dumps({"prompt": workflow, "client_id": client_id}).encode() req = urllib.request.Request( "http://127.0.0.1:8188/prompt", data=payload, headers={"Content-Type": "application/json"}, ) resp = urllib.request.urlopen(req, timeout=30) resp_data = json.loads(resp.read()) pid = resp_data.get("prompt_id", client_id) t0 = time.time() current_step = 0 max_steps = 0 current_label = "Queued" def _status_line(): elapsed = int(time.time() - t0) m, s = divmod(elapsed, 60) if max_steps > 0: return f"[{current_step}/{max_steps}] {m}m{s:02d}s: {current_label}" return f"{m}m{s:02d}s: {current_label}" ws = websocket.WebSocket() ws.settimeout(timeout) ws.connect(f"ws://127.0.0.1:8188/ws?clientId={client_id}") try: while time.time() - t0 < timeout: try: raw = ws.recv() if not raw: continue msg = json.loads(raw) except websocket.WebSocketTimeoutException: break except Exception: continue msg_type = msg.get("type", "") data = msg.get("data", {}) if msg_type == "executing": node_id = data.get("node") if node_id is None: current_label = "Complete" if status_cb: status_cb(_status_line()) break current_label = NODE_LABELS.get(str(node_id), f"Node {node_id}") if status_cb: status_cb(_status_line()) elif msg_type == "progress": current_step = data.get("value", 0) max_steps = data.get("max", 0) node_id = str(data.get("node", "11")) current_label = f"{NODE_LABELS.get(node_id, 'Step')} {current_step}/{max_steps}" if status_cb: status_cb(_status_line()) elif msg_type == "execution_error": err = data.get("exception_message", "Unknown error") current_label = f"Error: {err[:100]}" if status_cb: status_cb(_status_line()) ws.close() return None finally: try: ws.close() except Exception: pass video_path = None audio_path = None try: hist = urllib.request.urlopen(f"http://127.0.0.1:8188/history/{pid}", timeout=10) hdata = json.loads(hist.read()) if pid in hdata: outputs = hdata[pid].get("outputs", {}) for node_id, out in outputs.items(): for key in ("images", "gifs"): if key in out: for item in out[key]: fpath = OUTPUT / item.get("subfolder", "") / item["filename"] if fpath.exists() and not video_path: video_path = str(fpath) if "audio" in out: for item in out["audio"]: fpath = OUTPUT / item.get("subfolder", "") / item["filename"] if fpath.exists() and not audio_path: audio_path = str(fpath) except Exception: pass return video_path, audio_path def generate(prompt, duration_sec, steps, seed, image_path=None, user_lora_file=None, lora_strength=0.6, enable_audio=False, progress=None): import gradio as gr if not prompt.strip(): raise gr.Error("Prompt cannot be empty") status_lines = ["Initializing..."] if progress: progress(0.0, desc="Checking models...") _download_models(progress) if progress: progress(0.15, desc="Starting ComfyUI...") _ensure_comfy() img_name = None img_w, img_h = None, None if image_path: comfy_input = COMFY / "input" comfy_input.mkdir(parents=True, exist_ok=True) img_name = f"input_{uuid.uuid4().hex[:8]}.png" from PIL import Image as PILImage pil_img = PILImage.open(image_path) pil_img.save(str(comfy_input / img_name)) w, h = pil_img.size scale = 512 / max(w, h) img_w = int(w * scale) // 32 * 32 img_h = int(h * scale) // 32 * 32 img_w = max(img_w, 64) img_h = max(img_h, 64) mode = "I2V" if img_name else "T2V" if progress: progress(0.2, desc=f"{mode}: {steps} steps, {duration_sec}s clip...") def _on_status(line): status_lines[0] = line print(f"[status] {line}", flush=True) wf = _build_workflow( prompt, int(steps), float(duration_sec), int(seed), img_name=img_name, user_lora=user_lora_file, lora_strength=float(lora_strength), vid_w=img_w, vid_h=img_h, enable_audio=enable_audio, ) poll_result = _submit_and_poll(wf, status_cb=_on_status) if poll_result is None: raise gr.Error(f"Generation failed: {status_lines[0]}") result_video, result_audio = poll_result if result_video is None: raise gr.Error(f"Generation failed: {status_lines[0]}") result = result_video out_dir = Path(tempfile.mkdtemp()) out_path = out_dir / "output.mp4" try: from PIL import Image as PILImage import cv2 import numpy as np img = PILImage.open(result) frames = [] try: while True: frames.append(np.array(img.convert("RGB"))) img.seek(img.tell() + 1) except EOFError: pass if frames: h, w = frames[0].shape[:2] w2, h2 = w + (w % 2), h + (h % 2) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(str(out_path), fourcc, 24, (w2, h2)) for f in frames: bgr = cv2.cvtColor(f, cv2.COLOR_RGB2BGR) if bgr.shape[1] != w2 or bgr.shape[0] != h2: bgr = cv2.copyMakeBorder(bgr, 0, h2 - h, 0, w2 - w, cv2.BORDER_CONSTANT) writer.write(bgr) writer.release() h264_path = out_dir / "output_h264.mp4" rc = subprocess.run( ["ffmpeg", "-y", "-i", str(out_path), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", str(h264_path)], capture_output=True, timeout=120, ) if rc.returncode == 0 and h264_path.exists(): out_path.unlink() h264_path.rename(out_path) print(f"[output] Converted {len(frames)} frames to mp4 (h264: {'ok' if rc.returncode == 0 else 'fallback mp4v'})", flush=True) if result_audio and Path(result_audio).exists(): av_path = out_dir / "output_av.mp4" av_rc = subprocess.run( ["ffmpeg", "-y", "-i", str(out_path), "-i", result_audio, "-c:v", "copy", "-c:a", "aac", "-shortest", str(av_path)], capture_output=True, timeout=120, ) if av_rc.returncode == 0 and av_path.exists(): out_path.unlink() av_path.rename(out_path) print("[output] Merged audio into mp4", flush=True) except Exception as e: print(f"[output] mp4 conversion failed: {e}, returning webp", flush=True) out_path = out_dir / "output.webp" shutil.copy2(result, out_path) elapsed = status_lines[0].split(":")[0] if ":" in status_lines[0] else "?" lora_info = f" | LoRA: {user_lora_file}" if user_lora_file else "" return str(out_path), f"Done {elapsed} | {mode} | {steps} steps | {duration_sec}s | seed {int(seed)}{lora_info}" def health() -> str: import psutil mem = psutil.virtual_memory() return ( f"LTX 2.3 CPU Space | " f"RAM {mem.used // (1024**3)}/{mem.total // (1024**3)} GB | " f"ComfyUI {'running' if _comfy_proc and _comfy_proc.poll() is None else 'stopped'}" ) import gradio as gr import random _all_lora_choices = [] _lora_state = {"mode": "search"} def _on_lora_interact(value): if not value or len(value) < 2: repos = _search_hf_loras("ltx 2.3 lora") return gr.update(choices=repos, value=None) if value.endswith(".safetensors"): return gr.update(value=value) if "/" in value: parts = value.split("/") if len(parts) >= 2: repo_id = f"{parts[0]}/{parts[1]}" files = _resolve_lora_files(repo_id) if not files: try: from huggingface_hub import HfApi files = [f for f in HfApi().list_repo_files(repo_id) if f.endswith(".safetensors")] except Exception: files = [] choices = [f"{repo_id}/{f}" for f in files] if len(choices) == 1: return gr.update(choices=choices, value=choices[0]) return gr.update(choices=choices, value=None) repos = _search_hf_loras(value) return gr.update(choices=repos, value=None) def _prepare_user_lora(lora_path, progress=None): if not lora_path or "/" not in lora_path: return None lora_path = re.sub(r"^https?://huggingface\.co/", "", lora_path) lora_path = re.sub(r"/blob/main/", "/", lora_path) lora_path = re.sub(r"/resolve/main/", "/", lora_path) parts = lora_path.split("/") if len(parts) < 3: return None repo_id = f"{parts[0]}/{parts[1]}" filename = "/".join(parts[2:]) if progress: progress(0.1, desc=f"Downloading LoRA from {repo_id}...") return _download_user_lora(repo_id, filename) with gr.Blocks(title="LTX 2.3 CPU") as demo: gr.Markdown( "**[LTX 2.3](https://huggingface.co/Lightricks/LTX-2.3) CPU** 2s clip takes ~74 min (up to 321m w/ LoRA + I2V), `cond_safe` distill 1.1 + Sulphur-2 merge = [10Eros](https://huggingface.co/TenStrip/LTX2.3-10Eros). *4experimental~2be kinda patient..*" ) with gr.Row(equal_height=False): with gr.Column(scale=1): prompt_in = gr.Textbox( label="Prompt", lines=3, placeholder="A woman walking through a neon-lit Tokyo alley at night, cinematic", ) image_in = gr.Image(label="First frame (optional, I2V)", type="filepath", height=180) with gr.Accordion("LoRA (optional, up to 9)", open=False): lora_picker = gr.Dropdown( label="LoRA (select to add, click X to remove)", info="Type to search HF, paste URL or user/repo/lora.safetensors", choices=[], value=[], multiselect=True, allow_custom_value=True, interactive=True, ) lora_strength = gr.Slider(0.0, 1.5, value=0.6, step=0.05, label="LoRA strength (all)") with gr.Row(): audio_in = gr.Checkbox( label="Enable audio (+4h, duplicate & edit L1 app.py)", value=False, interactive=ENABLE_AUDIO ) duration_in = gr.Slider(1.0, 4.0, value=2.0, step=0.5, label="Duration (s)") steps_in = gr.Slider(4, 16, value=8, step=1, label="Steps") seed_in = gr.Number(label="Seed", value=-1, precision=0) run_btn = gr.Button("Generate Video", variant="primary") with gr.Column(scale=1): video_out = gr.Video(label="Output", height=300) status_out = gr.Textbox(label="Status", interactive=False) def _on_lora_pick(selected_values): global _all_lora_choices selected = list(selected_values) if selected_values else [] print(f"[lora] pick: {selected}", flush=True) valid = [v for v in selected if "/" in v] search_terms = [v for v in selected if "/" not in v and v.strip()] if search_terms: query = " ".join(search_terms) repos = _search_hf_loras(query) resolved = [] for repo in repos[:8]: try: from huggingface_hub import HfApi files = [f for f in HfApi().list_repo_files(repo) if f.endswith(".safetensors")] for f in files: resolved.append(f"{repo}/{f}") except Exception: resolved.append(repo) for r in resolved: if r not in _all_lora_choices: _all_lora_choices.append(r) print(f"[lora] search '{query}': {len(resolved)} new, {len(_all_lora_choices)} total", flush=True) return gr.update(choices=_all_lora_choices, value=valid[:9]) if len(valid) > 9: valid = valid[:9] return gr.update(choices=_all_lora_choices, value=valid) _POPULAR_LORAS = [ "Phr00t/LTX2-Rapid-Merges/LORAs/povnsfw-v3-complete.safetensors", "Phr00t/LTX2-Rapid-Merges/LORAs/phr00t-povnsfw-v1.safetensors", ] def _init_loras(): global _all_lora_choices for p in _POPULAR_LORAS: if p not in _all_lora_choices: _all_lora_choices.append(p) repos = _search_hf_loras("ltx 2.3 lora") for repo in repos[:12]: try: from huggingface_hub import HfApi files = [f for f in HfApi().list_repo_files(repo) if f.endswith(".safetensors")] for f in files: path = f"{repo}/{f}" if path not in _all_lora_choices: _all_lora_choices.append(path) except Exception: if repo not in _all_lora_choices: _all_lora_choices.append(repo) print(f"[lora] init: {len(repos)} repos -> {len(_all_lora_choices)} files", flush=True) return gr.update(choices=_all_lora_choices) lora_picker.input(fn=_on_lora_pick, inputs=[lora_picker], outputs=[lora_picker]) demo.load(fn=_init_loras, outputs=[lora_picker]) def _resolve_lora_entry(entry): if entry.endswith(".safetensors"): return entry if "/" in entry: parts = entry.split("/") if len(parts) >= 2: repo_id = f"{parts[0]}/{parts[1]}" try: from huggingface_hub import HfApi files = [f for f in HfApi().list_repo_files(repo_id) if f.endswith(".safetensors")] if files: return f"{repo_id}/{files[0]}" except Exception: pass return None def _gen(prompt, image, lora_list, lora_str, enable_audio, dur, steps, seed, progress=gr.Progress()): if seed < 0: seed = random.randint(0, 2**31) lora_files = [] if lora_list: for lp in lora_list[:9]: resolved = _resolve_lora_entry(lp) if lp else None if resolved: local = _prepare_user_lora(resolved, progress) if local: lora_files.append(local) first_lora = lora_files[0] if lora_files else None return generate(prompt, dur, steps, seed, image_path=image, user_lora_file=first_lora, lora_strength=lora_str, enable_audio=bool(enable_audio), progress=progress) run_btn.click( fn=_gen, inputs=[prompt_in, image_in, lora_picker, lora_strength, audio_in, duration_in, steps_in, seed_in], outputs=[video_out, status_out], api_name="generate", ) gr.Button(visible=False).click(fn=health, outputs=[gr.Textbox(visible=False)], api_name="health") demo.queue(default_concurrency_limit=1) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, theme="Taithrah/Minimal")