Spaces:
Running
Running
| """ | |
| Audiobook Generator - English Source to Multi-Language Audio | |
| Powered by: | |
| - Qwen3.5-Omni-Plus (preset voices, 36 languages) | |
| - Qwen3-TTS-VC (voice cloning, 10 languages) | |
| - YourVoic API (1000+ emotional voices, 93+ languages including Arabic, Swahili, Indian languages) | |
| Deploy as a Hugging Face Space: | |
| 1. Create a new Space (SDK: Gradio) | |
| 2. Upload app.py and requirements.txt | |
| 3. Add secrets: DASHSCOPE_API_KEY (required), YOURVOIC_API_KEY (optional) | |
| """ | |
| import os | |
| import base64 | |
| import json | |
| import pathlib | |
| import shutil | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import time | |
| import re | |
| import gradio as gr | |
| import requests as http_requests | |
| from openai import OpenAI | |
| try: | |
| import pypdf | |
| HAS_PYPDF = True | |
| except ImportError: | |
| HAS_PYPDF = False | |
| try: | |
| import docx | |
| HAS_DOCX = True | |
| except ImportError: | |
| HAS_DOCX = False | |
| # ========================================== | |
| # CONFIGURATION | |
| # ========================================== | |
| OMNI_MODEL = "qwen3.5-omni-plus" | |
| TTS_VC_MODEL = "qwen3-tts-vc-2026-01-22" | |
| VOICE_CLONE_MODEL = "qwen-voice-enrollment" | |
| DASHSCOPE_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" | |
| DASHSCOPE_API_URL = "https://dashscope-intl.aliyuncs.com/api/v1" | |
| VOICE_CLONE_URL = f"{DASHSCOPE_API_URL}/services/audio/tts/customization" | |
| TTS_SYNTHESIS_URL = f"{DASHSCOPE_API_URL}/services/aigc/multimodal-generation/generation" | |
| # YourVoic API | |
| YOURVOIC_TTS_URL = "https://yourvoic.com/api/v1/tts/generate" | |
| YOURVOIC_VOICES_URL = "https://yourvoic.com/api/v1/voices" | |
| MAX_CHARS_PER_CHUNK = 1500 | |
| # ========================================== | |
| # LANGUAGES - split by engine | |
| # ========================================== | |
| # "engine": "qwen" = Qwen Preset + Clone, "yourvoic" = YourVoic only | |
| LANGUAGES = { | |
| # -- Qwen Core (11 languages: 10 starred + Arabic) -- | |
| "English": {"code": "en", "engine": "qwen", "yourvoic": "en-US"}, | |
| "Chinese (Mandarin)": {"code": "zh", "engine": "qwen", "yourvoic": "zh-CN"}, | |
| "Japanese": {"code": "ja", "engine": "qwen", "yourvoic": "ja-JP"}, | |
| "Korean": {"code": "ko", "engine": "qwen", "yourvoic": "ko-KR"}, | |
| "German": {"code": "de", "engine": "qwen", "yourvoic": "de-DE"}, | |
| "French": {"code": "fr", "engine": "qwen", "yourvoic": "fr-FR"}, | |
| "Russian": {"code": "ru", "engine": "qwen", "yourvoic": "ru-RU"}, | |
| "Portuguese": {"code": "pt", "engine": "qwen", "yourvoic": "pt-BR"}, | |
| "Spanish": {"code": "es", "engine": "qwen", "yourvoic": "es-ES"}, | |
| "Italian": {"code": "it", "engine": "qwen", "yourvoic": "it-IT"}, | |
| "Arabic": {"code": "ar", "engine": "qwen", "yourvoic": "ar-SA"}, | |
| # -- YourVoic: African Languages -- | |
| "Afrikaans": {"code": "af", "engine": "yourvoic", "yourvoic": "af-ZA"}, | |
| "Amharic": {"code": "am", "engine": "yourvoic", "yourvoic": "am-ET"}, | |
| "Swahili": {"code": "sw", "engine": "yourvoic", "yourvoic": "sw-KE"}, | |
| # -- YourVoic: Indian Languages -- | |
| "Hindi": {"code": "hi", "engine": "yourvoic", "yourvoic": "hi-IN"}, | |
| "Bengali": {"code": "bn", "engine": "yourvoic", "yourvoic": "bn-IN"}, | |
| "Marathi": {"code": "mr", "engine": "yourvoic", "yourvoic": "mr-IN"}, | |
| "Telugu": {"code": "te", "engine": "yourvoic", "yourvoic": "te-IN"}, | |
| "Tamil": {"code": "ta", "engine": "yourvoic", "yourvoic": "ta-IN"}, | |
| "Gujarati": {"code": "gu", "engine": "yourvoic", "yourvoic": "gu-IN"}, | |
| "Kannada": {"code": "kn", "engine": "yourvoic", "yourvoic": "kn-IN"}, | |
| "Malayalam": {"code": "ml", "engine": "yourvoic", "yourvoic": "ml-IN"}, | |
| "Punjabi": {"code": "pa", "engine": "yourvoic", "yourvoic": "pa-IN"}, | |
| } | |
| # Qwen languages (for preset + clone) | |
| QWEN_LANGUAGES = {k for k, v in LANGUAGES.items() if v["engine"] == "qwen"} | |
| # Voice cloning only supports the original 10 (not Arabic) | |
| VOICE_CLONE_LANGUAGES = { | |
| "English", "Chinese (Mandarin)", "Japanese", "Korean", "German", | |
| "French", "Russian", "Portuguese", "Spanish", "Italian", | |
| } | |
| # YourVoic languages | |
| YOURVOIC_LANGUAGES = {k for k, v in LANGUAGES.items() if v["engine"] == "yourvoic"} | |
| PRESET_VOICES = [ | |
| "Cherry -- Sunny, friendly", "Serena -- Gentle, soft", | |
| "Jennifer -- Cinematic narrator", "Katerina -- Mature, rich rhythm", | |
| "Ethan -- Warm, energetic", "Ryan -- Dramatic, rhythmic", | |
| "Kai -- Soothing, calm", "Neil -- Precise, clear", | |
| "Lenn -- Rational, steady", "Eldric Sage -- Authoritative narrator", | |
| "Arthur -- Classic, mature", "Bella -- Elegant, warm", | |
| "Vivian -- Professional, clear", "Seren -- Calm, measured", | |
| "Dolce -- Sweet, melodic", "Bellona -- Strong, commanding", | |
| "Vincent -- Rich, theatrical", "Andre -- Deep, resonant", | |
| "Mia -- Young, versatile", "Aiden -- Young, lively", | |
| ] | |
| # YourVoic voices mapped by language | |
| # Only include CONFIRMED working voice names | |
| YOURVOIC_VOICE_MAP = { | |
| # African - Peter works as universal voice | |
| "Afrikaans": ["Peter", "Sarah"], | |
| "Amharic": ["Peter", "Sarah"], | |
| "Swahili": ["Peter", "Sarah"], | |
| # Indian - confirmed working | |
| "Hindi": ["Rahul", "Deepika", "Aditya"], | |
| "Bengali": ["Sneha", "Aryan"], | |
| "Marathi": ["Anjali", "Rohan"], | |
| "Telugu": ["Arjun", "Lakshmi"], | |
| "Tamil": ["Priya", "Kumar"], | |
| "Gujarati": ["Rahul", "Meera"], | |
| "Kannada": ["Divya", "Karthik"], | |
| "Malayalam": ["Nikhil", "Ammu"], | |
| "Punjabi": ["Vikram", "Simran"], | |
| # English fallback | |
| "English": ["Peter", "Sarah", "Caleb"], | |
| } | |
| # Build voice dropdown choices from the map | |
| YOURVOIC_VOICES = [] | |
| for lang, voices in YOURVOIC_VOICE_MAP.items(): | |
| for v in voices: | |
| entry = f"{v} -- {lang}" | |
| if entry not in YOURVOIC_VOICES: | |
| YOURVOIC_VOICES.append(entry) | |
| def get_yourvoic_voice_for_language(language, selected_voice): | |
| """Get a valid voice name for the given language. | |
| Uses API lookup for languages without confirmed voices.""" | |
| voice_name = get_voice_name(selected_voice) | |
| valid_voices = YOURVOIC_VOICE_MAP.get(language, []) | |
| # If selected voice is confirmed valid for this language, use it | |
| if voice_name in valid_voices: | |
| return voice_name | |
| # If we have confirmed voices for this language, use the first one | |
| if valid_voices: | |
| return valid_voices[0] | |
| # No confirmed voices - query the API | |
| yourvoic_lang = LANGUAGES.get(language, {}).get("yourvoic", "en-US") | |
| api_voice = _fetch_yourvoic_voice(yourvoic_lang) | |
| if api_voice: | |
| return api_voice | |
| return "Peter" # ultimate fallback | |
| # Cache for API-fetched voices | |
| _yourvoic_voice_cache = {} | |
| def _fetch_yourvoic_voice(yourvoic_lang, model="aura-prime"): | |
| """Query YourVoic /v1/voices endpoint to get valid voices for a language + model.""" | |
| cache_key = f"{yourvoic_lang}:{model}" | |
| if cache_key in _yourvoic_voice_cache: | |
| return _yourvoic_voice_cache[cache_key] | |
| yv_key = os.environ.get("YOURVOIC_API_KEY", "") | |
| if not yv_key: | |
| return None | |
| # Try with model parameter first, then without | |
| for url_params in [ | |
| f"?language={yourvoic_lang}&model={model}", | |
| f"?language={yourvoic_lang}", | |
| ]: | |
| try: | |
| resp = http_requests.get( | |
| f"{YOURVOIC_VOICES_URL}{url_params}", | |
| headers={"X-API-Key": yv_key}, | |
| timeout=15, | |
| ) | |
| print(f"[YourVoic] Voices API {url_params}: status={resp.status_code}") | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| voices = data if isinstance(data, list) else data.get("voices", data.get("data", [])) | |
| if voices and isinstance(voices[0], dict): | |
| # Return all voice names for trying | |
| all_names = [] | |
| for v in voices[:10]: # first 10 | |
| for field in ["id", "name", "voice_id", "voice"]: | |
| if field in v and v[field]: | |
| all_names.append(str(v[field])) | |
| break | |
| if all_names: | |
| # Deduplicate preserving order | |
| seen = set() | |
| unique = [x for x in all_names if not (x in seen or seen.add(x))] | |
| print(f"[YourVoic] Available voices for {yourvoic_lang}: {unique[:5]}") | |
| _yourvoic_voice_cache[cache_key] = unique | |
| return unique | |
| except Exception as e: | |
| print(f"[YourVoic] Voice lookup failed for {yourvoic_lang}: {e}") | |
| return None | |
| def generate_speech_yourvoic_with_retry(client, text, voice, yv_model, emotion, language, lang_config, | |
| translate, api_key, chunk_index, output_dir): | |
| """Wrapper that tries multiple voice names if the first one fails.""" | |
| yourvoic_lang = lang_config.get("yourvoic", "en-US") | |
| # Get list of candidate voices | |
| candidates = [] | |
| # 1. Try hardcoded voices for this language | |
| hardcoded = YOURVOIC_VOICE_MAP.get(language, []) | |
| candidates.extend(hardcoded) | |
| # 2. Try user-selected voice | |
| user_voice = get_voice_name(voice) | |
| if user_voice not in candidates: | |
| candidates.insert(0, user_voice) | |
| # 3. Try universal English voices (work for many languages like Swahili) | |
| for universal in ["Peter", "Sarah", "Caleb"]: | |
| if universal not in candidates: | |
| candidates.append(universal) | |
| # 4. Try API-fetched voices last | |
| api_voices = _fetch_yourvoic_voice(yourvoic_lang, yv_model) | |
| if api_voices: | |
| for av in api_voices: | |
| if av not in candidates: | |
| candidates.append(av) | |
| # Deduplicate preserving order | |
| seen = set() | |
| candidates = [x for x in candidates if not (x in seen or seen.add(x))] | |
| # Try each candidate until one works | |
| for i, candidate_voice in enumerate(candidates[:8]): # try up to 8 | |
| print(f"[YourVoic] Trying voice '{candidate_voice}' for {language} (attempt {i+1})") | |
| wav_path, transcript, error = generate_speech_yourvoic( | |
| client, text, candidate_voice, yv_model, emotion, language, lang_config, | |
| translate, api_key, chunk_index, output_dir, | |
| ) | |
| if wav_path: | |
| # Cache this working voice for future chunks | |
| if language not in YOURVOIC_VOICE_MAP or not YOURVOIC_VOICE_MAP.get(language): | |
| YOURVOIC_VOICE_MAP[language] = [candidate_voice] | |
| elif candidate_voice not in YOURVOIC_VOICE_MAP[language]: | |
| YOURVOIC_VOICE_MAP[language].insert(0, candidate_voice) | |
| return wav_path, transcript, None | |
| if error and "Invalid voice name" not in str(error): | |
| # Non-voice error (credits, etc) - don't try more voices | |
| return None, transcript, error | |
| return None, text, f"No valid voice found for {language}. This language may not be supported on your plan. Tried: {candidates[:8]}" | |
| YOURVOIC_MODELS = [ | |
| "aura-prime -- Balanced quality and speed (recommended)", | |
| "aura-lite -- Fast, good for previews", | |
| "aura-max -- Premium quality (paid plans only)", | |
| "rapid-max -- Fast with good quality", | |
| "rapid-flash -- Fastest, real-time apps", | |
| ] | |
| YOURVOIC_EMOTIONS = [ | |
| "neutral", "friendly", "hopeful", "cheerful", "sad", | |
| "excited", "angry", "terrified", "shouting", "whispering", | |
| ] | |
| def get_voice_name(label): | |
| return label.split("--")[0].strip() | |
| def get_yourvoic_model(label): | |
| return label.split("--")[0].strip() | |
| # ========================================== | |
| # AUDIO HELPERS | |
| # ========================================== | |
| def base64_to_wav(b64_data, output_path): | |
| audio_bytes = base64.b64decode(b64_data) | |
| sr, nc, bps = 24000, 1, 16 | |
| br = sr * nc * bps // 8 | |
| ba = nc * bps // 8 | |
| ds = len(audio_bytes) | |
| with open(output_path, "wb") as f: | |
| f.write(b"RIFF") | |
| f.write(struct.pack("<I", 36 + ds)) | |
| f.write(b"WAVE") | |
| f.write(b"fmt ") | |
| f.write(struct.pack("<I", 16)) | |
| f.write(struct.pack("<H", 1)) | |
| f.write(struct.pack("<H", nc)) | |
| f.write(struct.pack("<I", sr)) | |
| f.write(struct.pack("<I", br)) | |
| f.write(struct.pack("<H", ba)) | |
| f.write(struct.pack("<H", bps)) | |
| f.write(b"data") | |
| f.write(struct.pack("<I", ds)) | |
| f.write(audio_bytes) | |
| def concatenate_wavs(wav_files, output_path): | |
| if not wav_files: | |
| return | |
| if len(wav_files) == 1: | |
| shutil.copy2(wav_files[0], output_path) | |
| return | |
| list_file = output_path + ".txt" | |
| with open(list_file, "w") as f: | |
| for wav in wav_files: | |
| f.write(f"file '{wav}'\n") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", | |
| "-i", list_file, "-c", "copy", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| os.remove(list_file) | |
| def generate_silence(duration_sec, output_path): | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "lavfi", "-i", "anullsrc=r=24000:cl=mono", | |
| "-t", str(duration_sec), "-acodec", "pcm_s16le", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| # ========================================== | |
| # DOCUMENT EXTRACTION | |
| # ========================================== | |
| def extract_text_from_file(filepath): | |
| ext = os.path.splitext(filepath)[1].lower() | |
| if ext == ".pdf": | |
| if not HAS_PYPDF: | |
| raise gr.Error("pypdf not installed.") | |
| reader = pypdf.PdfReader(filepath) | |
| return "\n\n".join(p.extract_text().strip() for p in reader.pages if p.extract_text()) | |
| elif ext in (".docx", ".doc"): | |
| if ext == ".doc": | |
| raise gr.Error("Please save as .docx or .pdf.") | |
| if not HAS_DOCX: | |
| raise gr.Error("python-docx not installed.") | |
| doc = docx.Document(filepath) | |
| return "\n\n".join(p.text.strip() for p in doc.paragraphs if p.text.strip()) | |
| else: | |
| with open(filepath, "r", encoding="utf-8", errors="replace") as f: | |
| return f.read() | |
| # ========================================== | |
| # TEXT SPLITTING | |
| # ========================================== | |
| def split_text_into_chunks(text, max_chars=MAX_CHARS_PER_CHUNK): | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| if len(text) <= max_chars: | |
| return [text] | |
| chunks, paragraphs, current = [], re.split(r"\n\s*\n", text), "" | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| if len(current) + len(para) + 2 <= max_chars: | |
| current = (current + "\n\n" + para).strip() | |
| else: | |
| if current: | |
| chunks.append(current) | |
| if len(para) > max_chars: | |
| sentences = re.split(r"(?<=[.!?])\s+", para) | |
| current = "" | |
| for s in sentences: | |
| if len(current) + len(s) + 1 <= max_chars: | |
| current = (current + " " + s).strip() | |
| else: | |
| if current: | |
| chunks.append(current) | |
| current = s | |
| else: | |
| current = para | |
| if current: | |
| chunks.append(current) | |
| return chunks | |
| # ========================================== | |
| # VOICE CLONING (Qwen) | |
| # ========================================== | |
| def prepare_clone_audio(audio_path): | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", audio_path], | |
| capture_output=True, text=True, | |
| ) | |
| duration = float(result.stdout.strip()) | |
| if duration < 10: | |
| raise ValueError(f"Audio too short ({duration:.1f}s). Need at least 10 seconds.") | |
| tmp = audio_path + "_prepared.wav" | |
| if duration <= 60: | |
| subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-ar", "24000", "-ac", "1", | |
| "-acodec", "pcm_s16le", tmp], capture_output=True, check=True) | |
| else: | |
| start = min(5, duration - 60) | |
| subprocess.run(["ffmpeg", "-y", "-ss", str(start), "-t", "60", "-i", audio_path, | |
| "-ar", "24000", "-ac", "1", "-acodec", "pcm_s16le", tmp], | |
| capture_output=True, check=True) | |
| return tmp | |
| def clone_voice(audio_path, api_key): | |
| prepared = prepare_clone_audio(audio_path) | |
| b64 = base64.b64encode(pathlib.Path(prepared).read_bytes()).decode() | |
| try: | |
| os.remove(prepared) | |
| except OSError: | |
| pass | |
| resp = http_requests.post(VOICE_CLONE_URL, json={ | |
| "model": VOICE_CLONE_MODEL, | |
| "input": { | |
| "action": "create", "target_model": TTS_VC_MODEL, | |
| "preferred_name": "audiobook_voice", | |
| "audio": {"data": f"data:audio/wav;base64,{b64}"}, | |
| }, | |
| }, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, timeout=60) | |
| if resp.status_code != 200: | |
| raise RuntimeError(f"Voice clone failed: {resp.text[:300]}") | |
| return resp.json()["output"]["voice"] | |
| # ========================================== | |
| # TRANSLATION (Qwen text-only) | |
| # ========================================== | |
| def translate_text(client, text, target_language, lang_config): | |
| response = client.chat.completions.create( | |
| model=OMNI_MODEL, modalities=["text"], | |
| messages=[ | |
| {"role": "system", "content": f"Translate English to {target_language}. Output ONLY the translation."}, | |
| {"role": "user", "content": f"Translate:\n\n{text}"}, | |
| ], | |
| ) | |
| return response.choices[0].message.content.strip() | |
| # ========================================== | |
| # TTS MODE 1: PRESET VOICE (Qwen Omni) | |
| # ========================================== | |
| def generate_speech_preset(client, text, voice, language, lang_config, translate, chunk_index, output_dir): | |
| output_wav = os.path.join(output_dir, f"chunk_{chunk_index:04d}.wav") | |
| if translate and language != "English": | |
| sys_prompt = (f"Translate English to {language} " | |
| f"and narrate expressively. Respond ONLY with spoken {language} narration.") | |
| user_text = f"Translate into {language} and narrate:\n\n{text}" | |
| else: | |
| sys_prompt = "Narrate expressively as an audiobook. Respond ONLY with narration." | |
| user_text = f"Narrate:\n\n{text}" | |
| try: | |
| completion = client.chat.completions.create( | |
| model=OMNI_MODEL, | |
| messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": user_text}], | |
| modalities=["text", "audio"], audio={"voice": voice, "format": "wav"}, | |
| stream=True, stream_options={"include_usage": True}, | |
| ) | |
| audio_parts, text_parts = [], [] | |
| for event in completion: | |
| if not event.choices: | |
| continue | |
| delta = event.choices[0].delta | |
| if hasattr(delta, "content") and delta.content: | |
| text_parts.append(delta.content) | |
| if hasattr(delta, "audio") and delta.audio: | |
| if isinstance(delta.audio, dict) and "data" in delta.audio: | |
| audio_parts.append(delta.audio["data"]) | |
| elif hasattr(delta.audio, "data") and delta.audio.data: | |
| audio_parts.append(delta.audio.data) | |
| transcript = "".join(text_parts) | |
| if audio_parts: | |
| base64_to_wav("".join(audio_parts), output_wav) | |
| return output_wav, transcript | |
| return None, "No audio received" | |
| except Exception as e: | |
| return None, str(e) | |
| # ========================================== | |
| # TTS MODE 2: CLONED VOICE (Qwen TTS-VC) | |
| # ========================================== | |
| def generate_speech_cloned(client, text, voice_id, language, lang_config, translate, api_key, chunk_index, output_dir): | |
| output_wav = os.path.join(output_dir, f"vc_chunk_{chunk_index:04d}.wav") | |
| final_text = text | |
| if translate and language != "English": | |
| final_text = translate_text(client, text, language, lang_config) | |
| lang_map = { | |
| "English": "English", "Chinese (Mandarin)": "Chinese", "Japanese": "Japanese", | |
| "Korean": "Korean", "German": "German", "French": "French", | |
| "Russian": "Russian", "Portuguese": "Portuguese", "Spanish": "Spanish", "Italian": "Italian", | |
| } | |
| resp = http_requests.post(TTS_SYNTHESIS_URL, json={ | |
| "model": TTS_VC_MODEL, | |
| "input": {"text": final_text, "voice": voice_id, "language_type": lang_map.get(language, "English")}, | |
| }, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, timeout=120) | |
| if resp.status_code != 200: | |
| return None, final_text, f"TTS failed ({resp.status_code})" | |
| audio_url = resp.json().get("output", {}).get("audio", {}).get("url") | |
| if audio_url: | |
| audio_resp = http_requests.get(audio_url, timeout=120) | |
| with open(output_wav, "wb") as f: | |
| f.write(audio_resp.content) | |
| return output_wav, final_text, None | |
| return None, final_text, "No audio URL" | |
| # ========================================== | |
| # TTS MODE 3: YOURVOIC (emotional voices, 93+ languages) | |
| # ========================================== | |
| def generate_speech_yourvoic(client, text, voice, yv_model, emotion, language, lang_config, translate, | |
| api_key, chunk_index, output_dir): | |
| """Generate speech using YourVoic API. Handles translation via Qwen then TTS via YourVoic.""" | |
| output_file = os.path.join(output_dir, f"yv_chunk_{chunk_index:04d}.mp3") | |
| # Translate if needed | |
| final_text = text | |
| transcript = text | |
| if translate and language != "English": | |
| try: | |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| if ds_key and client: | |
| final_text = translate_text(client, text, language, lang_config) | |
| transcript = final_text | |
| except Exception as e: | |
| print(f"[YourVoic] Translation failed, using English: {e}") | |
| # Build request - voice is passed directly (already resolved by caller) | |
| yourvoic_lang = lang_config.get("yourvoic", "en-US") | |
| print(f"[YourVoic] Language: {language}, voice: {voice}") | |
| payload = { | |
| "text": final_text, | |
| "voice": voice, | |
| "language": yourvoic_lang, | |
| "model": yv_model, | |
| "speed": 0.9, | |
| } | |
| # Add emotion if not neutral | |
| if emotion and emotion != "neutral": | |
| payload["emotion"] = emotion | |
| headers = { | |
| "X-API-Key": api_key, | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| resp = http_requests.post(YOURVOIC_TTS_URL, json=payload, headers=headers, timeout=120) | |
| print(f"[YourVoic] Chunk {chunk_index}: status={resp.status_code}, size={len(resp.content)} bytes") | |
| if resp.status_code != 200: | |
| error_msg = resp.text[:200] | |
| print(f"[YourVoic] Error: {error_msg}") | |
| return None, transcript, f"YourVoic API error ({resp.status_code}): {error_msg}" | |
| # Check if response is JSON (contains audio_url) or direct audio bytes | |
| content_type = resp.headers.get("Content-Type", "") | |
| if "application/json" in content_type: | |
| data = resp.json() | |
| audio_url = data.get("audio_url") or data.get("url") | |
| if audio_url: | |
| audio_resp = http_requests.get(audio_url, timeout=120) | |
| with open(output_file, "wb") as f: | |
| f.write(audio_resp.content) | |
| else: | |
| return None, transcript, f"No audio URL in response: {json.dumps(data)[:200]}" | |
| else: | |
| # Direct audio bytes | |
| with open(output_file, "wb") as f: | |
| f.write(resp.content) | |
| # Convert MP3 to WAV for consistent concatenation | |
| output_wav = output_file.replace(".mp3", ".wav") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", output_file, "-ar", "24000", "-ac", "1", | |
| "-acodec", "pcm_s16le", output_wav], | |
| capture_output=True, check=True, | |
| ) | |
| return output_wav, transcript, None | |
| except Exception as e: | |
| return None, transcript, str(e) | |
| # ========================================== | |
| # MAIN PIPELINE | |
| # ========================================== | |
| def generate_audiobook(text_input, file_input, target_language, voice_mode, | |
| preset_voice_label, clone_audio, yourvoic_voice_label, | |
| yourvoic_model_label, yourvoic_emotion, | |
| add_pauses, progress=gr.Progress()): | |
| # Resolve text | |
| if file_input is not None: | |
| progress(0.02, desc="Extracting text from document...") | |
| text = extract_text_from_file(file_input) | |
| elif text_input and text_input.strip(): | |
| text = text_input.strip() | |
| else: | |
| raise gr.Error("Please provide text or upload a file.") | |
| if len(text) < 10: | |
| raise gr.Error("Text is too short.") | |
| ds_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| yv_key = os.environ.get("YOURVOIC_API_KEY", "") | |
| lang_config = LANGUAGES[target_language] | |
| lang_engine = lang_config["engine"] | |
| use_clone = voice_mode == "Clone a Voice" | |
| use_yourvoic = voice_mode == "YourVoic (Emotional AI)" | |
| translate = target_language != "English" | |
| # Auto-correct engine if language requires it | |
| if lang_engine == "yourvoic" and not use_yourvoic: | |
| # Language only supported by YourVoic, force switch | |
| use_yourvoic = True | |
| use_clone = False | |
| elif lang_engine == "qwen" and use_yourvoic: | |
| # User chose YourVoic but language is Qwen-only — allow it since YourVoic | |
| # supports most languages, but Qwen languages also work on YourVoic | |
| pass | |
| # Validate keys | |
| if use_yourvoic: | |
| if not yv_key: | |
| raise gr.Error("YOURVOIC_API_KEY not set. Add it in Settings > Secrets. Get one at yourvoic.com/api/user") | |
| if translate and not ds_key: | |
| raise gr.Error("DASHSCOPE_API_KEY needed for translation. Add it in Settings > Secrets.") | |
| else: | |
| if not ds_key: | |
| raise gr.Error("DASHSCOPE_API_KEY not set. Add it in Settings > Secrets.") | |
| client = OpenAI(api_key=ds_key, base_url=DASHSCOPE_BASE_URL) if ds_key else None | |
| tmp_dir = tempfile.mkdtemp(prefix="audiobook_") | |
| # Voice cloning setup | |
| cloned_voice_id = None | |
| if use_clone: | |
| if clone_audio is None: | |
| raise gr.Error("Upload a voice sample for cloning.") | |
| if target_language not in VOICE_CLONE_LANGUAGES: | |
| raise gr.Error(f"Voice cloning supports: {', '.join(sorted(VOICE_CLONE_LANGUAGES))}") | |
| progress(0.03, desc="Cloning voice...") | |
| cloned_voice_id = clone_voice(clone_audio, ds_key) | |
| try: | |
| progress(0.08, desc="Splitting text...") | |
| chunks = split_text_into_chunks(text) | |
| total_chunks = len(chunks) | |
| total_chars = sum(len(c) for c in chunks) | |
| audio_files, all_transcripts = [], [] | |
| silence_path = os.path.join(tmp_dir, "silence.wav") | |
| if add_pauses: | |
| generate_silence(1.5, silence_path) | |
| for i, chunk in enumerate(chunks): | |
| frac = 0.10 + 0.78 * (i / total_chunks) | |
| progress(frac, desc=f"Narrating chunk {i+1}/{total_chunks}...") | |
| wav_path, transcript, error = None, None, None | |
| if use_yourvoic: | |
| yv_voice = yourvoic_voice_label | |
| yv_model = get_yourvoic_model(yourvoic_model_label) | |
| wav_path, transcript, error = generate_speech_yourvoic_with_retry( | |
| client, chunk, yv_voice, yv_model, yourvoic_emotion, | |
| target_language, lang_config, translate, | |
| yv_key, i, tmp_dir, | |
| ) | |
| elif use_clone: | |
| wav_path, transcript, error = generate_speech_cloned( | |
| client, chunk, cloned_voice_id, target_language, | |
| lang_config, translate, ds_key, i, tmp_dir, | |
| ) | |
| else: | |
| voice = get_voice_name(preset_voice_label) | |
| wav_path, transcript = generate_speech_preset( | |
| client, chunk, voice, target_language, | |
| lang_config, translate, i, tmp_dir, | |
| ) | |
| error = None if wav_path else transcript | |
| if wav_path: | |
| audio_files.append(wav_path) | |
| else: | |
| all_transcripts.append(f"Chunk {i+1} failed: {error}") | |
| fail_sil = os.path.join(tmp_dir, f"fail_{i:04d}.wav") | |
| generate_silence(2.0, fail_sil) | |
| audio_files.append(fail_sil) | |
| if transcript and "failed" not in str(transcript).lower(): | |
| all_transcripts.append(transcript) | |
| if add_pauses and i < total_chunks - 1 and audio_files: | |
| audio_files.append(silence_path) | |
| if not audio_files: | |
| raise gr.Error("No audio was generated.") | |
| progress(0.90, desc="Assembling audiobook...") | |
| final_audio = os.path.join(tmp_dir, "audiobook.wav") | |
| concatenate_wavs(audio_files, final_audio) | |
| progress(0.95, desc="Converting to MP3...") | |
| final_mp3 = os.path.join(tmp_dir, "audiobook.mp3") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", final_audio, "-codec:a", "libmp3lame", | |
| "-b:a", "128k", "-ar", "24000", "-ac", "1", final_mp3], | |
| capture_output=True, check=True, | |
| ) | |
| progress(1.0, desc="Done!") | |
| audio_size = os.path.getsize(final_mp3) / (1024 * 1024) | |
| if use_yourvoic: | |
| voice_info = f"YourVoic: {yourvoic_voice_label} ({yourvoic_emotion})" | |
| mode_info = f"YourVoic API ({yourvoic_model_label})" | |
| elif use_clone: | |
| voice_info = f"Cloned (ID: {cloned_voice_id[:20]}...)" | |
| mode_info = "Qwen3-TTS-VC" | |
| else: | |
| voice_info = preset_voice_label | |
| mode_info = "Qwen3.5-Omni-Plus" | |
| stats = ( | |
| f"**Audiobook Generated!**\n\n" | |
| f"- **Source:** {total_chars:,} characters in {total_chunks} chunks\n" | |
| f"- **Language:** {target_language}\n" | |
| f"- **Voice:** {voice_info}\n" | |
| f"- **Engine:** {mode_info}\n" | |
| f"- **File size:** {audio_size:.1f} MB\n" | |
| ) | |
| transcript_text = "\n\n---\n\n".join(all_transcripts) if all_transcripts else "" | |
| return final_mp3, stats, transcript_text | |
| except gr.Error: | |
| raise | |
| except Exception as e: | |
| raise gr.Error(f"Pipeline error: {str(e)}") | |
| # ========================================== | |
| # GRADIO UI | |
| # ========================================== | |
| SAMPLE_TEXT = """Chapter 1: The Beginning | |
| The old lighthouse stood at the edge of the world, or so it seemed to the girl who had lived in its shadow all her life. Each morning, she would climb the winding iron staircase to the lamp room, counting exactly one hundred and forty-seven steps, and watch the sun rise from the sea like a great golden coin. | |
| "One day," she whispered to the seagulls, "I'll follow that sun to wherever it goes." | |
| Her name was Elena, and she was seventeen years old. She had hair the color of dark honey and eyes that changed with the weather - grey in storms, green in sunlight. | |
| The lighthouse keeper, her grandfather, was a man of few words but many stories. He kept them locked away like treasures in a chest, only bringing them out on winter nights when the storms howled outside. | |
| "Tell me about the ships," Elena would say, curling up in the worn armchair by the fire. | |
| And he would smile - that slow, careful smile that seemed to cost him something each time - and begin.""" | |
| DESCRIPTION = """ | |
| # Audiobook Generator | |
| ### English Text to Multi-Language Audiobook | |
| """ | |
| # Build language dropdown | |
| lang_choices = [] | |
| # Qwen languages first | |
| for name in LANGUAGES: | |
| if LANGUAGES[name]["engine"] == "qwen": | |
| lang_choices.append(name) | |
| # YourVoic languages | |
| for name in ["Afrikaans", "Amharic", "Swahili", | |
| "Hindi", "Bengali", "Marathi", "Telugu", "Tamil", | |
| "Gujarati", "Kannada", "Malayalam", "Punjabi"]: | |
| if name in LANGUAGES: | |
| lang_choices.append(name) | |
| def clean_language_name(choice): | |
| return choice.strip() | |
| def auto_select_engine(language_name): | |
| """Auto-select the right voice engine based on language.""" | |
| if language_name in LANGUAGES: | |
| return LANGUAGES[language_name]["engine"] | |
| return "qwen" | |
| def on_language_change(lang_choice): | |
| """Auto-switch visible controls based on language engine.""" | |
| lang = clean_language_name(lang_choice) | |
| engine = auto_select_engine(lang) | |
| if engine == "yourvoic": | |
| return ( | |
| gr.update(visible=False), # preset_voice | |
| gr.update(visible=True), # yv_voice | |
| gr.update(visible=True), # yv_model | |
| gr.update(visible=True), # yv_emotion | |
| gr.update(value=f"Engine: YourVoic (1000+ emotional voices)"), # engine_label | |
| gr.update(visible=False, value=False), # use_clone - hide and uncheck | |
| gr.update(visible=False), # clone_audio | |
| gr.update(visible=False), # clone_info | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), # preset_voice | |
| gr.update(visible=False), # yv_voice | |
| gr.update(visible=False), # yv_model | |
| gr.update(visible=False), # yv_emotion | |
| gr.update(value=f"Engine: Qwen3.5-Omni-Plus (translate + speak)"), # engine_label | |
| gr.update(visible=True), # use_clone - show | |
| gr.update(visible=False), # clone_audio (still hidden until checkbox checked) | |
| gr.update(visible=False), # clone_info | |
| ) | |
| def on_clone_toggle(use_clone): | |
| """Show/hide clone controls.""" | |
| if use_clone: | |
| return gr.update(visible=True), gr.update(visible=True) | |
| return gr.update(visible=False), gr.update(visible=False) | |
| def generate_wrapper(text_input, file_input, language_choice, use_clone, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, | |
| add_pauses, progress=gr.Progress()): | |
| language = clean_language_name(language_choice) | |
| engine = auto_select_engine(language) | |
| # Build voice_mode string for the pipeline | |
| if use_clone: | |
| voice_mode = "Clone a Voice" | |
| elif engine == "yourvoic": | |
| voice_mode = "YourVoic (Emotional AI)" | |
| else: | |
| voice_mode = "Preset Voice" | |
| return generate_audiobook( | |
| text_input, file_input, language, voice_mode, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, | |
| add_pauses, progress, | |
| ) | |
| with gr.Blocks(title="Audiobook Generator") as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| text_input = gr.Textbox(label="English Text", placeholder="Paste your English text here...", | |
| lines=10, max_lines=25) | |
| file_input = gr.File(label="Or Upload (.txt, .md, .pdf, .docx)", | |
| file_types=[".txt", ".md", ".text", ".pdf", ".docx", ".doc"], type="filepath") | |
| sample_btn = gr.Button("Load Sample Text", variant="secondary", size="sm") | |
| target_lang = gr.Dropdown(choices=lang_choices, value="English", label="Target Language", | |
| info="The right voice engine is selected automatically based on language.") | |
| engine_label = gr.Markdown(value="Engine: Qwen3.5-Omni-Plus (translate + speak)") | |
| # Qwen preset voice (visible for Qwen languages) | |
| preset_voice = gr.Dropdown(choices=PRESET_VOICES, value="Jennifer -- Cinematic narrator", | |
| label="Narrator Voice", visible=True) | |
| # YourVoic controls (visible for YourVoic languages) | |
| yv_voice = gr.Dropdown(choices=YOURVOIC_VOICES, value="Rahul -- Hindi", | |
| label="YourVoic Voice", visible=False, allow_custom_value=True, | |
| info="Auto-matched to your language. Type custom name if needed.") | |
| yv_model = gr.Dropdown(choices=YOURVOIC_MODELS, value="aura-prime -- Balanced quality and speed (recommended)", | |
| label="YourVoic Model", visible=False) | |
| yv_emotion = gr.Dropdown(choices=YOURVOIC_EMOTIONS, value="friendly", | |
| label="Emotion Style", visible=False, | |
| info="Add emotional expression to the narration") | |
| # Voice cloning toggle (optional, works for Qwen languages only) | |
| use_clone = gr.Checkbox(value=False, label="Use Voice Cloning (Qwen, 10 languages only)", | |
| info="Clone a voice from audio sample instead of using preset") | |
| clone_audio = gr.Audio(label="Voice Sample (10s-3min)", type="filepath", visible=False) | |
| clone_info = gr.Markdown( | |
| value="> 10-180s clear speech, no background noise. Supports: EN, ZH, JA, KO, DE, FR, RU, PT, ES, IT.", | |
| visible=False, | |
| ) | |
| add_pauses = gr.Checkbox(value=True, label="Add pauses between sections", info="1.5s silence between chunks") | |
| generate_btn = gr.Button("Generate Audiobook", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| audio_output = gr.Audio(label="Generated Audiobook", type="filepath") | |
| stats_output = gr.Markdown(label="Generation Stats") | |
| with gr.Accordion("Translation / Narration Transcript", open=False): | |
| transcript_output = gr.Markdown() | |
| # Events | |
| sample_btn.click(fn=lambda: SAMPLE_TEXT, outputs=text_input) | |
| target_lang.change( | |
| fn=on_language_change, inputs=target_lang, | |
| outputs=[preset_voice, yv_voice, yv_model, yv_emotion, engine_label, | |
| use_clone, clone_audio, clone_info], | |
| ) | |
| use_clone.change(fn=on_clone_toggle, inputs=use_clone, outputs=[clone_audio, clone_info]) | |
| generate_btn.click( | |
| fn=generate_wrapper, | |
| inputs=[text_input, file_input, target_lang, use_clone, | |
| preset_voice, clone_audio, yv_voice, yv_model, yv_emotion, add_pauses], | |
| outputs=[audio_output, stats_output, transcript_output], | |
| ) | |
| gr.Markdown( | |
| "---\n" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |