Spaces:
Sleeping
Sleeping
rsnarsna
fix: Refactor polling configuration to use a list of dictionaries for accurate retry attempts; add 'innertubex' to requirements
cce56a7 | #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import sys | |
| import json | |
| import logging | |
| import time | |
| from typing import Optional, List | |
| from urllib.parse import urlparse, parse_qs | |
| from google import genai # pip install google-genai | |
| from google.genai import types | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| ) | |
| # ============================================================================ | |
| # CONFIG | |
| # ============================================================================ | |
| GEMINI_KEY = "AIzaSyCNz5wQAyJ65kNRkwr0-1A-_Z6-lQzdcyc" | |
| # ββ API Keys ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", GEMINI_KEY) | |
| YT_API_KEY = os.getenv("YT_API_KEY", "AIzaSyASnhRpV-YQQb4xvoggWIEm8nvrujerEos") | |
| GEMINI_MODELS = [ | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-lite", | |
| "gemini-2.5-pro", | |
| ] | |
| # ββ FIX: Use a LIST of dicts, not a dict. | |
| # A plain dict with duplicate keys like "attempt_3" silently drops all | |
| # but the last definition, collapsing 13 intended attempts down to 4. | |
| # A list preserves every entry in order. | |
| def _polling_attempt(wait_before: int, description: str) -> dict: | |
| return {"wait_before": wait_before, "description": description} | |
| POLLING_CONFIG: list[dict] = [ | |
| _polling_attempt(0, "Immediate attempt on trigger"), | |
| _polling_attempt(300, "Retry after 5 minutes"), | |
| _polling_attempt(900, "Retry after 15 minutes (30 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (45 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (1 hr total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (1 hr 15 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (1 hr 30 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (1 hr 45 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (2 hr total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (2 hr 15 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (2 hr 30 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (2 hr 45 min total)"), | |
| _polling_attempt(900, "Retry after 15 minutes (3 hr total)"), | |
| ] | |
| SYSTEM_PROMPT = """ | |
| You are an expert content summarizer and educator. | |
| Produce the full output containing exactly two parts separated by a line with only 5 exclamation marks: | |
| !!!!! | |
| --- PART 1: SUMMARY --- | |
| Write a detailed, well-structured summary of the entire content. | |
| Use the following structure: | |
| ## Overview | |
| A 3-5 sentence high-level overview of the entire content. | |
| ## Key Topics Covered | |
| List the main topics discussed, each with a brief explanation. | |
| ## Detailed Summary | |
| A thorough section-by-section breakdown of the content in the order it was presented. | |
| Use subheadings for each major section or topic shift. | |
| ## Key Takeaways | |
| A bullet list of the most important insights, facts, or conclusions from the content. | |
| --- | |
| !!!!! | |
| --- PART 2: Q&A --- | |
| Generate a comprehensive Q&A section based on the content. | |
| Format each entry exactly like this: | |
| Q1: [First question] | |
| Answer: [Detailed answer] | |
| Q2: [Second question] | |
| Answer: [Detailed answer] | |
| Q3: [Third question] | |
| Answer: [Detailed answer] | |
| and so on until all important questions are covered. | |
| Rules: | |
| - Number every question and answer with matching numbers (Q1/A1, Q2/A2, etc.) | |
| - Each answer must be detailed and self-contained | |
| - Cover all major topics, concepts, facts, and insights from the content | |
| - Minimum 10 Q&A pairs, more if the content is rich | |
| - Do NOT use bullet points inside answers β write in full sentences | |
| --- | |
| """ | |
| # ============================================================================ | |
| # LOGGING | |
| # ============================================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s", | |
| ) | |
| logger = logging.getLogger("gemini_pipeline") | |
| # ============================================================================ | |
| # HELPERS | |
| # ============================================================================ | |
| def _format_duration(seconds: int) -> str: | |
| if seconds < 60: | |
| return f"{seconds}s" | |
| if seconds < 3600: | |
| return f"{seconds // 60}m" | |
| h = seconds // 3600 | |
| m = (seconds % 3600) // 60 | |
| return f"{h}h {m}m" if m else f"{h}h" | |
| # ============================================================================ | |
| # SUBTITLE PARSERS | |
| # ============================================================================ | |
| def _parse_vtt(content: str) -> str: | |
| """ | |
| Parse WebVTT subtitle content into clean plain text. | |
| Strips headers, timestamps, position metadata, and deduplicates | |
| consecutive identical lines (VTT scrolling captions repeat text). | |
| """ | |
| lines = content.splitlines() | |
| text_lines: list[str] = [] | |
| prev_line = "" | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| if stripped.startswith("WEBVTT"): | |
| continue | |
| if re.match(r"^(Kind:|Language:|Style|NOTE)", stripped, re.IGNORECASE): | |
| continue | |
| if re.match(r"^\d{2}:\d{2}[:\.]\d{2}[\.:]\d{3}\s*-->\s*\d{2}:\d{2}", stripped): | |
| continue | |
| if re.match(r"^(position:|align:|line:|size:)", stripped, re.IGNORECASE): | |
| continue | |
| if stripped.isdigit(): | |
| continue | |
| cleaned = re.sub(r"<[^>]+>", "", stripped).strip() | |
| if not cleaned: | |
| continue | |
| if cleaned != prev_line: | |
| text_lines.append(cleaned) | |
| prev_line = cleaned | |
| return " ".join(text_lines) | |
| def _parse_srt(content: str) -> str: | |
| """ | |
| Parse SRT subtitle content into clean plain text. | |
| Strips sequence numbers and timing lines. | |
| """ | |
| lines = content.splitlines() | |
| text_lines: list[str] = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| if stripped.isdigit(): | |
| continue | |
| if re.match(r"^\d{2}:\d{2}:\d{2}[,.]\d{3}\s*-->\s*\d{2}:\d{2}", stripped): | |
| continue | |
| cleaned = re.sub(r"<[^>]+>", "", stripped).strip() | |
| if cleaned: | |
| text_lines.append(cleaned) | |
| return " ".join(text_lines) | |
| def fetch_video_title(video_id: str) -> str: | |
| """Fetch YouTube video title via oembed β no API key needed.""" | |
| try: | |
| import urllib.request | |
| url = ( | |
| f"https://www.youtube.com/oembed" | |
| f"?url=https://www.youtube.com/watch?v={video_id}&format=json" | |
| ) | |
| with urllib.request.urlopen(url, timeout=10) as resp: | |
| data = json.loads(resp.read().decode()) | |
| title = data.get("title", "") | |
| safe = re.sub(r'[\\/*?:"<>|]', "", title) | |
| safe = re.sub(r"\s+", "_", safe.strip()) | |
| return safe[:80] or video_id | |
| except Exception: | |
| return video_id | |
| # ============================================================================ | |
| # YOUTUBE TRANSCRIPT FETCHER | |
| # ============================================================================ | |
| class YouTubeTranscriptFetcher: | |
| """ | |
| Fetches a YouTube transcript using a multi-tier fallback strategy: | |
| Tier 1: youtube_transcript_api (fast, works for most public videos) | |
| Tier 2: yt-dlp (robust, handles auto-generated + manual subs) | |
| Tier 3: YouTube Data API v3 (only for videos the user owns) | |
| Returns (transcript_text, extraction_method) tuple. | |
| """ | |
| def __init__( | |
| self, | |
| youtube_url: str, | |
| languages: Optional[List[str]] = None, | |
| polling_config: list[dict] = None, # β list, not dict | |
| google_creds = None, | |
| ): | |
| self.youtube_url = youtube_url | |
| self.languages = languages or ["en", "en-US", "en-GB"] | |
| self.polling_config = polling_config if polling_config is not None else POLLING_CONFIG | |
| self.video_id = self._extract_video_id(youtube_url) | |
| self.api = YouTubeTranscriptApi() | |
| self.google_creds = google_creds | |
| def _extract_video_id(url: str) -> str: | |
| parsed = urlparse(url) | |
| if parsed.hostname == "youtu.be": | |
| return parsed.path.lstrip("/").split("?")[0] | |
| if parsed.hostname in ("youtube.com", "www.youtube.com", "m.youtube.com"): | |
| path_parts = parsed.path.strip("/").split("/") | |
| if path_parts[0] in ("live", "shorts", "embed") and len(path_parts) >= 2: | |
| return path_parts[1].split("?")[0] | |
| params = parse_qs(parsed.query) | |
| if "v" in params: | |
| return params["v"][0] | |
| raise ValueError(f"Could not extract video ID from URL: {url}") | |
| raise ValueError(f"Unsupported YouTube URL: {url}") | |
| def _fetch_once(self) -> str: | |
| try: | |
| transcript = self.api.fetch(self.video_id, languages=self.languages) | |
| except NoTranscriptFound: | |
| logger.info("[Tier 1] Requested languages %s not found. Finding first available transcript...", self.languages) | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id) | |
| first_transcript = next(iter(transcript_list)) | |
| logger.info("[Tier 1] Falling back to language: %s", first_transcript.language_code) | |
| transcript = first_transcript.fetch() | |
| return " ".join(item.text for item in transcript) | |
| def _try_all_tiers(self) -> tuple[str, str]: | |
| """ | |
| Try all transcript extraction tiers in order. | |
| Returns (transcript_text, method_used) on first success. | |
| Raises RuntimeError if all tiers fail. | |
| """ | |
| errors: list[str] = [] | |
| # ββ Tier 1: youtube_transcript_api ββ | |
| try: | |
| text = self._fetch_once() | |
| logger.info("[Tier 1] β youtube_transcript_api succeeded β %d chars", len(text)) | |
| return text, "youtube_transcript_api" | |
| except TranscriptsDisabled as e: | |
| errors.append(f"Tier1(TranscriptsDisabled): {e}") | |
| logger.warning("[Tier 1] Transcripts disabled, trying fallbacks...") | |
| except Exception as e: | |
| errors.append(f"Tier1: {e}") | |
| logger.warning("[Tier 1] Failed: %s", e) | |
| # ββ Tier 2: yt-dlp ββ | |
| try: | |
| text = YtDlpTranscriptFetcher( | |
| self.video_id, languages=self.languages | |
| ).fetch() | |
| logger.info("[Tier 2] β yt-dlp succeeded β %d chars", len(text)) | |
| return text, "yt-dlp" | |
| except Exception as e: | |
| errors.append(f"Tier2(yt-dlp): {e}") | |
| logger.warning("[Tier 2] Failed: %s", e) | |
| # ββ Tier 3: YouTube Data API v3 (owned videos only) ββ | |
| if self.google_creds: | |
| try: | |
| text = YouTubeApiTranscriptFetcher( | |
| self.video_id, self.google_creds, languages=self.languages | |
| ).fetch() | |
| logger.info("[Tier 3] β YouTube Data API v3 succeeded β %d chars", len(text)) | |
| return text, "youtube_data_api_v3" | |
| except Exception as e: | |
| errors.append(f"Tier3(YT-API): {e}") | |
| logger.warning("[Tier 3] Failed: %s", e) | |
| else: | |
| errors.append("Tier3: Skipped (no OAuth credentials)") | |
| logger.info("[Tier 3] Skipped β no Google OAuth credentials provided.") | |
| raise RuntimeError( | |
| f"All transcript tiers failed for video {self.video_id}. " | |
| f"Details: {'; '.join(errors)}" | |
| ) | |
| def run(self) -> tuple[str, str]: | |
| """ | |
| Fetch transcript with polling retry and multi-tier fallback. | |
| On each polling attempt, all tiers are tried before waiting. | |
| Returns (transcript_text, extraction_method). | |
| """ | |
| # ββ FIX: polling_config is now a list, so len() and enumeration work correctly. | |
| attempts = self.polling_config | |
| total = len(attempts) | |
| logger.info("Video ID : %s", self.video_id) | |
| logger.info("Polling attempts : %d", total) | |
| for idx, config in enumerate(attempts, start=1): | |
| wait_before = config["wait_before"] | |
| description = config["description"] | |
| if wait_before > 0: | |
| logger.info( | |
| "[%d/%d] %s β waiting %s before retry...", | |
| idx, total, description, | |
| _format_duration(wait_before), | |
| ) | |
| time.sleep(wait_before) | |
| logger.info( | |
| "[%d/%d] %s β trying all transcript tiers...", | |
| idx, total, description, | |
| ) | |
| try: | |
| text, method = self._try_all_tiers() | |
| logger.info( | |
| "[%d/%d] β Transcript fetched via %s β %d characters", | |
| idx, total, method, len(text), | |
| ) | |
| return text, method | |
| except KeyboardInterrupt: | |
| logger.warning("Interrupted by user.") | |
| raise | |
| except Exception as e: | |
| logger.warning("[%d/%d] All tiers failed: %s", idx, total, e) | |
| if idx < total: | |
| next_cfg = attempts[idx] # idx is 1-based, list is 0-based β next item | |
| logger.info( | |
| "[%d/%d] Will retry in %s (%s)", | |
| idx, total, | |
| _format_duration(next_cfg["wait_before"]), | |
| next_cfg["description"], | |
| ) | |
| else: | |
| logger.error("All %d polling attempts exhausted.", total) | |
| raise RuntimeError( | |
| f"Transcript not available after {total} attempts (~3 hours). " | |
| f"Video ID: {self.video_id}" | |
| ) | |
| # ============================================================================ | |
| # TIER 2 β yt-dlp PYTHON API SUBTITLE FETCHER | |
| # ============================================================================ | |
| class YtDlpTranscriptFetcher: | |
| """ | |
| Tier 2 fallback β uses yt-dlp's Python API (no subprocess). | |
| Extracts subtitle URLs from video metadata via extract_info(), | |
| then fetches content in-memory via HTTP. | |
| Handles both manual and auto-generated captions. | |
| """ | |
| PREFERRED_FORMATS = ["vtt", "srt", "srv1", "srv2", "srv3", "ttml"] | |
| def __init__(self, video_id: str, languages: Optional[List[str]] = None): | |
| self.video_id = video_id | |
| self.languages = languages or ["en", "en-US", "en-GB"] | |
| def _find_subtitle_url(self, manual_subs: dict, auto_subs: dict) -> tuple[str, str]: | |
| for subs_dict in (manual_subs, auto_subs): | |
| if not subs_dict: | |
| continue | |
| for lang in self.languages: | |
| if lang not in subs_dict: | |
| continue | |
| tracks = subs_dict[lang] | |
| if not tracks: | |
| continue | |
| for fmt in self.PREFERRED_FORMATS: | |
| for track in tracks: | |
| if track.get("ext") == fmt and track.get("url"): | |
| return track["url"], fmt | |
| for track in tracks: | |
| if track.get("url"): | |
| return track["url"], track.get("ext", "vtt") | |
| logger.info("[yt-dlp] Preferred languages %s not found. Falling back to any available language.", self.languages) | |
| for subs_dict in (manual_subs, auto_subs): | |
| if not subs_dict: | |
| continue | |
| for fmt in self.PREFERRED_FORMATS: | |
| for lang, tracks in subs_dict.items(): | |
| if not tracks: | |
| continue | |
| for track in tracks: | |
| if track.get("ext") == fmt and track.get("url"): | |
| logger.info("[yt-dlp] Falling back to language: %s", lang) | |
| return track["url"], fmt | |
| for lang, tracks in subs_dict.items(): | |
| if not tracks: | |
| continue | |
| for track in tracks: | |
| if track.get("url"): | |
| logger.info("[yt-dlp] Falling back to language: %s", lang) | |
| return track["url"], track.get("ext", "vtt") | |
| raise RuntimeError( | |
| f"No subtitles found in yt-dlp metadata for video: {self.video_id}" | |
| ) | |
| def fetch(self) -> str: | |
| import yt_dlp | |
| import requests as _requests | |
| logger.info("[yt-dlp] Attempting in-memory subtitle extraction for %s", self.video_id) | |
| url = f"https://www.youtube.com/watch?v={self.video_id}" | |
| ydl_opts = { | |
| "skip_download": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "noplaylist": True, | |
| "extract_flat": False, | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| except Exception as e: | |
| raise RuntimeError(f"yt-dlp extract_info failed: {e}") | |
| if not info: | |
| raise RuntimeError("yt-dlp returned empty info dict.") | |
| manual_subs = info.get("subtitles") or {} | |
| auto_subs = info.get("automatic_captions") or {} | |
| logger.info( | |
| "[yt-dlp] Found %d manual sub tracks, %d auto-caption tracks", | |
| len(manual_subs), len(auto_subs), | |
| ) | |
| sub_url, sub_fmt = self._find_subtitle_url(manual_subs, auto_subs) | |
| logger.info("[yt-dlp] Fetching subtitle content (format=%s)", sub_fmt) | |
| try: | |
| resp = _requests.get(sub_url, timeout=30) | |
| resp.raise_for_status() | |
| raw_content = resp.text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to fetch subtitle from URL: {e}") | |
| if not raw_content.strip(): | |
| raise RuntimeError("Subtitle URL returned empty content.") | |
| if sub_fmt in ("vtt",): | |
| text = _parse_vtt(raw_content) | |
| elif sub_fmt in ("srt",): | |
| text = _parse_srt(raw_content) | |
| else: | |
| text = re.sub(r"<[^>]+>", "", raw_content) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if not text.strip(): | |
| raise RuntimeError( | |
| f"yt-dlp subtitle content was empty after parsing (format={sub_fmt})." | |
| ) | |
| logger.info( | |
| "[yt-dlp] β Transcript extracted β %d characters (format=%s)", | |
| len(text), sub_fmt, | |
| ) | |
| return text | |
| # ============================================================================ | |
| # TIER 3 β YOUTUBE DATA API v3 CAPTIONS FETCHER | |
| # ============================================================================ | |
| class YouTubeApiTranscriptFetcher: | |
| """ | |
| Fallback fetcher using the official YouTube Data API v3. | |
| β οΈ Only works for videos the authenticated user OWNS. | |
| Requires OAuth credentials with youtube.force-ssl scope. | |
| """ | |
| def __init__(self, video_id: str, credentials, languages: Optional[List[str]] = None): | |
| self.video_id = video_id | |
| self.credentials = credentials | |
| self.languages = languages or ["en", "en-US", "en-GB"] | |
| def fetch(self) -> str: | |
| if self.credentials is None: | |
| raise RuntimeError("No OAuth credentials provided for YouTube API.") | |
| logger.info("[YT-API] Attempting captions download for %s", self.video_id) | |
| try: | |
| from googleapiclient.discovery import build as yt_build | |
| youtube = yt_build( | |
| "youtube", "v3", | |
| credentials=self.credentials, | |
| cache_discovery=False, | |
| ) | |
| captions_response = youtube.captions().list( | |
| part="snippet", | |
| videoId=self.video_id, | |
| ).execute() | |
| items = captions_response.get("items", []) | |
| if not items: | |
| raise RuntimeError( | |
| f"No caption tracks found for video {self.video_id}" | |
| ) | |
| caption_id = None | |
| for lang in self.languages: | |
| for item in items: | |
| snippet = item.get("snippet", {}) | |
| if snippet.get("language", "") == lang: | |
| if snippet.get("trackKind") != "ASR": | |
| caption_id = item["id"] | |
| logger.info( | |
| "[YT-API] Found manual caption: lang=%s, id=%s", | |
| lang, caption_id, | |
| ) | |
| break | |
| if caption_id: | |
| break | |
| if not caption_id: | |
| for lang in self.languages: | |
| for item in items: | |
| if item.get("snippet", {}).get("language", "") == lang: | |
| caption_id = item["id"] | |
| logger.info( | |
| "[YT-API] Using caption (any kind): lang=%s, id=%s", | |
| lang, caption_id, | |
| ) | |
| break | |
| if caption_id: | |
| break | |
| if not caption_id: | |
| available = [i["snippet"]["language"] for i in items] | |
| if available: | |
| logger.info("[YT-API] Preferred languages %s not found. Falling back to any available language.", self.languages) | |
| caption_id = items[0]["id"] | |
| lang = items[0]["snippet"]["language"] | |
| logger.info("[YT-API] Falling back to language: %s, id=%s", lang, caption_id) | |
| else: | |
| raise RuntimeError( | |
| f"No caption tracks found for video {self.video_id}" | |
| ) | |
| caption_content = youtube.captions().download( | |
| id=caption_id, | |
| tfmt="srt", | |
| ).execute() | |
| if isinstance(caption_content, bytes): | |
| caption_content = caption_content.decode("utf-8") | |
| text = _parse_srt(caption_content) | |
| if not text.strip(): | |
| raise RuntimeError("YouTube API caption download returned empty text.") | |
| logger.info( | |
| "[YT-API] β Transcript extracted β %d characters", len(text) | |
| ) | |
| return text | |
| except ImportError: | |
| raise RuntimeError( | |
| "google-api-python-client is not installed. " | |
| "Cannot use YouTube Data API v3 fallback." | |
| ) | |
| except Exception as e: | |
| err_str = str(e) | |
| if "403" in err_str or "Forbidden" in err_str: | |
| raise RuntimeError( | |
| f"YouTube API returned 403 Forbidden β you can only " | |
| f"download captions for videos you own. Error: {err_str}" | |
| ) | |
| raise | |
| # ============================================================================ | |
| # GEMINI SUMMARIZER | |
| # ============================================================================ | |
| class GeminiSummarizer: | |
| """ | |
| Sends transcript to Gemini and returns (summary, qa, model_used). | |
| No files are written to disk. | |
| """ | |
| MAX_RETRIES = 5 | |
| BASE_WAIT = 10 | |
| MAX_WAIT = 120 | |
| RETRYABLE = ["503", "502", "500", "UNAVAILABLE", "SERVICE_UNAVAILABLE"] | |
| SKIP_TO_NEXT = ["429", "RESOURCE_EXHAUSTED", "quota", "404", "NOT_FOUND"] | |
| def __init__( | |
| self, | |
| api_key: str = GEMINI_API_KEY, | |
| models: list = None, | |
| ): | |
| self.client = genai.Client(api_key=api_key) | |
| self.models = models or GEMINI_MODELS | |
| def _call_api(self, transcript: str) -> tuple[str, str]: | |
| overall_last_error = None | |
| for model in self.models: | |
| logger.info("ββ Trying model: %s", model) | |
| wait = self.BASE_WAIT | |
| last_err = None | |
| for attempt in range(1, self.MAX_RETRIES + 1): | |
| try: | |
| logger.info(" [%d/%d] Sending request...", attempt, self.MAX_RETRIES) | |
| response = self.client.models.generate_content( | |
| model=model, | |
| contents=transcript, | |
| config=types.GenerateContentConfig( | |
| system_instruction=SYSTEM_PROMPT, | |
| ), | |
| ) | |
| logger.info( | |
| "β Response received from: %s (attempt %d)", | |
| model, attempt, | |
| ) | |
| return response.text, model | |
| except Exception as e: | |
| err = str(e) | |
| last_err = e | |
| if any(k in err for k in self.SKIP_TO_NEXT): | |
| logger.warning( | |
| " [%d/%d] %s β quota/not-found, skipping to next model.", | |
| attempt, self.MAX_RETRIES, model, | |
| ) | |
| break | |
| elif any(k in err for k in self.RETRYABLE): | |
| if attempt < self.MAX_RETRIES: | |
| logger.warning( | |
| " [%d/%d] %s β transient error. Retrying in %ds...", | |
| attempt, self.MAX_RETRIES, model, wait, | |
| ) | |
| time.sleep(wait) | |
| wait = min(wait * 2, self.MAX_WAIT) | |
| else: | |
| logger.warning( | |
| " [%d/%d] %s β max retries reached, trying next model.", | |
| attempt, self.MAX_RETRIES, model, | |
| ) | |
| else: | |
| logger.error( | |
| " [%d/%d] %s β unhandled error: %s", | |
| attempt, self.MAX_RETRIES, model, err, | |
| ) | |
| raise | |
| overall_last_error = last_err | |
| raise RuntimeError( | |
| f"All models and retries exhausted. Last error: {overall_last_error}" | |
| ) | |
| def _split(full_text: str) -> tuple[str, str]: | |
| for pattern in (r"^\s*!{5}\s*$", r"^\s*!{3}\s*$"): | |
| parts = re.split(pattern, full_text, flags=re.MULTILINE) | |
| if len(parts) >= 2: | |
| return parts[0].strip(), "".join(parts[1:]).strip() | |
| return full_text.strip(), "" | |
| def run(self, transcript: str) -> tuple[str, str, str]: | |
| full, model_used = self._call_api(transcript) | |
| summary, qa = self._split(full) | |
| logger.info("β Summarization complete β model: %s", model_used) | |
| return summary, qa, model_used | |
| # ============================================================================ | |
| # PIPELINE | |
| # ============================================================================ | |
| class TranscriptSummaryPipeline: | |
| """ | |
| Orchestrates fetch β summarize. | |
| All data flows in memory β no disk I/O. | |
| Supports multi-tier fallback for transcript extraction. | |
| """ | |
| def __init__( | |
| self, | |
| youtube_url: str, | |
| languages: Optional[List[str]] = None, | |
| polling_config: list[dict] = None, # β list, not dict | |
| google_creds = None, | |
| ): | |
| self.youtube_url = youtube_url | |
| self.fetcher = YouTubeTranscriptFetcher( | |
| youtube_url=youtube_url, | |
| languages=languages, | |
| polling_config=polling_config, | |
| google_creds=google_creds, | |
| ) | |
| self.summarizer = GeminiSummarizer() | |
| self.video_id = self.fetcher.video_id | |
| self.video_title = fetch_video_title(self.video_id) | |
| def run(self) -> dict: | |
| logger.info("=== Pipeline started ===") | |
| logger.info("Video title : %s", self.video_title) | |
| transcript, extraction_method = self.fetcher.run() | |
| summary, qa, model = self.summarizer.run(transcript) | |
| logger.info( | |
| "=== Pipeline complete | model: %s | extraction: %s ===", | |
| model, extraction_method, | |
| ) | |
| return { | |
| "video_id": self.video_id, | |
| "video_title": self.video_title, | |
| "model_used": model, | |
| "extraction_method": extraction_method, | |
| "summary": summary, | |
| "qa": qa, | |
| "transcript": transcript, | |
| } | |
| # ============================================================================ | |
| # CLI | |
| # ============================================================================ | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python gemini_transcript.py <youtube_url>", file=sys.stderr) | |
| sys.exit(1) | |
| pipeline = TranscriptSummaryPipeline( | |
| youtube_url=sys.argv[1], | |
| languages=["en", "en-US", "en-GB"], | |
| ) | |
| result = pipeline.run() | |
| for key, value in result.items(): | |
| if key not in ("summary", "qa", "transcript"): | |
| print(f"{key}: {value}") | |
| if __name__ == "__main__": | |
| main() |