| | """ |
| | PlotWeaver Audiobook Generator |
| | English β Hausa Translation + TTS with Timestamps + Emotions |
| | |
| | Optimized for fast startup on HuggingFace Spaces. |
| | """ |
| |
|
| | import gradio as gr |
| | import torch |
| | import numpy as np |
| | import tempfile |
| | import re |
| | from pathlib import Path |
| | from datetime import timedelta |
| | from typing import List, Tuple, Dict |
| |
|
| | |
| | import fitz |
| | from docx import Document |
| |
|
| | import scipy.io.wavfile as wavfile |
| | from scipy import signal |
| |
|
| | |
| | |
| | |
| | NLLB_MODEL = "facebook/nllb-200-distilled-600M" |
| | TTS_MODEL = "facebook/mms-tts-hau" |
| | SRC_LANG = "eng_Latn" |
| | TGT_LANG = "hau_Latn" |
| | SAMPLE_RATE = 16000 |
| | MAX_CHUNK_LENGTH = 200 |
| |
|
| | |
| | EMOTION_SETTINGS = { |
| | "joy": {"pitch": 1.15, "speed": 1.10, "energy": 1.2, "emoji": "π"}, |
| | "sadness": {"pitch": 0.90, "speed": 0.85, "energy": 0.8, "emoji": "π’"}, |
| | "anger": {"pitch": 1.10, "speed": 1.15, "energy": 1.4, "emoji": "π "}, |
| | "fear": {"pitch": 1.20, "speed": 1.20, "energy": 1.1, "emoji": "π¨"}, |
| | "surprise": {"pitch": 1.25, "speed": 1.05, "energy": 1.3, "emoji": "π²"}, |
| | "neutral": {"pitch": 1.00, "speed": 1.00, "energy": 1.0, "emoji": "π"}, |
| | } |
| |
|
| | |
| | EMOTION_KEYWORDS = { |
| | "joy": ["happy", "joy", "excited", "wonderful", "great", "love", "beautiful", "amazing", "fantastic", "delighted", "pleased", "glad", "cheerful", "celebrate", "laugh", "smile"], |
| | "sadness": ["sad", "sorry", "unfortunately", "loss", "grief", "tears", "cry", "mourn", "depressed", "heartbroken", "tragic", "miserable", "lonely", "pain", "suffer"], |
| | "anger": ["angry", "furious", "outraged", "hate", "frustrat", "annoyed", "mad", "rage", "hostile", "bitter", "resent", "irritat", "violent", "fight", "attack"], |
| | "fear": ["afraid", "fear", "scared", "terrified", "worried", "anxious", "panic", "horror", "dread", "nervous", "frighten", "danger", "threat", "alarm"], |
| | "surprise": ["surprised", "amazed", "astonished", "shocked", "unexpected", "wow", "incredible", "unbelievable", "sudden", "remarkable", "stunning"], |
| | } |
| |
|
| | |
| | _models = {} |
| |
|
| | |
| | |
| | |
| | def extract_text_from_pdf(file_path: str) -> str: |
| | """Extract text from PDF.""" |
| | doc = fitz.open(file_path) |
| | text = "" |
| | for page in doc: |
| | text += page.get_text() + "\n" |
| | doc.close() |
| | return text.strip() |
| |
|
| | def extract_text_from_docx(file_path: str) -> str: |
| | """Extract text from DOCX with multiple fallback methods.""" |
| | import zipfile |
| | import xml.etree.ElementTree as ET |
| | |
| | |
| | try: |
| | with zipfile.ZipFile(file_path, 'r') as z: |
| | if 'word/document.xml' in z.namelist(): |
| | xml_content = z.read('word/document.xml') |
| | tree = ET.fromstring(xml_content) |
| | |
| | |
| | texts = [] |
| | for elem in tree.iter(): |
| | if elem.tag.endswith('}t') or elem.tag == 't': |
| | if elem.text: |
| | texts.append(elem.text) |
| | |
| | text = ''.join(texts) |
| | if text.strip(): |
| | return text |
| | except Exception as e: |
| | print(f"XML extraction failed: {e}") |
| | |
| | |
| | try: |
| | doc = Document(file_path) |
| | text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) |
| | if text.strip(): |
| | return text |
| | except Exception as e: |
| | print(f"python-docx failed: {e}") |
| | |
| | |
| | try: |
| | doc = fitz.open(file_path) |
| | text = "" |
| | for page in doc: |
| | text += page.get_text() + "\n" |
| | doc.close() |
| | if text.strip(): |
| | return text.strip() |
| | except Exception as e: |
| | print(f"PyMuPDF failed: {e}") |
| | |
| | raise ValueError("Could not extract text from this DOCX file. Please convert to PDF or TXT.") |
| |
|
| | def extract_text_from_doc(file_path: str) -> str: |
| | """Extract text from old .doc format using PyMuPDF.""" |
| | |
| | try: |
| | doc = fitz.open(file_path) |
| | text = "" |
| | for page in doc: |
| | text += page.get_text() + "\n" |
| | doc.close() |
| | if text.strip(): |
| | return text.strip() |
| | except Exception as e: |
| | print(f"PyMuPDF .doc failed: {e}") |
| | |
| | |
| | try: |
| | import olefile |
| | ole = olefile.OleFileIO(file_path) |
| | |
| | |
| | if ole.exists('WordDocument'): |
| | |
| | stream = ole.openstream('WordDocument') |
| | data = stream.read() |
| | |
| | |
| | text_parts = [] |
| | current_text = [] |
| | for byte in data: |
| | if 32 <= byte < 127: |
| | current_text.append(chr(byte)) |
| | elif current_text: |
| | text_parts.append(''.join(current_text)) |
| | current_text = [] |
| | if current_text: |
| | text_parts.append(''.join(current_text)) |
| | |
| | text = ' '.join([t for t in text_parts if len(t) > 3]) |
| | ole.close() |
| | |
| | if text.strip(): |
| | return text.strip() |
| | except ImportError: |
| | print("olefile not installed") |
| | except Exception as e: |
| | print(f"olefile failed: {e}") |
| | |
| | raise ValueError("Cannot read this .doc file. Please convert to .docx, .pdf, or .txt format.\n\nTip: Open in Microsoft Word or LibreOffice and 'Save As' a different format.") |
| |
|
| | def extract_text(file_path: str) -> str: |
| | """Extract text from uploaded file.""" |
| | ext = Path(file_path).suffix.lower() |
| | |
| | if ext == ".pdf": |
| | return extract_text_from_pdf(file_path) |
| | elif ext == ".docx": |
| | return extract_text_from_docx(file_path) |
| | elif ext == ".doc": |
| | return extract_text_from_doc(file_path) |
| | elif ext == ".txt": |
| | with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| | return f.read() |
| | else: |
| | raise ValueError(f"Unsupported format: {ext}. Please use PDF, DOCX, DOC, or TXT.") |
| |
|
| | |
| | |
| | |
| | def get_translation_model(): |
| | """Load translation model only when needed.""" |
| | if "nllb" not in _models: |
| | from transformers import AutoModelForSeq2SeqLM, AutoTokenizer |
| | |
| | print("π₯ Loading NLLB-200...") |
| | tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL, src_lang=SRC_LANG) |
| | model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL, torch_dtype=torch.float16) |
| | |
| | if torch.cuda.is_available(): |
| | model = model.cuda() |
| | model.eval() |
| | |
| | _models["nllb"] = (model, tokenizer) |
| | print("β
NLLB-200 loaded") |
| | |
| | return _models["nllb"] |
| |
|
| | def get_tts_model(): |
| | """Load TTS model only when needed.""" |
| | if "tts" not in _models: |
| | from transformers import VitsModel, AutoTokenizer |
| | |
| | print("π₯ Loading MMS-TTS Hausa...") |
| | model = VitsModel.from_pretrained(TTS_MODEL) |
| | tokenizer = AutoTokenizer.from_pretrained(TTS_MODEL) |
| | |
| | if torch.cuda.is_available(): |
| | model = model.cuda() |
| | model.eval() |
| | |
| | _models["tts"] = (model, tokenizer) |
| | print("β
MMS-TTS loaded") |
| | |
| | return _models["tts"] |
| |
|
| | |
| | |
| | |
| | def detect_emotion(text: str) -> str: |
| | """Detect emotion from English text using keyword matching.""" |
| | text_lower = text.lower() |
| | |
| | emotion_scores = {emotion: 0 for emotion in EMOTION_KEYWORDS} |
| | |
| | for emotion, keywords in EMOTION_KEYWORDS.items(): |
| | for keyword in keywords: |
| | if keyword in text_lower: |
| | emotion_scores[emotion] += 1 |
| | |
| | |
| | if text.count('!') >= 2: |
| | emotion_scores["joy"] += 1 |
| | emotion_scores["surprise"] += 1 |
| | if text.count('?') >= 2: |
| | emotion_scores["surprise"] += 1 |
| | if text.isupper() and len(text) > 10: |
| | emotion_scores["anger"] += 1 |
| | |
| | |
| | max_emotion = max(emotion_scores, key=emotion_scores.get) |
| | |
| | if emotion_scores[max_emotion] > 0: |
| | return max_emotion |
| | return "neutral" |
| |
|
| | |
| | |
| | |
| | def apply_emotion_to_audio(audio: np.ndarray, emotion: str, sample_rate: int = SAMPLE_RATE) -> np.ndarray: |
| | """Apply emotion effects to audio (pitch, speed, energy).""" |
| | settings = EMOTION_SETTINGS.get(emotion, EMOTION_SETTINGS["neutral"]) |
| | |
| | |
| | if emotion == "neutral": |
| | return audio |
| | |
| | |
| | pitch_factor = settings["pitch"] |
| | if pitch_factor != 1.0: |
| | |
| | new_length = int(len(audio) / pitch_factor) |
| | audio = signal.resample(audio, new_length) |
| | |
| | |
| | speed_factor = settings["speed"] |
| | if speed_factor != 1.0: |
| | new_length = int(len(audio) / speed_factor) |
| | audio = signal.resample(audio, new_length) |
| | |
| | |
| | energy_factor = settings["energy"] |
| | audio = audio * energy_factor |
| | |
| | |
| | max_val = np.max(np.abs(audio)) |
| | if max_val > 0.95: |
| | audio = audio * (0.95 / max_val) |
| | |
| | return audio |
| |
|
| | def add_pause(duration_ms: int = 300) -> np.ndarray: |
| | """Generate silence for pauses between sentences.""" |
| | num_samples = int(SAMPLE_RATE * duration_ms / 1000) |
| | return np.zeros(num_samples) |
| |
|
| | |
| | |
| | |
| | def translate_text(text: str) -> str: |
| | """Translate English to Hausa.""" |
| | model, tokenizer = get_translation_model() |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | |
| | |
| | sentences = re.split(r'(?<=[.!?])\s+', text) |
| | translated = [] |
| | |
| | tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) |
| | |
| | with torch.no_grad(): |
| | for sentence in sentences: |
| | if not sentence.strip(): |
| | continue |
| | |
| | inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) |
| | if device == "cuda": |
| | inputs = {k: v.cuda() for k, v in inputs.items()} |
| | |
| | outputs = model.generate( |
| | **inputs, |
| | forced_bos_token_id=tgt_lang_id, |
| | max_length=256, |
| | num_beams=4, |
| | ) |
| | |
| | translated.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) |
| | |
| | return " ".join(translated) |
| |
|
| | |
| | |
| | |
| | def split_text(text: str, max_len: int = MAX_CHUNK_LENGTH) -> List[str]: |
| | """Split text into TTS-friendly chunks.""" |
| | sentences = re.split(r'(?<=[.!?])\s+', text) |
| | chunks, current = [], "" |
| | |
| | for s in sentences: |
| | if len(current) + len(s) <= max_len: |
| | current += s + " " |
| | else: |
| | if current: |
| | chunks.append(current.strip()) |
| | current = s + " " |
| | if current: |
| | chunks.append(current.strip()) |
| | |
| | return chunks |
| |
|
| | def generate_audio(text: str) -> Tuple[np.ndarray, List[dict]]: |
| | """Generate audio with timestamps.""" |
| | model, tokenizer = get_tts_model() |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | |
| | chunks = split_text(text) |
| | audio_segments = [] |
| | timestamps = [] |
| | current_time = 0.0 |
| | |
| | with torch.no_grad(): |
| | for chunk in chunks: |
| | if not chunk.strip(): |
| | continue |
| | |
| | inputs = tokenizer(chunk, return_tensors="pt") |
| | if device == "cuda": |
| | inputs = {k: v.cuda() for k, v in inputs.items()} |
| | |
| | audio = model(**inputs).waveform.squeeze().cpu().numpy() |
| | audio_segments.append(audio) |
| | |
| | duration = len(audio) / SAMPLE_RATE |
| | timestamps.append({ |
| | "start": format_time(current_time), |
| | "end": format_time(current_time + duration), |
| | "text": chunk |
| | }) |
| | current_time += duration |
| | |
| | return np.concatenate(audio_segments) if audio_segments else np.zeros(SAMPLE_RATE), timestamps |
| |
|
| | def format_time(seconds: float) -> str: |
| | """Format as HH:MM:SS.mmm""" |
| | h, r = divmod(int(seconds), 3600) |
| | m, s = divmod(r, 60) |
| | ms = int((seconds % 1) * 1000) |
| | return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" |
| |
|
| | |
| | |
| | |
| | MAX_CHARS = 10000 |
| |
|
| | def process_document(file, enable_emotions=True, progress=gr.Progress()): |
| | """Main pipeline: Document β Translation β TTS with Emotions β Audiobook""" |
| | |
| | if file is None: |
| | return None, "", "", "β οΈ Please upload a document" |
| | |
| | try: |
| | |
| | progress(0.05, desc="π Extracting text...") |
| | full_text = extract_text(file.name) |
| | |
| | if not full_text or not full_text.strip(): |
| | return None, "", "", "β οΈ No text found in document" |
| | |
| | |
| | original_length = len(full_text) |
| | if original_length > MAX_CHARS: |
| | text = full_text[:MAX_CHARS] |
| | truncated_msg = f"\n\nβ οΈ Text truncated from {original_length:,} to {MAX_CHARS:,} characters for demo." |
| | else: |
| | text = full_text |
| | truncated_msg = "" |
| | |
| | |
| | sentences = re.split(r'(?<=[.!?])\s+', text) |
| | sentences = [s.strip() for s in sentences if s.strip()] |
| | total_sentences = len(sentences) |
| | |
| | |
| | progress(0.08, desc="π Analyzing emotions...") |
| | sentence_emotions = [] |
| | for sentence in sentences: |
| | emotion = detect_emotion(sentence) if enable_emotions else "neutral" |
| | sentence_emotions.append(emotion) |
| | |
| | |
| | emotion_counts = {} |
| | for e in sentence_emotions: |
| | emotion_counts[e] = emotion_counts.get(e, 0) + 1 |
| | |
| | |
| | progress(0.1, desc=f"π Translating {total_sentences} sentences...") |
| | translated_sentences = [] |
| | |
| | model, tokenizer = get_translation_model() |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) |
| | |
| | with torch.no_grad(): |
| | for i, sentence in enumerate(sentences): |
| | if not sentence.strip(): |
| | continue |
| | |
| | |
| | prog = 0.1 + (0.35 * (i / total_sentences)) |
| | emotion_emoji = EMOTION_SETTINGS[sentence_emotions[i]]["emoji"] |
| | progress(prog, desc=f"π Translating {i+1}/{total_sentences} {emotion_emoji}") |
| | |
| | inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) |
| | if device == "cuda": |
| | inputs = {k: v.cuda() for k, v in inputs.items()} |
| | |
| | outputs = model.generate( |
| | **inputs, |
| | forced_bos_token_id=tgt_lang_id, |
| | max_length=256, |
| | num_beams=4, |
| | ) |
| | |
| | translated_sentences.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) |
| | |
| | translated = " ".join(translated_sentences) |
| | |
| | |
| | progress(0.45, desc="ποΈ Generating expressive audio...") |
| | |
| | tts_model, tts_tokenizer = get_tts_model() |
| | audio_segments = [] |
| | timestamps = [] |
| | current_time = 0.0 |
| | |
| | |
| | hausa_chunks = split_text(translated) |
| | total_chunks = len(hausa_chunks) |
| | |
| | |
| | chunk_emotions = [] |
| | chunk_idx = 0 |
| | for i, emotion in enumerate(sentence_emotions): |
| | |
| | if i < len(sentences): |
| | sentence_len = len(translated_sentences[i]) if i < len(translated_sentences) else 100 |
| | chunks_per_sentence = max(1, sentence_len // MAX_CHUNK_LENGTH + 1) |
| | for _ in range(chunks_per_sentence): |
| | if chunk_idx < total_chunks: |
| | chunk_emotions.append(emotion) |
| | chunk_idx += 1 |
| | |
| | |
| | while len(chunk_emotions) < total_chunks: |
| | chunk_emotions.append("neutral") |
| | |
| | with torch.no_grad(): |
| | for i, chunk in enumerate(hausa_chunks): |
| | if not chunk.strip(): |
| | continue |
| | |
| | |
| | emotion = chunk_emotions[i] if i < len(chunk_emotions) else "neutral" |
| | emotion_emoji = EMOTION_SETTINGS[emotion]["emoji"] |
| | |
| | |
| | prog = 0.45 + (0.45 * (i / total_chunks)) |
| | progress(prog, desc=f"ποΈ Generating audio {i+1}/{total_chunks} {emotion_emoji}") |
| | |
| | inputs = tts_tokenizer(chunk, return_tensors="pt") |
| | if device == "cuda": |
| | inputs = {k: v.cuda() for k, v in inputs.items()} |
| | |
| | audio = tts_model(**inputs).waveform.squeeze().cpu().numpy() |
| | |
| | |
| | if enable_emotions and emotion != "neutral": |
| | audio = apply_emotion_to_audio(audio, emotion) |
| | |
| | audio_segments.append(audio) |
| | |
| | |
| | audio_segments.append(add_pause(200)) |
| | |
| | duration = len(audio) / SAMPLE_RATE |
| | timestamps.append({ |
| | "start": format_time(current_time), |
| | "end": format_time(current_time + duration), |
| | "text": chunk, |
| | "emotion": emotion, |
| | "emoji": emotion_emoji |
| | }) |
| | current_time += duration + 0.2 |
| | |
| | |
| | if not audio_segments: |
| | return None, "", "", "β No audio generated" |
| | |
| | full_audio = np.concatenate(audio_segments) |
| | |
| | |
| | max_val = np.max(np.abs(full_audio)) |
| | if max_val > 0: |
| | full_audio = full_audio * (0.9 / max_val) |
| | |
| | |
| | progress(0.95, desc="πΎ Saving audiobook...") |
| | with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: |
| | wavfile.write(f.name, SAMPLE_RATE, (full_audio * 32767).astype(np.int16)) |
| | audio_path = f.name |
| | |
| | |
| | timestamps_text = "\n".join([ |
| | f"[{t['start']} β {t['end']}] {t['emoji']} [{t['emotion'].upper()}] {t['text']}" |
| | for t in timestamps |
| | ]) |
| | |
| | |
| | audio_duration = len(full_audio) / SAMPLE_RATE |
| | duration_str = f"{int(audio_duration // 60)}:{int(audio_duration % 60):02d}" |
| | |
| | |
| | emotion_summary = " | ".join([ |
| | f"{EMOTION_SETTINGS[e]['emoji']} {e}: {c}" |
| | for e, c in sorted(emotion_counts.items(), key=lambda x: -x[1]) |
| | ]) |
| | |
| | transcript = f"""## Original (English) |
| | {text[:1000]}{'...' if len(text) > 1000 else ''}{truncated_msg} |
| | |
| | ## Translation (Hausa) |
| | {translated} |
| | |
| | --- |
| | π **Stats**: {len(text):,} chars β {len(translated):,} chars | π΅ Duration: {duration_str} |
| | |
| | π **Emotions detected**: {emotion_summary} |
| | """ |
| | |
| | progress(1.0, desc="β
Done!") |
| | return audio_path, transcript, timestamps_text, f"β
Audiobook generated! Duration: {duration_str} | π Emotions: {len([e for e in sentence_emotions if e != 'neutral'])} expressive segments" |
| | |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | return None, "", "", f"β Error: {str(e)}" |
| |
|
| | |
| | |
| | |
| | with gr.Blocks( |
| | title="PlotWeaver Audiobook", |
| | theme=gr.themes.Soft(primary_hue="orange"), |
| | ) as demo: |
| | |
| | gr.HTML(""" |
| | <div style="text-align: center; margin-bottom: 1rem;"> |
| | <h1>π§ PlotWeaver Audiobook Generator</h1> |
| | <p><strong>English β Hausa</strong> | Powered by NLLB-200 + MMS-TTS</p> |
| | <p style="color: #666;">β¨ Now with Emotional Expression!</p> |
| | </div> |
| | """) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | file_input = gr.File( |
| | label="π Upload Document", |
| | file_types=[".pdf", ".docx", ".doc", ".txt"], |
| | type="filepath" |
| | ) |
| | |
| | emotion_toggle = gr.Checkbox( |
| | label="π Enable Emotional Expression", |
| | value=True, |
| | info="Adds emotion to voice based on text sentiment" |
| | ) |
| | |
| | btn = gr.Button("π Generate Audiobook", variant="primary", size="lg") |
| | status = gr.Textbox(label="Status", interactive=False) |
| | |
| | gr.Markdown(""" |
| | ### How it works |
| | 1. Upload English document (PDF, DOCX, DOC, TXT) |
| | 2. AI **detects emotions** in text |
| | 3. Translates to Hausa with NLLB-200 |
| | 4. TTS generates **expressive audio** |
| | 5. Download audiobook with timestamps |
| | |
| | --- |
| | ### π Emotions Detected |
| | - π **Joy** - Higher pitch, faster pace |
| | - π’ **Sadness** - Lower pitch, slower pace |
| | - π **Anger** - Intense, louder |
| | - π¨ **Fear** - Faster, higher pitch |
| | - π² **Surprise** - Excited tone |
| | - π **Neutral** - Normal speech |
| | |
| | --- |
| | β±οΈ **Processing**: ~1-2 min per page |
| | """) |
| | |
| | with gr.Column(scale=2): |
| | audio_out = gr.Audio(label="π§ Hausa Audiobook (with Emotions)") |
| | with gr.Tabs(): |
| | with gr.Tab("π Transcript"): |
| | transcript = gr.Markdown() |
| | with gr.Tab("β±οΈ Timestamps + Emotions"): |
| | timestamps = gr.Textbox(lines=12, interactive=False) |
| | |
| | gr.HTML("""<div style="text-align: center; padding: 1rem; background: #f8f9fa; border-radius: 8px; margin-top: 1rem;"> |
| | <strong>PlotWeaver</strong> - AI for African Languages | π Expressive Audiobooks |
| | </div>""") |
| | |
| | btn.click( |
| | process_document, |
| | [file_input, emotion_toggle], |
| | [audio_out, transcript, timestamps, status] |
| | ) |
| |
|
| | |
| | |
| | |
| | if __name__ == "__main__": |
| | demo.launch() |