#!/usr/bin/env python3 """ Universal Media Transcriber Supports: YouTube, YouTube Music, Spotify, Direct Audio/Video URLs Blazing fast: uses native captions when available, falls back to faster-whisper """ import os import sys import re import json import time import shutil import hashlib import argparse import tempfile import subprocess from pathlib import Path from datetime import timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse, parse_qs # Add current directory to PATH so local ffmpeg.exe can be found script_dir = str(Path(__file__).parent.absolute()) if script_dir not in os.environ["PATH"]: os.environ["PATH"] = script_dir + os.pathsep + os.environ["PATH"] # ───────────────────────────────────────────────────────────────────────────── # AUTO-INSTALLER — installs missing deps silently on first run # ───────────────────────────────────────────────────────────────────────────── REQUIRED = { "yt_dlp": "yt-dlp", "youtube_transcript_api": "youtube-transcript-api", "faster_whisper": "faster-whisper", "rich": "rich", "spotdl": "spotdl", "requests": "requests", } def ensure_deps(): missing = [] for module, pkg in REQUIRED.items(): try: __import__(module) except ImportError: missing.append(pkg) if missing: print(f"[setup] Installing: {', '.join(missing)} ...") subprocess.check_call( [sys.executable, "-m", "pip", "install", "--quiet", "--break-system-packages"] + missing ) print("[setup] Done. Reloading...\n") ensure_deps() # ───────────────────────────────────────────────────────────────────────────── # IMPORTS (after install) # ───────────────────────────────────────────────────────────────────────────── import yt_dlp import requests from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.panel import Panel from rich.table import Table from rich import print as rprint from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound console = Console() # ───────────────────────────────────────────────────────────────────────────── # CONSTANTS & CONFIG # ───────────────────────────────────────────────────────────────────────────── WHISPER_MODEL = "base" # tiny / base / small / medium / large-v3 WHISPER_DEVICE = "auto" # auto / cpu / cuda WHISPER_THREADS = os.cpu_count() # use all cores AUDIO_FORMAT = "mp3" MAX_WORKERS = 4 # parallel jobs CACHE_DIR = Path.home() / ".transcriber_cache" CACHE_DIR.mkdir(exist_ok=True) # Language preference order for YouTube captions LANG_PREF = ["en", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "en-IE", "en-NZ", "en-PH", "en-ZA", "en-orig", "a.en"] # ───────────────────────────────────────────────────────────────────────────── # URL DETECTION # ───────────────────────────────────────────────────────────────────────────── def detect_source(url: str) -> str: """Returns: youtube | youtube_music | spotify | audio | unknown""" parsed = urlparse(url) host = parsed.netloc.lower().replace("www.", "") if host in ("youtube.com", "youtu.be", "m.youtube.com"): return "youtube" if host in ("music.youtube.com",): return "youtube_music" if host in ("open.spotify.com", "spotify.com"): return "spotify" if any(url.lower().endswith(ext) for ext in [ ".mp3", ".mp4", ".wav", ".ogg", ".flac", ".m4a", ".webm", ".aac", ".opus", ".mkv", ".avi", ".mov" ]): return "audio" # Try to detect by content-type via HEAD try: r = requests.head(url, timeout=5, allow_redirects=True) ct = r.headers.get("content-type", "") if "audio" in ct or "video" in ct: return "audio" except Exception: pass return "unknown" def extract_youtube_id(url: str) -> str | None: """Extract video ID from any YouTube URL format.""" patterns = [ r"(?:v=|youtu\.be/|embed/|shorts/)([A-Za-z0-9_-]{11})", ] for p in patterns: m = re.search(p, url) if m: return m.group(1) return None def extract_spotify_type(url: str) -> tuple[str, str]: """Returns (type, id) e.g. ('track', 'abc123')""" m = re.search(r"spotify\.com/(track|album|playlist|episode|show)/([A-Za-z0-9]+)", url) if m: return m.group(1), m.group(2) return "unknown", "" # ───────────────────────────────────────────────────────────────────────────── # CACHE # ───────────────────────────────────────────────────────────────────────────── def cache_key(url: str) -> str: return hashlib.md5(url.encode()).hexdigest() def cache_get(url: str) -> str | None: path = CACHE_DIR / f"{cache_key(url)}.txt" if path.exists(): return path.read_text(encoding="utf-8") return None def cache_set(url: str, text: str): path = CACHE_DIR / f"{cache_key(url)}.txt" path.write_text(text, encoding="utf-8") # ───────────────────────────────────────────────────────────────────────────── # WHISPER ENGINE (lazy-loaded, singleton) # ───────────────────────────────────────────────────────────────────────────── _whisper_model = None def get_whisper(): global _whisper_model if _whisper_model is None: from faster_whisper import WhisperModel device = WHISPER_DEVICE if device == "auto": try: import torch device = "cuda" if torch.cuda.is_available() else "cpu" except ImportError: device = "cpu" console.log(f"[cyan]Loading Whisper [{WHISPER_MODEL}] on {device}...[/cyan]") compute = "float16" if device == "cuda" else "int8" _whisper_model = WhisperModel(WHISPER_MODEL, device=device, compute_type=compute, num_workers=WHISPER_THREADS, cpu_threads=WHISPER_THREADS) return _whisper_model def transcribe_audio_file(audio_path: str, lang: str = None) -> str: """Transcribe a local audio file with faster-whisper. Returns full transcript text.""" model = get_whisper() opts = dict(beam_size=5, word_timestamps=False, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500)) if lang: opts["language"] = lang segments, info = model.transcribe(audio_path, **opts) lines = [] for seg in segments: ts = str(timedelta(seconds=int(seg.start))).zfill(8) lines.append(f"[{ts}] {seg.text.strip()}") return "\n".join(lines) # ───────────────────────────────────────────────────────────────────────────── # YOUTUBE / YOUTUBE MUSIC — caption-first, whisper fallback # ───────────────────────────────────────────────────────────────────────────── def fetch_youtube_captions(video_id: str) -> str | None: """Try to get native captions (instant, no download).""" try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) # prefer manual > auto-generated, english first transcript = None for lang in LANG_PREF: try: transcript = transcript_list.find_transcript([lang]) break except Exception: pass if transcript is None: # grab whatever is first transcript = next(iter(transcript_list)) entries = transcript.fetch() lines = [] for e in entries: ts = str(timedelta(seconds=int(e["start"]))).zfill(8) lines.append(f"[{ts}] {e['text'].strip()}") return "\n".join(lines) except (TranscriptsDisabled, NoTranscriptFound): return None except Exception as exc: console.log(f"[yellow]Caption fetch warning: {exc}[/yellow]") return None def download_audio_yt(url: str, out_dir: str) -> str: """Download audio from YouTube/YouTube Music using yt-dlp. Returns file path.""" ydl_opts = { "format": "bestaudio/best", "outtmpl": os.path.join(out_dir, "%(id)s.%(ext)s"), "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": AUDIO_FORMAT, "preferredquality": "128", }], "quiet": True, "no_warnings": True, "concurrent_fragment_downloads": 8, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) video_id = info.get("id", "audio") return os.path.join(out_dir, f"{video_id}.{AUDIO_FORMAT}") def get_video_metadata(url: str) -> dict: """Get title, uploader, duration without downloading.""" ydl_opts = {"quiet": True, "no_warnings": True, "skip_download": True} try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) return { "title": info.get("title", "Unknown"), "uploader": info.get("uploader", "Unknown"), "duration": info.get("duration", 0), "description": info.get("description", ""), "upload_date": info.get("upload_date", ""), } except Exception: return {"title": "Unknown", "uploader": "Unknown", "duration": 0} def transcribe_youtube(url: str, force_whisper: bool = False) -> dict: """Full pipeline for YouTube / YouTube Music.""" video_id = extract_youtube_id(url) or "unknown" meta = get_video_metadata(url) transcript_text = None method = "unknown" if not force_whisper: console.log(f"[cyan]Trying native captions for[/cyan] [bold]{meta['title']}[/bold]") transcript_text = fetch_youtube_captions(video_id) if transcript_text: method = "native_captions" console.log("[green]✓ Got captions instantly (no download needed)[/green]") if transcript_text is None: console.log("[yellow]No captions → downloading audio for Whisper...[/yellow]") with tempfile.TemporaryDirectory() as tmpdir: audio_path = download_audio_yt(url, tmpdir) console.log(f"[cyan]Transcribing with Whisper [{WHISPER_MODEL}]...[/cyan]") transcript_text = transcribe_audio_file(audio_path) method = f"whisper_{WHISPER_MODEL}" return { "url": url, "source": "youtube", "method": method, "meta": meta, "transcript": transcript_text, } # ───────────────────────────────────────────────────────────────────────────── # SPOTIFY # ───────────────────────────────────────────────────────────────────────────── def transcribe_spotify(url: str) -> dict: """Download Spotify track/episode then transcribe.""" sp_type, sp_id = extract_spotify_type(url) # Podcasts/episodes: yt-dlp can handle them sometimes if sp_type == "episode": console.log("[cyan]Spotify episode — trying yt-dlp...[/cyan]") try: with tempfile.TemporaryDirectory() as tmpdir: audio_path = download_audio_yt(url, tmpdir) meta = get_video_metadata(url) transcript_text = transcribe_audio_file(audio_path) return { "url": url, "source": "spotify_episode", "method": f"whisper_{WHISPER_MODEL}", "meta": meta, "transcript": transcript_text, } except Exception as e: console.log(f"[yellow]yt-dlp failed for Spotify episode: {e}[/yellow]") # Music tracks / albums / playlists → spotdl console.log("[cyan]Spotify music — downloading via spotdl...[/cyan]") with tempfile.TemporaryDirectory() as tmpdir: result = subprocess.run( [sys.executable, "-m", "spotdl", url, "--output", tmpdir, "--format", "mp3", "--bitrate", "128k", "--print-errors"], capture_output=True, text=True ) # find downloaded files audio_files = list(Path(tmpdir).glob("*.mp3")) + list(Path(tmpdir).glob("*.m4a")) if not audio_files: raise RuntimeError(f"spotdl produced no files.\n{result.stderr}") transcripts = [] for af in sorted(audio_files): console.log(f"[cyan]Transcribing:[/cyan] {af.name}") t = transcribe_audio_file(str(af)) transcripts.append(f"=== {af.stem} ===\n{t}") return { "url": url, "source": f"spotify_{sp_type}", "method": f"spotdl+whisper_{WHISPER_MODEL}", "meta": {"title": f"Spotify {sp_type.title()}", "uploader": "Spotify"}, "transcript": "\n\n".join(transcripts), } # ───────────────────────────────────────────────────────────────────────────── # DIRECT AUDIO / VIDEO URL # ───────────────────────────────────────────────────────────────────────────── def transcribe_direct_audio(url: str) -> dict: """Download a direct audio/video file and transcribe.""" console.log(f"[cyan]Downloading direct audio:[/cyan] {url}") with tempfile.TemporaryDirectory() as tmpdir: ydl_opts = { "outtmpl": os.path.join(tmpdir, "audio.%(ext)s"), "quiet": True, "no_warnings": True, "concurrent_fragment_downloads": 8, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) title = info.get("title", Path(url).stem) if info else Path(url).stem audio_files = list(Path(tmpdir).iterdir()) if not audio_files: raise RuntimeError("No file downloaded") audio_path = str(audio_files[0]) console.log(f"[cyan]Transcribing:[/cyan] {Path(audio_path).name}") transcript_text = transcribe_audio_file(audio_path) return { "url": url, "source": "audio", "method": f"whisper_{WHISPER_MODEL}", "meta": {"title": title, "uploader": "Direct"}, "transcript": transcript_text, } # ───────────────────────────────────────────────────────────────────────────── # PLAYLIST / BATCH EXPANSION # ───────────────────────────────────────────────────────────────────────────── def expand_playlist(url: str) -> list[str]: """Return list of individual video URLs from a playlist/album/channel.""" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": True, "skip_download": True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if "entries" in info: urls = [] for e in info["entries"]: if e and e.get("url"): urls.append(e["url"]) elif e and e.get("id"): urls.append(f"https://www.youtube.com/watch?v={e['id']}") return urls except Exception as exc: console.log(f"[yellow]Playlist expansion warning: {exc}[/yellow]") return [url] # ───────────────────────────────────────────────────────────────────────────── # MAIN ROUTER # ───────────────────────────────────────────────────────────────────────────── def transcribe_url(url: str, force_whisper: bool = False, use_cache: bool = True) -> dict: """Route URL to the correct transcription pipeline.""" url = url.strip() if use_cache: cached = cache_get(url) if cached: console.log(f"[green]✓ Cache hit:[/green] {url[:60]}") return {"url": url, "source": "cache", "method": "cache", "meta": {"title": "Cached"}, "transcript": cached} source = detect_source(url) console.log(f"[bold blue]Source detected:[/bold blue] {source} → {url[:70]}") if source in ("youtube", "youtube_music"): result = transcribe_youtube(url, force_whisper=force_whisper) elif source == "spotify": result = transcribe_spotify(url) elif source == "audio": result = transcribe_direct_audio(url) else: # Try yt-dlp as generic fallback (handles 1000+ sites) console.log("[yellow]Unknown source — trying yt-dlp generic handler...[/yellow]") result = transcribe_direct_audio(url) if use_cache: cache_set(url, result["transcript"]) return result # ───────────────────────────────────────────────────────────────────────────── # OUTPUT FORMATTING # ───────────────────────────────────────────────────────────────────────────── def format_transcript(result: dict, include_header: bool = True) -> str: meta = result.get("meta", {}) title = meta.get("title", "Unknown") uploader = meta.get("uploader", "Unknown") duration = meta.get("duration", 0) dur_str = str(timedelta(seconds=int(duration))) if duration else "N/A" method = result.get("method", "unknown") url = result.get("url", "") header = "" if include_header: header = ( f"{'='*70}\n" f"TITLE : {title}\n" f"UPLOADER : {uploader}\n" f"DURATION : {dur_str}\n" f"SOURCE : {result.get('source','')}\n" f"METHOD : {method}\n" f"URL : {url}\n" f"{'='*70}\n\n" ) return header + result["transcript"] + "\n" def safe_filename(title: str) -> str: title = re.sub(r'[<>:"/\\|?*]', "_", title) title = title.strip(". ")[:80] return title or "transcript" # ───────────────────────────────────────────────────────────────────────────── # BATCH PROCESSING # ───────────────────────────────────────────────────────────────────────────── def process_batch(urls: list[str], output_dir: Path, force_whisper: bool, use_cache: bool, merge: bool, workers: int): output_dir.mkdir(parents=True, exist_ok=True) results = [] errors = [] console.rule("[bold green]Universal Media Transcriber[/bold green]") console.print(f"[dim]URLs: {len(urls)} | Workers: {workers} | Model: {WHISPER_MODEL}[/dim]\n") def job(url): t0 = time.time() try: r = transcribe_url(url, force_whisper=force_whisper, use_cache=use_cache) r["elapsed"] = round(time.time() - t0, 1) return r except Exception as exc: return {"url": url, "error": str(exc), "elapsed": round(time.time() - t0, 1)} with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Transcribing...", total=len(urls)) with ThreadPoolExecutor(max_workers=workers) as pool: futures = {pool.submit(job, u): u for u in urls} for fut in as_completed(futures): result = fut.result() if "error" in result: errors.append(result) console.log(f"[red]✗ Error:[/red] {result['url'][:60]} → {result['error']}") else: results.append(result) console.log(f"[green]✓[/green] {result['meta'].get('title','?')[:50]} [{result['elapsed']}s]") progress.advance(task) # ── Write files ───────────────────────────────────────────────────────── if merge and results: merged_path = output_dir / "merged_transcript.txt" with open(merged_path, "w", encoding="utf-8") as f: for r in results: f.write(format_transcript(r)) f.write("\n" + "─" * 70 + "\n\n") console.print(f"\n[bold green]✓ Merged transcript:[/bold green] {merged_path}") else: for r in results: title = r["meta"].get("title", "transcript") fname = safe_filename(title) + ".txt" out_path = output_dir / fname # avoid collisions if out_path.exists(): stem = out_path.stem out_path = output_dir / f"{stem}_{cache_key(r['url'])[:6]}.txt" out_path.write_text(format_transcript(r), encoding="utf-8") console.print(f"[green]✓ Saved:[/green] {out_path}") # ── Summary table ──────────────────────────────────────────────────────── table = Table(title="\n Summary", show_lines=True) table.add_column("Title", style="cyan", max_width=40) table.add_column("Method", style="magenta") table.add_column("Time", justify="right") table.add_column("Status", justify="center") for r in results: table.add_row( r["meta"].get("title", "?")[:38], r.get("method", "?"), f"{r['elapsed']}s", "[green]✓[/green]", ) for r in errors: table.add_row(r["url"][:38], "—", f"{r['elapsed']}s", "[red]✗[/red]") console.print(table) console.print(f"\n[bold]Done:[/bold] {len(results)} ok, {len(errors)} failed → [dim]{output_dir}[/dim]") # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── def main(): global WHISPER_MODEL parser = argparse.ArgumentParser( description=" Universal Media Transcriber — YouTube, Spotify, Audio & more", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python transcriber.py https://youtu.be/dQw4w9WgXcQ python transcriber.py URL1 URL2 URL3 --merge python transcriber.py --file urls.txt --output ./transcripts python transcriber.py https://open.spotify.com/track/... --whisper python transcriber.py https://youtu.be/... --model large-v3 python transcriber.py --playlist https://youtube.com/playlist?list=... """ ) parser.add_argument("urls", nargs="*", help="One or more media URLs") parser.add_argument("--file", "-f", help="Text file with one URL per line") parser.add_argument("--output", "-o", default="./transcripts", help="Output directory (default: ./transcripts)") parser.add_argument("--merge", "-m", action="store_true", help="Merge all transcripts into one file") parser.add_argument("--whisper", "-w", action="store_true", help="Force Whisper (skip caption check)") parser.add_argument("--model", default=WHISPER_MODEL, choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"], help="Whisper model size (default: base)") parser.add_argument("--workers", type=int, default=MAX_WORKERS, help="Parallel workers (default: 4)") parser.add_argument("--no-cache", action="store_true", help="Disable transcript cache") parser.add_argument("--playlist", action="store_true", help="Treat URL as playlist — expand all videos") parser.add_argument("--clear-cache", action="store_true", help="Clear the transcript cache and exit") args = parser.parse_args() if args.clear_cache: shutil.rmtree(CACHE_DIR, ignore_errors=True) CACHE_DIR.mkdir(exist_ok=True) console.print("[green]Cache cleared.[/green]") return # Collect URLs all_urls = list(args.urls) if args.file: path = Path(args.file) if not path.exists(): console.print(f"[red]File not found: {path}[/red]") sys.exit(1) lines = path.read_text().splitlines() all_urls += [l.strip() for l in lines if l.strip() and not l.startswith("#")] if not all_urls: parser.print_help() sys.exit(0) # Expand playlists if args.playlist or len(all_urls) == 1: expanded = [] for u in all_urls: exp = expand_playlist(u) if len(exp) > 1: console.log(f"[cyan]Playlist expanded:[/cyan] {len(exp)} items") expanded.extend(exp) all_urls = expanded # Deduplicate preserving order seen = set() deduped = [] for u in all_urls: if u not in seen: seen.add(u) deduped.append(u) all_urls = deduped process_batch( urls=all_urls, output_dir=Path(args.output), force_whisper=args.whisper, use_cache=not args.no_cache, merge=args.merge, wocd rkers=args.workers, ) if __name__ == "__main__": main()