Spaces:
Running
Running
| ENABLE_AUDIO = False # Set to True to show audio checkbox (adds ~4h on CPU) | |
| """LTX 2.3 CPU Space -- 10Eros + cond_safe distill LoRA via ComfyUI GGUF. | |
| Path C: 10Eros fine-tune (Q3_K_M GGUF) + cond_safe distill 1.1 LoRA. | |
| Abliterated Gemma-3-12B text encoder. Free HF CPU Space (18 GB RAM). | |
| """ | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| COMFY = Path("/app/ComfyUI") | |
| MODELS = COMFY / "models" | |
| OUTPUT = COMFY / "output" | |
| DOWNLOAD_MANIFEST = [ | |
| { | |
| "repo": "vantagewithai/LTX2.3-10Eros-GGUF", | |
| "file": "10Eros_v1-Q3_K_M.gguf", | |
| "dest": MODELS / "diffusion_models" / "10Eros_v1-Q3_K_M.gguf", | |
| "label": "10Eros DiT Q3_K_M (10.4 GB)", | |
| }, | |
| { | |
| "repo": "mradermacher/gemma-3-12b-it-qat-abliterated-GGUF", | |
| "file": "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", | |
| "dest": MODELS / "text_encoders" / "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", | |
| "label": "Gemma-3-12B abliterated Q3_K_M (5.6 GB)", | |
| }, | |
| { | |
| "repo": "Kijai/LTX2.3_comfy", | |
| "file": "text_encoders/ltx-2.3_text_projection_bf16.safetensors", | |
| "dest": MODELS / "text_encoders" / "ltx-2.3_text_projection_bf16.safetensors", | |
| "label": "Text projection (2.2 GB)", | |
| }, | |
| { | |
| "repo": "Kijai/LTX2.3_comfy", | |
| "file": "vae/taeltx2_3.safetensors", | |
| "dest": MODELS / "vae" / "taeltx2_3.safetensors", | |
| "label": "Tiny VAE (22 MB)", | |
| }, | |
| { | |
| "repo": "Kijai/LTX2.3_comfy", | |
| "file": "vae/LTX23_video_vae_bf16.safetensors", | |
| "dest": MODELS / "vae" / "LTX23_video_vae_bf16.safetensors", | |
| "label": "Full video VAE (1.4 GB)", | |
| }, | |
| { | |
| "repo": "TenStrip/LTX2.3_Distilled_Lora_1.1_Experiments", | |
| "file": "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", | |
| "dest": MODELS / "loras" / "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", | |
| "label": "cond_safe distill LoRA (662 MB)", | |
| }, | |
| ] | |
| WORKFLOW_TEMPLATE = { | |
| "1": { | |
| "class_type": "UnetLoaderGGUF", | |
| "inputs": {"unet_name": "10Eros_v1-Q3_K_M.gguf"}, | |
| }, | |
| "2": { | |
| "class_type": "DualCLIPLoaderGGUF", | |
| "inputs": { | |
| "clip_name1": "gemma-3-12b-it-qat-abliterated.Q3_K_M.gguf", | |
| "clip_name2": "ltx-2.3_text_projection_bf16.safetensors", | |
| "type": "ltxv", | |
| }, | |
| }, | |
| "3": { | |
| "class_type": "LoraLoaderModelOnly", | |
| "inputs": { | |
| "model": ["1", 0], | |
| "lora_name": "ltx-2.3-22b-distilled-lora-1.1_fro90_ceil72_condsafe.safetensors", | |
| "strength_model": 0.6, | |
| }, | |
| }, | |
| "40": { | |
| "class_type": "CLIPTextEncode", | |
| "inputs": {"text": "__PROMPT__", "clip": ["2", 0]}, | |
| }, | |
| "41": { | |
| "class_type": "CLIPTextEncode", | |
| "inputs": {"text": "blurry, oversaturated, low resolution, distorted", "clip": ["2", 0]}, | |
| }, | |
| "4": { | |
| "class_type": "LTXVConditioning", | |
| "inputs": {"positive": ["40", 0], "negative": ["41", 0], "frame_rate": 24}, | |
| }, | |
| "5": { | |
| "class_type": "EmptyLTXVLatentVideo", | |
| "inputs": {"width": 512, "height": 320, "length": 49, "batch_size": 1}, | |
| }, | |
| "50": { | |
| "class_type": "LTXVEmptyLatentAudio", | |
| "inputs": {"audio_vae": ["53", 0], "frame_rate": 24, "frames_number": 49, "batch_size": 1}, | |
| }, | |
| "51": { | |
| "class_type": "LTXVConcatAVLatent", | |
| "inputs": {"video_latent": ["5", 0], "audio_latent": ["50", 0]}, | |
| }, | |
| "7": { | |
| "class_type": "CFGGuider", | |
| "inputs": { | |
| "model": ["3", 0], | |
| "positive": ["4", 0], | |
| "negative": ["4", 1], | |
| "cfg": 1.0, | |
| }, | |
| }, | |
| "8": { | |
| "class_type": "LTXVScheduler", | |
| "inputs": {"steps": 8, "max_shift": 2.05, "base_shift": 0.95, "stretch": True, "terminal": 0.1}, | |
| }, | |
| "9": { | |
| "class_type": "KSamplerSelect", | |
| "inputs": {"sampler_name": "euler_ancestral_cfg_pp"}, | |
| }, | |
| "10": { | |
| "class_type": "RandomNoise", | |
| "inputs": {"noise_seed": 42}, | |
| }, | |
| "11": { | |
| "class_type": "SamplerCustomAdvanced", | |
| "inputs": { | |
| "noise": ["10", 0], | |
| "guider": ["7", 0], | |
| "sampler": ["9", 0], | |
| "sigmas": ["8", 0], | |
| "latent_image": ["51", 0], | |
| }, | |
| }, | |
| "52": { | |
| "class_type": "LTXVSeparateAVLatent", | |
| "inputs": {"av_latent": ["11", 0]}, | |
| }, | |
| "12": { | |
| "class_type": "VAELoader", | |
| "inputs": {"vae_name": "taeltx2_3.safetensors"}, | |
| }, | |
| "13": { | |
| "class_type": "VAEDecode", | |
| "inputs": {"samples": ["52", 0], "vae": ["12", 0]}, | |
| }, | |
| "53": { | |
| "class_type": "VAELoaderKJ", | |
| "inputs": {"vae_name": "LTX23_audio_vae_bf16.safetensors", "device": "main_device", "dtype": "bf16", "weight_dtype": "bf16"}, | |
| }, | |
| "54": { | |
| "class_type": "LTXVAudioVAEDecode", | |
| "inputs": {"audio_latent": ["52", 1], "vae": ["53", 0]}, | |
| }, | |
| "14": { | |
| "class_type": "SaveAnimatedWEBP", | |
| "inputs": { | |
| "images": ["13", 0], | |
| "filename_prefix": "ltx_output", | |
| "fps": 24.0, | |
| "lossless": False, | |
| "quality": 80, | |
| "method": "default", | |
| }, | |
| }, | |
| } | |
| NODE_LABELS = { | |
| "1": "Loading DiT GGUF", | |
| "2": "Loading Gemma+Projection", | |
| "3": "Applying distill LoRA", | |
| "4": "Encoding text", | |
| "5": "Creating video latent", | |
| "7": "Building guider", | |
| "8": "Computing schedule", | |
| "9": "Selecting sampler", | |
| "10": "Generating noise", | |
| "11": "Diffusion", | |
| "12": "Loading VAE", | |
| "13": "Decoding video", | |
| "14": "Saving output", | |
| "50": "Creating audio latent", | |
| "51": "Merging AV latents", | |
| "52": "Separating AV", | |
| "53": "Loading audio VAE", | |
| "54": "Decoding audio", | |
| "20": "Loading image", | |
| "21": "Preprocessing image", | |
| "22": "I2V conditioning", | |
| "30": "Applying user LoRA", | |
| "40": "Encoding prompt", | |
| "41": "Encoding negative", | |
| } | |
| def _download_models(progress_cb=None): | |
| from huggingface_hub import hf_hub_download | |
| for i, m in enumerate(DOWNLOAD_MANIFEST): | |
| dest = Path(m["dest"]) | |
| if dest.exists(): | |
| continue | |
| label = m["label"] | |
| if progress_cb: | |
| progress_cb((i / len(DOWNLOAD_MANIFEST)), desc=f"Downloading {label} (cache miss)...") | |
| print(f"[download] {label} from {m['repo']}/{m['file']}", flush=True) | |
| cached = hf_hub_download(repo_id=m["repo"], filename=m["file"]) | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| os.symlink(cached, str(dest)) | |
| except OSError: | |
| shutil.copy2(cached, str(dest)) | |
| if progress_cb: | |
| progress_cb(1.0, desc="Models ready") | |
| _comfy_proc = None | |
| def _ensure_comfy(): | |
| global _comfy_proc | |
| if _comfy_proc is not None and _comfy_proc.poll() is None: | |
| return | |
| print("[comfy] Starting ComfyUI headless (--cpu)...", flush=True) | |
| _comfy_proc = subprocess.Popen( | |
| [ | |
| sys.executable, "-u", str(COMFY / "main.py"), | |
| "--cpu", | |
| "--listen", "127.0.0.1", | |
| "--port", "8188", | |
| "--dont-print-server", | |
| "--force-fp32", | |
| "--cache-none", | |
| ], | |
| cwd=str(COMFY), | |
| stdout=sys.stdout, | |
| stderr=sys.stderr, | |
| ) | |
| import urllib.request | |
| for attempt in range(120): | |
| time.sleep(2) | |
| try: | |
| urllib.request.urlopen("http://127.0.0.1:8188/system_stats", timeout=2) | |
| print("[comfy] Server ready", flush=True) | |
| return | |
| except Exception: | |
| pass | |
| raise RuntimeError("ComfyUI failed to start within 240s") | |
| def _search_hf_loras(query: str) -> list[str]: | |
| if not query or len(query) < 2: | |
| query = "ltx 2.3 lora" | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| results = list(api.list_models(search=query, limit=15)) | |
| return [m.id for m in results if m.id] | |
| except Exception: | |
| return [] | |
| def _resolve_lora_files(repo_id: str) -> list[str]: | |
| if not repo_id or "/" not in repo_id: | |
| return [] | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| files = api.list_repo_files(repo_id) | |
| return [f for f in files if f.endswith(".safetensors") and "lora" in f.lower()] | |
| except Exception: | |
| return [] | |
| _ic_lora_cache: dict[str, bool] = {} | |
| def _is_ic_lora(lora_path: str) -> bool: | |
| if not lora_path: | |
| return False | |
| if lora_path in _ic_lora_cache: | |
| return _ic_lora_cache[lora_path] | |
| result = _detect_ic_lora(lora_path) | |
| _ic_lora_cache[lora_path] = result | |
| return result | |
| def _detect_ic_lora(lora_path: str) -> bool: | |
| if re.search(r"ic[-_]?lora", lora_path, re.IGNORECASE): | |
| return True | |
| local = MODELS / "loras" / lora_path | |
| if local.exists(): | |
| try: | |
| return _check_safetensors_header(str(local)) | |
| except Exception: | |
| return False | |
| if "/" in lora_path: | |
| parts = lora_path.split("/") | |
| if len(parts) >= 3: | |
| repo_id = f"{parts[0]}/{parts[1]}" | |
| filename = "/".join(parts[2:]) | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| cached = hf_hub_download(repo_id=repo_id, filename=filename) | |
| return _check_safetensors_header(cached) | |
| except Exception: | |
| pass | |
| return False | |
| def _check_safetensors_header(filepath: str) -> bool: | |
| with open(filepath, "rb") as f: | |
| header_size = int.from_bytes(f.read(8), "little") | |
| if header_size > 10_000_000: | |
| return False | |
| header_json = f.read(header_size).decode("utf-8", errors="ignore") | |
| return "reference_downscale_factor" in header_json | |
| def _download_user_lora(repo_id: str, filename: str) -> str | None: | |
| if not repo_id or not filename: | |
| return None | |
| from huggingface_hub import hf_hub_download | |
| lora_dir = MODELS / "loras" | |
| lora_dir.mkdir(parents=True, exist_ok=True) | |
| local_name = f"{repo_id.replace('/', '_')}_{filename.replace('/', '_')}" | |
| dest = lora_dir / local_name | |
| if dest.exists(): | |
| return local_name | |
| try: | |
| token = os.environ.get("HF_TOKEN") | |
| cached = hf_hub_download(repo_id=repo_id, filename=filename, token=token) | |
| try: | |
| os.symlink(cached, str(dest)) | |
| except OSError: | |
| shutil.copy2(cached, str(dest)) | |
| return local_name | |
| except Exception as e: | |
| print(f"[lora] Failed to download {repo_id}/{filename}: {e}", flush=True) | |
| return None | |
| def _build_workflow(prompt: str, steps: int, duration_sec: float, seed: int, | |
| img_name: str | None = None, user_lora: str | None = None, | |
| lora_strength: float = 0.6, vid_w: int | None = None, | |
| vid_h: int | None = None, enable_audio: bool = True) -> dict: | |
| wf = json.loads(json.dumps(WORKFLOW_TEMPLATE)) | |
| wf["40"]["inputs"]["text"] = prompt | |
| frames = max(9, int(duration_sec * 24) + 1) | |
| wf["5"]["inputs"]["length"] = frames | |
| if not enable_audio: | |
| for n in ["49", "50", "51", "52", "53", "54"]: | |
| wf.pop(n, None) | |
| wf["11"]["inputs"]["latent_image"] = ["5", 0] | |
| wf["13"]["inputs"]["samples"] = ["11", 0] | |
| else: | |
| wf["50"]["inputs"]["frames_number"] = frames | |
| if vid_w and vid_h: | |
| wf["5"]["inputs"]["width"] = vid_w | |
| wf["5"]["inputs"]["height"] = vid_h | |
| wf["8"]["inputs"]["steps"] = steps | |
| wf["10"]["inputs"]["noise_seed"] = seed | |
| model_source = "3" | |
| is_ic = _is_ic_lora(user_lora) if user_lora else False | |
| if user_lora and is_ic: | |
| wf["30"] = { | |
| "class_type": "LTXICLoRALoaderModelOnly", | |
| "inputs": { | |
| "model": [model_source, 0], | |
| "lora_name": user_lora, | |
| "strength_model": lora_strength, | |
| }, | |
| } | |
| model_source = "30" | |
| wf["7"]["inputs"]["model"] = [model_source, 0] | |
| elif user_lora: | |
| wf["30"] = { | |
| "class_type": "LoraLoaderModelOnly", | |
| "inputs": { | |
| "model": [model_source, 0], | |
| "lora_name": user_lora, | |
| "strength_model": lora_strength, | |
| }, | |
| } | |
| model_source = "30" | |
| wf["7"]["inputs"]["model"] = [model_source, 0] | |
| if img_name: | |
| wf["20"] = { | |
| "class_type": "LoadImage", | |
| "inputs": {"image": img_name}, | |
| } | |
| if not is_ic: | |
| wf["12"]["inputs"]["vae_name"] = "LTX23_video_vae_bf16.safetensors" | |
| wf["25"] = { | |
| "class_type": "ImageScale", | |
| "inputs": { | |
| "image": ["20", 0], | |
| "upscale_method": "lanczos", | |
| "width": wf["5"]["inputs"]["width"], | |
| "height": wf["5"]["inputs"]["height"], | |
| "crop": "center", | |
| }, | |
| } | |
| wf["21"] = { | |
| "class_type": "LTXVPreprocess", | |
| "inputs": {"image": ["25", 0], "img_compression": 18}, | |
| } | |
| wf["22"] = { | |
| "class_type": "LTXVImgToVideoInplace", | |
| "inputs": { | |
| "latent": ["5", 0], | |
| "vae": ["12", 0], | |
| "image": ["21", 0], | |
| "strength": 0.7, | |
| "bypass": False, | |
| "use_slerp": False, | |
| }, | |
| } | |
| if "51" in wf: | |
| wf["51"]["inputs"]["video_latent"] = ["22", 0] | |
| else: | |
| wf["11"]["inputs"]["latent_image"] = ["22", 0] | |
| if is_ic and img_name: | |
| wf["12"]["inputs"]["vae_name"] = "LTX23_video_vae_bf16.safetensors" | |
| wf["25"] = { | |
| "class_type": "ImageScale", | |
| "inputs": { | |
| "image": ["20", 0], | |
| "upscale_method": "lanczos", | |
| "width": wf["5"]["inputs"]["width"], | |
| "height": wf["5"]["inputs"]["height"], | |
| "crop": "center", | |
| }, | |
| } | |
| wf["31"] = { | |
| "class_type": "LTXAddVideoICLoRAGuide", | |
| "inputs": { | |
| "positive": ["4", 0], | |
| "negative": ["4", 1], | |
| "vae": ["12", 0], | |
| "latent": ["5", 0], | |
| "image": ["25", 0], | |
| "frame_idx": 0, | |
| "strength": 1.0, | |
| "latent_downscale_factor": ["30", 1], | |
| "crop": "disabled", | |
| "use_tiled_encode": False, | |
| "tile_size": 512, | |
| "tile_overlap": 64, | |
| }, | |
| } | |
| wf["7"]["inputs"]["positive"] = ["31", 0] | |
| wf["7"]["inputs"]["negative"] = ["31", 1] | |
| wf["11"]["inputs"]["latent_image"] = ["31", 2] | |
| return wf | |
| def _submit_and_poll(workflow: dict, status_cb=None, timeout: int = 21600) -> str | None: | |
| import urllib.request | |
| import websocket | |
| client_id = str(uuid.uuid4()) | |
| payload = json.dumps({"prompt": workflow, "client_id": client_id}).encode() | |
| req = urllib.request.Request( | |
| "http://127.0.0.1:8188/prompt", | |
| data=payload, | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| resp = urllib.request.urlopen(req, timeout=30) | |
| resp_data = json.loads(resp.read()) | |
| pid = resp_data.get("prompt_id", client_id) | |
| t0 = time.time() | |
| current_step = 0 | |
| max_steps = 0 | |
| current_label = "Queued" | |
| def _status_line(): | |
| elapsed = int(time.time() - t0) | |
| m, s = divmod(elapsed, 60) | |
| if max_steps > 0: | |
| return f"[{current_step}/{max_steps}] {m}m{s:02d}s: {current_label}" | |
| return f"{m}m{s:02d}s: {current_label}" | |
| ws = websocket.WebSocket() | |
| ws.settimeout(timeout) | |
| ws.connect(f"ws://127.0.0.1:8188/ws?clientId={client_id}") | |
| try: | |
| while time.time() - t0 < timeout: | |
| try: | |
| raw = ws.recv() | |
| if not raw: | |
| continue | |
| msg = json.loads(raw) | |
| except websocket.WebSocketTimeoutException: | |
| break | |
| except Exception: | |
| continue | |
| msg_type = msg.get("type", "") | |
| data = msg.get("data", {}) | |
| if msg_type == "executing": | |
| node_id = data.get("node") | |
| if node_id is None: | |
| current_label = "Complete" | |
| if status_cb: | |
| status_cb(_status_line()) | |
| break | |
| current_label = NODE_LABELS.get(str(node_id), f"Node {node_id}") | |
| if status_cb: | |
| status_cb(_status_line()) | |
| elif msg_type == "progress": | |
| current_step = data.get("value", 0) | |
| max_steps = data.get("max", 0) | |
| node_id = str(data.get("node", "11")) | |
| current_label = f"{NODE_LABELS.get(node_id, 'Step')} {current_step}/{max_steps}" | |
| if status_cb: | |
| status_cb(_status_line()) | |
| elif msg_type == "execution_error": | |
| err = data.get("exception_message", "Unknown error") | |
| current_label = f"Error: {err[:100]}" | |
| if status_cb: | |
| status_cb(_status_line()) | |
| ws.close() | |
| return None | |
| finally: | |
| try: | |
| ws.close() | |
| except Exception: | |
| pass | |
| video_path = None | |
| audio_path = None | |
| try: | |
| hist = urllib.request.urlopen(f"http://127.0.0.1:8188/history/{pid}", timeout=10) | |
| hdata = json.loads(hist.read()) | |
| if pid in hdata: | |
| outputs = hdata[pid].get("outputs", {}) | |
| for node_id, out in outputs.items(): | |
| for key in ("images", "gifs"): | |
| if key in out: | |
| for item in out[key]: | |
| fpath = OUTPUT / item.get("subfolder", "") / item["filename"] | |
| if fpath.exists() and not video_path: | |
| video_path = str(fpath) | |
| if "audio" in out: | |
| for item in out["audio"]: | |
| fpath = OUTPUT / item.get("subfolder", "") / item["filename"] | |
| if fpath.exists() and not audio_path: | |
| audio_path = str(fpath) | |
| except Exception: | |
| pass | |
| return video_path, audio_path | |
| def generate(prompt, duration_sec, steps, seed, image_path=None, | |
| user_lora_file=None, lora_strength=0.6, enable_audio=False, progress=None): | |
| import gradio as gr | |
| if not prompt.strip(): | |
| raise gr.Error("Prompt cannot be empty") | |
| status_lines = ["Initializing..."] | |
| if progress: | |
| progress(0.0, desc="Checking models...") | |
| _download_models(progress) | |
| if progress: | |
| progress(0.15, desc="Starting ComfyUI...") | |
| _ensure_comfy() | |
| img_name = None | |
| img_w, img_h = None, None | |
| if image_path: | |
| comfy_input = COMFY / "input" | |
| comfy_input.mkdir(parents=True, exist_ok=True) | |
| img_name = f"input_{uuid.uuid4().hex[:8]}.png" | |
| from PIL import Image as PILImage | |
| pil_img = PILImage.open(image_path) | |
| pil_img.save(str(comfy_input / img_name)) | |
| w, h = pil_img.size | |
| scale = 512 / max(w, h) | |
| img_w = int(w * scale) // 32 * 32 | |
| img_h = int(h * scale) // 32 * 32 | |
| img_w = max(img_w, 64) | |
| img_h = max(img_h, 64) | |
| mode = "I2V" if img_name else "T2V" | |
| if progress: | |
| progress(0.2, desc=f"{mode}: {steps} steps, {duration_sec}s clip...") | |
| def _on_status(line): | |
| status_lines[0] = line | |
| print(f"[status] {line}", flush=True) | |
| wf = _build_workflow( | |
| prompt, int(steps), float(duration_sec), int(seed), | |
| img_name=img_name, user_lora=user_lora_file, | |
| lora_strength=float(lora_strength), | |
| vid_w=img_w, vid_h=img_h, | |
| enable_audio=enable_audio, | |
| ) | |
| poll_result = _submit_and_poll(wf, status_cb=_on_status) | |
| if poll_result is None: | |
| raise gr.Error(f"Generation failed: {status_lines[0]}") | |
| result_video, result_audio = poll_result | |
| if result_video is None: | |
| raise gr.Error(f"Generation failed: {status_lines[0]}") | |
| result = result_video | |
| out_dir = Path(tempfile.mkdtemp()) | |
| out_path = out_dir / "output.mp4" | |
| try: | |
| from PIL import Image as PILImage | |
| import cv2 | |
| import numpy as np | |
| img = PILImage.open(result) | |
| frames = [] | |
| try: | |
| while True: | |
| frames.append(np.array(img.convert("RGB"))) | |
| img.seek(img.tell() + 1) | |
| except EOFError: | |
| pass | |
| if frames: | |
| h, w = frames[0].shape[:2] | |
| w2, h2 = w + (w % 2), h + (h % 2) | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(str(out_path), fourcc, 24, (w2, h2)) | |
| for f in frames: | |
| bgr = cv2.cvtColor(f, cv2.COLOR_RGB2BGR) | |
| if bgr.shape[1] != w2 or bgr.shape[0] != h2: | |
| bgr = cv2.copyMakeBorder(bgr, 0, h2 - h, 0, w2 - w, cv2.BORDER_CONSTANT) | |
| writer.write(bgr) | |
| writer.release() | |
| h264_path = out_dir / "output_h264.mp4" | |
| rc = subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(out_path), "-c:v", "libx264", | |
| "-pix_fmt", "yuv420p", "-r", "24", str(h264_path)], | |
| capture_output=True, timeout=120, | |
| ) | |
| if rc.returncode == 0 and h264_path.exists(): | |
| out_path.unlink() | |
| h264_path.rename(out_path) | |
| print(f"[output] Converted {len(frames)} frames to mp4 (h264: {'ok' if rc.returncode == 0 else 'fallback mp4v'})", flush=True) | |
| if result_audio and Path(result_audio).exists(): | |
| av_path = out_dir / "output_av.mp4" | |
| av_rc = subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(out_path), "-i", result_audio, | |
| "-c:v", "copy", "-c:a", "aac", "-shortest", str(av_path)], | |
| capture_output=True, timeout=120, | |
| ) | |
| if av_rc.returncode == 0 and av_path.exists(): | |
| out_path.unlink() | |
| av_path.rename(out_path) | |
| print("[output] Merged audio into mp4", flush=True) | |
| except Exception as e: | |
| print(f"[output] mp4 conversion failed: {e}, returning webp", flush=True) | |
| out_path = out_dir / "output.webp" | |
| shutil.copy2(result, out_path) | |
| elapsed = status_lines[0].split(":")[0] if ":" in status_lines[0] else "?" | |
| lora_info = f" | LoRA: {user_lora_file}" if user_lora_file else "" | |
| return str(out_path), f"Done {elapsed} | {mode} | {steps} steps | {duration_sec}s | seed {int(seed)}{lora_info}" | |
| def health() -> str: | |
| import psutil | |
| mem = psutil.virtual_memory() | |
| return ( | |
| f"LTX 2.3 CPU Space | " | |
| f"RAM {mem.used // (1024**3)}/{mem.total // (1024**3)} GB | " | |
| f"ComfyUI {'running' if _comfy_proc and _comfy_proc.poll() is None else 'stopped'}" | |
| ) | |
| import gradio as gr | |
| import random | |
| _all_lora_choices = [] | |
| _lora_state = {"mode": "search"} | |
| def _on_lora_interact(value): | |
| if not value or len(value) < 2: | |
| repos = _search_hf_loras("ltx 2.3 lora") | |
| return gr.update(choices=repos, value=None) | |
| if value.endswith(".safetensors"): | |
| return gr.update(value=value) | |
| if "/" in value: | |
| parts = value.split("/") | |
| if len(parts) >= 2: | |
| repo_id = f"{parts[0]}/{parts[1]}" | |
| files = _resolve_lora_files(repo_id) | |
| if not files: | |
| try: | |
| from huggingface_hub import HfApi | |
| files = [f for f in HfApi().list_repo_files(repo_id) if f.endswith(".safetensors")] | |
| except Exception: | |
| files = [] | |
| choices = [f"{repo_id}/{f}" for f in files] | |
| if len(choices) == 1: | |
| return gr.update(choices=choices, value=choices[0]) | |
| return gr.update(choices=choices, value=None) | |
| repos = _search_hf_loras(value) | |
| return gr.update(choices=repos, value=None) | |
| def _prepare_user_lora(lora_path, progress=None): | |
| if not lora_path or "/" not in lora_path: | |
| return None | |
| lora_path = re.sub(r"^https?://huggingface\.co/", "", lora_path) | |
| lora_path = re.sub(r"/blob/main/", "/", lora_path) | |
| lora_path = re.sub(r"/resolve/main/", "/", lora_path) | |
| parts = lora_path.split("/") | |
| if len(parts) < 3: | |
| return None | |
| repo_id = f"{parts[0]}/{parts[1]}" | |
| filename = "/".join(parts[2:]) | |
| if progress: | |
| progress(0.1, desc=f"Downloading LoRA from {repo_id}...") | |
| return _download_user_lora(repo_id, filename) | |
| with gr.Blocks(title="LTX 2.3 CPU") as demo: | |
| gr.Markdown( | |
| "**[LTX 2.3](https://huggingface.co/Lightricks/LTX-2.3) CPU** 2s clip takes ~74 min (up to 321m w/ LoRA + I2V), `cond_safe` distill 1.1 + Sulphur-2 merge = [10Eros](https://huggingface.co/TenStrip/LTX2.3-10Eros). *4experimental~2be kinda patient..*" | |
| ) | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=1): | |
| prompt_in = gr.Textbox( | |
| label="Prompt", lines=3, | |
| placeholder="A woman walking through a neon-lit Tokyo alley at night, cinematic", | |
| ) | |
| image_in = gr.Image(label="First frame (optional, I2V)", type="filepath", height=180) | |
| with gr.Accordion("LoRA (optional, up to 9)", open=False): | |
| lora_picker = gr.Dropdown( | |
| label="LoRA (select to add, click X to remove)", | |
| info="Type to search HF, paste URL or user/repo/lora.safetensors", | |
| choices=[], | |
| value=[], | |
| multiselect=True, | |
| allow_custom_value=True, | |
| interactive=True, | |
| ) | |
| lora_strength = gr.Slider(0.0, 1.5, value=0.6, step=0.05, label="LoRA strength (all)") | |
| with gr.Row(): | |
| audio_in = gr.Checkbox( | |
| label="Enable audio (+4h, duplicate & edit L1 app.py)", | |
| value=False, interactive=ENABLE_AUDIO | |
| ) | |
| duration_in = gr.Slider(1.0, 4.0, value=2.0, step=0.5, label="Duration (s)") | |
| steps_in = gr.Slider(4, 16, value=8, step=1, label="Steps") | |
| seed_in = gr.Number(label="Seed", value=-1, precision=0) | |
| run_btn = gr.Button("Generate Video", variant="primary") | |
| with gr.Column(scale=1): | |
| video_out = gr.Video(label="Output", height=300) | |
| status_out = gr.Textbox(label="Status", interactive=False) | |
| def _on_lora_pick(selected_values): | |
| global _all_lora_choices | |
| selected = list(selected_values) if selected_values else [] | |
| print(f"[lora] pick: {selected}", flush=True) | |
| valid = [v for v in selected if "/" in v] | |
| search_terms = [v for v in selected if "/" not in v and v.strip()] | |
| if search_terms: | |
| query = " ".join(search_terms) | |
| repos = _search_hf_loras(query) | |
| resolved = [] | |
| for repo in repos[:8]: | |
| try: | |
| from huggingface_hub import HfApi | |
| files = [f for f in HfApi().list_repo_files(repo) if f.endswith(".safetensors")] | |
| for f in files: | |
| resolved.append(f"{repo}/{f}") | |
| except Exception: | |
| resolved.append(repo) | |
| for r in resolved: | |
| if r not in _all_lora_choices: | |
| _all_lora_choices.append(r) | |
| print(f"[lora] search '{query}': {len(resolved)} new, {len(_all_lora_choices)} total", flush=True) | |
| return gr.update(choices=_all_lora_choices, value=valid[:9]) | |
| if len(valid) > 9: | |
| valid = valid[:9] | |
| return gr.update(choices=_all_lora_choices, value=valid) | |
| _POPULAR_LORAS = [ | |
| "Phr00t/LTX2-Rapid-Merges/LORAs/povnsfw-v3-complete.safetensors", | |
| "Phr00t/LTX2-Rapid-Merges/LORAs/phr00t-povnsfw-v1.safetensors", | |
| ] | |
| def _init_loras(): | |
| global _all_lora_choices | |
| for p in _POPULAR_LORAS: | |
| if p not in _all_lora_choices: | |
| _all_lora_choices.append(p) | |
| repos = _search_hf_loras("ltx 2.3 lora") | |
| for repo in repos[:12]: | |
| try: | |
| from huggingface_hub import HfApi | |
| files = [f for f in HfApi().list_repo_files(repo) if f.endswith(".safetensors")] | |
| for f in files: | |
| path = f"{repo}/{f}" | |
| if path not in _all_lora_choices: | |
| _all_lora_choices.append(path) | |
| except Exception: | |
| if repo not in _all_lora_choices: | |
| _all_lora_choices.append(repo) | |
| print(f"[lora] init: {len(repos)} repos -> {len(_all_lora_choices)} files", flush=True) | |
| return gr.update(choices=_all_lora_choices) | |
| lora_picker.input(fn=_on_lora_pick, inputs=[lora_picker], outputs=[lora_picker]) | |
| demo.load(fn=_init_loras, outputs=[lora_picker]) | |
| def _resolve_lora_entry(entry): | |
| if entry.endswith(".safetensors"): | |
| return entry | |
| if "/" in entry: | |
| parts = entry.split("/") | |
| if len(parts) >= 2: | |
| repo_id = f"{parts[0]}/{parts[1]}" | |
| try: | |
| from huggingface_hub import HfApi | |
| files = [f for f in HfApi().list_repo_files(repo_id) if f.endswith(".safetensors")] | |
| if files: | |
| return f"{repo_id}/{files[0]}" | |
| except Exception: | |
| pass | |
| return None | |
| def _gen(prompt, image, lora_list, lora_str, enable_audio, dur, steps, seed, progress=gr.Progress()): | |
| if seed < 0: | |
| seed = random.randint(0, 2**31) | |
| lora_files = [] | |
| if lora_list: | |
| for lp in lora_list[:9]: | |
| resolved = _resolve_lora_entry(lp) if lp else None | |
| if resolved: | |
| local = _prepare_user_lora(resolved, progress) | |
| if local: | |
| lora_files.append(local) | |
| first_lora = lora_files[0] if lora_files else None | |
| return generate(prompt, dur, steps, seed, image_path=image, | |
| user_lora_file=first_lora, lora_strength=lora_str, | |
| enable_audio=bool(enable_audio), progress=progress) | |
| run_btn.click( | |
| fn=_gen, | |
| inputs=[prompt_in, image_in, lora_picker, lora_strength, audio_in, duration_in, steps_in, seed_in], | |
| outputs=[video_out, status_out], | |
| api_name="generate", | |
| ) | |
| gr.Button(visible=False).click(fn=health, outputs=[gr.Textbox(visible=False)], api_name="health") | |
| demo.queue(default_concurrency_limit=1) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, theme="Taithrah/Minimal") | |