#!/usr/bin/env python3 from __future__ import annotations import os import re import sys import json import logging import time from typing import Optional, List from urllib.parse import urlparse, parse_qs from google import genai # pip install google-genai from google.genai import types from youtube_transcript_api import ( YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, ) # ============================================================================ # CONFIG # ============================================================================ GEMINI_KEY = "AIzaSyCNz5wQAyJ65kNRkwr0-1A-_Z6-lQzdcyc" # ── API Keys ──────────────────────────────────────────────────────────────── GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", GEMINI_KEY) YT_API_KEY = os.getenv("YT_API_KEY", "AIzaSyASnhRpV-YQQb4xvoggWIEm8nvrujerEos") GEMINI_MODELS = [ "gemini-2.5-flash", "gemini-2.5-flash-lite", "gemini-2.5-pro", ] # ── FIX: Use a LIST of dicts, not a dict. # A plain dict with duplicate keys like "attempt_3" silently drops all # but the last definition, collapsing 13 intended attempts down to 4. # A list preserves every entry in order. def _polling_attempt(wait_before: int, description: str) -> dict: return {"wait_before": wait_before, "description": description} POLLING_CONFIG: list[dict] = [ _polling_attempt(0, "Immediate attempt on trigger"), _polling_attempt(300, "Retry after 5 minutes"), _polling_attempt(900, "Retry after 15 minutes (30 min total)"), _polling_attempt(900, "Retry after 15 minutes (45 min total)"), _polling_attempt(900, "Retry after 15 minutes (1 hr total)"), _polling_attempt(900, "Retry after 15 minutes (1 hr 15 min total)"), _polling_attempt(900, "Retry after 15 minutes (1 hr 30 min total)"), _polling_attempt(900, "Retry after 15 minutes (1 hr 45 min total)"), _polling_attempt(900, "Retry after 15 minutes (2 hr total)"), _polling_attempt(900, "Retry after 15 minutes (2 hr 15 min total)"), _polling_attempt(900, "Retry after 15 minutes (2 hr 30 min total)"), _polling_attempt(900, "Retry after 15 minutes (2 hr 45 min total)"), _polling_attempt(900, "Retry after 15 minutes (3 hr total)"), ] SYSTEM_PROMPT = """ You are an expert content summarizer and educator. Produce the full output containing exactly two parts separated by a line with only 5 exclamation marks: !!!!! --- PART 1: SUMMARY --- Write a detailed, well-structured summary of the entire content. Use the following structure: ## Overview A 3-5 sentence high-level overview of the entire content. ## Key Topics Covered List the main topics discussed, each with a brief explanation. ## Detailed Summary A thorough section-by-section breakdown of the content in the order it was presented. Use subheadings for each major section or topic shift. ## Key Takeaways A bullet list of the most important insights, facts, or conclusions from the content. --- !!!!! --- PART 2: Q&A --- Generate a comprehensive Q&A section based on the content. Format each entry exactly like this: Q1: [First question] Answer: [Detailed answer] Q2: [Second question] Answer: [Detailed answer] Q3: [Third question] Answer: [Detailed answer] ... and so on until all important questions are covered. Rules: - Number every question and answer with matching numbers (Q1/A1, Q2/A2, etc.) - Each answer must be detailed and self-contained - Cover all major topics, concepts, facts, and insights from the content - Minimum 10 Q&A pairs, more if the content is rich - Do NOT use bullet points inside answers — write in full sentences --- """ # ============================================================================ # LOGGING # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", ) logger = logging.getLogger("gemini_pipeline") # ============================================================================ # HELPERS # ============================================================================ def _format_duration(seconds: int) -> str: if seconds < 60: return f"{seconds}s" if seconds < 3600: return f"{seconds // 60}m" h = seconds // 3600 m = (seconds % 3600) // 60 return f"{h}h {m}m" if m else f"{h}h" # ============================================================================ # SUBTITLE PARSERS # ============================================================================ def _parse_vtt(content: str) -> str: """ Parse WebVTT subtitle content into clean plain text. Strips headers, timestamps, position metadata, and deduplicates consecutive identical lines (VTT scrolling captions repeat text). """ lines = content.splitlines() text_lines: list[str] = [] prev_line = "" for line in lines: stripped = line.strip() if not stripped: continue if stripped.startswith("WEBVTT"): continue if re.match(r"^(Kind:|Language:|Style|NOTE)", stripped, re.IGNORECASE): continue if re.match(r"^\d{2}:\d{2}[:\.]\d{2}[\.:]\d{3}\s*-->\s*\d{2}:\d{2}", stripped): continue if re.match(r"^(position:|align:|line:|size:)", stripped, re.IGNORECASE): continue if stripped.isdigit(): continue cleaned = re.sub(r"<[^>]+>", "", stripped).strip() if not cleaned: continue if cleaned != prev_line: text_lines.append(cleaned) prev_line = cleaned return " ".join(text_lines) def _parse_srt(content: str) -> str: """ Parse SRT subtitle content into clean plain text. Strips sequence numbers and timing lines. """ lines = content.splitlines() text_lines: list[str] = [] for line in lines: stripped = line.strip() if not stripped: continue if stripped.isdigit(): continue if re.match(r"^\d{2}:\d{2}:\d{2}[,.]\d{3}\s*-->\s*\d{2}:\d{2}", stripped): continue cleaned = re.sub(r"<[^>]+>", "", stripped).strip() if cleaned: text_lines.append(cleaned) return " ".join(text_lines) def fetch_video_title(video_id: str) -> str: """Fetch YouTube video title via oembed — no API key needed.""" try: import urllib.request url = ( f"https://www.youtube.com/oembed" f"?url=https://www.youtube.com/watch?v={video_id}&format=json" ) with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read().decode()) title = data.get("title", "") safe = re.sub(r'[\\/*?:"<>|]', "", title) safe = re.sub(r"\s+", "_", safe.strip()) return safe[:80] or video_id except Exception: return video_id # ============================================================================ # YOUTUBE TRANSCRIPT FETCHER # ============================================================================ class YouTubeTranscriptFetcher: """ Fetches a YouTube transcript using a multi-tier fallback strategy: Tier 1: youtube_transcript_api (fast, works for most public videos) Tier 2: yt-dlp (robust, handles auto-generated + manual subs) Tier 3: YouTube Data API v3 (only for videos the user owns) Returns (transcript_text, extraction_method) tuple. """ def __init__( self, youtube_url: str, languages: Optional[List[str]] = None, polling_config: list[dict] = None, # ← list, not dict google_creds = None, ): self.youtube_url = youtube_url self.languages = languages or ["en", "en-US", "en-GB"] self.polling_config = polling_config if polling_config is not None else POLLING_CONFIG self.video_id = self._extract_video_id(youtube_url) self.api = YouTubeTranscriptApi() self.google_creds = google_creds @staticmethod def _extract_video_id(url: str) -> str: parsed = urlparse(url) if parsed.hostname == "youtu.be": return parsed.path.lstrip("/").split("?")[0] if parsed.hostname in ("youtube.com", "www.youtube.com", "m.youtube.com"): path_parts = parsed.path.strip("/").split("/") if path_parts[0] in ("live", "shorts", "embed") and len(path_parts) >= 2: return path_parts[1].split("?")[0] params = parse_qs(parsed.query) if "v" in params: return params["v"][0] raise ValueError(f"Could not extract video ID from URL: {url}") raise ValueError(f"Unsupported YouTube URL: {url}") def _fetch_once(self) -> str: try: transcript = self.api.fetch(self.video_id, languages=self.languages) except NoTranscriptFound: logger.info("[Tier 1] Requested languages %s not found. Finding first available transcript...", self.languages) transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id) first_transcript = next(iter(transcript_list)) logger.info("[Tier 1] Falling back to language: %s", first_transcript.language_code) transcript = first_transcript.fetch() return " ".join(item.text for item in transcript) def _try_all_tiers(self) -> tuple[str, str]: """ Try all transcript extraction tiers in order. Returns (transcript_text, method_used) on first success. Raises RuntimeError if all tiers fail. """ errors: list[str] = [] # ── Tier 1: youtube_transcript_api ── try: text = self._fetch_once() logger.info("[Tier 1] ✅ youtube_transcript_api succeeded — %d chars", len(text)) return text, "youtube_transcript_api" except TranscriptsDisabled as e: errors.append(f"Tier1(TranscriptsDisabled): {e}") logger.warning("[Tier 1] Transcripts disabled, trying fallbacks...") except Exception as e: errors.append(f"Tier1: {e}") logger.warning("[Tier 1] Failed: %s", e) # ── Tier 2: yt-dlp ── try: text = YtDlpTranscriptFetcher( self.video_id, languages=self.languages ).fetch() logger.info("[Tier 2] ✅ yt-dlp succeeded — %d chars", len(text)) return text, "yt-dlp" except Exception as e: errors.append(f"Tier2(yt-dlp): {e}") logger.warning("[Tier 2] Failed: %s", e) # ── Tier 3: YouTube Data API v3 (owned videos only) ── if self.google_creds: try: text = YouTubeApiTranscriptFetcher( self.video_id, self.google_creds, languages=self.languages ).fetch() logger.info("[Tier 3] ✅ YouTube Data API v3 succeeded — %d chars", len(text)) return text, "youtube_data_api_v3" except Exception as e: errors.append(f"Tier3(YT-API): {e}") logger.warning("[Tier 3] Failed: %s", e) else: errors.append("Tier3: Skipped (no OAuth credentials)") logger.info("[Tier 3] Skipped — no Google OAuth credentials provided.") raise RuntimeError( f"All transcript tiers failed for video {self.video_id}. " f"Details: {'; '.join(errors)}" ) def run(self) -> tuple[str, str]: """ Fetch transcript with polling retry and multi-tier fallback. On each polling attempt, all tiers are tried before waiting. Returns (transcript_text, extraction_method). """ # ── FIX: polling_config is now a list, so len() and enumeration work correctly. attempts = self.polling_config total = len(attempts) logger.info("Video ID : %s", self.video_id) logger.info("Polling attempts : %d", total) for idx, config in enumerate(attempts, start=1): wait_before = config["wait_before"] description = config["description"] if wait_before > 0: logger.info( "[%d/%d] %s — waiting %s before retry...", idx, total, description, _format_duration(wait_before), ) time.sleep(wait_before) logger.info( "[%d/%d] %s — trying all transcript tiers...", idx, total, description, ) try: text, method = self._try_all_tiers() logger.info( "[%d/%d] ✅ Transcript fetched via %s — %d characters", idx, total, method, len(text), ) return text, method except KeyboardInterrupt: logger.warning("Interrupted by user.") raise except Exception as e: logger.warning("[%d/%d] All tiers failed: %s", idx, total, e) if idx < total: next_cfg = attempts[idx] # idx is 1-based, list is 0-based → next item logger.info( "[%d/%d] Will retry in %s (%s)", idx, total, _format_duration(next_cfg["wait_before"]), next_cfg["description"], ) else: logger.error("All %d polling attempts exhausted.", total) raise RuntimeError( f"Transcript not available after {total} attempts (~3 hours). " f"Video ID: {self.video_id}" ) # ============================================================================ # TIER 2 — yt-dlp PYTHON API SUBTITLE FETCHER # ============================================================================ class YtDlpTranscriptFetcher: """ Tier 2 fallback — uses yt-dlp's Python API (no subprocess). Extracts subtitle URLs from video metadata via extract_info(), then fetches content in-memory via HTTP. Handles both manual and auto-generated captions. """ PREFERRED_FORMATS = ["vtt", "srt", "srv1", "srv2", "srv3", "ttml"] def __init__(self, video_id: str, languages: Optional[List[str]] = None): self.video_id = video_id self.languages = languages or ["en", "en-US", "en-GB"] def _find_subtitle_url(self, manual_subs: dict, auto_subs: dict) -> tuple[str, str]: for subs_dict in (manual_subs, auto_subs): if not subs_dict: continue for lang in self.languages: if lang not in subs_dict: continue tracks = subs_dict[lang] if not tracks: continue for fmt in self.PREFERRED_FORMATS: for track in tracks: if track.get("ext") == fmt and track.get("url"): return track["url"], fmt for track in tracks: if track.get("url"): return track["url"], track.get("ext", "vtt") logger.info("[yt-dlp] Preferred languages %s not found. Falling back to any available language.", self.languages) for subs_dict in (manual_subs, auto_subs): if not subs_dict: continue for fmt in self.PREFERRED_FORMATS: for lang, tracks in subs_dict.items(): if not tracks: continue for track in tracks: if track.get("ext") == fmt and track.get("url"): logger.info("[yt-dlp] Falling back to language: %s", lang) return track["url"], fmt for lang, tracks in subs_dict.items(): if not tracks: continue for track in tracks: if track.get("url"): logger.info("[yt-dlp] Falling back to language: %s", lang) return track["url"], track.get("ext", "vtt") raise RuntimeError( f"No subtitles found in yt-dlp metadata for video: {self.video_id}" ) def fetch(self) -> str: import yt_dlp import requests as _requests logger.info("[yt-dlp] Attempting in-memory subtitle extraction for %s", self.video_id) url = f"https://www.youtube.com/watch?v={self.video_id}" ydl_opts = { "skip_download": True, "quiet": True, "no_warnings": True, "noplaylist": True, "extract_flat": False, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) except Exception as e: raise RuntimeError(f"yt-dlp extract_info failed: {e}") if not info: raise RuntimeError("yt-dlp returned empty info dict.") manual_subs = info.get("subtitles") or {} auto_subs = info.get("automatic_captions") or {} logger.info( "[yt-dlp] Found %d manual sub tracks, %d auto-caption tracks", len(manual_subs), len(auto_subs), ) sub_url, sub_fmt = self._find_subtitle_url(manual_subs, auto_subs) logger.info("[yt-dlp] Fetching subtitle content (format=%s)", sub_fmt) try: resp = _requests.get(sub_url, timeout=30) resp.raise_for_status() raw_content = resp.text except Exception as e: raise RuntimeError(f"Failed to fetch subtitle from URL: {e}") if not raw_content.strip(): raise RuntimeError("Subtitle URL returned empty content.") if sub_fmt in ("vtt",): text = _parse_vtt(raw_content) elif sub_fmt in ("srt",): text = _parse_srt(raw_content) else: text = re.sub(r"<[^>]+>", "", raw_content) text = re.sub(r"\s+", " ", text).strip() if not text.strip(): raise RuntimeError( f"yt-dlp subtitle content was empty after parsing (format={sub_fmt})." ) logger.info( "[yt-dlp] ✅ Transcript extracted — %d characters (format=%s)", len(text), sub_fmt, ) return text # ============================================================================ # TIER 3 — YOUTUBE DATA API v3 CAPTIONS FETCHER # ============================================================================ class YouTubeApiTranscriptFetcher: """ Fallback fetcher using the official YouTube Data API v3. ⚠️ Only works for videos the authenticated user OWNS. Requires OAuth credentials with youtube.force-ssl scope. """ def __init__(self, video_id: str, credentials, languages: Optional[List[str]] = None): self.video_id = video_id self.credentials = credentials self.languages = languages or ["en", "en-US", "en-GB"] def fetch(self) -> str: if self.credentials is None: raise RuntimeError("No OAuth credentials provided for YouTube API.") logger.info("[YT-API] Attempting captions download for %s", self.video_id) try: from googleapiclient.discovery import build as yt_build youtube = yt_build( "youtube", "v3", credentials=self.credentials, cache_discovery=False, ) captions_response = youtube.captions().list( part="snippet", videoId=self.video_id, ).execute() items = captions_response.get("items", []) if not items: raise RuntimeError( f"No caption tracks found for video {self.video_id}" ) caption_id = None for lang in self.languages: for item in items: snippet = item.get("snippet", {}) if snippet.get("language", "") == lang: if snippet.get("trackKind") != "ASR": caption_id = item["id"] logger.info( "[YT-API] Found manual caption: lang=%s, id=%s", lang, caption_id, ) break if caption_id: break if not caption_id: for lang in self.languages: for item in items: if item.get("snippet", {}).get("language", "") == lang: caption_id = item["id"] logger.info( "[YT-API] Using caption (any kind): lang=%s, id=%s", lang, caption_id, ) break if caption_id: break if not caption_id: available = [i["snippet"]["language"] for i in items] if available: logger.info("[YT-API] Preferred languages %s not found. Falling back to any available language.", self.languages) caption_id = items[0]["id"] lang = items[0]["snippet"]["language"] logger.info("[YT-API] Falling back to language: %s, id=%s", lang, caption_id) else: raise RuntimeError( f"No caption tracks found for video {self.video_id}" ) caption_content = youtube.captions().download( id=caption_id, tfmt="srt", ).execute() if isinstance(caption_content, bytes): caption_content = caption_content.decode("utf-8") text = _parse_srt(caption_content) if not text.strip(): raise RuntimeError("YouTube API caption download returned empty text.") logger.info( "[YT-API] ✅ Transcript extracted — %d characters", len(text) ) return text except ImportError: raise RuntimeError( "google-api-python-client is not installed. " "Cannot use YouTube Data API v3 fallback." ) except Exception as e: err_str = str(e) if "403" in err_str or "Forbidden" in err_str: raise RuntimeError( f"YouTube API returned 403 Forbidden — you can only " f"download captions for videos you own. Error: {err_str}" ) raise # ============================================================================ # GEMINI SUMMARIZER # ============================================================================ class GeminiSummarizer: """ Sends transcript to Gemini and returns (summary, qa, model_used). No files are written to disk. """ MAX_RETRIES = 5 BASE_WAIT = 10 MAX_WAIT = 120 RETRYABLE = ["503", "502", "500", "UNAVAILABLE", "SERVICE_UNAVAILABLE"] SKIP_TO_NEXT = ["429", "RESOURCE_EXHAUSTED", "quota", "404", "NOT_FOUND"] def __init__( self, api_key: str = GEMINI_API_KEY, models: list = None, ): self.client = genai.Client(api_key=api_key) self.models = models or GEMINI_MODELS def _call_api(self, transcript: str) -> tuple[str, str]: overall_last_error = None for model in self.models: logger.info("── Trying model: %s", model) wait = self.BASE_WAIT last_err = None for attempt in range(1, self.MAX_RETRIES + 1): try: logger.info(" [%d/%d] Sending request...", attempt, self.MAX_RETRIES) response = self.client.models.generate_content( model=model, contents=transcript, config=types.GenerateContentConfig( system_instruction=SYSTEM_PROMPT, ), ) logger.info( "✅ Response received from: %s (attempt %d)", model, attempt, ) return response.text, model except Exception as e: err = str(e) last_err = e if any(k in err for k in self.SKIP_TO_NEXT): logger.warning( " [%d/%d] %s — quota/not-found, skipping to next model.", attempt, self.MAX_RETRIES, model, ) break elif any(k in err for k in self.RETRYABLE): if attempt < self.MAX_RETRIES: logger.warning( " [%d/%d] %s — transient error. Retrying in %ds...", attempt, self.MAX_RETRIES, model, wait, ) time.sleep(wait) wait = min(wait * 2, self.MAX_WAIT) else: logger.warning( " [%d/%d] %s — max retries reached, trying next model.", attempt, self.MAX_RETRIES, model, ) else: logger.error( " [%d/%d] %s — unhandled error: %s", attempt, self.MAX_RETRIES, model, err, ) raise overall_last_error = last_err raise RuntimeError( f"All models and retries exhausted. Last error: {overall_last_error}" ) @staticmethod def _split(full_text: str) -> tuple[str, str]: for pattern in (r"^\s*!{5}\s*$", r"^\s*!{3}\s*$"): parts = re.split(pattern, full_text, flags=re.MULTILINE) if len(parts) >= 2: return parts[0].strip(), "".join(parts[1:]).strip() return full_text.strip(), "" def run(self, transcript: str) -> tuple[str, str, str]: full, model_used = self._call_api(transcript) summary, qa = self._split(full) logger.info("✅ Summarization complete — model: %s", model_used) return summary, qa, model_used # ============================================================================ # PIPELINE # ============================================================================ class TranscriptSummaryPipeline: """ Orchestrates fetch → summarize. All data flows in memory — no disk I/O. Supports multi-tier fallback for transcript extraction. """ def __init__( self, youtube_url: str, languages: Optional[List[str]] = None, polling_config: list[dict] = None, # ← list, not dict google_creds = None, ): self.youtube_url = youtube_url self.fetcher = YouTubeTranscriptFetcher( youtube_url=youtube_url, languages=languages, polling_config=polling_config, google_creds=google_creds, ) self.summarizer = GeminiSummarizer() self.video_id = self.fetcher.video_id self.video_title = fetch_video_title(self.video_id) def run(self) -> dict: logger.info("=== Pipeline started ===") logger.info("Video title : %s", self.video_title) transcript, extraction_method = self.fetcher.run() summary, qa, model = self.summarizer.run(transcript) logger.info( "=== Pipeline complete | model: %s | extraction: %s ===", model, extraction_method, ) return { "video_id": self.video_id, "video_title": self.video_title, "model_used": model, "extraction_method": extraction_method, "summary": summary, "qa": qa, "transcript": transcript, } # ============================================================================ # CLI # ============================================================================ def main(): if len(sys.argv) < 2: print("Usage: python gemini_transcript.py ", file=sys.stderr) sys.exit(1) pipeline = TranscriptSummaryPipeline( youtube_url=sys.argv[1], languages=["en", "en-US", "en-GB"], ) result = pipeline.run() for key, value in result.items(): if key not in ("summary", "qa", "transcript"): print(f"{key}: {value}") if __name__ == "__main__": main()