""" Sahel-Voice-Lab — Internal Edition (Phase 2: Voice Output) Stack (100% non-Meta): STT : openai/whisper-large-v3-turbo LLM : Qwen/Qwen2.5-72B-Instruct (or LLM_MODEL_ID env var) TTS : MALIBA-AI/bambara-tts (Bambara) | ous-sow/fula-tts (Fula, after training) Store: HF Dataset ous-sow/sahel-agri-feedback → vocabulary.jsonl Flow: 1. User presses Push-to-Talk → records audio 2. Whisper transcribes to text 3. MemoryManager injects current vocabulary into Gemma's system prompt 4. Gemma returns structured JSON: teaching → MemoryManager.add_word_pair() → push to Hub question → answer using vocabulary conversation → natural reply 5. UI shows Gemma's reply + last 5 learned words """ from __future__ import annotations import logging import os import sys import threading from pathlib import Path logger = logging.getLogger(__name__) import gradio as gr ROOT = Path(__file__).parent sys.path.insert(0, str(ROOT)) # ── Env ─────────────────────────────────────────────────────────────────────── HF_TOKEN = os.environ.get("HF_TOKEN") FEEDBACK_REPO_ID = os.environ.get("FEEDBACK_REPO_ID", "ous-sow/sahel-agri-feedback") WHISPER_MODEL_ID = os.environ.get("WHISPER_MODEL_ID", "openai/whisper-large-v3-turbo") LLM_MODEL_ID = os.environ.get("LLM_MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") LANGUAGE_NAMES = { "bam": "Bambara", "ful": "Fula / Pular", "fr": "French", "en": "English", } # ── Singletons ──────────────────────────────────────────────────────────────── from src.memory.memory_manager import MemoryManager from src.llm.gemma_client import GemmaClient from src.tts.waxal_tts import WaxalTTSEngine from src.tts.voice_cloner import VoiceCloner from src.voice.speaker_profiles import SpeakerProfileManager from src.engine.stt_processor import ( transcribe_with_confidence, LOW_CONFIDENCE_THRESHOLD, CONFUSION_PROMPT, ) from src.engine.curiosity import CuriosityEngine _memory = MemoryManager(repo_id=FEEDBACK_REPO_ID, hf_token=HF_TOKEN) _gemma = GemmaClient(model_id=LLM_MODEL_ID, hf_token=HF_TOKEN) _tts = WaxalTTSEngine() _voice_cloner = VoiceCloner() _speaker_profiles = SpeakerProfileManager() _curiosity = CuriosityEngine(interval=5) # Whisper — loaded lazily in background _whisper_model = None _whisper_processor = None _whisper_lock = threading.Lock() _whisper_status = "not loaded" # ── Whisper loading ─────────────────────────────────────────────────────────── def _do_load_whisper() -> None: global _whisper_model, _whisper_processor, _whisper_status import torch try: from transformers.models.whisper import WhisperProcessor, WhisperForConditionalGeneration except ImportError: from transformers.models.whisper.processing_whisper import WhisperProcessor from transformers.models.whisper.modeling_whisper import WhisperForConditionalGeneration _whisper_status = "loading…" try: _whisper_processor = WhisperProcessor.from_pretrained( WHISPER_MODEL_ID, token=HF_TOKEN ) _whisper_model = WhisperForConditionalGeneration.from_pretrained( WHISPER_MODEL_ID, token=HF_TOKEN ) _whisper_model.eval() _whisper_status = f"ready ({WHISPER_MODEL_ID})" except Exception as exc: _whisper_status = f"error: {exc}" def _ensure_whisper() -> str: global _whisper_status with _whisper_lock: if _whisper_model is None and "loading" not in _whisper_status: _whisper_status = "loading…" threading.Thread(target=_do_load_whisper, daemon=True).start() return _whisper_status def _whisper_status_label() -> str: s = _ensure_whisper() if "ready" in s: return f"🟢 STT {s}" if "loading" in s: return f"🟡 STT {s}" if "error" in s: return f"🔴 STT {s}" return f"⚪ STT {s}" def _transcribe(audio_path: str, language_hint: str) -> tuple[str, float]: """ Run Whisper STT with confidence scoring. Returns (text, avg_logprob). avg_logprob < LOW_CONFIDENCE_THRESHOLD → confused. """ if _whisper_model is None: return "", 0.0 import librosa audio_np, _ = librosa.load(audio_path, sr=16_000, mono=True) # Whisper has no Bambara/Fula tokens — skip forced language for those if language_hint in ("bam", "ful"): forced_ids = None else: try: forced_ids = _whisper_processor.get_decoder_prompt_ids( language=language_hint, task="transcribe" ) except Exception: forced_ids = None with _whisper_lock: text, avg_logprob = transcribe_with_confidence( audio_np, _whisper_model, _whisper_processor, forced_ids, ) return text, avg_logprob # ── Core pipeline ───────────────────────────────────────────────────────────── def _run_llm_and_tts( transcript: str, lang_code: str, history: list, source_label: str, active_se=None, ) -> tuple: """ Shared core: Gemma → memory update → TTS → optional voice cloning. Returns: (history, recent_words_md, status_msg, audio_tuple_or_None) active_se: OpenVoice V2 tone-color SE (numpy array) to clone into, or None for the base VITS voice. """ # 1. Ask Gemma (with vocabulary context) vocab_ctx = _memory.get_vocabulary_context() llm_result = _gemma.chat(transcript, vocab_ctx) intent = llm_result.get("intent", "conversation") response = llm_result.get("response", "…") # 2. Persist teaching intent to memory if intent == "teaching": word = llm_result.get("word", transcript) lang = llm_result.get("language", lang_code) trans = llm_result.get("translation", "") trans_l = llm_result.get("translation_language", "en") if word and trans: _memory.add_word_pair(word, lang, trans, trans_l, source="user_taught") # 3. TTS → optional voice cloning audio_out = None tts_result = _tts.synthesize(response, lang_code) if tts_result is not None: audio_np, sr = tts_result if active_se is not None: cloned = _voice_cloner.convert(audio_np, sr, active_se) if cloned is not None: audio_np, sr = cloned audio_out = WaxalTTSEngine.audio_to_gradio(audio_np, sr) # 4. Update chat history history = list(history or []) history.append({"role": "user", "content": f"[{LANGUAGE_NAMES.get(lang_code, lang_code)}] {transcript}"}) history.append({"role": "assistant", "content": response}) # 5. Curiosity check — every 5 interactions, ask about a vocabulary gap curiosity_q = _curiosity.maybe_ask(_memory, _gemma) if curiosity_q: history.append({"role": "assistant", "content": f"🌱 {curiosity_q}"}) tts_status = "" if audio_out else " (TTS not available for this language yet)" status_msg = { "teaching": f"✅ Word learned and saved!{tts_status}", "question": f"💬 Answered from vocabulary.{tts_status}", "conversation": f"💬 Replied.{tts_status}", "error": "⚠️ LLM error.", }.get(intent, f"💬 Replied.{tts_status}") return history, _render_recent_words(), status_msg, audio_out def process_audio( audio_path, language_label: str, voice_mode: str, history: list, ) -> tuple: """ Full pipeline: audio → speaker ID → Whisper STT → Gemma → TTS → voice clone. Returns: (history, recent_words_md, status_msg, audio_out) """ try: if audio_path is None: return history, _render_recent_words(), "⚠️ No audio recorded.", None lang_code = _label_to_code(language_label) status = _ensure_whisper() if _whisper_model is None: return history, _render_recent_words(), f"⏳ {status} — wait a moment and try again.", None # Load audio once — used for both speaker ID and STT import librosa audio_np, _ = librosa.load(audio_path, sr=16_000, mono=True) # ── Speaker identification (Task 1) ─────────────────────────────────── uid, _ = _speaker_profiles.identify_or_create(audio_np) # Extract OpenVoice SE and update the user's profile if uid is not None: ov_se = _voice_cloner.extract_se(audio_np, 16_000) if ov_se is not None: _speaker_profiles.update_ov_embedding(uid, ov_se) # ── Select target SE based on mode (Task 3) ─────────────────────────── if voice_mode == "Individual" and uid is not None: active_se = _speaker_profiles.get_openvoice_se(uid) else: active_se = _speaker_profiles.get_collective_embedding() # ── Transcription with confidence scoring ───────────────────────────── transcript, avg_logprob = _transcribe(audio_path, lang_code) if not transcript: return history, _render_recent_words(), "⚠️ Could not transcribe audio.", None if avg_logprob < LOW_CONFIDENCE_THRESHOLD: logger.info( "Low STT confidence (avg_logprob=%.3f) — switching to confusion prompt", avg_logprob, ) transcript = CONFUSION_PROMPT return _run_llm_and_tts(transcript, lang_code, history, "voice", active_se) except Exception as exc: logger.exception("process_audio error") return history, _render_recent_words(), f"❌ Error: {exc}", None def process_text(text: str, language_label: str, voice_mode: str, history: list) -> tuple: """Text input path — Gemma → TTS → optional voice clone.""" try: if not text.strip(): return history, _render_recent_words(), "⚠️ Please type something.", None lang_code = _label_to_code(language_label) # Text has no speaker signal — use Collective in both modes as fallback active_se = _speaker_profiles.get_collective_embedding() return _run_llm_and_tts(text.strip(), lang_code, history, "text", active_se) except Exception as exc: logger.exception("process_text error") return history, _render_recent_words(), f"❌ Error: {exc}", None # ── Helpers ─────────────────────────────────────────────────────────────────── LANGUAGE_CHOICES = ["Bambara (bam)", "Fula (ful)", "French (fr)", "English (en)"] def _label_to_code(label: str) -> str: mapping = { "Bambara (bam)": "bam", "Fula (ful)": "ful", "French (fr)": "fr", "English (en)": "en", } return mapping.get(label, "bam") def _render_recent_words() -> str: recent = _memory.get_recent(5) if not recent: return "_No words learned yet. Start teaching me! Say something like: **'I ni ce means hello in Bambara'**_" lines = ["### 📖 Last 5 words learned\n"] for e in reversed(recent): lang = LANGUAGE_NAMES.get(e.get("language", "?"), e.get("language", "?")) word = e.get("word", "") tr = e.get("translation", "") tr_l = e.get("translation_language", "") lines.append(f"**{word}** `[{lang}]` → {tr} `({tr_l})`") return "\n\n".join(lines) # ── UI ──────────────────────────────────────────────────────────────────────── def build_ui() -> gr.Blocks: with gr.Blocks(title="Sahel-Voice-Lab", theme=gr.themes.Soft()) as demo: gr.Markdown( "# 🌍 Sahel-Voice-Lab — Internal Edition\n" "**Phase 1 · The Memory Loop** \n" "Teach me Bambara and Fula — I will remember every word you share." ) with gr.Row(): # ── Left column: input + voice output ──────────────────────────── with gr.Column(scale=2): def _full_status() -> str: stt = _whisper_status_label() tts = _tts.get_status() bam = "🟢" if tts["bam"] == "ready" else ("🟡" if "not" in tts["bam"] else "🔴") ful = "🟢" if tts["ful"] == "ready" else ("🟡" if "not" in tts["ful"] else "🔴") spk = _speaker_profiles.get_status() cln = "🟢 Cloner" if _voice_cloner._ready else ( "🔴 Cloner" if _voice_cloner._error else "🟡 Cloner") return f"{stt} | TTS Bambara {bam} | TTS Fula {ful}\n{spk} | {cln}" status_box = gr.Textbox( value=_full_status(), label="System status", interactive=False, max_lines=2, ) status_timer = gr.Timer(value=4) status_timer.tick(fn=_full_status, outputs=status_box) language_dd = gr.Dropdown( choices=LANGUAGE_CHOICES, value="Bambara (bam)", label="Language you are speaking", ) voice_mode_radio = gr.Radio( choices=["Individual", "Collective"], value="Individual", label="Voice Mode", info=( "Individual — respond in the voice of the last speaker detected. " "Collective — blend all known voices into one shared voice." ), ) with gr.Tab("🎙️ Push-to-Talk"): audio_input = gr.Audio( sources=["microphone"], type="filepath", label="Hold to record — release to send", ) talk_btn = gr.Button("▶ Send Recording", variant="primary", size="lg") with gr.Tab("⌨️ Type instead"): text_input = gr.Textbox( lines=3, placeholder=( "Type a message or teach me a word.\n" "Examples:\n" " 'I ni ce means hello in Bambara'\n" " 'Jam waali veut dire bonjour en Fula'\n" " 'How do you say rain in Bambara?'" ), label="Message", ) text_btn = gr.Button("▶ Send", variant="primary") action_status = gr.Textbox( label="Last action", interactive=False, max_lines=1 ) # Voice response output audio_output = gr.Audio( label="🔊 Voice response", autoplay=True, interactive=False, visible=True, ) gr.Markdown( "**Teaching tips:**\n" "- *'I ni ce means hello in Bambara'*\n" "- *'Jam waali veut dire bonjour en Fula'*\n" "- *'How do you say rain in Bambara?'*\n\n" "Every new word is saved to the Hub automatically.\n\n" "**TTS note:** Bambara voice is ready. " "Fula voice requires running `notebooks/train_fula_tts.ipynb` on Kaggle first." ) # ── Right column: memory + chat ─────────────────────────────────── with gr.Column(scale=3): recent_words = gr.Markdown(value=_render_recent_words()) gr.Markdown("---") chatbot = gr.Chatbot( label="Conversation", height=420, type="messages", bubble_full_width=False, ) clear_btn = gr.Button("🗑️ Clear conversation", size="sm", variant="secondary") # ── Wiring ──────────────────────────────────────────────────────────── history_state = gr.State([]) talk_btn.click( fn=process_audio, inputs=[audio_input, language_dd, voice_mode_radio, history_state], outputs=[history_state, recent_words, action_status, audio_output], ).then( fn=lambda h: h, inputs=[history_state], outputs=[chatbot], ) text_btn.click( fn=process_text, inputs=[text_input, language_dd, voice_mode_radio, history_state], outputs=[history_state, recent_words, action_status, audio_output], ).then( fn=lambda h: (h, ""), inputs=[history_state], outputs=[chatbot, text_input], ) text_input.submit( fn=process_text, inputs=[text_input, language_dd, voice_mode_radio, history_state], outputs=[history_state, recent_words, action_status, audio_output], ).then( fn=lambda h: (h, ""), inputs=[history_state], outputs=[chatbot, text_input], ) clear_btn.click( fn=lambda: ([], _render_recent_words(), "", None), outputs=[history_state, recent_words, action_status, audio_output], ).then(fn=lambda: [], outputs=[chatbot]) return demo # ── Entry point ─────────────────────────────────────────────────────────────── # Load vocabulary at startup (background — non-blocking for the UI) threading.Thread(target=_memory.load, daemon=True).start() # Begin loading Whisper immediately _ensure_whisper() # Preload TTS models in background _tts.preload() # Preload speaker identification (SpeechBrain ECAPA-TDNN) _speaker_profiles.preload() # Preload voice cloner (OpenVoice V2) — gracefully degrades if unavailable _voice_cloner.preload() if __name__ == "__main__": from dotenv import load_dotenv load_dotenv() HF_TOKEN = os.environ.get("HF_TOKEN") FEEDBACK_REPO_ID = os.environ.get("FEEDBACK_REPO_ID", "ous-sow/sahel-agri-feedback") WHISPER_MODEL_ID = os.environ.get("WHISPER_MODEL_ID", "openai/whisper-large-v3-turbo") LLM_MODEL_ID = os.environ.get("LLM_MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") _memory._hf_token = HF_TOKEN _memory._repo_id = FEEDBACK_REPO_ID _gemma._hf_token = HF_TOKEN print(f"STT model : {WHISPER_MODEL_ID}") print(f"LLM model : {LLM_MODEL_ID}") print(f"Store : {FEEDBACK_REPO_ID}") print(f"HF_TOKEN : {'set' if HF_TOKEN else 'NOT SET — Hub push disabled'}") print() demo = build_ui() demo.launch( server_port=7860, inbrowser=False, share=False, show_api=False, ssr_mode=False, )