TestingwithNeg / app(brokewithnewLoRAmethod).py
dagloop5's picture
Rename app.py to app(brokewithnewLoRAmethod).py
f070756 verified
import os
import subprocess
import sys
# Disable torch.compile / dynamo before any torch import
os.environ["TORCH_COMPILE_DISABLE"] = "1"
os.environ["TORCHDYNAMO_DISABLE"] = "1"
# Install xformers for memory-efficient attention
subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False)
# Clone LTX-2 repo and install packages
LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git"
LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2")
LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" # known working commit with decode_video
if not os.path.exists(LTX_REPO_DIR):
print(f"Cloning {LTX_REPO_URL}...")
subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True)
subprocess.run(["git", "checkout", LTX_COMMIT], cwd=LTX_REPO_DIR, check=True)
print("Installing ltx-core and ltx-pipelines from cloned repo...")
subprocess.run(
[sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e",
os.path.join(LTX_REPO_DIR, "packages", "ltx-core"),
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")],
check=True,
)
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src"))
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src"))
import logging
import random
import tempfile
from pathlib import Path
import gc
import hashlib
import torch
torch._dynamo.config.suppress_errors = True
torch._dynamo.config.disable = True
import spaces
import gradio as gr
import numpy as np
from huggingface_hub import hf_hub_download, snapshot_download
from safetensors.torch import load_file, save_file
from safetensors import safe_open
import json
import requests
from ltx_core.components.diffusion_steps import EulerDiffusionStep
from ltx_core.components.guiders import MultiModalGuider, MultiModalGuiderParams
from ltx_core.components.noisers import GaussianNoiser
from ltx_core.model.audio_vae import encode_audio as vae_encode_audio
from ltx_core.model.audio_vae import decode_audio as vae_decode_audio
from ltx_core.model.upsampler import upsample_video
from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number, decode_video as vae_decode_video
from ltx_core.quantization import QuantizationPolicy
from ltx_core.types import Audio, AudioLatentShape, VideoPixelShape
from ltx_pipelines.distilled import DistilledPipeline
from ltx_pipelines.utils import euler_denoising_loop
from ltx_pipelines.utils.args import ImageConditioningInput
from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES
from ltx_pipelines.utils.helpers import (
cleanup_memory,
combined_image_conditionings,
denoise_video_only,
encode_prompts,
simple_denoising_func,
multi_modal_guider_denoising_func,
)
from ltx_pipelines.utils.media_io import decode_audio_from_file, encode_video
from ltx_core.loader.primitives import LoraPathStrengthAndSDOps, StateDict, LoraStateDictWithStrength
from ltx_core.loader.sd_ops import LTXV_LORA_COMFY_RENAMING_MAP
from ltx_core.loader.fuse_loras import apply_loras
from safetensors import safe_open
# Force-patch xformers attention into the LTX attention module.
from ltx_core.model.transformer import attention as _attn_mod
print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
try:
from xformers.ops import memory_efficient_attention as _mea
_attn_mod.memory_efficient_attention = _mea
print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
except Exception as e:
print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}")
logging.getLogger().setLevel(logging.INFO)
MAX_SEED = np.iinfo(np.int32).max
DEFAULT_PROMPT = (
"An astronaut hatches from a fragile egg on the surface of the Moon, "
"the shell cracking and peeling apart in gentle low-gravity motion. "
"Fine lunar dust lifts and drifts outward with each movement, floating "
"in slow arcs before settling back onto the ground."
)
DEFAULT_NEGATIVE_PROMPT = (
"worst quality, inconsistent motion, blurry, jittery, distorted, "
"deformed, artifacts, text, watermark, logo, frame, border, "
"low resolution, pixelated, unnatural, fake, CGI, cartoon"
)
DEFAULT_FRAME_RATE = 24.0
# Resolution presets: (width, height)
RESOLUTIONS = {
"high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)},
"low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)},
}
class LTX23DistilledA2VPipeline:
"""Standalone pipeline with optional audio conditioning — no parent class."""
def __init__(
self,
distilled_checkpoint_path: str,
spatial_upsampler_path: str,
gemma_root: str,
loras: tuple,
quantization: QuantizationPolicy | None = None,
):
from ltx_pipelines.utils import ModelLedger, denoise_audio_video
from ltx_pipelines.utils.types import PipelineComponents
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.dtype = torch.bfloat16
self.model_ledger = ModelLedger(
dtype=self.dtype,
device=self.device,
checkpoint_path=distilled_checkpoint_path,
gemma_root_path=gemma_root,
spatial_upsampler_path=spatial_upsampler_path,
loras=loras,
quantization=quantization,
)
self.pipeline_components = PipelineComponents(
dtype=self.dtype,
device=self.device,
)
def __call__(
self,
prompt: str,
negative_prompt: str,
seed: int,
height: int,
width: int,
num_frames: int,
frame_rate: float,
video_guider_params: MultiModalGuiderParams,
audio_guider_params: MultiModalGuiderParams,
images: list[ImageConditioningInput],
audio_path: str | None = None,
tiling_config: TilingConfig | None = None,
enhance_prompt: bool = False,
):
print(prompt)
generator = torch.Generator(device=self.device).manual_seed(seed)
noiser = GaussianNoiser(generator=generator)
stepper = EulerDiffusionStep()
dtype = torch.bfloat16
ctx_p, ctx_n = encode_prompts(
[prompt, negative_prompt],
self.model_ledger,
enhance_first_prompt=enhance_prompt,
enhance_prompt_image=images[0].path if len(images) > 0 else None,
)
v_context_p, a_context_p = ctx_p.video_encoding, ctx_p.audio_encoding
v_context_n, a_context_n = ctx_n.video_encoding, ctx_n.audio_encoding
# ── Audio encoding (only for conditioning, not output generation) ──
encoded_audio_latent = None
decoded_audio = None
if audio_path is not None:
video_duration = num_frames / frame_rate
decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration)
if decoded_audio is None:
raise ValueError(f"Could not extract audio stream from {audio_path}")
encoded_audio_latent = vae_encode_audio(decoded_audio, self.model_ledger.audio_encoder())
audio_shape = AudioLatentShape.from_duration(batch=1, duration=video_duration, channels=8, mel_bins=16)
expected_frames = audio_shape.frames
actual_frames = encoded_audio_latent.shape[2]
if actual_frames > expected_frames:
encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :]
elif actual_frames < expected_frames:
pad = torch.zeros(
encoded_audio_latent.shape[0],
encoded_audio_latent.shape[1],
expected_frames - actual_frames,
encoded_audio_latent.shape[3],
device=encoded_audio_latent.device,
dtype=encoded_audio_latent.dtype,
)
encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2)
video_encoder = self.model_ledger.video_encoder()
transformer = self.model_ledger.transformer()
stage_1_sigmas = torch.tensor(DISTILLED_SIGMA_VALUES, device=self.device)
def stage1_denoising_loop(sigmas, video_state, audio_state, stepper):
return euler_denoising_loop(
sigmas=sigmas,
video_state=video_state,
audio_state=audio_state,
stepper=stepper,
denoise_fn=multi_modal_guider_denoising_func(
video_guider=MultiModalGuider(
params=video_guider_params,
negative_context=v_context_n,
),
audio_guider=MultiModalGuider(
params=audio_guider_params,
negative_context=a_context_n,
),
v_context=v_context_p,
a_context=a_context_p,
transformer=transformer,
),
)
def stage2_denoising_loop(sigmas, video_state, audio_state, stepper):
return euler_denoising_loop(
sigmas=sigmas,
video_state=video_state,
audio_state=audio_state,
stepper=stepper,
denoise_fn=simple_denoising_func(
video_context=v_context_p,
audio_context=a_context_p,
transformer=transformer,
),
)
# ── Stage 1: Half resolution ──
stage_1_output_shape = VideoPixelShape(
batch=1,
frames=num_frames,
width=width // 2,
height=height // 2,
fps=frame_rate,
)
stage_1_conditionings = combined_image_conditionings(
images=images,
height=stage_1_output_shape.height,
width=stage_1_output_shape.width,
video_encoder=video_encoder,
dtype=dtype,
device=self.device,
)
# Use denoise_audio_video so audio is ALWAYS generated
from ltx_pipelines.utils import denoise_audio_video
video_state, audio_state = denoise_audio_video(
output_shape=stage_1_output_shape,
conditionings=stage_1_conditionings,
noiser=noiser,
sigmas=stage_1_sigmas,
stepper=stepper,
denoising_loop_fn=stage1_denoising_loop,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
initial_audio_latent=encoded_audio_latent,
)
torch.cuda.synchronize()
cleanup_memory()
# ── Upscaling ──
upscaled_video_latent = upsample_video(
latent=video_state.latent[:1],
video_encoder=video_encoder,
upsampler=self.model_ledger.spatial_upsampler(),
)
# ── Stage 2: Full resolution ──
stage_2_sigmas = torch.tensor(STAGE_2_DISTILLED_SIGMA_VALUES, device=self.device)
stage_2_output_shape = VideoPixelShape(batch=1, frames=num_frames, width=width, height=height, fps=frame_rate)
stage_2_conditionings = combined_image_conditionings(
images=images,
height=stage_2_output_shape.height,
width=stage_2_output_shape.width,
video_encoder=video_encoder,
dtype=dtype,
device=self.device,
)
video_state, audio_state = denoise_audio_video(
output_shape=stage_2_output_shape,
conditionings=stage_2_conditionings,
noiser=noiser,
sigmas=stage_2_sigmas,
stepper=stepper,
denoising_loop_fn=stage2_denoising_loop,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
noise_scale=stage_2_sigmas[0],
initial_video_latent=upscaled_video_latent,
initial_audio_latent=audio_state.latent,
)
torch.cuda.synchronize()
del transformer
del video_encoder
cleanup_memory()
# ── Decode both video and audio ──
decoded_video = vae_decode_video(
video_state.latent,
self.model_ledger.video_decoder(),
tiling_config,
generator,
)
decoded_audio_output = vae_decode_audio(
audio_state.latent,
self.model_ledger.audio_decoder(),
self.model_ledger.vocoder(),
)
return decoded_video, decoded_audio_output
# Model repos
LTX_MODEL_REPO = "Lightricks/LTX-2.3"
GEMMA_REPO ="Lightricks/gemma-3-12b-it-qat-q4_0-unquantized"
# Download model checkpoints
print("=" * 80)
print("Downloading LTX-2.3 distilled model + Gemma...")
print("=" * 80)
# LoRA cache directory and currently-applied key
LORA_CACHE_DIR = Path("lora_cache")
LORA_CACHE_DIR.mkdir(exist_ok=True)
current_lora_key: str | None = None
weights_dir = Path("weights")
weights_dir.mkdir(exist_ok=True)
checkpoint_path = hf_hub_download(
repo_id=LTX_MODEL_REPO,
filename="ltx-2.3-22b-distilled-1.1.safetensors",
local_dir=str(weights_dir),
local_dir_use_symlinks=False,
)
spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.1.safetensors")
gemma_root = snapshot_download(repo_id=GEMMA_REPO)
# ---- Insert block (LoRA downloads) between lines 268 and 269 ----
# LoRA repo + download the requested LoRA adapters
LORA_REPO = "dagloop5/LoRA"
print("=" * 80)
print("Downloading LoRA adapters from dagloop5/LoRA...")
print("=" * 80)
pose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2_3_NSFW_furry_concat_v2.safetensors")
general_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2.3_reasoning_I2V_V3.safetensors")
motion_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="motion_helper.safetensors")
dreamlay_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="DR34ML4Y_LTXXX_PREVIEW_RC1.safetensors") # m15510n4ry, bl0wj0b, d0ubl3_bj, d0gg1e, c0wg1rl
mself_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="Furry Hyper Masturbation - LTX-2 I2V v1.safetensors") # Hyperfap
dramatic_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2.3 - Orgasm.safetensors") # "[He | She] is having am orgasm." (am or an?)
fluid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="cr3ampi3_animation_i2v_ltx2_v1.0.safetensors") # cr3ampi3 animation., missionary animation, doggystyle bouncy animation, double penetration animation
liquid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="liquid_wet_dr1pp_ltx2_v1.0_scaled.safetensors") # wet dr1pp
demopose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="clapping-cheeks-audio-v001-alpha.safetensors")
voice_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="hentai_voice_ltx23.safetensors")
realism_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="FurryenhancerLTX2.3V1.215.safetensors")
transition_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2_takerpov_lora_v1.2.safetensors") # takerpov1, taker pov
print(f"Pose LoRA: {pose_lora_path}")
print(f"General LoRA: {general_lora_path}")
print(f"Motion LoRA: {motion_lora_path}")
print(f"Dreamlay LoRA: {dreamlay_lora_path}")
print(f"Mself LoRA: {mself_lora_path}")
print(f"Dramatic LoRA: {dramatic_lora_path}")
print(f"Fluid LoRA: {fluid_lora_path}")
print(f"Liquid LoRA: {liquid_lora_path}")
print(f"Demopose LoRA: {demopose_lora_path}")
print(f"Voice LoRA: {voice_lora_path}")
print(f"Realism LoRA: {realism_lora_path}")
print(f"Transition LoRA: {transition_lora_path}")
# ----------------------------------------------------------------
print(f"Checkpoint: {checkpoint_path}")
print(f"Spatial upsampler: {spatial_upsampler_path}")
print(f"[Gemma] Root ready: {gemma_root}")
pipeline = LTX23DistilledA2VPipeline(
distilled_checkpoint_path=checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root=gemma_root,
loras=[],
quantization=QuantizationPolicy.fp8_cast(),
)
def _make_lora_key(pose_strength: float, general_strength: float, motion_strength: float, dreamlay_strength: float, mself_strength: float, dramatic_strength: float, fluid_strength: float, liquid_strength: float, demopose_strength: float, voice_strength: float, realism_strength: float, transition_strength: float) -> tuple[str, str]:
rp = round(float(pose_strength), 2)
rg = round(float(general_strength), 2)
rm = round(float(motion_strength), 2)
rd = round(float(dreamlay_strength), 2)
rs = round(float(mself_strength), 2)
rr = round(float(dramatic_strength), 2)
rf = round(float(fluid_strength), 2)
rl = round(float(liquid_strength), 2)
ro = round(float(demopose_strength), 2)
rv = round(float(voice_strength), 2)
re = round(float(realism_strength), 2)
rt = round(float(transition_strength), 2)
key_str = f"{pose_lora_path}:{rp}|{general_lora_path}:{rg}|{motion_lora_path}:{rm}|{dreamlay_lora_path}:{rd}|{mself_lora_path}:{rs}|{dramatic_lora_path}:{rr}|{fluid_lora_path}:{rf}|{liquid_lora_path}:{rl}|{demopose_lora_path}:{ro}|{voice_lora_path}:{rv}|{realism_lora_path}:{re}|{transition_lora_path}:{rt}"
key = hashlib.sha256(key_str.encode("utf-8")).hexdigest()
return key, key_str
# =============================================================================
# LoRA Cache (In-Memory) - Ultra-Fast In-Place Application
# =============================================================================
# In-memory caches to avoid redundant disk I/O
LORA_SD_CACHE: dict[str, StateDict] = {} # lora_path -> loaded StateDict
FUSED_CACHE: dict[str, dict] = {} # cache key -> fused state dict (CPU)
current_lora_key: str | None = None
def load_lora_into_cache(lora_path: str) -> StateDict:
"""
Load a LoRA safetensor file into a cached StateDict.
Subsequent calls return the cached version instantly.
This replaces repeated disk reads with a one-time load + memory cache.
"""
if lora_path in LORA_SD_CACHE:
return LORA_SD_CACHE[lora_path]
print(f"[LoRA] Loading {os.path.basename(lora_path)} into memory cache...")
# Use safe_open for memory-efficient streaming reads of large files
tensors = {}
with safe_open(lora_path, framework="pt", device="cpu") as f:
for key in f.keys():
tensors[key] = f.get_tensor(key)
state_dict = StateDict(
sd=tensors,
device=torch.device("cpu"),
size=sum(t.nbytes for t in tensors.values()),
dtype=set(t.dtype for t in tensors.values())
)
LORA_SD_CACHE[lora_path] = state_dict
print(f"[LoRA] Cached {len(tensors)} tensors from {os.path.basename(lora_path)}")
return state_dict
def _rename_lora_keys_for_base_model(lora_sd: StateDict, base_keys: set[str]) -> StateDict:
"""
Rename LoRA state dict keys to match the base model's key format.
LoRA keys: transformer_blocks.0.attn1.to_k.lora_A.weight
Base keys: velocity_model.transformer_blocks.4.audio_attn1.to_out.0.weight
We need to:
1. Add 'velocity_model.' prefix
2. Match block indices (LoRA block 0 might correspond to base block 4, etc.)
Actually, LTX LoRA files typically have entries for ALL blocks.
The LoRA files from ComfyUI exports may have different structures.
We need to normalize to: velocity_model.transformer_blocks.X.attn.to_weight
"""
renamed_sd = {}
# First pass: identify the pattern
# LoRA keys like: transformer_blocks.0.attn1.to_k.lora_A.weight
# Need to become: velocity_model.transformer_blocks.0.attn1.to_k.weight
for key, tensor in lora_sd.sd.items():
new_key = key
# Strip "diffusion_model." prefix if present
if new_key.startswith("diffusion_model."):
new_key = new_key[len("diffusion_model."):]
# Now add "velocity_model." prefix
if not new_key.startswith("velocity_model."):
new_key = "velocity_model." + new_key
# Convert LoRA key suffix to base weight suffix
# .lora_A.weight -> .weight
# .lora_B.weight -> .weight
if new_key.endswith(".lora_A.weight"):
new_key = new_key[:-len(".lora_A.weight")] + ".weight"
elif new_key.endswith(".lora_B.weight"):
new_key = new_key[:-len(".lora_B.weight")] + ".weight"
renamed_sd[new_key] = tensor
return StateDict(
sd=renamed_sd,
device=lora_sd.device,
size=lora_sd.size,
dtype=lora_sd.dtype
)
def build_fused_state_dict(
base_transformer,
lora_configs: list[tuple[str, float]],
progress_callback=None
) -> dict[str, torch.Tensor]:
"""
Fuse multiple LoRAs into a single state dict ready for load_state_dict().
Uses LTX's apply_loras function which handles FP8 quantization correctly.
Args:
base_transformer: The preloaded transformer model
lora_configs: List of (lora_path, strength) tuples for non-zero LoRAs
progress_callback: Optional callback(step, desc) for progress updates
Returns:
Dictionary of fused weights ready for load_state_dict()
"""
if not lora_configs:
# No LoRAs - return base transformer state dict
return {k: v.clone() for k, v in base_transformer.state_dict().items()}
if progress_callback:
progress_callback(0.1, "Loading LoRA state dicts into memory")
# Step 1: Load all LoRA state dicts and rename keys to match base model
lora_sd_with_strengths = []
# Get base key set for matching
base_dict = base_transformer.state_dict()
base_key_set = set(base_dict.keys())
print(f"[LoRA DEBUG] Total base model keys: {len(base_key_set)}")
for lora_path, strength in lora_configs:
sd = load_lora_into_cache(lora_path)
sd_renamed = _rename_lora_keys_for_base_model(sd, base_key_set)
# Show before/after for first few keys
original_keys = list(sd.sd.keys())[:3]
renamed_keys = list(sd_renamed.sd.keys())[:3]
print(f"[LoRA DEBUG] Before: {original_keys}")
print(f"[LoRA DEBUG] After: {renamed_keys}")
# Check if renamed keys exist in base model
sample_renamed = list(sd_renamed.sd.keys())[0]
exists_in_base = sample_renamed in base_key_set
print(f"[LoRA DEBUG] Sample renamed key exists in base? {exists_in_base}")
if not exists_in_base:
print(f"[LoRA DEBUG] Checking for similar base keys...")
prefix = sample_renamed.rsplit(".", 1)[0] # Remove .weight suffix
similar = [k for k in base_key_set if k.startswith(prefix[:30])]
print(f"[LoRA DEBUG] Similar base keys: {similar[:3]}")
if progress_callback:
progress_callback(0.3, "Extracting base transformer state dict")
# Step 2: Get base transformer state dict (already in memory from preloading!)
base_dict = base_transformer.state_dict()
base_sd = StateDict(
sd={k: v.detach().cpu().contiguous() for k, v in base_dict.items()},
device=torch.device("cpu"),
size=sum(v.nbytes for v in base_dict.values()),
dtype=set(v.dtype for v in base_dict.values())
)
if progress_callback:
progress_callback(0.5, "Fusing LoRAs with base weights (CPU)")
# Step 3: Fuse using LTX's apply_loras function
# This function handles:
# - FP8 quantized weights (_fuse_delta_with_scaled_fp8 / _fuse_delta_with_cast_fp8)
# - BFloat16 weights (_fuse_delta_with_bfloat16)
# - Proper delta accumulation for multiple LoRAs
fused_sd = apply_loras(
model_sd=base_sd,
lora_sd_and_strengths=lora_sd_with_strengths,
dtype=torch.bfloat16
)
if progress_callback:
progress_callback(0.9, "Extracting fused state dict")
# Step 4: Return the fused state dict as a plain dict (for load_state_dict)
return fused_sd.sd
def on_prepare_loras_click(
pose_strength: float,
general_strength: float,
motion_strength: float,
dreamlay_strength: float,
mself_strength: float,
dramatic_strength: float,
fluid_strength: float,
liquid_strength: float,
demopose_strength: float,
voice_strength: float,
realism_strength: float,
transition_strength: float,
progress=gr.Progress(track_tqdm=True),
):
"""
Called when user clicks the 'Prepare LoRA Cache' button.
This function:
1. Checks if LoRA combination is already applied (skip if so)
2. Checks in-memory FUSED_CACHE (skip building if cached)
3. Loads LoRA files into cache (reuses LORA_SD_CACHE on subsequent calls)
4. Builds fused state dict if needed (only new combinations)
5. Applies to the preloaded transformer
Only runs on button click, NOT on slider change.
"""
global current_lora_key, FUSED_CACHE
# Debug: Verify transformer consistency
ledger_transformer = ledger.transformer()
pipeline_transformer = pipeline.model_ledger.transformer()
print(f"[LoRA DEBUG] ledger.transformer() id: {id(ledger_transformer)}")
print(f"[LoRA DEBUG] pipeline.model_ledger.transformer() id: {id(pipeline_transformer)}")
print(f"[LoRA DEBUG] Same object? {ledger_transformer is pipeline_transformer}")
print(f"[LoRA DEBUG] _transformer id: {id(_transformer)}")
# Compute the cache key for this combination of strengths
key, _ = _make_lora_key(
pose_strength, general_strength, motion_strength, dreamlay_strength,
mself_strength, dramatic_strength, fluid_strength, liquid_strength,
demopose_strength, voice_strength, realism_strength, transition_strength
)
# Already applied with these exact strengths? Nothing to do.
if current_lora_key == key:
return f"✓ LoRAs already applied with current strengths"
progress(0.0, desc="Starting LoRA preparation")
# Build the list of active (non-zero) LoRAs
active_loras = []
lora_entries = [
(pose_lora_path, pose_strength, "Anthro Enhancer"),
(general_lora_path, general_strength, "Reasoning Enhancer"),
(motion_lora_path, motion_strength, "Anthro Posing"),
(dreamlay_lora_path, dreamlay_strength, "Dreamlay"),
(mself_lora_path, mself_strength, "Mself"),
(dramatic_lora_path, dramatic_strength, "Dramatic"),
(fluid_lora_path, fluid_strength, "Fluid Helper"),
(liquid_lora_path, liquid_strength, "Liquid Helper"),
(demopose_lora_path, demopose_strength, "Audio Helper"),
(voice_lora_path, voice_strength, "Voice Helper"),
(realism_lora_path, realism_strength, "Anthro Realism"),
(transition_lora_path, transition_strength, "POV"),
]
for path, strength, name in lora_entries:
if float(strength) != 0.0:
active_loras.append((path, float(strength)))
print(f"[LoRA] Active: {name} = {strength}")
if not active_loras:
# No LoRAs selected - apply base model weights (reset from any previous LoRAs)
print("[LoRA] No LoRAs selected, resetting to base model weights")
try:
transformer = ledger.transformer()
target_device = next(transformer.parameters()).device
# Get base weights and keep them on the same device as transformer
base_weights = {k: v.to(target_device) for k, v in transformer.state_dict().items()}
transformer.load_state_dict(base_weights, strict=False)
current_lora_key = key
progress(1.0, desc="Done")
return "✓ Reset to base model (no LoRAs active)"
except Exception as e:
return f"✗ Reset failed: {e}"
# Check in-memory cache for this strength combination
if key in FUSED_CACHE:
print(f"[LoRA] Using cached fused state for: {key[:16]}...")
fused_state = FUSED_CACHE[key]
progress(0.85, desc="Using cached fused state")
else:
# Need to build the fused state dict (the expensive part)
print(f"[LoRA] Building new fused state dict for {len(active_loras)} LoRA(s)...")
# Progress callback that maps to Gradio's progress tracker
def progress_cb(step, desc):
progress(0.1 + step * 0.8, desc=desc)
transformer = ledger.transformer()
fused_state = build_fused_state_dict(transformer, active_loras, progress_cb)
# Cache the fused state for future reuse (keyed by strength combination)
FUSED_CACHE[key] = fused_state
print(f"[LoRA] Cached fused state for: {key[:16]}...")
# Apply fused state to transformer
progress(0.92, desc="Applying fused weights to transformer")
try:
transformer = ledger.transformer()
# Determine target device - the transformer should already be on GPU
target_device = next(transformer.parameters()).device
# Load fused state dict directly into GPU transformer
# Convert CPU tensors to GPU tensors inline, then load
fused_state_gpu = {k: v.to(target_device) for k, v in fused_state.items()}
missing, unexpected = transformer.load_state_dict(fused_state_gpu, strict=False)
if missing:
print(f"[LoRA] Warning: {len(missing)} keys not found in fused state")
if unexpected:
print(f"[LoRA] Warning: {len(unexpected)} unexpected keys in fused state")
current_lora_key = key
progress(1.0, desc="Done")
return f"✓ Applied {len(active_loras)} LoRA(s) successfully"
except Exception as e:
import traceback
print(f"[LoRA] Apply failed: {e}")
print(traceback.format_exc())
# Try to restore transformer to GPU on error
try:
transformer = ledger.transformer()
if next(transformer.parameters()).device.type == "cpu":
if torch.cuda.is_available():
transformer = transformer.to("cuda")
except Exception:
pass
return f"✗ LoRA application failed: {e}"
# Preload all models for ZeroGPU tensor packing.
print("Preloading all models (including Gemma and audio components)...")
ledger = pipeline.model_ledger
# Save the original factory methods so we can rebuild individual components later.
# These are bound callables on ledger that will call the builder when invoked.
_orig_transformer_factory = ledger.transformer
_orig_video_encoder_factory = ledger.video_encoder
_orig_video_decoder_factory = ledger.video_decoder
_orig_audio_encoder_factory = ledger.audio_encoder
_orig_audio_decoder_factory = ledger.audio_decoder
_orig_vocoder_factory = ledger.vocoder
_orig_spatial_upsampler_factory = ledger.spatial_upsampler
_orig_text_encoder_factory = ledger.text_encoder
_orig_gemma_embeddings_factory = ledger.gemma_embeddings_processor
# Call the original factories once to create the cached instances we will serve by default.
_transformer = _orig_transformer_factory()
_video_encoder = _orig_video_encoder_factory()
_video_decoder = _orig_video_decoder_factory()
_audio_encoder = _orig_audio_encoder_factory()
_audio_decoder = _orig_audio_decoder_factory()
_vocoder = _orig_vocoder_factory()
_spatial_upsampler = _orig_spatial_upsampler_factory()
_text_encoder = _orig_text_encoder_factory()
_embeddings_processor = _orig_gemma_embeddings_factory()
# Replace ledger methods with lightweight lambdas that return the cached instances.
# We keep the original factories above so we can call them later to rebuild components.
ledger.transformer = lambda: _transformer
ledger.video_encoder = lambda: _video_encoder
ledger.video_decoder = lambda: _video_decoder
ledger.audio_encoder = lambda: _audio_encoder
ledger.audio_decoder = lambda: _audio_decoder
ledger.vocoder = lambda: _vocoder
ledger.spatial_upsampler = lambda: _spatial_upsampler
ledger.text_encoder = lambda: _text_encoder
ledger.gemma_embeddings_processor = lambda: _embeddings_processor
print("All models preloaded (including Gemma text encoder and audio encoder)!")
# ---- REPLACE PRELOAD BLOCK END ----
print("=" * 80)
print("Pipeline ready!")
print("=" * 80)
def log_memory(tag: str):
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3
peak = torch.cuda.max_memory_allocated() / 1024**3
free, total = torch.cuda.mem_get_info()
print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB")
def detect_aspect_ratio(image) -> str:
if image is None:
return "16:9"
if hasattr(image, "size"):
w, h = image.size
elif hasattr(image, "shape"):
h, w = image.shape[:2]
else:
return "16:9"
ratio = w / h
candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0}
return min(candidates, key=lambda k: abs(ratio - candidates[k]))
def on_image_upload(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def on_highres_toggle(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def get_gpu_duration(
first_image,
last_image,
input_audio,
prompt: str,
negative_prompt: str,
duration: float,
gpu_duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 1024,
width: int = 1536,
video_cfg_scale: float = 1.0,
video_stg_scale: float = 0.0,
video_rescale_scale: float = 0.45,
video_a2v_scale: float = 3.0,
audio_cfg_scale: float = 1.0,
audio_stg_scale: float = 0.0,
audio_rescale_scale: float = 1.0,
audio_v2a_scale: float = 3.0,
pose_strength: float = 0.0,
general_strength: float = 0.0,
motion_strength: float = 0.0,
dreamlay_strength: float = 0.0,
mself_strength: float = 0.0,
dramatic_strength: float = 0.0,
fluid_strength: float = 0.0,
liquid_strength: float = 0.0,
demopose_strength: float = 0.0,
voice_strength: float = 0.0,
realism_strength: float = 0.0,
transition_strength: float = 0.0,
progress=None,
):
return int(gpu_duration)
@spaces.GPU(duration=get_gpu_duration)
@torch.inference_mode()
def generate_video(
first_image,
last_image,
input_audio,
prompt: str,
negative_prompt: str,
duration: float,
gpu_duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 1024,
width: int = 1536,
video_cfg_scale: float = 1.0,
video_stg_scale: float = 0.0,
video_rescale_scale: float = 0.45,
video_a2v_scale: float = 3.0,
audio_cfg_scale: float = 1.0,
audio_stg_scale: float = 0.0,
audio_rescale_scale: float = 1.0,
audio_v2a_scale: float = 3.0,
pose_strength: float = 0.0,
general_strength: float = 0.0,
motion_strength: float = 0.0,
dreamlay_strength: float = 0.0,
mself_strength: float = 0.0,
dramatic_strength: float = 0.0,
fluid_strength: float = 0.0,
liquid_strength: float = 0.0,
demopose_strength: float = 0.0,
voice_strength: float = 0.0,
realism_strength: float = 0.0,
transition_strength: float = 0.0,
progress=gr.Progress(track_tqdm=True),
):
try:
torch.cuda.reset_peak_memory_stats()
log_memory("start")
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
frame_rate = DEFAULT_FRAME_RATE
num_frames = int(duration * frame_rate) + 1
num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1
print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}")
images = []
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
if first_image is not None:
temp_first_path = output_dir / f"temp_first_{current_seed}.jpg"
if hasattr(first_image, "save"):
first_image.save(temp_first_path)
else:
temp_first_path = Path(first_image)
images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0))
if last_image is not None:
temp_last_path = output_dir / f"temp_last_{current_seed}.jpg"
if hasattr(last_image, "save"):
last_image.save(temp_last_path)
else:
temp_last_path = Path(last_image)
images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0))
tiling_config = TilingConfig.default()
video_chunks_number = get_video_chunks_number(num_frames, tiling_config)
video_guider_params = MultiModalGuiderParams(
cfg_scale=video_cfg_scale,
stg_scale=video_stg_scale,
rescale_scale=video_rescale_scale,
modality_scale=video_a2v_scale,
skip_step=0,
stg_blocks=[],
)
audio_guider_params = MultiModalGuiderParams(
cfg_scale=audio_cfg_scale,
stg_scale=audio_stg_scale,
rescale_scale=audio_rescale_scale,
modality_scale=audio_v2a_scale,
skip_step=0,
stg_blocks=[],
)
log_memory("before pipeline call")
video, audio = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
seed=current_seed,
height=int(height),
width=int(width),
num_frames=num_frames,
frame_rate=frame_rate,
video_guider_params=video_guider_params,
audio_guider_params=audio_guider_params,
images=images,
audio_path=input_audio,
tiling_config=tiling_config,
enhance_prompt=enhance_prompt,
)
log_memory("after pipeline call")
output_path = tempfile.mktemp(suffix=".mp4")
encode_video(
video=video,
fps=frame_rate,
audio=audio,
output_path=output_path,
video_chunks_number=video_chunks_number,
)
log_memory("after encode_video")
return str(output_path), current_seed
except Exception as e:
import traceback
log_memory("on error")
print(f"Error: {str(e)}\n{traceback.format_exc()}")
return None, current_seed
# =============================================================================
# Gradio UI
# =============================================================================
css = """
.fillable {max-width: 1200px !important}
.progress-text {color: black}
"""
with gr.Blocks(title="LTX-2.3 Distilled with LoRAs, Negative Prompting, and Advanced Settings") as demo:
gr.Markdown("# LTX-2.3 Two-Stage HQ Video Generation")
gr.Markdown(
"High-quality text/image-to-video with cached LoRA state + CFG guidance. "
"[[Model]](https://huggingface.co/Lightricks/LTX-2.3)"
)
with gr.Row():
# LEFT SIDE: Input Controls
with gr.Column():
with gr.Row():
first_image = gr.Image(label="First Frame (Optional)", type="pil")
last_image = gr.Image(label="Last Frame (Optional)", type="pil")
prompt = gr.Textbox(
label="Prompt",
value="Make this image come alive with cinematic motion, smooth animation",
lines=3,
placeholder="Describe the motion and animation you want...",
)
negative_prompt = gr.Textbox(
label="Negative Prompt",
value="worst quality, inconsistent motion, blurry, jittery, distorted, deformed, artifacts, text, watermark, logo, frame, border, low resolution, pixelated, unnatural, fake, CGI, cartoon",
lines=2,
)
duration = gr.Slider(
label="Duration (seconds)",
minimum=1.0, maximum=30.0, value=10.0, step=0.1,
)
with gr.Row():
seed = gr.Number(label="Seed", value=42, precision=0, minimum=0, maximum=MAX_SEED)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
with gr.Row():
high_res = gr.Checkbox(label="High Resolution", value=True)
enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False)
with gr.Row():
width = gr.Number(label="Width", value=1536, precision=0)
height = gr.Number(label="Height", value=1024, precision=0)
generate_btn = gr.Button("Generate Video", variant="primary", size="lg")
with gr.Accordion("Advanced Settings", open=False):
gr.Markdown("### Video Guidance Parameters")
with gr.Row():
video_cfg_scale = gr.Slider(
label="Video CFG Scale", minimum=1.0, maximum=10.0, value=1.0, step=0.1
)
video_stg_scale = gr.Slider(
label="Video STG Scale", minimum=0.0, maximum=2.0, value=0.0, step=0.1
)
with gr.Row():
video_rescale_scale = gr.Slider(
label="Video Rescale", minimum=0.0, maximum=2.0, value=0.45, step=0.1
)
video_a2v_scale = gr.Slider(
label="A2V Scale", minimum=0.0, maximum=5.0, value=3.0, step=0.1
)
gr.Markdown("### Audio Guidance Parameters")
with gr.Row():
audio_cfg_scale = gr.Slider(
label="Audio CFG Scale", minimum=1.0, maximum=15.0, value=1.0, step=0.1
)
audio_stg_scale = gr.Slider(
label="Audio STG Scale", minimum=0.0, maximum=2.0, value=0.0, step=0.1
)
with gr.Row():
audio_rescale_scale = gr.Slider(
label="Audio Rescale", minimum=0.0, maximum=2.0, value=1.0, step=0.1
)
audio_v2a_scale = gr.Slider(
label="V2A Scale", minimum=0.0, maximum=5.0, value=3.0, step=0.1
)
with gr.Row():
input_audio = gr.Audio(label="Audio Input (Optional)", type="filepath")
# RIGHT SIDE: Output and LoRA
with gr.Column():
output_video = gr.Video(label="Generated Video", autoplay=False)
gpu_duration = gr.Slider(
label="ZeroGPU duration (seconds)",
minimum=30.0, maximum=240.0, value=90.0, step=1.0,
info="Increase for longer videos, higher resolution, or LoRA usage"
)
gr.Markdown("### LoRA Adapter Strengths")
gr.Markdown("Set to 0 to disable, then click 'Prepare LoRA Cache'")
with gr.Row():
pose_strength = gr.Slider(label="Anthro Enhancer", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
gr.Markdown("") # Spacer for alignment
with gr.Row():
general_strength = gr.Slider(label="Reasoning Enhancer", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
motion_strength = gr.Slider(label="Anthro Posing", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
with gr.Row():
dreamlay_strength = gr.Slider(label="Dreamlay", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
mself_strength = gr.Slider(label="Mself", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
with gr.Row():
dramatic_strength = gr.Slider(label="Dramatic", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
fluid_strength = gr.Slider(label="Fluid Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
with gr.Row():
liquid_strength = gr.Slider(label="Liquid Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
demopose_strength = gr.Slider(label="Audio Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
with gr.Row():
voice_strength = gr.Slider(label="Voice Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
realism_strength = gr.Slider(label="Anthro Realism", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
with gr.Row():
transition_strength = gr.Slider(label="POV", minimum=0.0, maximum=2.0, value=0.0, step=0.01)
gr.Markdown("") # Spacer for alignment
prepare_lora_btn = gr.Button("Prepare / Load LoRA Cache", variant="secondary")
lora_status = gr.Textbox(
label="LoRA Cache Status",
value="No LoRA state prepared yet.",
interactive=False,
)
# Event handlers
first_image.change(fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height])
last_image.change(fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height])
high_res.change(fn=on_highres_toggle, inputs=[first_image, last_image, high_res], outputs=[width, height])
prepare_lora_btn.click(
fn=on_prepare_loras_click,
inputs=[pose_strength, general_strength, motion_strength, dreamlay_strength,
mself_strength, dramatic_strength, fluid_strength, liquid_strength,
demopose_strength, voice_strength, realism_strength, transition_strength],
outputs=[lora_status],
)
generate_btn.click(
fn=generate_video,
inputs=[
first_image, last_image, input_audio, prompt, negative_prompt, duration, gpu_duration,
enhance_prompt, seed, randomize_seed, height, width,
video_cfg_scale, video_stg_scale, video_rescale_scale, video_a2v_scale,
audio_cfg_scale, audio_stg_scale, audio_rescale_scale, audio_v2a_scale,
pose_strength, general_strength, motion_strength,
dreamlay_strength, mself_strength, dramatic_strength, fluid_strength,
liquid_strength, demopose_strength, voice_strength, realism_strength,
transition_strength,
],
outputs=[output_video, seed],
)
if __name__ == "__main__":
demo.queue().launch(theme=gr.themes.Citrus(), css=css, mcp_server=False)