Spaces:
Sleeping
Sleeping
| from functools import wraps | |
| import logging | |
| import gradio as gr | |
| import os | |
| import modal | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import re | |
| import time | |
| import uuid | |
| import yt_dlp | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| load_dotenv() | |
| process_media_remotely = modal.Function.from_name("clipscript-processing-service", "process_media") | |
| asr_handle = modal.Cls.from_name("clipscript-asr-service", "ASR") | |
| upload_volume = modal.Volume.from_name("clipscript-uploads", create_if_missing=True) | |
| llm = "deepseek/deepseek-r1-0528:free" | |
| api_key = os.environ.get("OPENROUTER_API_KEY") | |
| def retry_on_rate_limit(max_retries: int = 5, base_delay: float = 2.0): | |
| """Decorator for exponential backoff on rate limits""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| delay = base_delay | |
| for attempt in range(max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| # Check for 429 status code in different ways | |
| status_code = getattr(getattr(e, 'response', None), 'status_code', None) | |
| if status_code == 429 or '429' in str(e) or 'rate limit' in str(e).lower(): | |
| logging.warning(f"Rate limit hit. Retrying in {delay:.1f} seconds...") | |
| time.sleep(delay) | |
| delay *= 2 | |
| else: | |
| raise | |
| raise Exception("Max retries exceeded due to rate limits or other persistent errors.") | |
| return wrapper | |
| return decorator | |
| def extract_youtube_video_id(url: str) -> str: | |
| """Extract YouTube video ID from various YouTube URL formats.""" | |
| patterns = [ | |
| r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/)([^&\n?#]+)', | |
| r'youtube\.com\/watch\?.*v=([^&\n?#]+)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def get_youtube_thumbnail_url(video_id: str) -> str: | |
| """Get the high quality thumbnail URL for a YouTube video.""" | |
| return f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg" | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=api_key, | |
| ) | |
| def download_and_convert_youtube_audio(url: str) -> str: | |
| """ | |
| Downloads audio from a YouTube URL and converts it to a 16kHz mono WAV file. | |
| Uses a temporary directory for all intermediate files, ensuring cleanup. | |
| Returns the path to the final temporary WAV file. | |
| """ | |
| temp_dir = tempfile.mkdtemp() | |
| cookie_file_path = None | |
| try: | |
| # Check for YouTube cookies in secrets and write to a temporary file | |
| youtube_cookies = os.environ.get("YOUTUBE_COOKIES") | |
| if youtube_cookies: | |
| # Use NamedTemporaryFile to handle the file creation and cleanup | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_cookie_file: | |
| tmp_cookie_file.write(youtube_cookies) | |
| cookie_file_path = tmp_cookie_file.name | |
| print("Using YouTube cookies from secrets.") | |
| output_tmpl = os.path.join(temp_dir, "audio.%(ext)s") | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": output_tmpl, | |
| "postprocessors": [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'wav', | |
| }], | |
| 'postprocessor_args': { | |
| 'extractaudio': ['-ar', '16000', '-ac', '1'] | |
| }, | |
| "quiet": True, | |
| } | |
| # Add cookiefile to options if it exists | |
| if cookie_file_path: | |
| ydl_opts['cookiefile'] = cookie_file_path | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Find the downloaded .wav file | |
| downloaded_files = list(Path(temp_dir).glob("*.wav")) | |
| if not downloaded_files: | |
| raise FileNotFoundError("yt-dlp failed to create a WAV file. The video might be protected or unavailable.") | |
| # Move the final file to a new temporary location so we can clean up the directory | |
| source_path = downloaded_files[0] | |
| fd, dest_path = tempfile.mkstemp(suffix=".wav") | |
| os.close(fd) | |
| shutil.move(source_path, dest_path) | |
| return dest_path | |
| finally: | |
| # Clean up the cookie file if it was created | |
| if cookie_file_path and os.path.exists(cookie_file_path): | |
| os.remove(cookie_file_path) | |
| shutil.rmtree(temp_dir) | |
| def handle_transcription(file, url): | |
| if not file and not (url and url.strip()): | |
| gr.Warning("Please upload a file or enter a URL.") | |
| return "Error: Please upload a file or enter a URL." | |
| gr.Info("Starting secure transcription... This might take a moment.") | |
| try: | |
| result = None | |
| if url and url.strip(): | |
| video_id = extract_youtube_video_id(url) | |
| if video_id: | |
| converted_wav_path = None | |
| try: | |
| print(f"Detected YouTube URL. Processing locally: {url}") | |
| converted_wav_path = download_and_convert_youtube_audio(url) | |
| # Read audio bytes and call ASR service | |
| with open(converted_wav_path, "rb") as f: | |
| audio_bytes = f.read() | |
| print("Sending audio bytes to ASR service.") | |
| result = asr_handle().transcribe.remote(audio_bytes=audio_bytes) | |
| finally: | |
| # Clean up the final temp file | |
| if converted_wav_path and os.path.exists(converted_wav_path): | |
| os.remove(converted_wav_path) | |
| else: | |
| # Process other URLs remotely and securely. | |
| print(f"Sending URL to Modal for processing: {url}") | |
| result = process_media_remotely.remote(url=url) | |
| elif file is not None: | |
| # For file uploads: | |
| # 1. Generate a unique ID for the file. | |
| upload_id = f"upload-{uuid.uuid4()}" | |
| print(f"Uploading file to Modal volume with ID: {upload_id}") | |
| # 2. Upload the local file to the remote volume | |
| with upload_volume.batch_upload() as batch: | |
| batch.put_file(file, upload_id) | |
| # 3. Trigger remote processing by passing the upload ID. | |
| print(f"Sending upload ID to Modal for processing: {upload_id}") | |
| result = process_media_remotely.remote(upload_id=upload_id) | |
| if result.get("error"): | |
| return f"Error from ASR service: {result['error']}" | |
| return result["text"] | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| # It's good practice to remove the local temp file if it exists | |
| if file and os.path.exists(file): | |
| os.remove(file) | |
| return f"Error: {str(e)}" | |
| finally: | |
| # Gradio's gr.File widget creates a temporary file. We should clean it up. | |
| if file and os.path.exists(file): | |
| os.remove(file) | |
| def add_transcript_to_chat(transcript: str): | |
| if transcript.startswith("Error"): | |
| gr.Error("Transcription failed. Please check the logs.") | |
| return [] | |
| gr.Info("Transcript ready! Generating blog post...") | |
| # Return empty list for display but store transcript for LLM processing | |
| return [] | |
| def user_chat(user_message: str, history: list): | |
| return "", history + [{"role": "user", "content": user_message}] | |
| def _stream_chat_response(history: list, system_prompt: str, transcript: str = None): | |
| if not history and not transcript: | |
| # Don't do anything if there's no history and no transcript | |
| return | |
| if transcript and transcript.startswith("Error"): | |
| return | |
| # Include transcript as first user message if provided, but don't display it | |
| messages = [{"role": "system", "content": system_prompt}] | |
| if transcript: | |
| messages.append({"role": "user", "content": transcript}) | |
| messages.extend(history) | |
| stream = client.chat.completions.create( | |
| model=llm, | |
| messages=messages, | |
| stream=True | |
| ) | |
| history.append({"role": "assistant", "content": ""}) | |
| response_content = "" | |
| for chunk in stream: | |
| content = chunk.choices[0].delta.content | |
| if content: | |
| response_content += content | |
| history[-1]["content"] = response_content | |
| yield history | |
| def generate_blog_post(history: list, transcript: str, context: str): | |
| system_prompt = """You are an expert blog writer and editor. Your task is to transform a raw video transcription into a well-structured, engaging, and publish-ready blog post in Markdown format. | |
| Core Mandate: Erase the Video Origin | |
| This is a critical function. The reader must not know the content came from a video. | |
| Eliminate all video-specific language: Remove phrases like "in this video," "thanks for watching," "as you can see here," "welcome to the channel," etc. | |
| Scrub all platform calls-to-action: No "like and subscribe," "hit the bell icon," or "comment below." | |
| Remove sponsor messages and ads: Completely omit any sponsor mentions. | |
| Rephrase visual references: Convert "look at this screen" to a description of the information itself (e.g., "The data reveals that..."). | |
| Content & Formatting Rules: | |
| Title: Create a compelling, SEO-friendly H1 title. | |
| Structure: Use ## for main headings and ### for subheadings to create a logical flow. | |
| Readability: Use short paragraphs, bulleted/numbered lists, and bolding for key terms. | |
| Refine Prose: Convert conversational speech into clean, professional writing. | |
| Remove all filler words (um, uh, like, you know). | |
| Fix grammar and consolidate rambling sentences. | |
| Flow: Start with a strong introduction and end with a concise summary or conclusion. | |
| Your output must be a complete, polished article in Markdown.""" | |
| # Combine transcript with additional context if provided | |
| full_transcript = transcript | |
| if context and context.strip(): | |
| full_transcript = f"{transcript}\n\n--- Additional Context ---\n{context.strip()}\n\nThis is some additional context relevant to the transcription above." | |
| yield from _stream_chat_response(history, system_prompt, full_transcript) | |
| def bot_chat(history: list): | |
| system_prompt = "You are a helpful assistant that helps refine a blog post created from an audio transcript. The user will provide instructions for changes and you will return only the updated blog post." | |
| yield from _stream_chat_response(history, system_prompt) | |
| def update_thumbnail_display(url: str): | |
| """Update the thumbnail display when YouTube URL is entered.""" | |
| if not url or not url.strip(): | |
| return gr.update(visible=False, value=None) | |
| video_id = extract_youtube_video_id(url) | |
| if video_id: | |
| thumbnail_url = get_youtube_thumbnail_url(video_id) | |
| return gr.update(visible=True, value=thumbnail_url) | |
| else: | |
| return gr.update(visible=False, value=None) | |
| # Gradio Interface | |
| theme = gr.themes.Ocean() | |
| with gr.Blocks(title="ClipScript", theme=theme) as demo: | |
| gr.Markdown("# 🎬➡️📝 ClipScript: Video-to-Blog Transformer", elem_classes="hero-title") | |
| gr.Markdown("### Upload an audio file, or provide a YouTube/direct URL *of any size*.") | |
| with gr.Row(): | |
| # Column 1: File input, URL input, and thumbnail | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="Upload any audio file (Recommended)", type="filepath", height=200, file_types=["audio", ".webm", ".mp3", ".mp4", ".m4a", ".ogg", ".wav"]) | |
| with gr.Row(): | |
| with gr.Column(): | |
| url_input = gr.Textbox( | |
| label="YouTube or Direct Audio URL", | |
| placeholder="youtube.com/watch?v=... OR xyz.com/audio.mp3", | |
| scale=2 | |
| ) | |
| # YouTube thumbnail display | |
| thumbnail_display = gr.Image( | |
| label="Thumbnail", | |
| visible=False, | |
| height=100, | |
| show_download_button=False, | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Column 2: Transcript view | |
| with gr.Column(scale=2): | |
| transcript_output = gr.Textbox(label="Transcription POWERED by Modal Labs", lines=12, interactive=True, show_copy_button=True) | |
| transcribe_button = gr.Button("Blogify", variant="primary") | |
| gr.Markdown("---") | |
| # Add Context section | |
| context_input = gr.Textbox( | |
| label="Additional Context", | |
| placeholder="Enter any additional context, code, articles, or any references that relate to the video content...", | |
| lines=5, | |
| interactive=True | |
| ) | |
| chatbot = gr.Chatbot( | |
| label="Blog Post", type="messages", height=500, show_copy_all_button=True, show_copy_button=True, show_share_button=True | |
| ) | |
| chat_input = gr.Textbox( | |
| label="Your message", | |
| placeholder="Refine the blog post or ask for changes...", | |
| container=False, | |
| ) | |
| clear_button = gr.ClearButton([chat_input, chatbot]) | |
| # Event handlers to disable/enable inputs based on usage | |
| def on_file_upload(file): | |
| if file is not None: | |
| return gr.update(interactive=False), gr.update(visible=False, value=None) | |
| else: | |
| return gr.update(interactive=True), gr.update(visible=False, value=None) | |
| def on_url_change(url): | |
| if url and url.strip(): | |
| thumbnail_update = update_thumbnail_display(url) | |
| return gr.update(interactive=False), thumbnail_update | |
| else: | |
| return gr.update(interactive=True), gr.update(visible=False, value=None) | |
| file_input.change(fn=on_file_upload, inputs=file_input, outputs=[url_input, thumbnail_display]) | |
| url_input.change(fn=on_url_change, inputs=url_input, outputs=[file_input, thumbnail_display]) | |
| # Chained events for blog generation | |
| ( | |
| transcribe_button.click( | |
| fn=handle_transcription, | |
| inputs=[file_input, url_input], | |
| outputs=transcript_output, | |
| ) | |
| .then( | |
| fn=lambda: gr.update(value=None, interactive=True), | |
| outputs=file_input, | |
| queue=False, | |
| ) | |
| .then( | |
| fn=add_transcript_to_chat, | |
| inputs=transcript_output, | |
| outputs=chatbot, | |
| queue=False, | |
| ) | |
| .then(fn=generate_blog_post, inputs=[chatbot, transcript_output, context_input], outputs=chatbot) | |
| ) | |
| # Event handler for follow-up chat | |
| chat_input.submit( | |
| fn=user_chat, | |
| inputs=[chat_input, chatbot], | |
| outputs=[chat_input, chatbot], | |
| queue=False, | |
| ).then(fn=bot_chat, inputs=chatbot, outputs=chatbot) | |
| if __name__ == "__main__": | |
| demo.launch() |