Spaces:
Running
Running
| """ | |
| Sahel-Voice-Lab — Internal Edition (Phase 2: Voice Output) | |
| Stack (100% non-Meta): | |
| STT : openai/whisper-large-v3-turbo | |
| LLM : Qwen/Qwen2.5-72B-Instruct (or LLM_MODEL_ID env var) | |
| TTS : MALIBA-AI/bambara-tts (Bambara) | ous-sow/fula-tts (Fula, after training) | |
| Store: HF Dataset ous-sow/sahel-agri-feedback → vocabulary.jsonl | |
| Flow: | |
| 1. User presses Push-to-Talk → records audio | |
| 2. Whisper transcribes to text | |
| 3. MemoryManager injects current vocabulary into Gemma's system prompt | |
| 4. Gemma returns structured JSON: | |
| teaching → MemoryManager.add_word_pair() → push to Hub | |
| question → answer using vocabulary | |
| conversation → natural reply | |
| 5. UI shows Gemma's reply + last 5 learned words | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import sys | |
| import threading | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| import gradio as gr | |
| ROOT = Path(__file__).parent | |
| sys.path.insert(0, str(ROOT)) | |
| # ── Env ─────────────────────────────────────────────────────────────────────── | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| FEEDBACK_REPO_ID = os.environ.get("FEEDBACK_REPO_ID", "ous-sow/sahel-agri-feedback") | |
| WHISPER_MODEL_ID = os.environ.get("WHISPER_MODEL_ID", "openai/whisper-large-v3-turbo") | |
| LLM_MODEL_ID = os.environ.get("LLM_MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") | |
| LANGUAGE_NAMES = { | |
| "bam": "Bambara", | |
| "ful": "Fula / Pular", | |
| "fr": "French", | |
| "en": "English", | |
| } | |
| # ── Singletons ──────────────────────────────────────────────────────────────── | |
| from src.memory.memory_manager import MemoryManager | |
| from src.llm.gemma_client import GemmaClient | |
| from src.tts.waxal_tts import WaxalTTSEngine | |
| from src.tts.voice_cloner import VoiceCloner | |
| from src.voice.speaker_profiles import SpeakerProfileManager | |
| from src.engine.stt_processor import ( | |
| transcribe_with_confidence, | |
| LOW_CONFIDENCE_THRESHOLD, | |
| CONFUSION_PROMPT, | |
| ) | |
| from src.engine.curiosity import CuriosityEngine | |
| _memory = MemoryManager(repo_id=FEEDBACK_REPO_ID, hf_token=HF_TOKEN) | |
| _gemma = GemmaClient(model_id=LLM_MODEL_ID, hf_token=HF_TOKEN) | |
| _tts = WaxalTTSEngine() | |
| _voice_cloner = VoiceCloner() | |
| _speaker_profiles = SpeakerProfileManager() | |
| _curiosity = CuriosityEngine(interval=5) | |
| # Whisper — loaded lazily in background | |
| _whisper_model = None | |
| _whisper_processor = None | |
| _whisper_lock = threading.Lock() | |
| _whisper_status = "not loaded" | |
| # ── Whisper loading ─────────────────────────────────────────────────────────── | |
| def _do_load_whisper() -> None: | |
| global _whisper_model, _whisper_processor, _whisper_status | |
| import torch | |
| try: | |
| from transformers.models.whisper import WhisperProcessor, WhisperForConditionalGeneration | |
| except ImportError: | |
| from transformers.models.whisper.processing_whisper import WhisperProcessor | |
| from transformers.models.whisper.modeling_whisper import WhisperForConditionalGeneration | |
| _whisper_status = "loading…" | |
| try: | |
| _whisper_processor = WhisperProcessor.from_pretrained( | |
| WHISPER_MODEL_ID, token=HF_TOKEN | |
| ) | |
| _whisper_model = WhisperForConditionalGeneration.from_pretrained( | |
| WHISPER_MODEL_ID, token=HF_TOKEN | |
| ) | |
| _whisper_model.eval() | |
| _whisper_status = f"ready ({WHISPER_MODEL_ID})" | |
| except Exception as exc: | |
| _whisper_status = f"error: {exc}" | |
| def _ensure_whisper() -> str: | |
| global _whisper_status | |
| with _whisper_lock: | |
| if _whisper_model is None and "loading" not in _whisper_status: | |
| _whisper_status = "loading…" | |
| threading.Thread(target=_do_load_whisper, daemon=True).start() | |
| return _whisper_status | |
| def _whisper_status_label() -> str: | |
| s = _ensure_whisper() | |
| if "ready" in s: return f"🟢 STT {s}" | |
| if "loading" in s: return f"🟡 STT {s}" | |
| if "error" in s: return f"🔴 STT {s}" | |
| return f"⚪ STT {s}" | |
| def _transcribe(audio_path: str, language_hint: str) -> tuple[str, float]: | |
| """ | |
| Run Whisper STT with confidence scoring. | |
| Returns (text, avg_logprob). avg_logprob < LOW_CONFIDENCE_THRESHOLD → confused. | |
| """ | |
| if _whisper_model is None: | |
| return "", 0.0 | |
| import librosa | |
| audio_np, _ = librosa.load(audio_path, sr=16_000, mono=True) | |
| # Whisper has no Bambara/Fula tokens — skip forced language for those | |
| if language_hint in ("bam", "ful"): | |
| forced_ids = None | |
| else: | |
| try: | |
| forced_ids = _whisper_processor.get_decoder_prompt_ids( | |
| language=language_hint, task="transcribe" | |
| ) | |
| except Exception: | |
| forced_ids = None | |
| with _whisper_lock: | |
| text, avg_logprob = transcribe_with_confidence( | |
| audio_np, | |
| _whisper_model, | |
| _whisper_processor, | |
| forced_ids, | |
| ) | |
| return text, avg_logprob | |
| # ── Core pipeline ───────────────────────────────────────────────────────────── | |
| def _run_llm_and_tts( | |
| transcript: str, | |
| lang_code: str, | |
| history: list, | |
| source_label: str, | |
| active_se=None, | |
| ) -> tuple: | |
| """ | |
| Shared core: Gemma → memory update → TTS → optional voice cloning. | |
| Returns: (history, recent_words_md, status_msg, audio_tuple_or_None) | |
| active_se: OpenVoice V2 tone-color SE (numpy array) to clone into, or None | |
| for the base VITS voice. | |
| """ | |
| # 1. Ask Gemma (with vocabulary context) | |
| vocab_ctx = _memory.get_vocabulary_context() | |
| llm_result = _gemma.chat(transcript, vocab_ctx) | |
| intent = llm_result.get("intent", "conversation") | |
| response = llm_result.get("response", "…") | |
| # 2. Persist teaching intent to memory | |
| if intent == "teaching": | |
| word = llm_result.get("word", transcript) | |
| lang = llm_result.get("language", lang_code) | |
| trans = llm_result.get("translation", "") | |
| trans_l = llm_result.get("translation_language", "en") | |
| if word and trans: | |
| _memory.add_word_pair(word, lang, trans, trans_l, source="user_taught") | |
| # 3. TTS → optional voice cloning | |
| audio_out = None | |
| tts_result = _tts.synthesize(response, lang_code) | |
| if tts_result is not None: | |
| audio_np, sr = tts_result | |
| if active_se is not None: | |
| cloned = _voice_cloner.convert(audio_np, sr, active_se) | |
| if cloned is not None: | |
| audio_np, sr = cloned | |
| audio_out = WaxalTTSEngine.audio_to_gradio(audio_np, sr) | |
| # 4. Update chat history | |
| history = list(history or []) | |
| history.append({"role": "user", "content": f"[{LANGUAGE_NAMES.get(lang_code, lang_code)}] {transcript}"}) | |
| history.append({"role": "assistant", "content": response}) | |
| # 5. Curiosity check — every 5 interactions, ask about a vocabulary gap | |
| curiosity_q = _curiosity.maybe_ask(_memory, _gemma) | |
| if curiosity_q: | |
| history.append({"role": "assistant", "content": f"🌱 {curiosity_q}"}) | |
| tts_status = "" if audio_out else " (TTS not available for this language yet)" | |
| status_msg = { | |
| "teaching": f"✅ Word learned and saved!{tts_status}", | |
| "question": f"💬 Answered from vocabulary.{tts_status}", | |
| "conversation": f"💬 Replied.{tts_status}", | |
| "error": "⚠️ LLM error.", | |
| }.get(intent, f"💬 Replied.{tts_status}") | |
| return history, _render_recent_words(), status_msg, audio_out | |
| def process_audio( | |
| audio_path, | |
| language_label: str, | |
| voice_mode: str, | |
| history: list, | |
| ) -> tuple: | |
| """ | |
| Full pipeline: audio → speaker ID → Whisper STT → Gemma → TTS → voice clone. | |
| Returns: (history, recent_words_md, status_msg, audio_out) | |
| """ | |
| try: | |
| if audio_path is None: | |
| return history, _render_recent_words(), "⚠️ No audio recorded.", None | |
| lang_code = _label_to_code(language_label) | |
| status = _ensure_whisper() | |
| if _whisper_model is None: | |
| return history, _render_recent_words(), f"⏳ {status} — wait a moment and try again.", None | |
| # Load audio once — used for both speaker ID and STT | |
| import librosa | |
| audio_np, _ = librosa.load(audio_path, sr=16_000, mono=True) | |
| # ── Speaker identification (Task 1) ─────────────────────────────────── | |
| uid, _ = _speaker_profiles.identify_or_create(audio_np) | |
| # Extract OpenVoice SE and update the user's profile | |
| if uid is not None: | |
| ov_se = _voice_cloner.extract_se(audio_np, 16_000) | |
| if ov_se is not None: | |
| _speaker_profiles.update_ov_embedding(uid, ov_se) | |
| # ── Select target SE based on mode (Task 3) ─────────────────────────── | |
| if voice_mode == "Individual" and uid is not None: | |
| active_se = _speaker_profiles.get_openvoice_se(uid) | |
| else: | |
| active_se = _speaker_profiles.get_collective_embedding() | |
| # ── Transcription with confidence scoring ───────────────────────────── | |
| transcript, avg_logprob = _transcribe(audio_path, lang_code) | |
| if not transcript: | |
| return history, _render_recent_words(), "⚠️ Could not transcribe audio.", None | |
| if avg_logprob < LOW_CONFIDENCE_THRESHOLD: | |
| logger.info( | |
| "Low STT confidence (avg_logprob=%.3f) — switching to confusion prompt", | |
| avg_logprob, | |
| ) | |
| transcript = CONFUSION_PROMPT | |
| return _run_llm_and_tts(transcript, lang_code, history, "voice", active_se) | |
| except Exception as exc: | |
| logger.exception("process_audio error") | |
| return history, _render_recent_words(), f"❌ Error: {exc}", None | |
| def process_text(text: str, language_label: str, voice_mode: str, history: list) -> tuple: | |
| """Text input path — Gemma → TTS → optional voice clone.""" | |
| try: | |
| if not text.strip(): | |
| return history, _render_recent_words(), "⚠️ Please type something.", None | |
| lang_code = _label_to_code(language_label) | |
| # Text has no speaker signal — use Collective in both modes as fallback | |
| active_se = _speaker_profiles.get_collective_embedding() | |
| return _run_llm_and_tts(text.strip(), lang_code, history, "text", active_se) | |
| except Exception as exc: | |
| logger.exception("process_text error") | |
| return history, _render_recent_words(), f"❌ Error: {exc}", None | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| LANGUAGE_CHOICES = ["Bambara (bam)", "Fula (ful)", "French (fr)", "English (en)"] | |
| def _label_to_code(label: str) -> str: | |
| mapping = { | |
| "Bambara (bam)": "bam", | |
| "Fula (ful)": "ful", | |
| "French (fr)": "fr", | |
| "English (en)": "en", | |
| } | |
| return mapping.get(label, "bam") | |
| def _render_recent_words() -> str: | |
| recent = _memory.get_recent(5) | |
| if not recent: | |
| return "_No words learned yet. Start teaching me! Say something like: **'I ni ce means hello in Bambara'**_" | |
| lines = ["### 📖 Last 5 words learned\n"] | |
| for e in reversed(recent): | |
| lang = LANGUAGE_NAMES.get(e.get("language", "?"), e.get("language", "?")) | |
| word = e.get("word", "") | |
| tr = e.get("translation", "") | |
| tr_l = e.get("translation_language", "") | |
| lines.append(f"**{word}** `[{lang}]` → {tr} `({tr_l})`") | |
| return "\n\n".join(lines) | |
| # ── UI ──────────────────────────────────────────────────────────────────────── | |
| def build_ui() -> gr.Blocks: | |
| with gr.Blocks(title="Sahel-Voice-Lab", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| "# 🌍 Sahel-Voice-Lab — Internal Edition\n" | |
| "**Phase 1 · The Memory Loop** \n" | |
| "Teach me Bambara and Fula — I will remember every word you share." | |
| ) | |
| with gr.Row(): | |
| # ── Left column: input + voice output ──────────────────────────── | |
| with gr.Column(scale=2): | |
| def _full_status() -> str: | |
| stt = _whisper_status_label() | |
| tts = _tts.get_status() | |
| bam = "🟢" if tts["bam"] == "ready" else ("🟡" if "not" in tts["bam"] else "🔴") | |
| ful = "🟢" if tts["ful"] == "ready" else ("🟡" if "not" in tts["ful"] else "🔴") | |
| spk = _speaker_profiles.get_status() | |
| cln = "🟢 Cloner" if _voice_cloner._ready else ( | |
| "🔴 Cloner" if _voice_cloner._error else "🟡 Cloner") | |
| return f"{stt} | TTS Bambara {bam} | TTS Fula {ful}\n{spk} | {cln}" | |
| status_box = gr.Textbox( | |
| value=_full_status(), | |
| label="System status", | |
| interactive=False, | |
| max_lines=2, | |
| ) | |
| status_timer = gr.Timer(value=4) | |
| status_timer.tick(fn=_full_status, outputs=status_box) | |
| language_dd = gr.Dropdown( | |
| choices=LANGUAGE_CHOICES, | |
| value="Bambara (bam)", | |
| label="Language you are speaking", | |
| ) | |
| voice_mode_radio = gr.Radio( | |
| choices=["Individual", "Collective"], | |
| value="Individual", | |
| label="Voice Mode", | |
| info=( | |
| "Individual — respond in the voice of the last speaker detected. " | |
| "Collective — blend all known voices into one shared voice." | |
| ), | |
| ) | |
| with gr.Tab("🎙️ Push-to-Talk"): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Hold to record — release to send", | |
| ) | |
| talk_btn = gr.Button("▶ Send Recording", variant="primary", size="lg") | |
| with gr.Tab("⌨️ Type instead"): | |
| text_input = gr.Textbox( | |
| lines=3, | |
| placeholder=( | |
| "Type a message or teach me a word.\n" | |
| "Examples:\n" | |
| " 'I ni ce means hello in Bambara'\n" | |
| " 'Jam waali veut dire bonjour en Fula'\n" | |
| " 'How do you say rain in Bambara?'" | |
| ), | |
| label="Message", | |
| ) | |
| text_btn = gr.Button("▶ Send", variant="primary") | |
| action_status = gr.Textbox( | |
| label="Last action", interactive=False, max_lines=1 | |
| ) | |
| # Voice response output | |
| audio_output = gr.Audio( | |
| label="🔊 Voice response", | |
| autoplay=True, | |
| interactive=False, | |
| visible=True, | |
| ) | |
| gr.Markdown( | |
| "**Teaching tips:**\n" | |
| "- *'I ni ce means hello in Bambara'*\n" | |
| "- *'Jam waali veut dire bonjour en Fula'*\n" | |
| "- *'How do you say rain in Bambara?'*\n\n" | |
| "Every new word is saved to the Hub automatically.\n\n" | |
| "**TTS note:** Bambara voice is ready. " | |
| "Fula voice requires running `notebooks/train_fula_tts.ipynb` on Kaggle first." | |
| ) | |
| # ── Right column: memory + chat ─────────────────────────────────── | |
| with gr.Column(scale=3): | |
| recent_words = gr.Markdown(value=_render_recent_words()) | |
| gr.Markdown("---") | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=420, | |
| type="messages", | |
| bubble_full_width=False, | |
| ) | |
| clear_btn = gr.Button("🗑️ Clear conversation", size="sm", variant="secondary") | |
| # ── Wiring ──────────────────────────────────────────────────────────── | |
| history_state = gr.State([]) | |
| talk_btn.click( | |
| fn=process_audio, | |
| inputs=[audio_input, language_dd, voice_mode_radio, history_state], | |
| outputs=[history_state, recent_words, action_status, audio_output], | |
| ).then( | |
| fn=lambda h: h, | |
| inputs=[history_state], | |
| outputs=[chatbot], | |
| ) | |
| text_btn.click( | |
| fn=process_text, | |
| inputs=[text_input, language_dd, voice_mode_radio, history_state], | |
| outputs=[history_state, recent_words, action_status, audio_output], | |
| ).then( | |
| fn=lambda h: (h, ""), | |
| inputs=[history_state], | |
| outputs=[chatbot, text_input], | |
| ) | |
| text_input.submit( | |
| fn=process_text, | |
| inputs=[text_input, language_dd, voice_mode_radio, history_state], | |
| outputs=[history_state, recent_words, action_status, audio_output], | |
| ).then( | |
| fn=lambda h: (h, ""), | |
| inputs=[history_state], | |
| outputs=[chatbot, text_input], | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ([], _render_recent_words(), "", None), | |
| outputs=[history_state, recent_words, action_status, audio_output], | |
| ).then(fn=lambda: [], outputs=[chatbot]) | |
| return demo | |
| # ── Entry point ─────────────────────────────────────────────────────────────── | |
| # Load vocabulary at startup (background — non-blocking for the UI) | |
| threading.Thread(target=_memory.load, daemon=True).start() | |
| # Begin loading Whisper immediately | |
| _ensure_whisper() | |
| # Preload TTS models in background | |
| _tts.preload() | |
| # Preload speaker identification (SpeechBrain ECAPA-TDNN) | |
| _speaker_profiles.preload() | |
| # Preload voice cloner (OpenVoice V2) — gracefully degrades if unavailable | |
| _voice_cloner.preload() | |
| if __name__ == "__main__": | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| FEEDBACK_REPO_ID = os.environ.get("FEEDBACK_REPO_ID", "ous-sow/sahel-agri-feedback") | |
| WHISPER_MODEL_ID = os.environ.get("WHISPER_MODEL_ID", "openai/whisper-large-v3-turbo") | |
| LLM_MODEL_ID = os.environ.get("LLM_MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") | |
| _memory._hf_token = HF_TOKEN | |
| _memory._repo_id = FEEDBACK_REPO_ID | |
| _gemma._hf_token = HF_TOKEN | |
| print(f"STT model : {WHISPER_MODEL_ID}") | |
| print(f"LLM model : {LLM_MODEL_ID}") | |
| print(f"Store : {FEEDBACK_REPO_ID}") | |
| print(f"HF_TOKEN : {'set' if HF_TOKEN else 'NOT SET — Hub push disabled'}") | |
| print() | |
| demo = build_ui() | |
| demo.launch( | |
| server_port=7860, | |
| inbrowser=False, | |
| share=False, | |
| show_api=False, | |
| ssr_mode=False, | |
| ) | |