| | from smolagents.tools import Tool |
| | from typing import Optional, Union, Dict, Any |
| | import os |
| | import time |
| | import requests |
| | import io |
| | from PIL import Image |
| | from pytubefix import YouTube |
| | import docx |
| | from docx.shared import Pt, RGBColor, Inches |
| | from docx.enum.text import WD_PARAGRAPH_ALIGNMENT |
| | import google.generativeai as genai |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv() |
| |
|
| | class TranscriptSummarizer(Tool): |
| | description = "Summarizes a transcript and generates blog content using Google's Gemini model for summarization and Hugging Face API for image generation." |
| | name = "transcript_summarizer" |
| | inputs = { |
| | 'transcript': {'type': 'string', 'description': 'The transcript to summarize.'}, |
| | 'language': {'type': 'string', 'description': 'The language of the transcript.', 'nullable': True} |
| | } |
| | output_type = "string" |
| |
|
| | def __init__(self, *args, hf_api_key: str = None, gemini_api_key: str = None, model_name: str = 'gemini-2.0-flash', **kwargs): |
| | super().__init__(*args, **kwargs) |
| | |
| | self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY") |
| | self.model_name = model_name |
| |
|
| | if self.gemini_api_key: |
| | |
| | genai.configure(api_key=self.gemini_api_key) |
| | |
| | self.gemini_model = genai.GenerativeModel(self.model_name) |
| | else: |
| | self.gemini_model = None |
| |
|
| | |
| | self.api_url = "https://api-inference.huggingface.co/models/ZB-Tech/Text-to-Image" |
| | self.hf_api_key = hf_api_key |
| | self.headers = {"Authorization": f"Bearer {self.hf_api_key}"} |
| |
|
| | def query_image_api(self, payload): |
| | response = requests.post(self.api_url, headers=self.headers, json=payload) |
| | return response.content |
| |
|
| | def summarize_with_gemini(self, text, language='en', max_tokens=1000): |
| | """Use Gemini to summarize text in the specified language""" |
| | |
| | language_map = { |
| | 'en': 'English', |
| | 'hi': 'Hindi', |
| | 'es': 'Spanish', |
| | 'fr': 'French', |
| | 'de': 'German', |
| | 'it': 'Italian', |
| | 'ja': 'Japanese', |
| | 'ko': 'Korean', |
| | 'pt': 'Portuguese', |
| | 'ru': 'Russian', |
| | 'zh': 'Chinese', |
| | 'ar': 'Arabic', |
| | 'bn': 'Bengali', |
| | 'ta': 'Tamil', |
| | 'te': 'Telugu', |
| | 'mr': 'Marathi', |
| | 'gu': 'Gujarati', |
| | 'kn': 'Kannada', |
| | 'ml': 'Malayalam', |
| | 'pa': 'Punjabi', |
| | 'ur': 'Urdu' |
| | |
| | } |
| |
|
| | language_name = language_map.get(language, language) |
| |
|
| | prompt = f""" |
| | Please summarize the following transcript in a concise but comprehensive way. |
| | Focus on the main points and key information. |
| | |
| | IMPORTANT: The transcript is in {language_name}. Please provide the summary in the SAME LANGUAGE ({language_name}). |
| | Do not translate to any other language. Keep the summary in the original language of the transcript. |
| | |
| | Transcript: |
| | {text} |
| | """ |
| |
|
| | generation_config = { |
| | "temperature": 0.4, |
| | "top_p": 0.95, |
| | "top_k": 40, |
| | "max_output_tokens": max_tokens, |
| | } |
| |
|
| | response = self.gemini_model.generate_content( |
| | prompt, |
| | generation_config=generation_config |
| | ) |
| |
|
| | return response.text |
| |
|
| | def forward(self, transcript: str, language: str = 'en') -> str: |
| | try: |
| | if not self.hf_api_key: |
| | return "Hugging Face API key is required for image generation. Please provide it in the input field." |
| |
|
| | if not self.gemini_model: |
| | return "Gemini API key is required for summarization. Please provide it in the input field." |
| |
|
| | transcript_length = len(transcript) |
| |
|
| | |
| | if transcript_length < 100: |
| | return "Transcript is too short to summarize." |
| |
|
| | |
| | if transcript_length > 30000: |
| | chunk_size = 25000 |
| | transcript_chunks = [transcript[i:i+chunk_size] for i in range(0, len(transcript), chunk_size)] |
| |
|
| | |
| | chunk_summaries = [] |
| | for chunk in transcript_chunks: |
| | chunk_summary = self.summarize_with_gemini(chunk, language=language, max_tokens=1000) |
| | chunk_summaries.append(chunk_summary) |
| |
|
| | |
| | combined_summary = "\n\n".join(chunk_summaries) |
| | if len(combined_summary) > 25000: |
| | full_summary = self.summarize_with_gemini(combined_summary, language=language, max_tokens=2000) |
| | else: |
| | full_summary = combined_summary |
| | else: |
| | |
| | full_summary = self.summarize_with_gemini(transcript, language=language, max_tokens=2000) |
| |
|
| | |
| | try: |
| | key_entities = full_summary.split()[:15] |
| | image_prompt = f"Generate an image related to: {' '.join(key_entities)}, cartoon style" |
| | image_bytes = self.query_image_api({"inputs": image_prompt}) |
| |
|
| | |
| | if not image_bytes or len(image_bytes) < 100: |
| | print("Warning: Received invalid or empty image response") |
| | return full_summary |
| |
|
| | try: |
| | |
| | image = Image.open(io.BytesIO(image_bytes)) |
| |
|
| | |
| | image_folder = "Image" |
| | if not os.path.exists(image_folder): |
| | os.makedirs(image_folder) |
| | image_url = os.path.join(image_folder, f"image_{int(time.time())}.jpg") |
| | image.save(image_url) |
| |
|
| | return f"{full_summary}\n\nImage URL: {image_url}" |
| | except Exception as img_error: |
| | print(f"Error processing image: {str(img_error)}") |
| | |
| | return full_summary |
| | except Exception as img_gen_error: |
| | print(f"Error generating image: {str(img_gen_error)}") |
| | |
| | return full_summary |
| | except Exception as e: |
| | return f"An unexpected error occurred: {str(e)}" |
| |
|
| | class YouTubeTranscriptExtractor(Tool): |
| | description = "Extracts the transcript from a YouTube video." |
| | name = "youtube_transcript_extractor" |
| | inputs = {'video_url': {'type': 'string', 'description': 'The URL of the YouTube video.'}} |
| | output_type = "string" |
| |
|
| | def forward(self, video_url: str) -> str: |
| | try: |
| | |
| | yt = YouTube(video_url) |
| | lang = 'en' |
| |
|
| | |
| | try: |
| | if 'en' in yt.captions: |
| | transcript = yt.captions['en'].generate_srt_captions() |
| | lang = 'en' |
| | else: |
| | |
| | if len(yt.captions.all()) > 0: |
| | caption = yt.captions.all()[0] |
| | transcript = caption.generate_srt_captions() |
| | lang = caption.code |
| | else: |
| | return f"LANGUAGE:{lang}||No transcript available for this video." |
| | except StopIteration: |
| | return f"LANGUAGE:{lang}||No transcript available for this video." |
| | except Exception as e: |
| | return f"LANGUAGE:{lang}||An unexpected error occurred while accessing captions: {str(e)}" |
| |
|
| | |
| | cleaned_transcript = "" |
| | for line in transcript.splitlines(): |
| | if not line.strip().isdigit() and "-->" not in line: |
| | cleaned_transcript += line + "\n" |
| |
|
| | print(f"Transcript language detected: {lang}") |
| | print("Transcript sample: ", cleaned_transcript[:200] + "..." if len(cleaned_transcript) > 200 else cleaned_transcript) |
| |
|
| | |
| | |
| | return f"LANGUAGE:{lang}||{cleaned_transcript}" |
| | except Exception as e: |
| | return f"LANGUAGE:en||An unexpected error occurred: {str(e)}" |
| |
|
| | def __init__(self, *args, **kwargs): |
| | self.is_initialized = False |
| |
|
| | class TranscriptToDocx(Tool): |
| | description = "Creates or updates a DOCX file with YouTube transcript and summary." |
| | name = "transcript_to_docx" |
| | inputs = { |
| | 'transcript': {'type': 'string', 'description': 'The transcript to include in the document.'}, |
| | 'summary': {'type': 'string', 'description': 'The summary to include in the document.'}, |
| | 'video_title': {'type': 'string', 'description': 'The title of the YouTube video.'}, |
| | 'image_path': {'type': 'string', 'description': 'Path to the image to include in the document.', 'nullable': True}, |
| | 'existing_docx_path': {'type': 'string', 'description': 'Path to an existing DOCX file to update.', 'nullable': True} |
| | } |
| | output_type = "string" |
| |
|
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| | self.docx_folder = "Documents" |
| | if not os.path.exists(self.docx_folder): |
| | os.makedirs(self.docx_folder) |
| |
|
| | def forward(self, transcript: str, summary: str, video_title: str, image_path: Optional[str] = None, existing_docx_path: Optional[str] = None) -> str: |
| | try: |
| | |
| | if existing_docx_path and os.path.exists(existing_docx_path): |
| | doc = docx.Document(existing_docx_path) |
| | |
| | doc.add_paragraph().add_run().add_break(docx.enum.text.WD_BREAK.PAGE) |
| | else: |
| | doc = docx.Document() |
| | |
| | doc.core_properties.title = f"YouTube Transcript: {video_title}" |
| | doc.core_properties.author = "YouTube Transcript Tool" |
| |
|
| | |
| | title = doc.add_heading(video_title, level=1) |
| | title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER |
| |
|
| | |
| | doc.add_heading("Summary", level=2) |
| | summary_para = doc.add_paragraph(summary) |
| |
|
| | |
| | if image_path and os.path.exists(image_path): |
| | try: |
| | doc.add_picture(image_path, width=Inches(6)) |
| | |
| | caption = doc.add_paragraph("Generated image based on transcript content") |
| | caption.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER |
| | caption.runs[0].italic = True |
| | except Exception as img_error: |
| | |
| | print(f"Error adding image to document: {str(img_error)}") |
| |
|
| | |
| | doc.add_heading("Full Transcript", level=2) |
| | transcript_para = doc.add_paragraph(transcript) |
| |
|
| | |
| | safe_title = ''.join(c for c in video_title if c.isalnum() or c in ' _-') |
| | safe_title = safe_title.replace(' ', '_') |
| |
|
| | |
| | output_filename = f"{safe_title}.docx" |
| | output_path = os.path.join(self.docx_folder, output_filename) |
| |
|
| | try: |
| | doc.save(output_path) |
| | print(f"Document saved successfully at: {output_path}") |
| | return output_path |
| | except Exception as save_error: |
| | error_msg = f"Error saving document: {str(save_error)}" |
| | print(error_msg) |
| | |
| | try: |
| | fallback_path = os.path.join(self.docx_folder, f"youtube_transcript_{int(time.time())}.docx") |
| | doc.save(fallback_path) |
| | print(f"Document saved with fallback name at: {fallback_path}") |
| | return fallback_path |
| | except: |
| | return error_msg |
| | except Exception as e: |
| | return f"An error occurred while creating the DOCX file: {str(e)}" |
| |
|