| | import gradio as gr |
| | from faster_whisper import WhisperModel |
| | from transformers import AutoTokenizer, AutoModelForCausalLM |
| | import requests |
| | import time |
| | import base64 |
| | import tempfile |
| | import os |
| | import logging |
| | from datetime import datetime |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | logger.info("Loading Whisper model...") |
| | whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") |
| |
|
| | logger.info("Loading Qwen 0.5B (fastest model)...") |
| | model_name = "Qwen/Qwen2.5-0.5B-Instruct" |
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| | model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | torch_dtype=torch.float32, |
| | device_map="cpu", |
| | low_cpu_mem_usage=True |
| | ) |
| |
|
| | logger.info("All models loaded!") |
| |
|
| | def search_web_google(query, max_results=3): |
| | """Use Google Custom Search API (free tier: 100 queries/day)""" |
| | logger.info(f"[SEARCH] Query: {query}") |
| | |
| | |
| | try: |
| | |
| | url = "https://www.googleapis.com/customsearch/v1" |
| | params = { |
| | 'q': query, |
| | 'num': max_results, |
| | 'key': os.getenv('GOOGLE_API_KEY', ''), |
| | 'cx': os.getenv('GOOGLE_CX', '') |
| | } |
| | |
| | |
| | searx_url = "https://searx.be/search" |
| | searx_params = { |
| | 'q': query, |
| | 'format': 'json', |
| | 'categories': 'general', |
| | 'language': 'en' |
| | } |
| | |
| | response = requests.get(searx_url, params=searx_params, timeout=5) |
| | |
| | if response.status_code == 200: |
| | data = response.json() |
| | results = data.get('results', []) |
| | |
| | context = "" |
| | for i, result in enumerate(results[:max_results], 1): |
| | title = result.get('title', '') |
| | content = result.get('content', '') |
| | context += f"\n[Source {i}] {title}\n{content}\n" |
| | logger.info(f"[SEARCH] Result {i}: {title[:50]}...") |
| | |
| | if context: |
| | logger.info(f"[SEARCH] Success - {len(results)} results") |
| | return context.strip() |
| | |
| | logger.warning("[SEARCH] No results from Searx") |
| | return "Unable to fetch current information. Please try a different question." |
| | |
| | except Exception as e: |
| | logger.error(f"[SEARCH] Error: {str(e)}") |
| | return f"Search unavailable: {str(e)}" |
| |
|
| | def transcribe_audio_base64(audio_base64): |
| | """Transcribe audio from base64""" |
| | logger.info("[PLUELY STT] Request received") |
| | try: |
| | audio_bytes = base64.b64decode(audio_base64) |
| | logger.info(f"[PLUELY STT] Audio size: {len(audio_bytes)} bytes") |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: |
| | temp_audio.write(audio_bytes) |
| | temp_path = temp_audio.name |
| | |
| | segments, _ = whisper_model.transcribe(temp_path, language="en", beam_size=1) |
| | transcription = " ".join([seg.text for seg in segments]) |
| | os.unlink(temp_path) |
| | |
| | logger.info(f"[PLUELY STT] Success: {transcription[:50]}...") |
| | return {"text": transcription.strip()} |
| | |
| | except Exception as e: |
| | logger.error(f"[PLUELY STT] Error: {str(e)}") |
| | return {"error": str(e)} |
| |
|
| | def generate_answer(text_input): |
| | """Generate fast answer using search results""" |
| | logger.info(f"[PLUELY AI] Question: {text_input}") |
| | try: |
| | if not text_input or not text_input.strip(): |
| | return "No input provided" |
| | |
| | current_date = datetime.now().strftime("%B %d, %Y") |
| | |
| | |
| | logger.info("[PLUELY AI] Searching...") |
| | search_results = search_web_google(text_input, max_results=3) |
| | logger.info(f"[PLUELY AI] Search done ({len(search_results)} chars)") |
| | |
| | |
| | prompt = f"""Today is {current_date}. Answer based on these search results: |
| | |
| | {search_results} |
| | |
| | Question: {text_input} |
| | Answer (80-100 words):""" |
| |
|
| | logger.info("[PLUELY AI] Generating...") |
| | inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1000) |
| | |
| | with torch.no_grad(): |
| | outputs = model.generate( |
| | **inputs, |
| | max_new_tokens=120, |
| | temperature=0.3, |
| | do_sample=True, |
| | top_p=0.9, |
| | pad_token_id=tokenizer.eos_token_id |
| | ) |
| | |
| | answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip() |
| | |
| | logger.info(f"[PLUELY AI] Done ({len(answer)} chars)") |
| | return answer |
| | |
| | except Exception as e: |
| | logger.error(f"[PLUELY AI] Error: {str(e)}") |
| | return f"Error: {str(e)}" |
| |
|
| | def process_audio(audio_path, question_text): |
| | """Main pipeline""" |
| | start_time = time.time() |
| | logger.info("="*50) |
| | logger.info("[MAIN] New request") |
| | |
| | if audio_path: |
| | logger.info(f"[MAIN] Audio: {audio_path}") |
| | try: |
| | segments, _ = whisper_model.transcribe(audio_path, language="en", beam_size=1) |
| | question = " ".join([seg.text for seg in segments]) |
| | logger.info(f"[MAIN] Transcribed: {question}") |
| | except Exception as e: |
| | logger.error(f"[MAIN] Transcription failed: {str(e)}") |
| | return f"❌ Error: {str(e)}", 0.0 |
| | else: |
| | question = question_text |
| | logger.info(f"[MAIN] Text: {question}") |
| | |
| | if not question or not question.strip(): |
| | return "❌ No input", 0.0 |
| | |
| | transcription_time = time.time() - start_time |
| | |
| | |
| | search_start = time.time() |
| | search_web_google(question, max_results=3) |
| | search_time = time.time() - search_start |
| | |
| | |
| | llm_start = time.time() |
| | answer = generate_answer(question) |
| | llm_time = time.time() - llm_start |
| | |
| | total_time = time.time() - start_time |
| | time_emoji = "🟢" if total_time < 3.0 else "🟡" if total_time < 5.0 else "🔴" |
| | |
| | logger.info(f"[MAIN] Total: {total_time:.2f}s") |
| | logger.info("="*50) |
| | |
| | timing = f"\n\n{time_emoji} **Time:** Trans={transcription_time:.2f}s | Search={search_time:.2f}s | LLM={llm_time:.2f}s | **Total={total_time:.2f}s**" |
| | |
| | return answer + timing, total_time |
| |
|
| | def audio_handler(audio_path): |
| | return process_audio(audio_path, None) |
| |
|
| | def text_handler(text_input): |
| | return process_audio(None, text_input) |
| |
|
| | |
| | with gr.Blocks(title="Fast Q&A", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown(""" |
| | # ⚡ Ultra-Fast Political Q&A |
| | **Search-grounded answers** - Qwen 0.5B + Searx |
| | """) |
| | |
| | with gr.Tab("🎙️ Audio"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="Audio") |
| | audio_submit = gr.Button("🚀 Submit", variant="primary", size="lg") |
| | with gr.Column(): |
| | audio_output = gr.Textbox(label="Answer", lines=8, show_copy_button=True) |
| | audio_time = gr.Number(label="Time (s)", precision=2) |
| | |
| | audio_submit.click(fn=audio_handler, inputs=[audio_input], outputs=[audio_output, audio_time], api_name="audio_query") |
| | |
| | with gr.Tab("✍️ Text"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | text_input = gr.Textbox(label="Question", placeholder="Ask anything...", lines=3) |
| | text_submit = gr.Button("🚀 Submit", variant="primary", size="lg") |
| | with gr.Column(): |
| | text_output = gr.Textbox(label="Answer", lines=8, show_copy_button=True) |
| | text_time = gr.Number(label="Time (s)", precision=2) |
| | |
| | text_submit.click(fn=text_handler, inputs=[text_input], outputs=[text_output, text_time], api_name="text_query") |
| | |
| | gr.Examples( |
| | examples=[ |
| | ["Is internet shut down in Bareilly today?"], |
| | ["Who won 2024 US election?"], |
| | ["Current India inflation rate?"] |
| | ], |
| | inputs=text_input |
| | ) |
| | |
| | with gr.Tab("🔌 API"): |
| | gr.Markdown(""" |
| | ### Pluely Endpoints |
| | |
| | **STT:** `https://archcoder-basic-app.hf.space/call/transcribe_stt` |
| | **AI:** `https://archcoder-basic-app.hf.space/call/answer_ai` |
| | |
| | **Response Paths:** |
| | STT: `data[0].text` |
| | AI: `data[0]` |
| | """) |
| | |
| | with gr.Row(visible=False): |
| | stt_in = gr.Textbox() |
| | stt_out = gr.JSON() |
| | ai_in = gr.Textbox() |
| | ai_out = gr.Textbox() |
| | |
| | gr.Button("STT", visible=False).click(fn=transcribe_audio_base64, inputs=[stt_in], outputs=[stt_out], api_name="transcribe_stt") |
| | gr.Button("AI", visible=False).click(fn=generate_answer, inputs=[ai_in], outputs=[ai_out], api_name="answer_ai") |
| | |
| | gr.Markdown("🟢 < 3s | 🟡 3-5s | 🔴 > 5s") |
| |
|
| | if __name__ == "__main__": |
| | demo.queue(max_size=5) |
| | demo.launch() |
| |
|