|
|
| from pathlib import Path |
| import shutil |
| from pydub import AudioSegment |
| import os |
| import traceback |
| from functools import reduce |
| import threading |
| import queue |
| import time |
| import sys |
| from . import common |
|
|
| from dotenv import load_dotenv |
| import os |
| if os.path.exists(".env"): |
| print("Loaded load_dotenv") |
| load_dotenv(".env") |
|
|
| class BaseTTS: |
| def __init__(self, type, stream_audio=False, setup_signals=True): |
| """Initialize BaseTTS with environment settings and configuration.""" |
| if os.getenv("USE_CPU_IF_POSSIBLE", None): |
| self.device = "cpu" |
| else: |
| self.device = "cuda" if common.is_gpu_available() else "cpu" |
| print(f'Using device:: {self.device}') |
| |
| os.environ["TORCH_USE_CUDA_DSA"] = "1" |
| os.environ["CUDA_LAUNCH_BLOCKING"] = "1" |
| os.environ["HF_HUB_TIMEOUT"] = "120" |
|
|
| base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
| |
| self.content_file = Path("content.txt") |
| self.final_output_audio = "output_audio.wav" |
| self.final_output_timestamps = "output_timestamps.json" |
| self.temp_output_dir = Path("temp_audio_chunks") |
|
|
| if not self.temp_output_dir.exists(): |
| self.temp_output_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.default_voice_index = 8 |
| self.default_speed = 0.8 |
|
|
| self.voices = [ |
| None, |
| os.path.join(base_dir, 'voices/Main-4.wav'), |
| os.path.join(base_dir, 'voices/Ellen-TTS-10.wav'), |
| os.path.join(base_dir, 'voices/kratos(ambient)_en.wav'), |
| os.path.join(base_dir, 'voices/20250329-audio-american-male.wav'), |
| os.path.join(base_dir, 'voices/Ellen13y TTS-14.wav'), |
| os.path.join(base_dir, 'voices/Simple guy.wav'), |
| None, |
| os.path.join(base_dir, 'voices/bbc_news.wav'), |
| os.path.join(base_dir, 'voices/en_woman.wav'), |
| os.path.join(base_dir, 'voices/voice_preview_david castlemore - newsreader and educator.mp3'), |
| os.path.join(base_dir, 'voices/voice_preview_kelly - storytelling & motivational content.mp3'), |
| os.path.join(base_dir, 'voices/voice_preview_motivational coach - leader.mp3'), |
| os.path.join(base_dir, 'voices/voice_preview_sevan bomar - black motivational speaker.mp3'), |
| os.path.join(base_dir, 'voices/voice_preview_female news reader.mp3'), |
| ] |
| |
| |
| self.type = type |
| self.save_audio_file = True |
| self.stream_audio = stream_audio |
| |
| |
| self.audio_queue = queue.Queue() |
| self.is_streaming = False |
| self.stream_thread = None |
| self.sample_rate = 24000 |
| self.last_playing_audio_duration_seconds = 0.1 |
| |
| |
| self.text_queue = queue.Queue() |
| self.text_processing_thread = None |
| self.is_text_streaming = False |
| self.text_chunk_size = 10 |
| self.current_voice = None |
| self.current_speed = None |
| |
| |
| self.temp_feed_words = [] |
| |
| |
| self.emergency_stop = False |
| if setup_signals: |
| self.setup_signal_handler() |
|
|
| |
| |
| def cleanup_temp_files(self): |
| """Clean up temporary audio files.""" |
| if self.temp_output_dir.exists(): |
| shutil.rmtree(self.temp_output_dir) |
| if os.path.exists(self.final_output_audio): |
| os.remove(self.final_output_audio) |
| if os.path.exists(self.final_output_timestamps): |
| os.remove(self.final_output_timestamps) |
| print("Temporary files cleaned up") |
|
|
| def setup_output_directory(self): |
| """Create clean output directory for audio chunks.""" |
| if self.temp_output_dir.exists(): |
| shutil.rmtree(self.temp_output_dir) |
| self.temp_output_dir.mkdir(exist_ok=True) |
|
|
| def read_content_file(self): |
| """Read content from the content file.""" |
| with open(self.content_file, 'r', encoding='utf-8') as file: |
| return file.read().strip() |
|
|
| def validate_voice_index(self, args) -> str: |
| """Validate and return voice file path.""" |
| voice_index = self.default_voice_index |
| try: |
| voice_index = int(getattr(args, 'voice')) |
| if not 0 <= voice_index < len(self.voices): |
| print(f"Invalid voice index {voice_index}, using default voice") |
| voice_index = self.default_voice_index |
| except: |
| voice_index = self.default_voice_index |
|
|
| print(f"Voice Value: {self.voices[voice_index]}") |
| return self.voices[voice_index] |
|
|
| def validate_speed(self, args) -> float: |
| """Validate and return speed value.""" |
| speed_value = self.default_speed |
| try: |
| speed_value = float(getattr(args, 'speed')) |
| if speed_value <= 0: |
| print(f"Invalid speed {speed_value}, using default speed") |
| speed_value = self.default_speed |
| except: |
| speed_value = self.default_speed |
|
|
| print(f"Speed Value: {speed_value}") |
| return speed_value |
|
|
| def combine_audio_files(self, audio_files): |
| """Combine multiple audio files into one. |
| |
| Args: |
| audio_files: List of audio file paths to combine |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| if not audio_files: |
| raise ValueError("No audio files to combine") |
|
|
| print(f"Combining {len(audio_files)} audio files...") |
| combined = reduce( |
| lambda acc, file_name: acc + AudioSegment.from_wav(file_name), |
| audio_files, |
| AudioSegment.empty() |
| ) |
| |
| |
| combined.export(self.final_output_audio, format="wav") |
| print(f"Combined audio saved as {self.final_output_audio}") |
| return True |
|
|
| def split_sentences(self, text, max_chars=300): |
| """Common split method for all frameworks. Override in subclasses if needed. |
| |
| Args: |
| text (str): Text to split |
| max_chars (int): Maximum characters per chunk |
| |
| Returns: |
| list: List of text chunks |
| """ |
| words = text.split() |
| chunks = [] |
| current = "" |
| |
| for word in words: |
| test_chunk = current + " " + word if current else word |
| if len(test_chunk) <= max_chars: |
| current = test_chunk |
| else: |
| if current: |
| chunks.append(current) |
| current = word |
| |
| if current: |
| chunks.append(current) |
| |
| return chunks |
|
|
| def generate_chunk_audio_file(self, audio, chunk_index) -> Path: |
| import soundfile as sf |
| chunk_file = self.temp_output_dir / f"chunk_{chunk_index:04d}.wav" |
| sf.write(chunk_file, audio, self.sample_rate) |
| return chunk_file |
|
|
| |
| |
| def setup_signal_handler(self): |
| """Setup signal handler for Ctrl+C to stop everything immediately.""" |
| import signal |
| signal.signal(signal.SIGINT, self.emergency_stop_handler) |
| signal.signal(signal.SIGTERM, self.emergency_stop_handler) |
|
|
| def emergency_stop_handler(self, signum, frame): |
| """Handle Ctrl+C - stop everything immediately.""" |
| print("\nπ Emergency stop triggered! Stopping all operations...") |
| self.emergency_stop = True |
| |
| |
| try: |
| import sounddevice as sd |
| sd.stop() |
| except: |
| pass |
| |
| |
| self.force_stop_streaming() |
| self.force_stop_text_streaming() |
| |
| print("β
Emergency stop completed. Exiting...") |
| sys.exit(0) |
|
|
| def force_stop_streaming(self): |
| """Force stop audio streaming immediately without waiting.""" |
| if self.is_streaming: |
| self.is_streaming = False |
| |
| |
| try: |
| while not self.audio_queue.empty(): |
| self.audio_queue.get_nowait() |
| except: |
| pass |
| |
| |
| try: |
| self.audio_queue.put(None) |
| except: |
| pass |
| |
| print("π Audio streaming force stopped") |
|
|
| def force_stop_text_streaming(self): |
| """Force stop text streaming immediately without waiting.""" |
| if self.is_text_streaming: |
| self.is_text_streaming = False |
| |
| |
| try: |
| while not self.text_queue.empty(): |
| self.text_queue.get_nowait() |
| except: |
| pass |
| |
| |
| try: |
| self.text_queue.put(None) |
| except: |
| pass |
| |
| print("π Text streaming force stopped") |
|
|
| def check_emergency_stop(self): |
| """Check if emergency stop was triggered. Call this in loops.""" |
| if self.emergency_stop: |
| raise KeyboardInterrupt("Emergency stop triggered") |
|
|
| |
| |
| def _audio_stream_worker(self): |
| """Worker thread that plays audio chunks as they arrive.""" |
| while self.is_streaming and not self.emergency_stop: |
| try: |
| audio_data = self.audio_queue.get(timeout=0.1) |
| if audio_data is None or self.emergency_stop: |
| break |
|
|
| self.last_playing_audio_duration_seconds = len(audio_data) / self.sample_rate |
| |
| import sounddevice as sd |
| sd.play(audio_data, samplerate=self.sample_rate) |
| |
| |
| while sd.get_stream().active and not self.emergency_stop: |
| time.sleep(0.01) |
| |
| if self.emergency_stop: |
| sd.stop() |
| break |
| |
| except queue.Empty: |
| continue |
| except Exception as e: |
| if not self.emergency_stop: |
| print(f"Audio playback error: {e}") |
| break |
|
|
| def start_audio_streaming(self): |
| """Start the audio streaming thread.""" |
| try: |
| import sounddevice as sd |
| if not self.is_streaming and not self.emergency_stop: |
| self.is_streaming = True |
| self.stream_thread = threading.Thread(target=self._audio_stream_worker) |
| self.stream_thread.daemon = True |
| self.stream_thread.start() |
| print("π Audio streaming started") |
| except: |
| self.stream_audio = False |
| print("π No sounddevice available.") |
| pass |
|
|
| def stop_audio_streaming(self): |
| """Stop the audio streaming thread.""" |
| if self.is_streaming: |
| self.is_streaming = False |
| self.audio_queue.put(None) |
| if self.stream_thread: |
| self.stream_thread.join(timeout=2) |
| print("π Audio streaming stopped") |
|
|
| def queue_audio_for_streaming(self, audio_data, sample_rate=None): |
| """Queue audio data for streaming playback. |
| |
| Args: |
| audio_data: Audio data (numpy array, tensor, or file path) |
| sample_rate (int, optional): Sample rate of the audio data |
| """ |
| if self.is_streaming and not self.emergency_stop: |
| |
| processed_audio = self._prepare_audio_for_streaming(audio_data, sample_rate) |
| if processed_audio is not None: |
| self.audio_queue.put(processed_audio) |
| return len(processed_audio) / self.sample_rate |
| return 0 |
|
|
| def _prepare_audio_for_streaming(self, audio_data, sample_rate=None): |
| """Prepare audio data for streaming by converting to numpy array. |
| |
| Args: |
| audio_data: Raw audio data (numpy array, tensor, or file path) |
| sample_rate (int, optional): Sample rate of the audio data |
| |
| Returns: |
| numpy.ndarray: Audio data ready for streaming |
| """ |
| import numpy as np |
| |
| try: |
| |
| if isinstance(audio_data, np.ndarray): |
| |
| audio = audio_data.astype(np.float32) |
| |
| if audio.max() > 1.0 or audio.min() < -1.0: |
| audio = audio / np.max(np.abs(audio)) |
| return audio |
| |
| |
| elif hasattr(audio_data, 'cpu'): |
| audio = audio_data.cpu().numpy().astype(np.float32) |
| |
| if audio.max() > 1.0 or audio.min() < -1.0: |
| audio = audio / np.max(np.abs(audio)) |
| return audio |
| |
| |
| elif isinstance(audio_data, (str, Path)): |
| from scipy.io import wavfile |
| sr, audio = wavfile.read(str(audio_data)) |
| |
| if audio.dtype == np.int16: |
| audio = audio.astype(np.float32) / 32768.0 |
| elif audio.dtype == np.int32: |
| audio = audio.astype(np.float32) / 2147483648.0 |
| else: |
| audio = audio.astype(np.float32) |
| |
| |
| if sample_rate is None: |
| self.sample_rate = sr |
| |
| return audio |
| |
| else: |
| print(f"β οΈ Unsupported audio data type: {type(audio_data)}") |
| return None |
| |
| except Exception as e: |
| print(f"β Error preparing audio for streaming: {e}") |
| return None |
|
|
| def wait_for_audio_streaming_complete(self): |
| """Wait for all queued audio to finish playing.""" |
| time.sleep(0.5) |
| while not self.audio_queue.empty() and not self.emergency_stop: |
| time.sleep(0.1) |
|
|
| time.sleep(self.last_playing_audio_duration_seconds) |
|
|
| |
| |
| def _text_processing_worker(self): |
| """Worker thread that processes text chunks from the queue.""" |
| chunk_counter = 0 |
| |
| while self.is_text_streaming and not self.emergency_stop: |
| try: |
| text_chunk = self.text_queue.get(timeout=0.1) |
| if text_chunk is None or self.emergency_stop: |
| break |
| |
| if not text_chunk.strip(): |
| continue |
| |
| print(f"π Processing text chunk {chunk_counter + 1}: '{text_chunk[:50]}...'") |
| |
| |
| try: |
| audio_files = self.generate_audio_files( |
| text_chunk, |
| self.current_voice, |
| self.current_speed, |
| chunk_id=chunk_counter |
| ) |
| |
| if audio_files: |
| print(f"β
Generated audio for chunk {chunk_counter + 1}") |
| else: |
| print(f"β οΈ No audio generated for chunk {chunk_counter + 1}") |
| |
| except Exception as e: |
| print(f"β Error processing chunk {chunk_counter + 1}: {e}") |
| |
| chunk_counter += 1 |
| |
| except queue.Empty: |
| continue |
| except Exception as e: |
| if not self.emergency_stop: |
| print(f"Text processing error: {e}") |
| break |
| |
| print(f"π Text processing completed. Processed {chunk_counter} chunks.") |
|
|
| def start_text_streaming(self, voice, speed): |
| """Start the text processing streaming thread.""" |
| if not self.is_text_streaming and not self.emergency_stop: |
| self.current_voice = voice |
| self.current_speed = speed |
| self.is_text_streaming = True |
| self.text_processing_thread = threading.Thread(target=self._text_processing_worker) |
| self.text_processing_thread.daemon = True |
| self.text_processing_thread.start() |
| print("π Text streaming started") |
|
|
| def stop_text_streaming(self): |
| """Stop the text processing streaming thread.""" |
| if self.is_text_streaming: |
| self.is_text_streaming = False |
| self.text_queue.put(None) |
| if self.text_processing_thread: |
| self.text_processing_thread.join(timeout=5) |
| print("π Text streaming stopped") |
|
|
| def add_text_chunk(self, text_chunk): |
| """Add a text chunk to the processing queue. |
| |
| Args: |
| text_chunk (str): Text chunk to process |
| """ |
| if self.is_text_streaming and not self.emergency_stop and text_chunk.strip(): |
| |
| cleaned_chunk = text_chunk.strip() |
| if not any(cleaned_chunk.endswith(p) for p in ['.', '!', '?', ':', ';', ',']): |
| cleaned_chunk += '.' |
| |
| self.text_queue.put(cleaned_chunk) |
| print(f"π Queued text chunk: '{cleaned_chunk[:30]}...'") |
| else: |
| if not self.is_text_streaming: |
| print("β οΈ Text streaming not started. Call start_text_streaming() first.") |
|
|
| def add_text_by_words(self, text, words_per_chunk=None): |
| """Split text into word chunks and add to queue. |
| |
| Args: |
| text (str): Full text to split and queue |
| words_per_chunk (int, optional): Number of words per chunk. Uses self.text_chunk_size if None. |
| """ |
| if words_per_chunk is None: |
| words_per_chunk = self.text_chunk_size |
| |
| words = text.split() |
| |
| for i in range(0, len(words), words_per_chunk): |
| chunk = ' '.join(words[i:i + words_per_chunk]) |
| self.add_text_chunk(chunk) |
| |
| print(f"π Split text into {(len(words) + words_per_chunk - 1) // words_per_chunk} chunks of {words_per_chunk} words each") |
|
|
| def wait_for_text_processing_complete(self): |
| """Wait for all queued text chunks to be processed.""" |
| print("π Waiting for text processing to complete...") |
| while not self.text_queue.empty() and not self.emergency_stop: |
| time.sleep(0.1) |
| time.sleep(1) |
| print("π Text processing queue empty") |
|
|
| |
|
|
| def feed_text_chunk(self, text_chunk): |
| """Feed a single text chunk for processing with smart buffering. |
| |
| Args: |
| text_chunk (str): Text chunk to process |
| """ |
| |
| self.temp_feed_words.extend(text_chunk.split()) |
| |
| |
| all_words = " ".join(self.temp_feed_words) |
| sentences = self.split_sentences(all_words) |
| total_sentences = len(sentences) |
| |
| |
| for i, sentence in enumerate(sentences): |
| if i + 1 != total_sentences: |
| print(f"π Feeding chunk: {sentence}") |
| self.add_text_chunk(sentence) |
| |
| |
| self.temp_feed_words = sentences[-1].split() if sentences else [] |
|
|
| def flush_remaining_words(self): |
| """Flush any remaining words in the buffer. Call this when done feeding text.""" |
| if self.temp_feed_words: |
| chunk_text = " ".join(self.temp_feed_words) |
| print(f"π Flushing final chunk: {chunk_text}") |
| self.add_text_chunk(chunk_text) |
| self.temp_feed_words = [] |
|
|
| |
|
|
| def stream_real_time_text(self, args): |
| """Initialize streaming for real-time text input. |
| |
| Args: |
| args: Arguments containing voice, speed, etc. |
| """ |
| speed = self.validate_speed(args) |
| voice = self.validate_voice_index(args) |
| |
| |
| self.cleanup_temp_files() |
| self.setup_output_directory() |
| |
| |
| if self.stream_audio: |
| self.start_audio_streaming() |
| |
| self.start_text_streaming(voice, speed) |
| |
| print("π Real-time text streaming initialized!") |
| print("π Use feed_text_chunk() to add text incrementally") |
| print("π Use add_text_chunk() to add individual chunks") |
| print("π Use add_text_by_words() to split and add text automatically") |
| print("π Use stop_all_streaming() when done") |
|
|
| def stop_all_streaming(self): |
| """Stop all streaming operations and cleanup.""" |
| print("π Stopping all streaming operations...") |
| |
| |
| self.flush_remaining_words() |
| |
| |
| self.wait_for_text_processing_complete() |
| self.wait_for_audio_streaming_complete() |
| |
| |
| self.stop_text_streaming() |
| self.stop_audio_streaming() |
| |
| print("β
All streaming operations stopped") |
|
|
| |
|
|
| def start_streaming(self): |
| """Start audio streaming (backward compatibility).""" |
| self.start_audio_streaming() |
|
|
| def stop_streaming(self): |
| """Stop audio streaming (backward compatibility).""" |
| self.stop_audio_streaming() |
|
|
| def wait_for_streaming_complete(self): |
| """Wait for audio streaming to complete (backward compatibility).""" |
| self.wait_for_audio_streaming_complete() |
|
|
| |
| |
| def generate_audio_files(self, text: str, voice: str, speed: float, chunk_id: int = None): |
| """Generate audio files. To be implemented by subclasses. |
| |
| Args: |
| text (str): Text to convert to audio |
| voice (str): Voice file path |
| speed (float): Speed multiplier |
| chunk_id (int, optional): Unique identifier for this chunk (for streaming) |
| """ |
| raise NotImplementedError("Subclasses must implement generate_audio_files") |
|
|
| |
|
|
| def save_audio(self, args) -> bool: |
| """Generate and save complete audio file (batch mode). |
| |
| Args: |
| args: Arguments containing voice, speed, etc. |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| |
| text = self.read_content_file() |
| if not text: |
| raise ValueError("Warning: Content file is empty") |
|
|
| speed = self.validate_speed(args) |
| voice = self.validate_voice_index(args) |
| |
| |
| self.cleanup_temp_files() |
| |
| |
| self.setup_output_directory() |
| |
| |
| if self.stream_audio: |
| self.start_audio_streaming() |
|
|
| audio_files = self.generate_audio_files(text, voice, speed) |
| |
| if not audio_files: |
| raise ValueError("Error: No audio files generated") |
|
|
| |
| success = self.combine_audio_files(audio_files) |
|
|
| self.wait_for_audio_streaming_complete() |
| self.stop_audio_streaming() |
| |
| return success |