| """FFmpeg video stitching, clip splitting/shuffling, lyrics overlay. |
| |
| Takes generated video clips (one per 4-beat segment), splits each into |
| two halves, shuffles them with a distance constraint, builds a timeline |
| with dynamic pacing (4-beat cuts before the drop, 2-beat after), overlays |
| audio and lyrics text. |
| """ |
|
|
| import json |
| import random |
| import subprocess |
| import tempfile |
| from pathlib import Path |
|
|
|
|
| def _get_audio_path(run_dir: Path) -> Path: |
| """Find the original audio file one level above the run directory.""" |
| song_dir = run_dir.parent |
| for ext in [".wav", ".mp3", ".flac", ".m4a"]: |
| candidates = list(song_dir.glob(f"*{ext}")) |
| if candidates: |
| return candidates[0] |
| raise FileNotFoundError(f"No audio file found in {song_dir}") |
|
|
|
|
| def _get_clip_duration(clip_path: Path) -> float: |
| """Get video duration in seconds using ffprobe.""" |
| result = subprocess.run([ |
| "ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "csv=p=0", |
| str(clip_path), |
| ], capture_output=True, text=True, check=True) |
| return float(result.stdout.strip()) |
|
|
|
|
| def _get_clip_fps(clip_path: Path) -> float: |
| """Get video frame rate using ffprobe.""" |
| result = subprocess.run([ |
| "ffprobe", "-v", "error", |
| "-select_streams", "v:0", |
| "-show_entries", "stream=r_frame_rate", |
| "-of", "csv=p=0", |
| str(clip_path), |
| ], capture_output=True, text=True, check=True) |
| num, den = result.stdout.strip().split("/") |
| return int(num) / int(den) |
|
|
|
|
| def _trim_clip(clip_path: Path, start: float, duration: float, output_path: Path): |
| """Trim a video clip from a start point to a duration using FFmpeg.""" |
| cmd = [ |
| "ffmpeg", "-y", |
| "-ss", f"{start:.3f}", |
| "-i", str(clip_path), |
| "-t", f"{duration:.3f}", |
| "-c:v", "libx264", "-preset", "fast", |
| "-an", |
| str(output_path), |
| ] |
| subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
| |
| |
| |
|
|
| |
| _KB_ZOOM = 0.45 |
|
|
| KEN_BURNS_EFFECTS = [ |
| "zoom_in", |
| "zoom_out", |
| ] |
|
|
|
|
| def _ken_burns_filter( |
| effect: str, n_frames: int, width: int, height: int, |
| ) -> str: |
| """Build an FFmpeg filter for a smooth Ken Burns zoom effect on video. |
| |
| Upscales the video 4x before applying zoompan with d=1 (one output |
| frame per input frame), then scales back to original size. The 4x |
| upscale makes integer rounding in zoompan negligible, eliminating |
| visible jitter. |
| """ |
| z = _KB_ZOOM |
| N = max(n_frames, 1) |
| W, H = width, height |
| |
| UP = 8 |
| UW, UH = W * UP, H * UP |
|
|
| if effect == "zoom_in": |
| zoom_expr = f"1+{z}*on/{N}" |
| elif effect == "zoom_out": |
| zoom_expr = f"1+{z}-{z}*on/{N}" |
| else: |
| return f"scale={W}:{H}" |
|
|
| return ( |
| f"scale={UW}:{UH}:flags=lanczos," |
| f"zoompan=z='{zoom_expr}':" |
| f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':" |
| f"d=1:s={UW}x{UH}," |
| f"scale={W}:{H}:flags=lanczos" |
| ) |
|
|
|
|
| def _get_clip_dimensions(clip_path: Path) -> tuple[int, int]: |
| """Get width and height of a video clip.""" |
| result = subprocess.run( |
| ["ffprobe", "-v", "error", "-select_streams", "v:0", |
| "-show_entries", "stream=width,height", |
| "-of", "csv=s=x:p=0", str(clip_path)], |
| capture_output=True, text=True, check=True, |
| ) |
| w, h = result.stdout.strip().split("x") |
| return int(w), int(h) |
|
|
|
|
| def _split_clip(clip_path: Path, clip_id: int) -> dict: |
| """Register a clip's two halves without pre-splitting. |
| |
| The "first" half plays from the start, the "second" half plays from |
| the end (offset back by the slot duration at trim time). This makes |
| the two halves maximally different — no fixed midpoint split. |
| |
| Returns dict with the original path and full duration for each half. |
| """ |
| duration = _get_clip_duration(clip_path) |
|
|
| return { |
| "clip_id": clip_id, |
| "first": clip_path, |
| "second": clip_path, |
| "first_duration": duration, |
| "second_duration": duration, |
| } |
|
|
|
|
| def _build_sub_segments(segments: list[dict], drop_time: float | None) -> list[dict]: |
| """Build the final timeline of sub-segments. |
| |
| Before the drop: one slot per 4-beat segment. |
| After the drop: each 4-beat segment splits into two 2-beat slots |
| using the beat timestamps stored in the segment. |
| """ |
| sub_segments = [] |
|
|
| for seg in segments: |
| beats = seg.get("beats", [seg["start"], seg["end"]]) |
| is_after_drop = drop_time is not None and seg["start"] >= drop_time |
|
|
| if is_after_drop and len(beats) >= 3: |
| |
| mid_idx = len(beats) // 2 |
| mid_time = beats[mid_idx] |
|
|
| sub_segments.append({ |
| "start": seg["start"], |
| "end": mid_time, |
| "duration": round(mid_time - seg["start"], 3), |
| "lyrics": seg.get("lyrics", ""), |
| "parent_segment": seg["segment"], |
| }) |
| sub_segments.append({ |
| "start": mid_time, |
| "end": seg["end"], |
| "duration": round(seg["end"] - mid_time, 3), |
| "lyrics": "", |
| "parent_segment": seg["segment"], |
| }) |
| else: |
| |
| sub_segments.append({ |
| "start": seg["start"], |
| "end": seg["end"], |
| "duration": seg["duration"], |
| "lyrics": seg.get("lyrics", ""), |
| "parent_segment": seg["segment"], |
| }) |
|
|
| return sub_segments |
|
|
|
|
| def _shuffle_with_distance(pool: list[tuple], n_slots: int) -> list[tuple]: |
| """Select n_slots sub-clips maximising clip diversity and spacing. |
| |
| Shuffles clip IDs once, then repeats that order to fill all slots. |
| First pass uses "first" halves, second pass uses "second" halves. |
| Same clip is always exactly n_clips positions apart — maximum spacing. |
| |
| Each item is (clip_id, half_label, path, duration). |
| """ |
| by_clip: dict[int, list[tuple]] = {} |
| for item in pool: |
| by_clip.setdefault(item[0], []).append(item) |
|
|
| clip_ids = list(by_clip.keys()) |
| random.shuffle(clip_ids) |
|
|
| |
| result = [] |
| cycle = 0 |
| while len(result) < n_slots: |
| for cid in clip_ids: |
| if len(result) >= n_slots: |
| break |
| halves = by_clip[cid] |
| |
| half_idx = cycle % len(halves) |
| result.append(halves[half_idx]) |
| cycle += 1 |
|
|
| return result |
|
|
|
|
| |
| FONTS = { |
| "Bebas Neue": "BebasNeue-Regular.ttf", |
| "Teko": "Teko-Bold.ttf", |
| "Russo One": "RussoOne-Regular.ttf", |
| "Staatliches": "Staatliches-Regular.ttf", |
| } |
|
|
| DEFAULT_FONT = "Bebas Neue" |
| DEFAULT_FONT_COLOR = "#FFF7D4" |
|
|
| _FONTS_DIR = Path(__file__).resolve().parent.parent / "fonts" |
|
|
|
|
| def font_names() -> list[str]: |
| """Return list of available font display names.""" |
| return list(FONTS.keys()) |
|
|
|
|
| def _get_font_path(font_name: str) -> Path: |
| """Resolve a font display name to its .ttf file path.""" |
| filename = FONTS.get(font_name, FONTS[DEFAULT_FONT]) |
| return _FONTS_DIR / filename |
|
|
|
|
| _SPOTIFY_BADGE = Path(__file__).resolve().parent.parent / "assets" / "spotify_badge.png" |
|
|
|
|
| def _add_lyrics_overlay( |
| video_path: Path, |
| segments: list[dict], |
| output_path: Path, |
| audio_offset: float, |
| font_name: str = DEFAULT_FONT, |
| font_color: str = DEFAULT_FONT_COLOR, |
| cover_art: Path | None = None, |
| drop_time: float | None = None, |
| song_name: str = "", |
| ): |
| """Add lyrics text and optional cover art overlay using FFmpeg filters.""" |
| font_path = _get_font_path(font_name) |
|
|
| |
| lyrics_cutoff = None |
| if cover_art is not None and drop_time is not None: |
| lyrics_cutoff = drop_time |
|
|
| |
| all_words = [] |
| for seg in segments: |
| for word_info in seg.get("words", []): |
| word = word_info["word"].strip().lower() |
| if not word: |
| continue |
| w_start = word_info["start"] |
| w_end = word_info["end"] |
| |
| if lyrics_cutoff is not None and w_start >= lyrics_cutoff: |
| continue |
| |
| if lyrics_cutoff is not None and w_end > lyrics_cutoff: |
| w_end = lyrics_cutoff |
| all_words.append({"word": word, "start": w_start, "end": w_end}) |
|
|
| |
| gap_threshold = 0.5 |
| for i in range(len(all_words) - 1): |
| gap = all_words[i + 1]["start"] - all_words[i]["end"] |
| if 0 < gap < gap_threshold: |
| mid = all_words[i]["end"] + gap / 2 |
| all_words[i]["end"] = mid |
| all_words[i + 1]["start"] = mid |
|
|
| |
| drawtext_filters = [] |
| for w in all_words: |
| escaped = (w["word"] |
| .replace("\\", "\\\\") |
| .replace("'", "\u2019") |
| .replace('"', '\\"') |
| .replace(":", "\\:") |
| .replace("%", "%%") |
| .replace("[", "\\[") |
| .replace("]", "\\]")) |
|
|
| start = w["start"] - audio_offset |
| end = w["end"] - audio_offset |
|
|
| drawtext_filters.append( |
| f"drawtext=text='{escaped}'" |
| f":fontfile='{font_path}'" |
| f":fontsize=36" |
| f":fontcolor={font_color}" |
| f":x=(w-text_w)/2:y=(h-text_h)/2" |
| f":enable='between(t,{start:.3f},{end:.3f})'" |
| ) |
|
|
| has_cover = cover_art is not None and drop_time is not None |
| has_lyrics = len(drawtext_filters) > 0 |
|
|
| if not has_cover and not has_lyrics: |
| subprocess.run([ |
| "ffmpeg", "-y", "-i", str(video_path), |
| "-c", "copy", str(output_path), |
| ], check=True, capture_output=True) |
| return |
|
|
| if has_cover: |
| drop_start = drop_time - audio_offset |
| enable = f"enable='gte(t,{drop_start:.3f})'" |
|
|
| |
| art_h = 270 |
| art_y_offset = 10 |
| badge_h = 56 |
|
|
| |
| vid_h = int(subprocess.run([ |
| "ffprobe", "-v", "error", "-select_streams", "v:0", |
| "-show_entries", "stream=height", "-of", "csv=p=0", |
| str(video_path), |
| ], capture_output=True, text=True, check=True).stdout.strip()) |
| art_center = vid_h / 2 + art_y_offset |
| art_top = art_center - art_h / 2 |
| art_bottom = art_center + art_h / 2 |
|
|
| |
| sq_side = vid_h * 9 / 16 |
| sq_top = (vid_h - sq_side) / 2 |
| sq_bottom = (vid_h + sq_side) / 2 |
|
|
| |
| badge_center_y = (sq_top + art_top) / 2 |
| badge_y = int(badge_center_y - badge_h / 2) |
|
|
| |
| title_center_y = int((art_bottom + sq_bottom) / 2) |
|
|
| art_overlay_y = int(art_center - art_h / 2) |
|
|
| parts = [ |
| f"[1:v]scale=-2:{art_h}:flags=lanczos[art]", |
| f"[2:v]scale=-2:{badge_h}:flags=lanczos[badge]", |
| f"[0:v][art]overlay=(W-w)/2:{art_overlay_y}:{enable}[v1]", |
| f"[v1][badge]overlay=(W-w)/2:{badge_y}:{enable}", |
| ] |
|
|
| |
| title_escaped = (song_name |
| .replace("\\", "\\\\") |
| .replace("'", "\u2019") |
| .replace('"', '\\"') |
| .replace(":", "\\:") |
| .replace("%", "%%")) |
| title_text = f'\\"{title_escaped}\\" out now!'.lower() |
| parts[-1] += ( |
| f",drawtext=text='{title_text}'" |
| f":fontfile='{font_path}'" |
| f":fontsize=40" |
| f":fontcolor={font_color}" |
| f":x=(w-text_w)/2:y={title_center_y}-text_h/2" |
| f":{enable}" |
| ) |
|
|
| |
| if has_lyrics: |
| parts[-1] += "," + ",".join(drawtext_filters) |
| filter_chain = ";".join(parts) |
|
|
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", str(video_path), |
| "-i", str(cover_art), |
| "-i", str(_SPOTIFY_BADGE), |
| "-filter_complex", filter_chain, |
| "-c:v", "libx264", "-preset", "fast", |
| "-c:a", "copy", |
| str(output_path), |
| ] |
| subprocess.run(cmd, check=True, capture_output=True) |
| else: |
| |
| filter_chain = ",".join(drawtext_filters) |
| subprocess.run([ |
| "ffmpeg", "-y", |
| "-i", str(video_path), |
| "-vf", filter_chain, |
| "-c:v", "libx264", "-preset", "fast", |
| "-c:a", "copy", |
| str(output_path), |
| ], check=True, capture_output=True) |
|
|
|
|
| def assemble( |
| run_dir: str | Path, |
| audio_path: str | Path | None = None, |
| font_name: str = DEFAULT_FONT, |
| font_color: str = DEFAULT_FONT_COLOR, |
| cover_art: str | Path | None = None, |
| ) -> Path: |
| """Assemble final video with dynamic pacing, clip shuffling, and lyrics. |
| |
| Args: |
| run_dir: Run directory containing clips/, segments.json, drop.json. |
| audio_path: Path to the original audio. Auto-detected if None. |
| font_name: Display name of the font for lyrics overlay. |
| font_color: Hex color for lyrics text (e.g. '#FFF7D4'). |
| cover_art: Path to cover art image. Overlayed from the drop onwards. |
| |
| Returns: |
| Path to the final video file. |
| """ |
| run_dir = Path(run_dir) |
| clips_dir = run_dir / "clips" |
| output_dir = run_dir / "output" |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| with open(run_dir / "segments.json") as f: |
| segments = json.load(f) |
|
|
| |
| drop_time = None |
| drop_path = run_dir / "drop.json" |
| if drop_path.exists(): |
| with open(drop_path) as f: |
| drop_time = json.load(f).get("drop_time") |
| print(f" Drop at {drop_time:.3f}s") |
| else: |
| print(" No drop detected — using uniform pacing") |
|
|
| if audio_path is None: |
| audio_path = _get_audio_path(run_dir) |
| audio_path = Path(audio_path) |
|
|
| |
| sub_clips = [] |
| for seg in segments: |
| idx = seg["segment"] |
| clip_path = clips_dir / f"clip_{idx:03d}.mp4" |
| if not clip_path.exists(): |
| print(f" Warning: {clip_path.name} not found, skipping") |
| continue |
|
|
| halves = _split_clip(clip_path, idx) |
| sub_clips.append((idx, "first", halves["first"], halves["first_duration"])) |
| sub_clips.append((idx, "second", halves["second"], halves["second_duration"])) |
| print(f" Registered {clip_path.name} ({halves['first_duration']:.1f}s)") |
|
|
| if not sub_clips: |
| raise FileNotFoundError(f"No clips found in {clips_dir}") |
|
|
| |
| sub_segments = _build_sub_segments(segments, drop_time) |
| print(f" Timeline: {len(sub_segments)} slots " |
| f"({len([s for s in sub_segments if s['duration'] < 1.5])} fast cuts)") |
|
|
| |
| assigned = _shuffle_with_distance(sub_clips.copy(), n_slots=len(sub_segments)) |
|
|
| |
| |
| fps = _get_clip_fps(assigned[0][2]) |
| print(f" Source FPS: {fps}") |
|
|
| trimmed_dir = run_dir / "clips_trimmed" |
| trimmed_dir.mkdir(exist_ok=True) |
| trimmed_paths = [] |
|
|
| |
| clip_width, clip_height = _get_clip_dimensions(assigned[0][2]) |
| print(f" Clip resolution: {clip_width}x{clip_height}") |
|
|
| |
| cumulative_frames = 0 |
| cumulative_target = 0.0 |
|
|
| for i, (sub_seg, (clip_id, half, clip_path, clip_dur)) in enumerate( |
| zip(sub_segments, assigned) |
| ): |
| slot_dur = sub_seg["duration"] |
| cumulative_target += min(slot_dur, clip_dur) |
| target_frame = round(cumulative_target * fps) |
| n_frames = max(1, target_frame - cumulative_frames) |
| cumulative_frames = target_frame |
|
|
| |
| |
| if half == "second": |
| ss = max(0, clip_dur - slot_dur) |
| else: |
| ss = 0 |
|
|
| |
| effect = KEN_BURNS_EFFECTS[i % len(KEN_BURNS_EFFECTS)] |
| vf = _ken_burns_filter(effect, n_frames, clip_width, clip_height) |
|
|
| trimmed_path = trimmed_dir / f"slot_{i:03d}.mp4" |
| cmd = [ |
| "ffmpeg", "-y", |
| "-ss", f"{ss:.3f}", |
| "-i", str(clip_path), |
| "-frames:v", str(n_frames), |
| "-vf", vf, |
| "-c:v", "libx264", "-preset", "fast", |
| "-r", str(int(fps)), |
| "-an", |
| str(trimmed_path), |
| ] |
| subprocess.run(cmd, check=True, capture_output=True) |
| trimmed_paths.append(trimmed_path) |
| actual_dur = n_frames / fps |
| print(f" Slot {i}: clip {clip_id} ({half}, ss={ss:.1f}s, {effect}) → " |
| f"{n_frames}f/{actual_dur:.3f}s (target {slot_dur:.3f}s)") |
|
|
| |
| with tempfile.NamedTemporaryFile( |
| mode="w", suffix=".txt", delete=False, dir=str(run_dir) |
| ) as f: |
| for p in trimmed_paths: |
| f.write(f"file '{p.resolve()}'\n") |
| concat_list = f.name |
|
|
| concat_path = output_dir / "video_only.mp4" |
| subprocess.run([ |
| "ffmpeg", "-y", |
| "-f", "concat", "-safe", "0", |
| "-i", concat_list, |
| "-c", "copy", |
| str(concat_path), |
| ], check=True, capture_output=True) |
|
|
| |
| audio_start = segments[0]["start"] |
| video_duration = cumulative_frames / fps |
|
|
| with_audio_path = output_dir / "with_audio.mp4" |
| subprocess.run([ |
| "ffmpeg", "-y", |
| "-i", str(concat_path), |
| "-ss", f"{audio_start:.3f}", |
| "-i", str(audio_path), |
| "-t", f"{video_duration:.3f}", |
| "-c:v", "copy", |
| "-c:a", "aac", "-b:a", "192k", |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-shortest", |
| str(with_audio_path), |
| ], check=True, capture_output=True) |
|
|
| |
| overlay_path = output_dir / "with_overlay.mp4" |
| cover_path = Path(cover_art) if cover_art else None |
| song_name = run_dir.parent.name |
| _add_lyrics_overlay(with_audio_path, segments, overlay_path, audio_start, |
| font_name=font_name, font_color=font_color, |
| cover_art=cover_path, drop_time=drop_time, |
| song_name=song_name) |
|
|
| |
| final_path = output_dir / "final.mp4" |
| subprocess.run([ |
| "ffmpeg", "-y", |
| "-i", str(overlay_path), |
| "-vf", "crop=2*floor(ih*9/16/2):ih:(iw-2*floor(ih*9/16/2))/2:0", |
| "-c:v", "libx264", "-preset", "fast", |
| "-c:a", "copy", |
| str(final_path), |
| ], check=True, capture_output=True) |
|
|
| |
| Path(concat_list).unlink(missing_ok=True) |
|
|
| print(f"\nFinal video: {final_path}") |
| print(f" Duration: {video_duration:.2f}s") |
| print(f" Slots: {len(sub_segments)} ({len(segments)} original segments)") |
| return final_path |
|
|
|
|
| def run( |
| run_dir: str | Path, |
| font_name: str = DEFAULT_FONT, |
| font_color: str = DEFAULT_FONT_COLOR, |
| cover_art: str | Path | None = None, |
| ) -> Path: |
| """Assemble final video from clips + audio. |
| |
| Args: |
| run_dir: Run directory (e.g. data/Gone/run_001/). |
| font_name: Display name of the font for lyrics overlay. |
| font_color: Hex color for lyrics text. |
| cover_art: Path to cover art image (optional). |
| |
| Returns: |
| Path to final video. |
| """ |
| print("Assembling final video...") |
| return assemble(run_dir, font_name=font_name, font_color=font_color, |
| cover_art=cover_art) |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
|
|
| if len(sys.argv) < 2: |
| print("Usage: python -m src.assembler <run_dir>") |
| print(" e.g. python -m src.assembler data/Gone/run_001") |
| sys.exit(1) |
|
|
| run(sys.argv[1]) |
|
|