| import numpy as np |
| import re |
| import struct |
| import lameenc |
|
|
| def split_text_into_sentences(text: str, min_chunk_size: int = 150,split_pattern = r'\n+'): |
| if not text: |
| return [] |
|
|
| |
| text = re.sub(r'\s+', ' ', text).strip() |
|
|
| |
| raw_parts = re.split(r'([.?!:;]+)(?=\s|$)', text) |
| |
| atomic_sentences = [] |
| current_atomic = "" |
| |
| for part in raw_parts: |
| if not part.strip(): |
| continue |
| if re.match(r'^[.?!:;]+$', part): |
| current_atomic += part |
| if current_atomic.strip(): |
| atomic_sentences.append(current_atomic.strip()) |
| current_atomic = "" |
| else: |
| current_atomic += part |
| |
| if current_atomic.strip(): |
| atomic_sentences.append(current_atomic.strip()) |
|
|
| |
| final_chunks = [] |
| current_buffer = "" |
| first_sentence_sent = False |
|
|
| for sentence in atomic_sentences: |
| if not first_sentence_sent: |
| final_chunks.append(sentence) |
| first_sentence_sent = True |
| continue |
|
|
| if current_buffer: |
| current_buffer += " " + sentence |
| else: |
| current_buffer = sentence |
|
|
| if len(current_buffer) >= min_chunk_size: |
| final_chunks.append(current_buffer) |
| current_buffer = "" |
|
|
| if current_buffer: |
| final_chunks.append(current_buffer) |
|
|
| return final_chunks |
|
|
| def create_wav_header(sample_rate: int, channels: int = 1, bits_per_sample: int = 16): |
| byte_rate = sample_rate * channels * bits_per_sample // 8 |
| block_align = channels * bits_per_sample // 8 |
| |
| header = b'RIFF' |
| header += struct.pack('<I', 0xFFFFFFFF) |
| header += b'WAVE' |
| header += b'fmt ' |
| header += struct.pack('<I', 16) |
| header += struct.pack('<H', 1) |
| header += struct.pack('<H', channels) |
| header += struct.pack('<I', sample_rate) |
| header += struct.pack('<I', byte_rate) |
| header += struct.pack('<H', block_align) |
| header += struct.pack('<H', bits_per_sample) |
| header += b'data' |
| header += struct.pack('<I', 0xFFFFFFFF) |
| |
| return header |
|
|
| def float_to_pcm16(audio_array): |
| """Converts float32 audio to int16 bytes.""" |
| audio_array = np.array(audio_array) |
| if len(audio_array.shape) > 1: |
| audio_array = audio_array.flatten() |
| |
| |
| audio_array = np.clip(audio_array, -1.0, 1.0) |
| |
| |
| audio_int16 = (audio_array * 32767).astype(np.int16) |
| return audio_int16.tobytes() |
|
|
| def create_mp3_encoder(sample_rate=44100, channels=1, bit_rate=128, quality=5): |
| encoder = lameenc.Encoder() |
| encoder.set_bit_rate(bit_rate) |
| encoder.set_in_sample_rate(sample_rate) |
| encoder.set_channels(channels) |
| encoder.set_quality(quality) |
| return encoder |
|
|
|
|
| def float_to_mp3(audio_array, encoder): |
| """ |
| Converts float32 audio -> Int16 -> Encoded MP3 bytes. |
| """ |
| |
| audio_array = np.array(audio_array) |
| if len(audio_array.shape) > 1: |
| audio_array = audio_array.flatten() |
| |
| audio_array = np.clip(audio_array, -1.0, 1.0) |
| audio_int16 = (audio_array * 32767).astype(np.int16) |
| |
| |
| |
| mp3_data = encoder.encode(audio_int16.tobytes()) |
| |
| return bytes(mp3_data) |