""" PlotWeaver Audiobook Generator English ā Hausa Translation + TTS with Timestamps + Emotions Optimized for fast startup on HuggingFace Spaces. """ import gradio as gr import torch import numpy as np import tempfile import re from pathlib import Path from datetime import timedelta from typing import List, Tuple, Dict # Document processing import fitz # PyMuPDF from docx import Document import scipy.io.wavfile as wavfile from scipy import signal # ============================================ # CONFIGURATION # ============================================ NLLB_MODEL = "facebook/nllb-200-distilled-600M" TTS_MODEL = "facebook/mms-tts-hau" SRC_LANG = "eng_Latn" TGT_LANG = "hau_Latn" SAMPLE_RATE = 16000 MAX_CHUNK_LENGTH = 200 # Emotion settings (pitch_shift, speed_factor, energy_boost) EMOTION_SETTINGS = { "joy": {"pitch": 1.15, "speed": 1.10, "energy": 1.2, "emoji": "š"}, "sadness": {"pitch": 0.90, "speed": 0.85, "energy": 0.8, "emoji": "š¢"}, "anger": {"pitch": 1.10, "speed": 1.15, "energy": 1.4, "emoji": "š "}, "fear": {"pitch": 1.20, "speed": 1.20, "energy": 1.1, "emoji": "šØ"}, "surprise": {"pitch": 1.25, "speed": 1.05, "energy": 1.3, "emoji": "š²"}, "neutral": {"pitch": 1.00, "speed": 1.00, "energy": 1.0, "emoji": "š"}, } # Emotion keywords for detection EMOTION_KEYWORDS = { "joy": ["happy", "joy", "excited", "wonderful", "great", "love", "beautiful", "amazing", "fantastic", "delighted", "pleased", "glad", "cheerful", "celebrate", "laugh", "smile"], "sadness": ["sad", "sorry", "unfortunately", "loss", "grief", "tears", "cry", "mourn", "depressed", "heartbroken", "tragic", "miserable", "lonely", "pain", "suffer"], "anger": ["angry", "furious", "outraged", "hate", "frustrat", "annoyed", "mad", "rage", "hostile", "bitter", "resent", "irritat", "violent", "fight", "attack"], "fear": ["afraid", "fear", "scared", "terrified", "worried", "anxious", "panic", "horror", "dread", "nervous", "frighten", "danger", "threat", "alarm"], "surprise": ["surprised", "amazed", "astonished", "shocked", "unexpected", "wow", "incredible", "unbelievable", "sudden", "remarkable", "stunning"], } # Global model cache (lazy loaded) _models = {} # ============================================ # DOCUMENT EXTRACTION # ============================================ def extract_text_from_pdf(file_path: str) -> str: """Extract text from PDF.""" doc = fitz.open(file_path) text = "" for page in doc: text += page.get_text() + "\n" doc.close() return text.strip() def extract_text_from_docx(file_path: str) -> str: """Extract text from DOCX with multiple fallback methods.""" import zipfile import xml.etree.ElementTree as ET # Method 1: Direct XML extraction (most reliable) try: with zipfile.ZipFile(file_path, 'r') as z: if 'word/document.xml' in z.namelist(): xml_content = z.read('word/document.xml') tree = ET.fromstring(xml_content) # Extract all text nodes texts = [] for elem in tree.iter(): if elem.tag.endswith('}t') or elem.tag == 't': if elem.text: texts.append(elem.text) text = ''.join(texts) if text.strip(): return text except Exception as e: print(f"XML extraction failed: {e}") # Method 2: Try python-docx try: doc = Document(file_path) text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) if text.strip(): return text except Exception as e: print(f"python-docx failed: {e}") # Method 3: Use PyMuPDF (can handle some docx too) try: doc = fitz.open(file_path) text = "" for page in doc: text += page.get_text() + "\n" doc.close() if text.strip(): return text.strip() except Exception as e: print(f"PyMuPDF failed: {e}") raise ValueError("Could not extract text from this DOCX file. Please convert to PDF or TXT.") def extract_text_from_doc(file_path: str) -> str: """Extract text from old .doc format using PyMuPDF.""" # PyMuPDF can open .doc files try: doc = fitz.open(file_path) text = "" for page in doc: text += page.get_text() + "\n" doc.close() if text.strip(): return text.strip() except Exception as e: print(f"PyMuPDF .doc failed: {e}") # Fallback: Try reading with olefile for OLE-based .doc try: import olefile ole = olefile.OleFileIO(file_path) # Try to find the WordDocument stream if ole.exists('WordDocument'): # Extract text from the document stream = ole.openstream('WordDocument') data = stream.read() # Simple text extraction (decode readable ASCII/UTF-8) text_parts = [] current_text = [] for byte in data: if 32 <= byte < 127: # Printable ASCII current_text.append(chr(byte)) elif current_text: text_parts.append(''.join(current_text)) current_text = [] if current_text: text_parts.append(''.join(current_text)) text = ' '.join([t for t in text_parts if len(t) > 3]) ole.close() if text.strip(): return text.strip() except ImportError: print("olefile not installed") except Exception as e: print(f"olefile failed: {e}") raise ValueError("Cannot read this .doc file. Please convert to .docx, .pdf, or .txt format.\n\nTip: Open in Microsoft Word or LibreOffice and 'Save As' a different format.") def extract_text(file_path: str) -> str: """Extract text from uploaded file.""" ext = Path(file_path).suffix.lower() if ext == ".pdf": return extract_text_from_pdf(file_path) elif ext == ".docx": return extract_text_from_docx(file_path) elif ext == ".doc": return extract_text_from_doc(file_path) elif ext == ".txt": with open(file_path, "r", encoding="utf-8", errors="ignore") as f: return f.read() else: raise ValueError(f"Unsupported format: {ext}. Please use PDF, DOCX, DOC, or TXT.") # ============================================ # LAZY MODEL LOADING # ============================================ def get_translation_model(): """Load translation model only when needed.""" if "nllb" not in _models: from transformers import AutoModelForSeq2SeqLM, AutoTokenizer print("š„ Loading NLLB-200...") tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL, src_lang=SRC_LANG) model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL, torch_dtype=torch.float16) if torch.cuda.is_available(): model = model.cuda() model.eval() _models["nllb"] = (model, tokenizer) print("ā NLLB-200 loaded") return _models["nllb"] def get_tts_model(): """Load TTS model only when needed.""" if "tts" not in _models: from transformers import VitsModel, AutoTokenizer print("š„ Loading MMS-TTS Hausa...") model = VitsModel.from_pretrained(TTS_MODEL) tokenizer = AutoTokenizer.from_pretrained(TTS_MODEL) if torch.cuda.is_available(): model = model.cuda() model.eval() _models["tts"] = (model, tokenizer) print("ā MMS-TTS loaded") return _models["tts"] # ============================================ # EMOTION DETECTION # ============================================ def detect_emotion(text: str) -> str: """Detect emotion from English text using keyword matching.""" text_lower = text.lower() emotion_scores = {emotion: 0 for emotion in EMOTION_KEYWORDS} for emotion, keywords in EMOTION_KEYWORDS.items(): for keyword in keywords: if keyword in text_lower: emotion_scores[emotion] += 1 # Check for punctuation-based cues if text.count('!') >= 2: emotion_scores["joy"] += 1 emotion_scores["surprise"] += 1 if text.count('?') >= 2: emotion_scores["surprise"] += 1 if text.isupper() and len(text) > 10: emotion_scores["anger"] += 1 # Get highest scoring emotion max_emotion = max(emotion_scores, key=emotion_scores.get) if emotion_scores[max_emotion] > 0: return max_emotion return "neutral" # ============================================ # AUDIO EMOTION PROCESSING # ============================================ def apply_emotion_to_audio(audio: np.ndarray, emotion: str, sample_rate: int = SAMPLE_RATE) -> np.ndarray: """Apply emotion effects to audio (pitch, speed, energy).""" settings = EMOTION_SETTINGS.get(emotion, EMOTION_SETTINGS["neutral"]) # Skip processing for neutral if emotion == "neutral": return audio # 1. Pitch shift using resampling pitch_factor = settings["pitch"] if pitch_factor != 1.0: # Resample to change pitch new_length = int(len(audio) / pitch_factor) audio = signal.resample(audio, new_length) # 2. Speed adjustment (time stretch using resampling) speed_factor = settings["speed"] if speed_factor != 1.0: new_length = int(len(audio) / speed_factor) audio = signal.resample(audio, new_length) # 3. Energy/volume adjustment energy_factor = settings["energy"] audio = audio * energy_factor # Normalize to prevent clipping max_val = np.max(np.abs(audio)) if max_val > 0.95: audio = audio * (0.95 / max_val) return audio def add_pause(duration_ms: int = 300) -> np.ndarray: """Generate silence for pauses between sentences.""" num_samples = int(SAMPLE_RATE * duration_ms / 1000) return np.zeros(num_samples) # ============================================ # TRANSLATION # ============================================ def translate_text(text: str) -> str: """Translate English to Hausa.""" model, tokenizer = get_translation_model() device = "cuda" if torch.cuda.is_available() else "cpu" # Split into sentences sentences = re.split(r'(?<=[.!?])\s+', text) translated = [] tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) with torch.no_grad(): for sentence in sentences: if not sentence.strip(): continue inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) if device == "cuda": inputs = {k: v.cuda() for k, v in inputs.items()} outputs = model.generate( **inputs, forced_bos_token_id=tgt_lang_id, max_length=256, num_beams=4, ) translated.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) return " ".join(translated) # ============================================ # TEXT-TO-SPEECH # ============================================ def split_text(text: str, max_len: int = MAX_CHUNK_LENGTH) -> List[str]: """Split text into TTS-friendly chunks.""" sentences = re.split(r'(?<=[.!?])\s+', text) chunks, current = [], "" for s in sentences: if len(current) + len(s) <= max_len: current += s + " " else: if current: chunks.append(current.strip()) current = s + " " if current: chunks.append(current.strip()) return chunks def generate_audio(text: str) -> Tuple[np.ndarray, List[dict]]: """Generate audio with timestamps.""" model, tokenizer = get_tts_model() device = "cuda" if torch.cuda.is_available() else "cpu" chunks = split_text(text) audio_segments = [] timestamps = [] current_time = 0.0 with torch.no_grad(): for chunk in chunks: if not chunk.strip(): continue inputs = tokenizer(chunk, return_tensors="pt") if device == "cuda": inputs = {k: v.cuda() for k, v in inputs.items()} audio = model(**inputs).waveform.squeeze().cpu().numpy() audio_segments.append(audio) duration = len(audio) / SAMPLE_RATE timestamps.append({ "start": format_time(current_time), "end": format_time(current_time + duration), "text": chunk }) current_time += duration return np.concatenate(audio_segments) if audio_segments else np.zeros(SAMPLE_RATE), timestamps def format_time(seconds: float) -> str: """Format as HH:MM:SS.mmm""" h, r = divmod(int(seconds), 3600) m, s = divmod(r, 60) ms = int((seconds % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" # ============================================ # MAIN PIPELINE # ============================================ MAX_CHARS = 10000 # Max characters to process (increase for longer files) def process_document(file, enable_emotions=True, progress=gr.Progress()): """Main pipeline: Document ā Translation ā TTS with Emotions ā Audiobook""" if file is None: return None, "", "", "ā ļø Please upload a document" try: # Extract text progress(0.05, desc="š Extracting text...") full_text = extract_text(file.name) if not full_text or not full_text.strip(): return None, "", "", "ā ļø No text found in document" # Limit text length with warning original_length = len(full_text) if original_length > MAX_CHARS: text = full_text[:MAX_CHARS] truncated_msg = f"\n\nā ļø Text truncated from {original_length:,} to {MAX_CHARS:,} characters for demo." else: text = full_text truncated_msg = "" # Split into sentences for batch processing sentences = re.split(r'(?<=[.!?])\s+', text) sentences = [s.strip() for s in sentences if s.strip()] total_sentences = len(sentences) # Detect emotions for each sentence progress(0.08, desc="š Analyzing emotions...") sentence_emotions = [] for sentence in sentences: emotion = detect_emotion(sentence) if enable_emotions else "neutral" sentence_emotions.append(emotion) # Count emotions emotion_counts = {} for e in sentence_emotions: emotion_counts[e] = emotion_counts.get(e, 0) + 1 # Translate in batches progress(0.1, desc=f"š Translating {total_sentences} sentences...") translated_sentences = [] model, tokenizer = get_translation_model() device = "cuda" if torch.cuda.is_available() else "cpu" tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) with torch.no_grad(): for i, sentence in enumerate(sentences): if not sentence.strip(): continue # Update progress prog = 0.1 + (0.35 * (i / total_sentences)) emotion_emoji = EMOTION_SETTINGS[sentence_emotions[i]]["emoji"] progress(prog, desc=f"š Translating {i+1}/{total_sentences} {emotion_emoji}") inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) if device == "cuda": inputs = {k: v.cuda() for k, v in inputs.items()} outputs = model.generate( **inputs, forced_bos_token_id=tgt_lang_id, max_length=256, num_beams=4, ) translated_sentences.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) translated = " ".join(translated_sentences) # Generate audio with emotions progress(0.45, desc="šļø Generating expressive audio...") tts_model, tts_tokenizer = get_tts_model() audio_segments = [] timestamps = [] current_time = 0.0 # Split translated text for TTS hausa_chunks = split_text(translated) total_chunks = len(hausa_chunks) # Map chunks to emotions (approximate) chunk_emotions = [] chunk_idx = 0 for i, emotion in enumerate(sentence_emotions): # Estimate how many chunks per sentence if i < len(sentences): sentence_len = len(translated_sentences[i]) if i < len(translated_sentences) else 100 chunks_per_sentence = max(1, sentence_len // MAX_CHUNK_LENGTH + 1) for _ in range(chunks_per_sentence): if chunk_idx < total_chunks: chunk_emotions.append(emotion) chunk_idx += 1 # Fill remaining with neutral while len(chunk_emotions) < total_chunks: chunk_emotions.append("neutral") with torch.no_grad(): for i, chunk in enumerate(hausa_chunks): if not chunk.strip(): continue # Get emotion for this chunk emotion = chunk_emotions[i] if i < len(chunk_emotions) else "neutral" emotion_emoji = EMOTION_SETTINGS[emotion]["emoji"] # Update progress prog = 0.45 + (0.45 * (i / total_chunks)) progress(prog, desc=f"šļø Generating audio {i+1}/{total_chunks} {emotion_emoji}") inputs = tts_tokenizer(chunk, return_tensors="pt") if device == "cuda": inputs = {k: v.cuda() for k, v in inputs.items()} audio = tts_model(**inputs).waveform.squeeze().cpu().numpy() # Apply emotion effects if enable_emotions and emotion != "neutral": audio = apply_emotion_to_audio(audio, emotion) audio_segments.append(audio) # Add small pause between chunks audio_segments.append(add_pause(200)) duration = len(audio) / SAMPLE_RATE timestamps.append({ "start": format_time(current_time), "end": format_time(current_time + duration), "text": chunk, "emotion": emotion, "emoji": emotion_emoji }) current_time += duration + 0.2 # Include pause # Concatenate audio if not audio_segments: return None, "", "", "ā No audio generated" full_audio = np.concatenate(audio_segments) # Normalize final audio max_val = np.max(np.abs(full_audio)) if max_val > 0: full_audio = full_audio * (0.9 / max_val) # Save audio progress(0.95, desc="š¾ Saving audiobook...") with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: wavfile.write(f.name, SAMPLE_RATE, (full_audio * 32767).astype(np.int16)) audio_path = f.name # Format timestamps with emotions timestamps_text = "\n".join([ f"[{t['start']} ā {t['end']}] {t['emoji']} [{t['emotion'].upper()}] {t['text']}" for t in timestamps ]) # Calculate audio duration audio_duration = len(full_audio) / SAMPLE_RATE duration_str = f"{int(audio_duration // 60)}:{int(audio_duration % 60):02d}" # Emotion summary emotion_summary = " | ".join([ f"{EMOTION_SETTINGS[e]['emoji']} {e}: {c}" for e, c in sorted(emotion_counts.items(), key=lambda x: -x[1]) ]) transcript = f"""## Original (English) {text[:1000]}{'...' if len(text) > 1000 else ''}{truncated_msg} ## Translation (Hausa) {translated} --- š **Stats**: {len(text):,} chars ā {len(translated):,} chars | šµ Duration: {duration_str} š **Emotions detected**: {emotion_summary} """ progress(1.0, desc="ā Done!") return audio_path, transcript, timestamps_text, f"ā Audiobook generated! Duration: {duration_str} | š Emotions: {len([e for e in sentence_emotions if e != 'neutral'])} expressive segments" except Exception as e: import traceback traceback.print_exc() return None, "", "", f"ā Error: {str(e)}" # ============================================ # GRADIO INTERFACE # ============================================ with gr.Blocks( title="PlotWeaver Audiobook", theme=gr.themes.Soft(primary_hue="orange"), ) as demo: gr.HTML("""
English ā Hausa | Powered by NLLB-200 + MMS-TTS
⨠Now with Emotional Expression!