| """ |
| services/streaming.py — Production-grade parallel TTS streamer |
| |
| FIX-ISSUE4 (Natural, slow, small-chunk TTS): |
| The previous code used character-count thresholds that produced large |
| sentence-level chunks (25–65 chars), causing buffered, robotic-feeling |
| speech with a burst of audio at once. |
| |
| New behaviour: |
| • Flush at word boundaries (2–3 words) for voice-like pacing. |
| • Flush threshold is ~15 chars first chunk, ~25 chars subsequent — which |
| corresponds to roughly 2–3 average Bengali/English words. |
| • Hard limit of 40 chars ensures no chunk ever gets too large. |
| • Sentence-ending punctuation (।.!?) always flushes immediately |
| regardless of length, giving natural pause points. |
| • The TTS rate is slightly faster than neutral in tts.py for a more |
| conversational pace. |
| |
| Result: audio arrives in small, fast, overlapping synthesis tasks, |
| giving a low-latency, smooth, natural speech feel. |
| |
| FIX-BUG5 (TOCTOU race in stream_audio) — preserved from previous version. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import re |
| from dataclasses import dataclass, field |
| from typing import AsyncGenerator |
|
|
| from services.tts import text_to_speech_stream, USE_ELEVENLABS, EDGE_VOICE |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| if USE_ELEVENLABS: |
| |
| |
| FIRST_FLUSH_MIN = 8 |
| FIRST_FLUSH_HARD = 18 |
| SUBSEQUENT_FLUSH_MIN = 14 |
| SUBSEQUENT_FLUSH_HARD = 28 |
| else: |
| FIRST_FLUSH_MIN = 10 |
| FIRST_FLUSH_HARD = 30 |
| SUBSEQUENT_FLUSH_MIN = 18 |
| SUBSEQUENT_FLUSH_HARD = 40 |
|
|
| _backend_label = "ElevenLabs" if USE_ELEVENLABS else "Edge-TTS" |
| print(f"[Streamer] TTS backend: {_backend_label} | chunk: {SUBSEQUENT_FLUSH_MIN}–{SUBSEQUENT_FLUSH_HARD} chars") |
|
|
| MIN_CHARS = 2 |
| SENTENCE_BOUNDARIES = frozenset(".!?।॥\n") |
| CLAUSE_BOUNDARIES = frozenset(",;:—–") |
| _SENTINEL = object() |
|
|
| _DIGIT_WORDS = { |
| "0": "শূন্য", |
| "1": "এক", |
| "2": "দুই", |
| "3": "তিন", |
| "4": "চার", |
| "5": "পাঁচ", |
| "6": "ছয়", |
| "7": "সাত", |
| "8": "আট", |
| "9": "নয়", |
| "০": "শূন্য", |
| "১": "এক", |
| "২": "দুই", |
| "৩": "তিন", |
| "৪": "চার", |
| "৫": "পাঁচ", |
| "৬": "ছয়", |
| "৭": "সাত", |
| "৮": "আট", |
| "৯": "নয়", |
| "٠": "শূন্য", |
| "١": "এক", |
| "٢": "দুই", |
| "٣": "তিন", |
| "٤": "চার", |
| "٥": "পাঁচ", |
| "٦": "ছয়", |
| "٧": "সাত", |
| "٨": "আট", |
| "٩": "নয়", |
| } |
|
|
|
|
| def _spoken_phone_text(text: str) -> str: |
| if not text: |
| return "" |
|
|
| def repl(match: re.Match[str]) -> str: |
| chunk = match.group(0) |
| digits = [ch for ch in chunk if ch in _DIGIT_WORDS] |
| if len(digits) < 10: |
| return chunk |
| spoken = " ".join(_DIGIT_WORDS[ch] for ch in digits) |
| prev_char = text[match.start() - 1] if match.start() > 0 else "" |
| next_char = text[match.end()] if match.end() < len(text) else "" |
| if prev_char and not prev_char.isspace() and prev_char not in "([<{\"'": |
| spoken = " " + spoken |
| if next_char and not next_char.isspace() and next_char not in ")]>.,!?;:}\"'": |
| spoken = spoken + " " |
| return spoken |
|
|
| out = re.sub(r"[+\d০-৯٠-٩][\d০-৯٠-٩\s().\-]{8,}[\d০-৯٠-٩]", repl, text) |
| return re.sub(r"[ \t]{2,}", " ", out) |
|
|
|
|
| def _clean_for_tts(text: str) -> str: |
| |
| |
| |
| text = re.sub(r"(?:(?<=^)|(?<=\s))\[[^\[\]\n]{1,24}\](?=\s|$)", "", text) |
| |
| |
| text = re.sub(r"(?:(?<=^)|(?<=\s))\[[A-Za-z]{2,16}(?=\s|$)", "", text) |
| text = re.sub(r"(?:(?<=^)|(?<=\s))[A-Za-z]{2,16}\](?=\s|$)", "", text) |
| text = re.sub(r"\*{1,3}", "", text) |
| text = re.sub(r"#+\s*", "", text) |
| text = re.sub(r"^\s*[-•]\s*", "", text, flags=re.MULTILINE) |
| text = re.sub(r"^\s*[\d০-৯]+[.)]\s*", "", text, flags=re.MULTILINE) |
| text = re.sub(r"`+", "", text) |
| text = re.sub(r"\n{2,}", "\n", text) |
| |
| text = re.sub(r"[ \t]{2,}", " ", text) |
| text = _spoken_phone_text(text) |
| |
| return text.strip("\n\r\t") |
|
|
|
|
| def _flush_reason(buffer: str, first_chunk: bool) -> str | None: |
| """ |
| Like _should_flush, but returns the reason so we can preserve spacing |
| when flushing at a word boundary. |
| """ |
| n = len(buffer) |
| if n == 0: |
| return None |
|
|
| flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN |
| hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD |
|
|
| if n >= hard_limit: |
| return "hard" |
|
|
| last_char = buffer[-1] |
|
|
| if last_char in SENTENCE_BOUNDARIES and n >= flush_min: |
| return "sentence" |
|
|
| if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: |
| return "clause" |
|
|
| if last_char == " " and n >= flush_min: |
| return "space" |
|
|
| return None |
|
|
|
|
| def _should_flush(buffer: str, first_chunk: bool) -> bool: |
| n = len(buffer) |
| if n == 0: |
| return False |
|
|
| flush_min = FIRST_FLUSH_MIN if first_chunk else SUBSEQUENT_FLUSH_MIN |
| hard_limit = FIRST_FLUSH_HARD if first_chunk else SUBSEQUENT_FLUSH_HARD |
|
|
| |
| if n >= hard_limit: |
| return True |
|
|
| last_char = buffer[-1] |
|
|
| |
| if last_char in SENTENCE_BOUNDARIES and n >= flush_min: |
| return True |
|
|
| |
| if last_char in CLAUSE_BOUNDARIES and n >= hard_limit * 0.70: |
| return True |
|
|
| |
| if last_char == ' ' and n >= flush_min: |
| return True |
|
|
| return False |
|
|
|
|
| @dataclass |
| class _AudioSlot: |
| index: int |
| queue: asyncio.Queue = field(default_factory=lambda: asyncio.Queue()) |
| done: bool = False |
|
|
| def mark_done(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) |
| def mark_error(self) -> None: self.done = True; self.queue.put_nowait(_SENTINEL) |
|
|
|
|
| class ParallelTTSStreamer: |
| def __init__(self, voice: str | None = None) -> None: |
| self.voice = voice |
| self.buffer = "" |
| self._cancelled = False |
| self._first_chunk = True |
| self._carry_space = False |
| self._slot_index = 0 |
| self._slots: list[_AudioSlot] = [] |
| self._slots_lock = asyncio.Lock() |
| self._tasks: list[asyncio.Task] = [] |
| self._llm_done = asyncio.Event() |
| self._slot_added = asyncio.Event() |
| self._last_flush_t: float = 0.0 |
| self._last_token_t: float = 0.0 |
|
|
| async def add_token(self, token: str) -> None: |
| if not token or self._cancelled: |
| return |
| loop = asyncio.get_running_loop() |
| now = loop.time() |
| self._last_token_t = now |
| |
| |
| if self.buffer == " " and token[:1].isspace(): |
| token = token.lstrip() |
| self.buffer += token |
|
|
| reason = _flush_reason(self.buffer, self._first_chunk) |
| if reason is not None: |
| self._first_chunk = False |
| self._carry_space = (reason == "space") |
| await self._schedule_chunk() |
| self._last_flush_t = now |
| return |
|
|
| |
| |
| |
| flush_min = FIRST_FLUSH_MIN if self._first_chunk else SUBSEQUENT_FLUSH_MIN |
| if len(self.buffer) >= flush_min and (now - self._last_flush_t) >= 0.8: |
| self._first_chunk = False |
| |
| self._carry_space = False |
| await self._schedule_chunk() |
| self._last_flush_t = now |
|
|
| async def _schedule_chunk(self) -> None: |
| if self._cancelled: |
| self.buffer = "" |
| return |
| raw = self.buffer |
| self.buffer = " " if self._carry_space else "" |
| self._carry_space = False |
| |
| |
| text = _clean_for_tts(raw) |
| if len(text) < MIN_CHARS: |
| return |
| async with self._slots_lock: |
| slot = _AudioSlot(index=self._slot_index) |
| self._slot_index += 1 |
| self._slots.append(slot) |
| self._slot_added.set() |
| task = asyncio.create_task(self._synthesise(text, slot)) |
| self._tasks.append(task) |
| task.add_done_callback( |
| lambda t: self._tasks.remove(t) if t in self._tasks else None |
| ) |
|
|
| async def _synthesise(self, text: str, slot: _AudioSlot) -> None: |
| if self._cancelled: |
| slot.mark_error() |
| return |
| try: |
| async for chunk in text_to_speech_stream(text, voice=self.voice): |
| if self._cancelled: |
| break |
| await slot.queue.put(chunk) |
| except asyncio.CancelledError: |
| pass |
| except Exception as exc: |
| print(f"[Streamer] TTS error for '{text[:50]}': {exc}") |
| finally: |
| slot.mark_done() |
|
|
| async def flush(self) -> None: |
| if self.buffer.strip(): |
| await self._schedule_chunk() |
| self._llm_done.set() |
|
|
| async def cancel(self) -> None: |
| self._cancelled = True |
| tasks = list(self._tasks) |
| self._tasks.clear() |
| for t in tasks: |
| t.cancel() |
| if tasks: |
| await asyncio.gather(*tasks, return_exceptions=True) |
| async with self._slots_lock: |
| for slot in self._slots: |
| if not slot.done: |
| slot.mark_error() |
| self._llm_done.set() |
| self._slot_added.set() |
|
|
| async def stream_audio(self) -> AsyncGenerator[bytes, None]: |
| """ |
| Deliver TTS audio chunks in slot order. |
| |
| FIX-BUG5 — double-check pattern eliminates TOCTOU race: |
| 1. clear() the event |
| 2. Re-check slot list under lock (slot may have been added between |
| previous check and clear()) |
| 3. Only then wait() — so we never miss a newly-added slot |
| """ |
| delivered = 0 |
| while True: |
| async with self._slots_lock: |
| slot = self._slots[delivered] if delivered < len(self._slots) else None |
|
|
| if slot is None: |
| if self._llm_done.is_set(): |
| async with self._slots_lock: |
| total = len(self._slots) |
| if delivered >= total: |
| break |
|
|
| |
| self._slot_added.clear() |
| async with self._slots_lock: |
| have_new = delivered < len(self._slots) |
| if have_new: |
| continue |
| try: |
| await asyncio.wait_for(self._slot_added.wait(), timeout=30.0) |
| except asyncio.TimeoutError: |
| |
| |
| if self._llm_done.is_set(): |
| break |
| print("[Streamer] Timeout waiting for TTS slot (continuing)…") |
| continue |
| continue |
|
|
| |
| while True: |
| item = await slot.queue.get() |
| if item is _SENTINEL: |
| break |
| if not self._cancelled: |
| yield item |
| delivered += 1 |
|
|
| def reset(self) -> None: |
| self._cancelled = False |
| self._first_chunk = True |
| self._carry_space = False |
| self.buffer = "" |
| self._slot_index = 0 |
| self._slots.clear() |
| self._tasks.clear() |
| self._llm_done.clear() |
| self._slot_added.clear() |
|
|