| |
|
| | """
|
| | Universal Media Transcriber
|
| | Supports: YouTube, YouTube Music, Spotify, Direct Audio/Video URLs
|
| | Blazing fast: uses native captions when available, falls back to faster-whisper
|
| | """
|
| |
|
| | import os
|
| | import sys
|
| | import re
|
| | import json
|
| | import time
|
| | import shutil
|
| | import hashlib
|
| | import argparse
|
| | import tempfile
|
| | import subprocess
|
| | from pathlib import Path
|
| | from datetime import timedelta
|
| | from concurrent.futures import ThreadPoolExecutor, as_completed
|
| | from urllib.parse import urlparse, parse_qs
|
| |
|
| |
|
| | script_dir = str(Path(__file__).parent.absolute())
|
| | if script_dir not in os.environ["PATH"]:
|
| | os.environ["PATH"] = script_dir + os.pathsep + os.environ["PATH"]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | REQUIRED = {
|
| | "yt_dlp": "yt-dlp",
|
| | "youtube_transcript_api": "youtube-transcript-api",
|
| | "faster_whisper": "faster-whisper",
|
| | "rich": "rich",
|
| | "spotdl": "spotdl",
|
| | "requests": "requests",
|
| | }
|
| |
|
| | def ensure_deps():
|
| | missing = []
|
| | for module, pkg in REQUIRED.items():
|
| | try:
|
| | __import__(module)
|
| | except ImportError:
|
| | missing.append(pkg)
|
| | if missing:
|
| | print(f"[setup] Installing: {', '.join(missing)} ...")
|
| | subprocess.check_call(
|
| | [sys.executable, "-m", "pip", "install", "--quiet", "--break-system-packages"] + missing
|
| | )
|
| | print("[setup] Done. Reloading...\n")
|
| |
|
| | ensure_deps()
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import yt_dlp
|
| | import requests
|
| | from rich.console import Console
|
| | from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
|
| | from rich.panel import Panel
|
| | from rich.table import Table
|
| | from rich import print as rprint
|
| | from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
|
| |
|
| | console = Console()
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | WHISPER_MODEL = "base"
|
| | WHISPER_DEVICE = "auto"
|
| | WHISPER_THREADS = os.cpu_count()
|
| | AUDIO_FORMAT = "mp3"
|
| | MAX_WORKERS = 4
|
| | CACHE_DIR = Path.home() / ".transcriber_cache"
|
| | CACHE_DIR.mkdir(exist_ok=True)
|
| |
|
| |
|
| | LANG_PREF = ["en", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "en-IE", "en-NZ", "en-PH", "en-ZA", "en-orig", "a.en"]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def detect_source(url: str) -> str:
|
| | """Returns: youtube | youtube_music | spotify | audio | unknown"""
|
| | parsed = urlparse(url)
|
| | host = parsed.netloc.lower().replace("www.", "")
|
| |
|
| | if host in ("youtube.com", "youtu.be", "m.youtube.com"):
|
| | return "youtube"
|
| | if host in ("music.youtube.com",):
|
| | return "youtube_music"
|
| | if host in ("open.spotify.com", "spotify.com"):
|
| | return "spotify"
|
| | if any(url.lower().endswith(ext) for ext in [
|
| | ".mp3", ".mp4", ".wav", ".ogg", ".flac", ".m4a", ".webm",
|
| | ".aac", ".opus", ".mkv", ".avi", ".mov"
|
| | ]):
|
| | return "audio"
|
| |
|
| | try:
|
| | r = requests.head(url, timeout=5, allow_redirects=True)
|
| | ct = r.headers.get("content-type", "")
|
| | if "audio" in ct or "video" in ct:
|
| | return "audio"
|
| | except Exception:
|
| | pass
|
| | return "unknown"
|
| |
|
| |
|
| | def extract_youtube_id(url: str) -> str | None:
|
| | """Extract video ID from any YouTube URL format."""
|
| | patterns = [
|
| | r"(?:v=|youtu\.be/|embed/|shorts/)([A-Za-z0-9_-]{11})",
|
| | ]
|
| | for p in patterns:
|
| | m = re.search(p, url)
|
| | if m:
|
| | return m.group(1)
|
| | return None
|
| |
|
| |
|
| | def extract_spotify_type(url: str) -> tuple[str, str]:
|
| | """Returns (type, id) e.g. ('track', 'abc123')"""
|
| | m = re.search(r"spotify\.com/(track|album|playlist|episode|show)/([A-Za-z0-9]+)", url)
|
| | if m:
|
| | return m.group(1), m.group(2)
|
| | return "unknown", ""
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def cache_key(url: str) -> str:
|
| | return hashlib.md5(url.encode()).hexdigest()
|
| |
|
| | def cache_get(url: str) -> str | None:
|
| | path = CACHE_DIR / f"{cache_key(url)}.txt"
|
| | if path.exists():
|
| | return path.read_text(encoding="utf-8")
|
| | return None
|
| |
|
| | def cache_set(url: str, text: str):
|
| | path = CACHE_DIR / f"{cache_key(url)}.txt"
|
| | path.write_text(text, encoding="utf-8")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | _whisper_model = None
|
| |
|
| | def get_whisper():
|
| | global _whisper_model
|
| | if _whisper_model is None:
|
| | from faster_whisper import WhisperModel
|
| | device = WHISPER_DEVICE
|
| | if device == "auto":
|
| | try:
|
| | import torch
|
| | device = "cuda" if torch.cuda.is_available() else "cpu"
|
| | except ImportError:
|
| | device = "cpu"
|
| | console.log(f"[cyan]Loading Whisper [{WHISPER_MODEL}] on {device}...[/cyan]")
|
| | compute = "float16" if device == "cuda" else "int8"
|
| | _whisper_model = WhisperModel(WHISPER_MODEL, device=device, compute_type=compute,
|
| | num_workers=WHISPER_THREADS, cpu_threads=WHISPER_THREADS)
|
| | return _whisper_model
|
| |
|
| |
|
| | def transcribe_audio_file(audio_path: str, lang: str = None) -> str:
|
| | """Transcribe a local audio file with faster-whisper. Returns full transcript text."""
|
| | model = get_whisper()
|
| | opts = dict(beam_size=5, word_timestamps=False, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500))
|
| | if lang:
|
| | opts["language"] = lang
|
| | segments, info = model.transcribe(audio_path, **opts)
|
| | lines = []
|
| | for seg in segments:
|
| | ts = str(timedelta(seconds=int(seg.start))).zfill(8)
|
| | lines.append(f"[{ts}] {seg.text.strip()}")
|
| | return "\n".join(lines)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def fetch_youtube_captions(video_id: str) -> str | None:
|
| | """Try to get native captions (instant, no download)."""
|
| | try:
|
| | transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
| |
|
| | transcript = None
|
| | for lang in LANG_PREF:
|
| | try:
|
| | transcript = transcript_list.find_transcript([lang])
|
| | break
|
| | except Exception:
|
| | pass
|
| | if transcript is None:
|
| |
|
| | transcript = next(iter(transcript_list))
|
| | entries = transcript.fetch()
|
| | lines = []
|
| | for e in entries:
|
| | ts = str(timedelta(seconds=int(e["start"]))).zfill(8)
|
| | lines.append(f"[{ts}] {e['text'].strip()}")
|
| | return "\n".join(lines)
|
| | except (TranscriptsDisabled, NoTranscriptFound):
|
| | return None
|
| | except Exception as exc:
|
| | console.log(f"[yellow]Caption fetch warning: {exc}[/yellow]")
|
| | return None
|
| |
|
| |
|
| | def download_audio_yt(url: str, out_dir: str) -> str:
|
| | """Download audio from YouTube/YouTube Music using yt-dlp. Returns file path."""
|
| | ydl_opts = {
|
| | "format": "bestaudio/best",
|
| | "outtmpl": os.path.join(out_dir, "%(id)s.%(ext)s"),
|
| | "postprocessors": [{
|
| | "key": "FFmpegExtractAudio",
|
| | "preferredcodec": AUDIO_FORMAT,
|
| | "preferredquality": "128",
|
| | }],
|
| | "quiet": True,
|
| | "no_warnings": True,
|
| | "concurrent_fragment_downloads": 8,
|
| | }
|
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| | info = ydl.extract_info(url, download=True)
|
| | video_id = info.get("id", "audio")
|
| | return os.path.join(out_dir, f"{video_id}.{AUDIO_FORMAT}")
|
| |
|
| |
|
| | def get_video_metadata(url: str) -> dict:
|
| | """Get title, uploader, duration without downloading."""
|
| | ydl_opts = {"quiet": True, "no_warnings": True, "skip_download": True}
|
| | try:
|
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| | info = ydl.extract_info(url, download=False)
|
| | return {
|
| | "title": info.get("title", "Unknown"),
|
| | "uploader": info.get("uploader", "Unknown"),
|
| | "duration": info.get("duration", 0),
|
| | "description": info.get("description", ""),
|
| | "upload_date": info.get("upload_date", ""),
|
| | }
|
| | except Exception:
|
| | return {"title": "Unknown", "uploader": "Unknown", "duration": 0}
|
| |
|
| |
|
| | def transcribe_youtube(url: str, force_whisper: bool = False) -> dict:
|
| | """Full pipeline for YouTube / YouTube Music."""
|
| | video_id = extract_youtube_id(url) or "unknown"
|
| | meta = get_video_metadata(url)
|
| |
|
| | transcript_text = None
|
| | method = "unknown"
|
| |
|
| | if not force_whisper:
|
| | console.log(f"[cyan]Trying native captions for[/cyan] [bold]{meta['title']}[/bold]")
|
| | transcript_text = fetch_youtube_captions(video_id)
|
| | if transcript_text:
|
| | method = "native_captions"
|
| | console.log("[green]β Got captions instantly (no download needed)[/green]")
|
| |
|
| | if transcript_text is None:
|
| | console.log("[yellow]No captions β downloading audio for Whisper...[/yellow]")
|
| | with tempfile.TemporaryDirectory() as tmpdir:
|
| | audio_path = download_audio_yt(url, tmpdir)
|
| | console.log(f"[cyan]Transcribing with Whisper [{WHISPER_MODEL}]...[/cyan]")
|
| | transcript_text = transcribe_audio_file(audio_path)
|
| | method = f"whisper_{WHISPER_MODEL}"
|
| |
|
| | return {
|
| | "url": url,
|
| | "source": "youtube",
|
| | "method": method,
|
| | "meta": meta,
|
| | "transcript": transcript_text,
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def transcribe_spotify(url: str) -> dict:
|
| | """Download Spotify track/episode then transcribe."""
|
| | sp_type, sp_id = extract_spotify_type(url)
|
| |
|
| |
|
| | if sp_type == "episode":
|
| | console.log("[cyan]Spotify episode β trying yt-dlp...[/cyan]")
|
| | try:
|
| | with tempfile.TemporaryDirectory() as tmpdir:
|
| | audio_path = download_audio_yt(url, tmpdir)
|
| | meta = get_video_metadata(url)
|
| | transcript_text = transcribe_audio_file(audio_path)
|
| | return {
|
| | "url": url,
|
| | "source": "spotify_episode",
|
| | "method": f"whisper_{WHISPER_MODEL}",
|
| | "meta": meta,
|
| | "transcript": transcript_text,
|
| | }
|
| | except Exception as e:
|
| | console.log(f"[yellow]yt-dlp failed for Spotify episode: {e}[/yellow]")
|
| |
|
| |
|
| | console.log("[cyan]Spotify music β downloading via spotdl...[/cyan]")
|
| | with tempfile.TemporaryDirectory() as tmpdir:
|
| | result = subprocess.run(
|
| | [sys.executable, "-m", "spotdl", url, "--output", tmpdir,
|
| | "--format", "mp3", "--bitrate", "128k", "--print-errors"],
|
| | capture_output=True, text=True
|
| | )
|
| |
|
| | audio_files = list(Path(tmpdir).glob("*.mp3")) + list(Path(tmpdir).glob("*.m4a"))
|
| | if not audio_files:
|
| | raise RuntimeError(f"spotdl produced no files.\n{result.stderr}")
|
| |
|
| | transcripts = []
|
| | for af in sorted(audio_files):
|
| | console.log(f"[cyan]Transcribing:[/cyan] {af.name}")
|
| | t = transcribe_audio_file(str(af))
|
| | transcripts.append(f"=== {af.stem} ===\n{t}")
|
| |
|
| | return {
|
| | "url": url,
|
| | "source": f"spotify_{sp_type}",
|
| | "method": f"spotdl+whisper_{WHISPER_MODEL}",
|
| | "meta": {"title": f"Spotify {sp_type.title()}", "uploader": "Spotify"},
|
| | "transcript": "\n\n".join(transcripts),
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def transcribe_direct_audio(url: str) -> dict:
|
| | """Download a direct audio/video file and transcribe."""
|
| | console.log(f"[cyan]Downloading direct audio:[/cyan] {url}")
|
| | with tempfile.TemporaryDirectory() as tmpdir:
|
| | ydl_opts = {
|
| | "outtmpl": os.path.join(tmpdir, "audio.%(ext)s"),
|
| | "quiet": True,
|
| | "no_warnings": True,
|
| | "concurrent_fragment_downloads": 8,
|
| | }
|
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| | info = ydl.extract_info(url, download=True)
|
| | title = info.get("title", Path(url).stem) if info else Path(url).stem
|
| |
|
| | audio_files = list(Path(tmpdir).iterdir())
|
| | if not audio_files:
|
| | raise RuntimeError("No file downloaded")
|
| | audio_path = str(audio_files[0])
|
| | console.log(f"[cyan]Transcribing:[/cyan] {Path(audio_path).name}")
|
| | transcript_text = transcribe_audio_file(audio_path)
|
| |
|
| | return {
|
| | "url": url,
|
| | "source": "audio",
|
| | "method": f"whisper_{WHISPER_MODEL}",
|
| | "meta": {"title": title, "uploader": "Direct"},
|
| | "transcript": transcript_text,
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def expand_playlist(url: str) -> list[str]:
|
| | """Return list of individual video URLs from a playlist/album/channel."""
|
| | ydl_opts = {
|
| | "quiet": True,
|
| | "no_warnings": True,
|
| | "extract_flat": True,
|
| | "skip_download": True,
|
| | }
|
| | try:
|
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| | info = ydl.extract_info(url, download=False)
|
| | if "entries" in info:
|
| | urls = []
|
| | for e in info["entries"]:
|
| | if e and e.get("url"):
|
| | urls.append(e["url"])
|
| | elif e and e.get("id"):
|
| | urls.append(f"https://www.youtube.com/watch?v={e['id']}")
|
| | return urls
|
| | except Exception as exc:
|
| | console.log(f"[yellow]Playlist expansion warning: {exc}[/yellow]")
|
| | return [url]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def transcribe_url(url: str, force_whisper: bool = False, use_cache: bool = True) -> dict:
|
| | """Route URL to the correct transcription pipeline."""
|
| | url = url.strip()
|
| |
|
| | if use_cache:
|
| | cached = cache_get(url)
|
| | if cached:
|
| | console.log(f"[green]β Cache hit:[/green] {url[:60]}")
|
| | return {"url": url, "source": "cache", "method": "cache",
|
| | "meta": {"title": "Cached"}, "transcript": cached}
|
| |
|
| | source = detect_source(url)
|
| | console.log(f"[bold blue]Source detected:[/bold blue] {source} β {url[:70]}")
|
| |
|
| | if source in ("youtube", "youtube_music"):
|
| | result = transcribe_youtube(url, force_whisper=force_whisper)
|
| | elif source == "spotify":
|
| | result = transcribe_spotify(url)
|
| | elif source == "audio":
|
| | result = transcribe_direct_audio(url)
|
| | else:
|
| |
|
| | console.log("[yellow]Unknown source β trying yt-dlp generic handler...[/yellow]")
|
| | result = transcribe_direct_audio(url)
|
| |
|
| | if use_cache:
|
| | cache_set(url, result["transcript"])
|
| |
|
| | return result
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def format_transcript(result: dict, include_header: bool = True) -> str:
|
| | meta = result.get("meta", {})
|
| | title = meta.get("title", "Unknown")
|
| | uploader = meta.get("uploader", "Unknown")
|
| | duration = meta.get("duration", 0)
|
| | dur_str = str(timedelta(seconds=int(duration))) if duration else "N/A"
|
| | method = result.get("method", "unknown")
|
| | url = result.get("url", "")
|
| |
|
| | header = ""
|
| | if include_header:
|
| | header = (
|
| | f"{'='*70}\n"
|
| | f"TITLE : {title}\n"
|
| | f"UPLOADER : {uploader}\n"
|
| | f"DURATION : {dur_str}\n"
|
| | f"SOURCE : {result.get('source','')}\n"
|
| | f"METHOD : {method}\n"
|
| | f"URL : {url}\n"
|
| | f"{'='*70}\n\n"
|
| | )
|
| |
|
| | return header + result["transcript"] + "\n"
|
| |
|
| |
|
| | def safe_filename(title: str) -> str:
|
| | title = re.sub(r'[<>:"/\\|?*]', "_", title)
|
| | title = title.strip(". ")[:80]
|
| | return title or "transcript"
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def process_batch(urls: list[str], output_dir: Path, force_whisper: bool,
|
| | use_cache: bool, merge: bool, workers: int):
|
| | output_dir.mkdir(parents=True, exist_ok=True)
|
| | results = []
|
| | errors = []
|
| |
|
| | console.rule("[bold green]Universal Media Transcriber[/bold green]")
|
| | console.print(f"[dim]URLs: {len(urls)} | Workers: {workers} | Model: {WHISPER_MODEL}[/dim]\n")
|
| |
|
| | def job(url):
|
| | t0 = time.time()
|
| | try:
|
| | r = transcribe_url(url, force_whisper=force_whisper, use_cache=use_cache)
|
| | r["elapsed"] = round(time.time() - t0, 1)
|
| | return r
|
| | except Exception as exc:
|
| | return {"url": url, "error": str(exc), "elapsed": round(time.time() - t0, 1)}
|
| |
|
| | with Progress(
|
| | SpinnerColumn(),
|
| | TextColumn("[progress.description]{task.description}"),
|
| | BarColumn(),
|
| | TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
| | TimeElapsedColumn(),
|
| | console=console,
|
| | ) as progress:
|
| | task = progress.add_task("Transcribing...", total=len(urls))
|
| | with ThreadPoolExecutor(max_workers=workers) as pool:
|
| | futures = {pool.submit(job, u): u for u in urls}
|
| | for fut in as_completed(futures):
|
| | result = fut.result()
|
| | if "error" in result:
|
| | errors.append(result)
|
| | console.log(f"[red]β Error:[/red] {result['url'][:60]} β {result['error']}")
|
| | else:
|
| | results.append(result)
|
| | console.log(f"[green]β[/green] {result['meta'].get('title','?')[:50]} [{result['elapsed']}s]")
|
| | progress.advance(task)
|
| |
|
| |
|
| | if merge and results:
|
| | merged_path = output_dir / "merged_transcript.txt"
|
| | with open(merged_path, "w", encoding="utf-8") as f:
|
| | for r in results:
|
| | f.write(format_transcript(r))
|
| | f.write("\n" + "β" * 70 + "\n\n")
|
| | console.print(f"\n[bold green]β Merged transcript:[/bold green] {merged_path}")
|
| | else:
|
| | for r in results:
|
| | title = r["meta"].get("title", "transcript")
|
| | fname = safe_filename(title) + ".txt"
|
| | out_path = output_dir / fname
|
| |
|
| | if out_path.exists():
|
| | stem = out_path.stem
|
| | out_path = output_dir / f"{stem}_{cache_key(r['url'])[:6]}.txt"
|
| | out_path.write_text(format_transcript(r), encoding="utf-8")
|
| | console.print(f"[green]β Saved:[/green] {out_path}")
|
| |
|
| |
|
| | table = Table(title="\n Summary", show_lines=True)
|
| | table.add_column("Title", style="cyan", max_width=40)
|
| | table.add_column("Method", style="magenta")
|
| | table.add_column("Time", justify="right")
|
| | table.add_column("Status", justify="center")
|
| |
|
| | for r in results:
|
| | table.add_row(
|
| | r["meta"].get("title", "?")[:38],
|
| | r.get("method", "?"),
|
| | f"{r['elapsed']}s",
|
| | "[green]β[/green]",
|
| | )
|
| | for r in errors:
|
| | table.add_row(r["url"][:38], "β", f"{r['elapsed']}s", "[red]β[/red]")
|
| |
|
| | console.print(table)
|
| | console.print(f"\n[bold]Done:[/bold] {len(results)} ok, {len(errors)} failed β [dim]{output_dir}[/dim]")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def main():
|
| | global WHISPER_MODEL
|
| | parser = argparse.ArgumentParser(
|
| | description=" Universal Media Transcriber β YouTube, Spotify, Audio & more",
|
| | formatter_class=argparse.RawDescriptionHelpFormatter,
|
| | epilog="""
|
| | Examples:
|
| | python transcriber.py https://youtu.be/dQw4w9WgXcQ
|
| | python transcriber.py URL1 URL2 URL3 --merge
|
| | python transcriber.py --file urls.txt --output ./transcripts
|
| | python transcriber.py https://open.spotify.com/track/... --whisper
|
| | python transcriber.py https://youtu.be/... --model large-v3
|
| | python transcriber.py --playlist https://youtube.com/playlist?list=...
|
| | """
|
| | )
|
| | parser.add_argument("urls", nargs="*", help="One or more media URLs")
|
| | parser.add_argument("--file", "-f", help="Text file with one URL per line")
|
| | parser.add_argument("--output", "-o", default="./transcripts", help="Output directory (default: ./transcripts)")
|
| | parser.add_argument("--merge", "-m", action="store_true", help="Merge all transcripts into one file")
|
| | parser.add_argument("--whisper", "-w", action="store_true", help="Force Whisper (skip caption check)")
|
| | parser.add_argument("--model", default=WHISPER_MODEL,
|
| | choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"],
|
| | help="Whisper model size (default: base)")
|
| | parser.add_argument("--workers", type=int, default=MAX_WORKERS, help="Parallel workers (default: 4)")
|
| | parser.add_argument("--no-cache", action="store_true", help="Disable transcript cache")
|
| | parser.add_argument("--playlist", action="store_true", help="Treat URL as playlist β expand all videos")
|
| | parser.add_argument("--clear-cache", action="store_true", help="Clear the transcript cache and exit")
|
| |
|
| | args = parser.parse_args()
|
| |
|
| | if args.clear_cache:
|
| | shutil.rmtree(CACHE_DIR, ignore_errors=True)
|
| | CACHE_DIR.mkdir(exist_ok=True)
|
| | console.print("[green]Cache cleared.[/green]")
|
| | return
|
| |
|
| |
|
| | all_urls = list(args.urls)
|
| | if args.file:
|
| | path = Path(args.file)
|
| | if not path.exists():
|
| | console.print(f"[red]File not found: {path}[/red]")
|
| | sys.exit(1)
|
| | lines = path.read_text().splitlines()
|
| | all_urls += [l.strip() for l in lines if l.strip() and not l.startswith("#")]
|
| |
|
| | if not all_urls:
|
| | parser.print_help()
|
| | sys.exit(0)
|
| |
|
| |
|
| | if args.playlist or len(all_urls) == 1:
|
| | expanded = []
|
| | for u in all_urls:
|
| | exp = expand_playlist(u)
|
| | if len(exp) > 1:
|
| | console.log(f"[cyan]Playlist expanded:[/cyan] {len(exp)} items")
|
| | expanded.extend(exp)
|
| | all_urls = expanded
|
| |
|
| |
|
| | seen = set()
|
| | deduped = []
|
| | for u in all_urls:
|
| | if u not in seen:
|
| | seen.add(u)
|
| | deduped.append(u)
|
| | all_urls = deduped
|
| |
|
| | process_batch(
|
| | urls=all_urls,
|
| | output_dir=Path(args.output),
|
| | force_whisper=args.whisper,
|
| | use_cache=not args.no_cache,
|
| | merge=args.merge,
|
| | wocd rkers=args.workers,
|
| | )
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | main() |