transcript / gemini_transcript.py
rsnarsna
fix: Refactor polling configuration to use a list of dictionaries for accurate retry attempts; add 'innertubex' to requirements
cce56a7
Raw
History Blame Contribute Delete
29.5 kB
#!/usr/bin/env python3
from __future__ import annotations
import os
import re
import sys
import json
import logging
import time
from typing import Optional, List
from urllib.parse import urlparse, parse_qs
from google import genai # pip install google-genai
from google.genai import types
from youtube_transcript_api import (
YouTubeTranscriptApi,
TranscriptsDisabled,
NoTranscriptFound,
)
# ============================================================================
# CONFIG
# ============================================================================
GEMINI_KEY = "AIzaSyCNz5wQAyJ65kNRkwr0-1A-_Z6-lQzdcyc"
# ── API Keys ────────────────────────────────────────────────────────────────
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", GEMINI_KEY)
YT_API_KEY = os.getenv("YT_API_KEY", "AIzaSyASnhRpV-YQQb4xvoggWIEm8nvrujerEos")
GEMINI_MODELS = [
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.5-pro",
]
# ── FIX: Use a LIST of dicts, not a dict.
# A plain dict with duplicate keys like "attempt_3" silently drops all
# but the last definition, collapsing 13 intended attempts down to 4.
# A list preserves every entry in order.
def _polling_attempt(wait_before: int, description: str) -> dict:
return {"wait_before": wait_before, "description": description}
POLLING_CONFIG: list[dict] = [
_polling_attempt(0, "Immediate attempt on trigger"),
_polling_attempt(300, "Retry after 5 minutes"),
_polling_attempt(900, "Retry after 15 minutes (30 min total)"),
_polling_attempt(900, "Retry after 15 minutes (45 min total)"),
_polling_attempt(900, "Retry after 15 minutes (1 hr total)"),
_polling_attempt(900, "Retry after 15 minutes (1 hr 15 min total)"),
_polling_attempt(900, "Retry after 15 minutes (1 hr 30 min total)"),
_polling_attempt(900, "Retry after 15 minutes (1 hr 45 min total)"),
_polling_attempt(900, "Retry after 15 minutes (2 hr total)"),
_polling_attempt(900, "Retry after 15 minutes (2 hr 15 min total)"),
_polling_attempt(900, "Retry after 15 minutes (2 hr 30 min total)"),
_polling_attempt(900, "Retry after 15 minutes (2 hr 45 min total)"),
_polling_attempt(900, "Retry after 15 minutes (3 hr total)"),
]
SYSTEM_PROMPT = """
You are an expert content summarizer and educator.
Produce the full output containing exactly two parts separated by a line with only 5 exclamation marks:
!!!!!
--- PART 1: SUMMARY ---
Write a detailed, well-structured summary of the entire content.
Use the following structure:
## Overview
A 3-5 sentence high-level overview of the entire content.
## Key Topics Covered
List the main topics discussed, each with a brief explanation.
## Detailed Summary
A thorough section-by-section breakdown of the content in the order it was presented.
Use subheadings for each major section or topic shift.
## Key Takeaways
A bullet list of the most important insights, facts, or conclusions from the content.
---
!!!!!
--- PART 2: Q&A ---
Generate a comprehensive Q&A section based on the content.
Format each entry exactly like this:
Q1: [First question]
Answer: [Detailed answer]
Q2: [Second question]
Answer: [Detailed answer]
Q3: [Third question]
Answer: [Detailed answer]
... and so on until all important questions are covered.
Rules:
- Number every question and answer with matching numbers (Q1/A1, Q2/A2, etc.)
- Each answer must be detailed and self-contained
- Cover all major topics, concepts, facts, and insights from the content
- Minimum 10 Q&A pairs, more if the content is rich
- Do NOT use bullet points inside answers β€” write in full sentences
---
"""
# ============================================================================
# LOGGING
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger("gemini_pipeline")
# ============================================================================
# HELPERS
# ============================================================================
def _format_duration(seconds: int) -> str:
if seconds < 60:
return f"{seconds}s"
if seconds < 3600:
return f"{seconds // 60}m"
h = seconds // 3600
m = (seconds % 3600) // 60
return f"{h}h {m}m" if m else f"{h}h"
# ============================================================================
# SUBTITLE PARSERS
# ============================================================================
def _parse_vtt(content: str) -> str:
"""
Parse WebVTT subtitle content into clean plain text.
Strips headers, timestamps, position metadata, and deduplicates
consecutive identical lines (VTT scrolling captions repeat text).
"""
lines = content.splitlines()
text_lines: list[str] = []
prev_line = ""
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("WEBVTT"):
continue
if re.match(r"^(Kind:|Language:|Style|NOTE)", stripped, re.IGNORECASE):
continue
if re.match(r"^\d{2}:\d{2}[:\.]\d{2}[\.:]\d{3}\s*-->\s*\d{2}:\d{2}", stripped):
continue
if re.match(r"^(position:|align:|line:|size:)", stripped, re.IGNORECASE):
continue
if stripped.isdigit():
continue
cleaned = re.sub(r"<[^>]+>", "", stripped).strip()
if not cleaned:
continue
if cleaned != prev_line:
text_lines.append(cleaned)
prev_line = cleaned
return " ".join(text_lines)
def _parse_srt(content: str) -> str:
"""
Parse SRT subtitle content into clean plain text.
Strips sequence numbers and timing lines.
"""
lines = content.splitlines()
text_lines: list[str] = []
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.isdigit():
continue
if re.match(r"^\d{2}:\d{2}:\d{2}[,.]\d{3}\s*-->\s*\d{2}:\d{2}", stripped):
continue
cleaned = re.sub(r"<[^>]+>", "", stripped).strip()
if cleaned:
text_lines.append(cleaned)
return " ".join(text_lines)
def fetch_video_title(video_id: str) -> str:
"""Fetch YouTube video title via oembed β€” no API key needed."""
try:
import urllib.request
url = (
f"https://www.youtube.com/oembed"
f"?url=https://www.youtube.com/watch?v={video_id}&format=json"
)
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read().decode())
title = data.get("title", "")
safe = re.sub(r'[\\/*?:"<>|]', "", title)
safe = re.sub(r"\s+", "_", safe.strip())
return safe[:80] or video_id
except Exception:
return video_id
# ============================================================================
# YOUTUBE TRANSCRIPT FETCHER
# ============================================================================
class YouTubeTranscriptFetcher:
"""
Fetches a YouTube transcript using a multi-tier fallback strategy:
Tier 1: youtube_transcript_api (fast, works for most public videos)
Tier 2: yt-dlp (robust, handles auto-generated + manual subs)
Tier 3: YouTube Data API v3 (only for videos the user owns)
Returns (transcript_text, extraction_method) tuple.
"""
def __init__(
self,
youtube_url: str,
languages: Optional[List[str]] = None,
polling_config: list[dict] = None, # ← list, not dict
google_creds = None,
):
self.youtube_url = youtube_url
self.languages = languages or ["en", "en-US", "en-GB"]
self.polling_config = polling_config if polling_config is not None else POLLING_CONFIG
self.video_id = self._extract_video_id(youtube_url)
self.api = YouTubeTranscriptApi()
self.google_creds = google_creds
@staticmethod
def _extract_video_id(url: str) -> str:
parsed = urlparse(url)
if parsed.hostname == "youtu.be":
return parsed.path.lstrip("/").split("?")[0]
if parsed.hostname in ("youtube.com", "www.youtube.com", "m.youtube.com"):
path_parts = parsed.path.strip("/").split("/")
if path_parts[0] in ("live", "shorts", "embed") and len(path_parts) >= 2:
return path_parts[1].split("?")[0]
params = parse_qs(parsed.query)
if "v" in params:
return params["v"][0]
raise ValueError(f"Could not extract video ID from URL: {url}")
raise ValueError(f"Unsupported YouTube URL: {url}")
def _fetch_once(self) -> str:
try:
transcript = self.api.fetch(self.video_id, languages=self.languages)
except NoTranscriptFound:
logger.info("[Tier 1] Requested languages %s not found. Finding first available transcript...", self.languages)
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
first_transcript = next(iter(transcript_list))
logger.info("[Tier 1] Falling back to language: %s", first_transcript.language_code)
transcript = first_transcript.fetch()
return " ".join(item.text for item in transcript)
def _try_all_tiers(self) -> tuple[str, str]:
"""
Try all transcript extraction tiers in order.
Returns (transcript_text, method_used) on first success.
Raises RuntimeError if all tiers fail.
"""
errors: list[str] = []
# ── Tier 1: youtube_transcript_api ──
try:
text = self._fetch_once()
logger.info("[Tier 1] βœ… youtube_transcript_api succeeded β€” %d chars", len(text))
return text, "youtube_transcript_api"
except TranscriptsDisabled as e:
errors.append(f"Tier1(TranscriptsDisabled): {e}")
logger.warning("[Tier 1] Transcripts disabled, trying fallbacks...")
except Exception as e:
errors.append(f"Tier1: {e}")
logger.warning("[Tier 1] Failed: %s", e)
# ── Tier 2: yt-dlp ──
try:
text = YtDlpTranscriptFetcher(
self.video_id, languages=self.languages
).fetch()
logger.info("[Tier 2] βœ… yt-dlp succeeded β€” %d chars", len(text))
return text, "yt-dlp"
except Exception as e:
errors.append(f"Tier2(yt-dlp): {e}")
logger.warning("[Tier 2] Failed: %s", e)
# ── Tier 3: YouTube Data API v3 (owned videos only) ──
if self.google_creds:
try:
text = YouTubeApiTranscriptFetcher(
self.video_id, self.google_creds, languages=self.languages
).fetch()
logger.info("[Tier 3] βœ… YouTube Data API v3 succeeded β€” %d chars", len(text))
return text, "youtube_data_api_v3"
except Exception as e:
errors.append(f"Tier3(YT-API): {e}")
logger.warning("[Tier 3] Failed: %s", e)
else:
errors.append("Tier3: Skipped (no OAuth credentials)")
logger.info("[Tier 3] Skipped β€” no Google OAuth credentials provided.")
raise RuntimeError(
f"All transcript tiers failed for video {self.video_id}. "
f"Details: {'; '.join(errors)}"
)
def run(self) -> tuple[str, str]:
"""
Fetch transcript with polling retry and multi-tier fallback.
On each polling attempt, all tiers are tried before waiting.
Returns (transcript_text, extraction_method).
"""
# ── FIX: polling_config is now a list, so len() and enumeration work correctly.
attempts = self.polling_config
total = len(attempts)
logger.info("Video ID : %s", self.video_id)
logger.info("Polling attempts : %d", total)
for idx, config in enumerate(attempts, start=1):
wait_before = config["wait_before"]
description = config["description"]
if wait_before > 0:
logger.info(
"[%d/%d] %s β€” waiting %s before retry...",
idx, total, description,
_format_duration(wait_before),
)
time.sleep(wait_before)
logger.info(
"[%d/%d] %s β€” trying all transcript tiers...",
idx, total, description,
)
try:
text, method = self._try_all_tiers()
logger.info(
"[%d/%d] βœ… Transcript fetched via %s β€” %d characters",
idx, total, method, len(text),
)
return text, method
except KeyboardInterrupt:
logger.warning("Interrupted by user.")
raise
except Exception as e:
logger.warning("[%d/%d] All tiers failed: %s", idx, total, e)
if idx < total:
next_cfg = attempts[idx] # idx is 1-based, list is 0-based β†’ next item
logger.info(
"[%d/%d] Will retry in %s (%s)",
idx, total,
_format_duration(next_cfg["wait_before"]),
next_cfg["description"],
)
else:
logger.error("All %d polling attempts exhausted.", total)
raise RuntimeError(
f"Transcript not available after {total} attempts (~3 hours). "
f"Video ID: {self.video_id}"
)
# ============================================================================
# TIER 2 β€” yt-dlp PYTHON API SUBTITLE FETCHER
# ============================================================================
class YtDlpTranscriptFetcher:
"""
Tier 2 fallback β€” uses yt-dlp's Python API (no subprocess).
Extracts subtitle URLs from video metadata via extract_info(),
then fetches content in-memory via HTTP.
Handles both manual and auto-generated captions.
"""
PREFERRED_FORMATS = ["vtt", "srt", "srv1", "srv2", "srv3", "ttml"]
def __init__(self, video_id: str, languages: Optional[List[str]] = None):
self.video_id = video_id
self.languages = languages or ["en", "en-US", "en-GB"]
def _find_subtitle_url(self, manual_subs: dict, auto_subs: dict) -> tuple[str, str]:
for subs_dict in (manual_subs, auto_subs):
if not subs_dict:
continue
for lang in self.languages:
if lang not in subs_dict:
continue
tracks = subs_dict[lang]
if not tracks:
continue
for fmt in self.PREFERRED_FORMATS:
for track in tracks:
if track.get("ext") == fmt and track.get("url"):
return track["url"], fmt
for track in tracks:
if track.get("url"):
return track["url"], track.get("ext", "vtt")
logger.info("[yt-dlp] Preferred languages %s not found. Falling back to any available language.", self.languages)
for subs_dict in (manual_subs, auto_subs):
if not subs_dict:
continue
for fmt in self.PREFERRED_FORMATS:
for lang, tracks in subs_dict.items():
if not tracks:
continue
for track in tracks:
if track.get("ext") == fmt and track.get("url"):
logger.info("[yt-dlp] Falling back to language: %s", lang)
return track["url"], fmt
for lang, tracks in subs_dict.items():
if not tracks:
continue
for track in tracks:
if track.get("url"):
logger.info("[yt-dlp] Falling back to language: %s", lang)
return track["url"], track.get("ext", "vtt")
raise RuntimeError(
f"No subtitles found in yt-dlp metadata for video: {self.video_id}"
)
def fetch(self) -> str:
import yt_dlp
import requests as _requests
logger.info("[yt-dlp] Attempting in-memory subtitle extraction for %s", self.video_id)
url = f"https://www.youtube.com/watch?v={self.video_id}"
ydl_opts = {
"skip_download": True,
"quiet": True,
"no_warnings": True,
"noplaylist": True,
"extract_flat": False,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
except Exception as e:
raise RuntimeError(f"yt-dlp extract_info failed: {e}")
if not info:
raise RuntimeError("yt-dlp returned empty info dict.")
manual_subs = info.get("subtitles") or {}
auto_subs = info.get("automatic_captions") or {}
logger.info(
"[yt-dlp] Found %d manual sub tracks, %d auto-caption tracks",
len(manual_subs), len(auto_subs),
)
sub_url, sub_fmt = self._find_subtitle_url(manual_subs, auto_subs)
logger.info("[yt-dlp] Fetching subtitle content (format=%s)", sub_fmt)
try:
resp = _requests.get(sub_url, timeout=30)
resp.raise_for_status()
raw_content = resp.text
except Exception as e:
raise RuntimeError(f"Failed to fetch subtitle from URL: {e}")
if not raw_content.strip():
raise RuntimeError("Subtitle URL returned empty content.")
if sub_fmt in ("vtt",):
text = _parse_vtt(raw_content)
elif sub_fmt in ("srt",):
text = _parse_srt(raw_content)
else:
text = re.sub(r"<[^>]+>", "", raw_content)
text = re.sub(r"\s+", " ", text).strip()
if not text.strip():
raise RuntimeError(
f"yt-dlp subtitle content was empty after parsing (format={sub_fmt})."
)
logger.info(
"[yt-dlp] βœ… Transcript extracted β€” %d characters (format=%s)",
len(text), sub_fmt,
)
return text
# ============================================================================
# TIER 3 β€” YOUTUBE DATA API v3 CAPTIONS FETCHER
# ============================================================================
class YouTubeApiTranscriptFetcher:
"""
Fallback fetcher using the official YouTube Data API v3.
⚠️ Only works for videos the authenticated user OWNS.
Requires OAuth credentials with youtube.force-ssl scope.
"""
def __init__(self, video_id: str, credentials, languages: Optional[List[str]] = None):
self.video_id = video_id
self.credentials = credentials
self.languages = languages or ["en", "en-US", "en-GB"]
def fetch(self) -> str:
if self.credentials is None:
raise RuntimeError("No OAuth credentials provided for YouTube API.")
logger.info("[YT-API] Attempting captions download for %s", self.video_id)
try:
from googleapiclient.discovery import build as yt_build
youtube = yt_build(
"youtube", "v3",
credentials=self.credentials,
cache_discovery=False,
)
captions_response = youtube.captions().list(
part="snippet",
videoId=self.video_id,
).execute()
items = captions_response.get("items", [])
if not items:
raise RuntimeError(
f"No caption tracks found for video {self.video_id}"
)
caption_id = None
for lang in self.languages:
for item in items:
snippet = item.get("snippet", {})
if snippet.get("language", "") == lang:
if snippet.get("trackKind") != "ASR":
caption_id = item["id"]
logger.info(
"[YT-API] Found manual caption: lang=%s, id=%s",
lang, caption_id,
)
break
if caption_id:
break
if not caption_id:
for lang in self.languages:
for item in items:
if item.get("snippet", {}).get("language", "") == lang:
caption_id = item["id"]
logger.info(
"[YT-API] Using caption (any kind): lang=%s, id=%s",
lang, caption_id,
)
break
if caption_id:
break
if not caption_id:
available = [i["snippet"]["language"] for i in items]
if available:
logger.info("[YT-API] Preferred languages %s not found. Falling back to any available language.", self.languages)
caption_id = items[0]["id"]
lang = items[0]["snippet"]["language"]
logger.info("[YT-API] Falling back to language: %s, id=%s", lang, caption_id)
else:
raise RuntimeError(
f"No caption tracks found for video {self.video_id}"
)
caption_content = youtube.captions().download(
id=caption_id,
tfmt="srt",
).execute()
if isinstance(caption_content, bytes):
caption_content = caption_content.decode("utf-8")
text = _parse_srt(caption_content)
if not text.strip():
raise RuntimeError("YouTube API caption download returned empty text.")
logger.info(
"[YT-API] βœ… Transcript extracted β€” %d characters", len(text)
)
return text
except ImportError:
raise RuntimeError(
"google-api-python-client is not installed. "
"Cannot use YouTube Data API v3 fallback."
)
except Exception as e:
err_str = str(e)
if "403" in err_str or "Forbidden" in err_str:
raise RuntimeError(
f"YouTube API returned 403 Forbidden β€” you can only "
f"download captions for videos you own. Error: {err_str}"
)
raise
# ============================================================================
# GEMINI SUMMARIZER
# ============================================================================
class GeminiSummarizer:
"""
Sends transcript to Gemini and returns (summary, qa, model_used).
No files are written to disk.
"""
MAX_RETRIES = 5
BASE_WAIT = 10
MAX_WAIT = 120
RETRYABLE = ["503", "502", "500", "UNAVAILABLE", "SERVICE_UNAVAILABLE"]
SKIP_TO_NEXT = ["429", "RESOURCE_EXHAUSTED", "quota", "404", "NOT_FOUND"]
def __init__(
self,
api_key: str = GEMINI_API_KEY,
models: list = None,
):
self.client = genai.Client(api_key=api_key)
self.models = models or GEMINI_MODELS
def _call_api(self, transcript: str) -> tuple[str, str]:
overall_last_error = None
for model in self.models:
logger.info("── Trying model: %s", model)
wait = self.BASE_WAIT
last_err = None
for attempt in range(1, self.MAX_RETRIES + 1):
try:
logger.info(" [%d/%d] Sending request...", attempt, self.MAX_RETRIES)
response = self.client.models.generate_content(
model=model,
contents=transcript,
config=types.GenerateContentConfig(
system_instruction=SYSTEM_PROMPT,
),
)
logger.info(
"βœ… Response received from: %s (attempt %d)",
model, attempt,
)
return response.text, model
except Exception as e:
err = str(e)
last_err = e
if any(k in err for k in self.SKIP_TO_NEXT):
logger.warning(
" [%d/%d] %s β€” quota/not-found, skipping to next model.",
attempt, self.MAX_RETRIES, model,
)
break
elif any(k in err for k in self.RETRYABLE):
if attempt < self.MAX_RETRIES:
logger.warning(
" [%d/%d] %s β€” transient error. Retrying in %ds...",
attempt, self.MAX_RETRIES, model, wait,
)
time.sleep(wait)
wait = min(wait * 2, self.MAX_WAIT)
else:
logger.warning(
" [%d/%d] %s β€” max retries reached, trying next model.",
attempt, self.MAX_RETRIES, model,
)
else:
logger.error(
" [%d/%d] %s β€” unhandled error: %s",
attempt, self.MAX_RETRIES, model, err,
)
raise
overall_last_error = last_err
raise RuntimeError(
f"All models and retries exhausted. Last error: {overall_last_error}"
)
@staticmethod
def _split(full_text: str) -> tuple[str, str]:
for pattern in (r"^\s*!{5}\s*$", r"^\s*!{3}\s*$"):
parts = re.split(pattern, full_text, flags=re.MULTILINE)
if len(parts) >= 2:
return parts[0].strip(), "".join(parts[1:]).strip()
return full_text.strip(), ""
def run(self, transcript: str) -> tuple[str, str, str]:
full, model_used = self._call_api(transcript)
summary, qa = self._split(full)
logger.info("βœ… Summarization complete β€” model: %s", model_used)
return summary, qa, model_used
# ============================================================================
# PIPELINE
# ============================================================================
class TranscriptSummaryPipeline:
"""
Orchestrates fetch β†’ summarize.
All data flows in memory β€” no disk I/O.
Supports multi-tier fallback for transcript extraction.
"""
def __init__(
self,
youtube_url: str,
languages: Optional[List[str]] = None,
polling_config: list[dict] = None, # ← list, not dict
google_creds = None,
):
self.youtube_url = youtube_url
self.fetcher = YouTubeTranscriptFetcher(
youtube_url=youtube_url,
languages=languages,
polling_config=polling_config,
google_creds=google_creds,
)
self.summarizer = GeminiSummarizer()
self.video_id = self.fetcher.video_id
self.video_title = fetch_video_title(self.video_id)
def run(self) -> dict:
logger.info("=== Pipeline started ===")
logger.info("Video title : %s", self.video_title)
transcript, extraction_method = self.fetcher.run()
summary, qa, model = self.summarizer.run(transcript)
logger.info(
"=== Pipeline complete | model: %s | extraction: %s ===",
model, extraction_method,
)
return {
"video_id": self.video_id,
"video_title": self.video_title,
"model_used": model,
"extraction_method": extraction_method,
"summary": summary,
"qa": qa,
"transcript": transcript,
}
# ============================================================================
# CLI
# ============================================================================
def main():
if len(sys.argv) < 2:
print("Usage: python gemini_transcript.py <youtube_url>", file=sys.stderr)
sys.exit(1)
pipeline = TranscriptSummaryPipeline(
youtube_url=sys.argv[1],
languages=["en", "en-US", "en-GB"],
)
result = pipeline.run()
for key, value in result.items():
if key not in ("summary", "qa", "transcript"):
print(f"{key}: {value}")
if __name__ == "__main__":
main()