| | from copy import deepcopy |
| | from time import time |
| | from config import VAD_MODEL_PATH |
| | from silero_vad import load_silero_vad |
| | import numpy as np |
| | import onnxruntime |
| | import logging |
| | from datetime import timedelta |
| | import gc |
| | from pydub import AudioSegment |
| | from collections import deque |
| |
|
| | class AdaptiveSilenceController: |
| | def __init__(self, base_silence_ms=120, min_ms=50, max_ms=600): |
| | self.base = base_silence_ms |
| | self.min = min_ms |
| | self.max = max_ms |
| | self.recent_silences = deque(maxlen=20) |
| | self.recent_speeches = deque(maxlen=20) |
| |
|
| | def update_silence(self, duration_ms): |
| | self.recent_silences.append(duration_ms) |
| |
|
| | def update_speech(self, duration_ms): |
| | self.recent_speeches.append(duration_ms) |
| |
|
| | def get_adaptive_silence_ms(self): |
| | |
| | avg_speech = np.mean(self.recent_speeches) if self.recent_speeches else self.base |
| | avg_silence = np.mean(self.recent_silences) if self.recent_silences else self.base |
| |
|
| | |
| | speed_factor = 1.0 |
| | if avg_speech < 300: |
| | speed_factor = 0.5 |
| | elif avg_speech < 600: |
| | speed_factor = 0.8 |
| | logging.warning(f"Avg speech :{avg_speech}, Avg silence: {avg_silence}") |
| | |
| | adaptive = self.base * speed_factor + 0.3 * avg_silence |
| |
|
| | return int(max(self.min, min(self.max, adaptive))) |
| |
|
| | |
| | class OnnxWrapper(): |
| |
|
| | def __init__(self, path, force_onnx_cpu=False): |
| | opts = onnxruntime.SessionOptions() |
| | opts.inter_op_num_threads = 1 |
| | opts.intra_op_num_threads = 1 |
| |
|
| | if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers(): |
| | self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts) |
| | else: |
| | self.session = onnxruntime.InferenceSession(path, sess_options=opts) |
| |
|
| | self.reset_states() |
| | self.sample_rates = [16000] |
| |
|
| | def _validate_input(self, x: np.ndarray, sr: int): |
| | if x.ndim == 1: |
| | x = x[None] |
| | if x.ndim > 2: |
| | raise ValueError(f"Too many dimensions for input audio chunk {x.ndim}") |
| |
|
| | if sr != 16000 and (sr % 16000 == 0): |
| | step = sr // 16000 |
| | x = x[:, ::step] |
| | sr = 16000 |
| |
|
| | if sr not in self.sample_rates: |
| | raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)") |
| | if sr / x.shape[1] > 31.25: |
| | raise ValueError("Input audio chunk is too short") |
| |
|
| | return x, sr |
| |
|
| | def reset_states(self, batch_size=1): |
| | self._state = np.zeros((2, batch_size, 128)).astype(np.float32) |
| | self._context = np.zeros(0) |
| | self._last_sr = 0 |
| | self._last_batch_size = 0 |
| |
|
| | def __call__(self, x, sr: int): |
| |
|
| | x, sr = self._validate_input(x, sr) |
| | num_samples = 512 if sr == 16000 else 256 |
| |
|
| | if x.shape[-1] != num_samples: |
| | raise ValueError( |
| | f"Provided number of samples is {x.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)") |
| |
|
| | batch_size = x.shape[0] |
| | context_size = 64 if sr == 16000 else 32 |
| |
|
| | if not self._last_batch_size: |
| | self.reset_states(batch_size) |
| | if (self._last_sr) and (self._last_sr != sr): |
| | self.reset_states(batch_size) |
| | if (self._last_batch_size) and (self._last_batch_size != batch_size): |
| | self.reset_states(batch_size) |
| |
|
| | if not len(self._context): |
| | self._context = np.zeros((batch_size, context_size)).astype(np.float32) |
| |
|
| | x = np.concatenate([self._context, x], axis=1) |
| | if sr in [8000, 16000]: |
| | ort_inputs = {'input': x, 'state': self._state, 'sr': np.array(sr, dtype='int64')} |
| | ort_outs = self.session.run(None, ort_inputs) |
| | out, state = ort_outs |
| | self._state = state |
| | else: |
| | raise ValueError() |
| |
|
| | self._context = x[..., -context_size:] |
| | self._last_sr = sr |
| | self._last_batch_size = batch_size |
| |
|
| | |
| | return out |
| |
|
| | def audio_forward(self, audio: np.ndarray, sr: int): |
| | outs = [] |
| | x, sr = self._validate_input(audio, sr) |
| | self.reset_states() |
| | num_samples = 512 if sr == 16000 else 256 |
| |
|
| | if x.shape[1] % num_samples: |
| | pad_num = num_samples - (x.shape[1] % num_samples) |
| | x = np.pad(x, ((0, 0), (0, pad_num)), 'constant', constant_values=(0.0, 0.0)) |
| |
|
| | for i in range(0, x.shape[1], num_samples): |
| | wavs_batch = x[:, i:i + num_samples] |
| | out_chunk = self.__call__(wavs_batch, sr) |
| | outs.append(out_chunk) |
| |
|
| | stacked = np.concatenate(outs, axis=1) |
| | return stacked |
| |
|
| |
|
| | class VADIteratorOnnx: |
| | def __init__(self, |
| | threshold: float = 0.5, |
| | sampling_rate: int = 16000, |
| | min_silence_duration_ms: int = 100, |
| | max_speech_duration_s: float = float('inf'), |
| | speech_pad_ms: int = 30 |
| | ): |
| | self.model = OnnxWrapper(VAD_MODEL_PATH, True) |
| | self.threshold = threshold |
| | self.sampling_rate = sampling_rate |
| |
|
| | if sampling_rate not in [8000, 16000]: |
| | raise ValueError('VADIterator does not support sampling rates other than [8000, 16000]') |
| |
|
| | self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000 |
| | |
| | self.speech_pad_samples = sampling_rate * speech_pad_ms / 1000 |
| | self.reset_states() |
| |
|
| | def reset_states(self): |
| |
|
| | self.model.reset_states() |
| | self.triggered = False |
| | self.temp_end = 0 |
| | self.current_sample = 0 |
| | self.start = 0 |
| |
|
| | def __call__(self, x: np.ndarray, return_seconds=False): |
| | """ |
| | x: np.ndarray |
| | audio chunk (see examples in repo) |
| | |
| | return_seconds: bool (default - False) |
| | whether return timestamps in seconds (default - samples) |
| | """ |
| |
|
| | window_size_samples = 512 if self.sampling_rate == 16000 else 256 |
| | x = x[:window_size_samples] |
| | if len(x) < window_size_samples: |
| | x = np.pad(x, ((0, 0), (0, window_size_samples - len(x))), 'constant', constant_values=0.0) |
| |
|
| | self.current_sample += window_size_samples |
| |
|
| | speech_prob = self.model(x, self.sampling_rate)[0,0] |
| |
|
| |
|
| | if (speech_prob >= self.threshold) and self.temp_end: |
| | self.temp_end = 0 |
| |
|
| | if (speech_prob >= self.threshold) and not self.triggered: |
| | self.triggered = True |
| | |
| | speech_start = max(0, self.current_sample - self.speech_pad_samples - window_size_samples) |
| | self.start = speech_start |
| | return {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, 1)} |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | if (speech_prob < self.threshold - 0.15) and self.triggered: |
| | if not self.temp_end: |
| | self.temp_end = self.current_sample |
| | if self.current_sample - self.temp_end < self.min_silence_samples: |
| | return None |
| | else: |
| | |
| | speech_end = self.temp_end + self.speech_pad_samples - window_size_samples |
| | self.temp_end = 0 |
| | self.triggered = False |
| | return {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, 1)} |
| |
|
| | return None |
| |
|
| |
|
| |
|
| |
|
| | class FixedVADIterator(VADIteratorOnnx): |
| | '''It fixes VADIterator by allowing to process any audio length, not only exactly 512 frames at once. |
| | If audio to be processed at once is long and multiple voiced segments detected, |
| | then __call__ returns the start of the first segment, and end (or middle, which means no end) of the last segment. |
| | ''' |
| |
|
| | def reset_states(self): |
| | super().reset_states() |
| | self.buffer = np.array([],dtype=np.float32) |
| |
|
| | def __call__(self, x, return_seconds=False): |
| | self.buffer = np.append(self.buffer, x) |
| | ret = None |
| | while len(self.buffer) >= 512: |
| | r = super().__call__(self.buffer[:512], return_seconds=return_seconds) |
| | self.buffer = self.buffer[512:] |
| | if ret is None: |
| | ret = r |
| | elif r is not None: |
| | if 'end' in r: |
| | ret['end'] = r['end'] |
| | if 'start' in r and 'end' in ret: |
| | |
| | del ret['end'] |
| | return ret if ret != {} else None |
| |
|
| | class VadV2: |
| | def __init__(self, |
| | threshold: float = 0.5, |
| | sampling_rate: int = 16000, |
| | min_silence_duration_ms: int = 100, |
| | speech_pad_ms: int = 30, |
| | max_speech_duration_s: float = float('inf')): |
| | |
| | self.vad_iterator = VADIteratorOnnx(threshold, sampling_rate, min_silence_duration_ms, max_speech_duration_s) |
| | self.speech_pad_samples = int(sampling_rate * speech_pad_ms / 1000) |
| | self.sampling_rate = sampling_rate |
| | self.audio_buffer = np.array([], dtype=np.float32) |
| | self.start = 0 |
| | self.end = 0 |
| | self.offset = 0 |
| | assert speech_pad_ms <= min_silence_duration_ms, "speech_pad_ms should be less than min_silence_duration_ms" |
| | self.max_speech_samples = int(sampling_rate * max_speech_duration_s) |
| |
|
| | self.silence_chunk_size = 0 |
| | self.silence_chunk_threshold = 60 / (512 / self.sampling_rate) |
| |
|
| | def reset(self): |
| | self.audio_buffer = np.array([], dtype=np.float32) |
| | self.start = 0 |
| | self.end = 0 |
| | self.offset = 0 |
| | self.vad_iterator.reset_states() |
| |
|
| | def __call__(self, x: np.ndarray = None): |
| | if x is None: |
| | if self.start: |
| | start = max(self.offset, self.start - self.speech_pad_samples) |
| | end = self.offset + len(self.audio_buffer) |
| | start_ts = round(start / self.sampling_rate, 1) |
| | end_ts = round(end / self.sampling_rate, 1) |
| | audio_data = self.audio_buffer[start - self.offset: end - self.offset] |
| | result = { |
| | "start": start_ts, |
| | "end": end_ts, |
| | "audio": audio_data, |
| | } |
| | else: |
| | result = None |
| | self.reset() |
| | return result |
| |
|
| | self.audio_buffer = np.append(self.audio_buffer, deepcopy(x)) |
| |
|
| | result = self.vad_iterator(x) |
| | if result is not None: |
| | |
| | |
| | self.silence_chunk_size = 0 |
| |
|
| | if 'start' in result: |
| | self.start = result['start'] |
| | if 'end' in result: |
| | self.end = result['end'] |
| | else: |
| | self.silence_chunk_size += 1 |
| |
|
| | if self.start == 0 and len(self.audio_buffer) > self.speech_pad_samples: |
| | self.offset += len(self.audio_buffer) - self.speech_pad_samples |
| | self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:] |
| |
|
| | if self.silence_chunk_size >= self.silence_chunk_threshold: |
| | self.offset += len(self.audio_buffer) - self.speech_pad_samples |
| | self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:] |
| | self.silence_chunk_size = 0 |
| |
|
| | if self.end > self.start: |
| | start = max(self.offset, self.start - self.speech_pad_samples) |
| | end = self.end + self.speech_pad_samples |
| | start_ts = round(start / self.sampling_rate, 1) |
| | end_ts = round(end / self.sampling_rate, 1) |
| | audio_data = self.audio_buffer[start - self.offset: end - self.offset] |
| | self.audio_buffer = self.audio_buffer[self.end - self.offset:] |
| | self.offset = self.end |
| | self.start = self.end |
| | |
| | self.end = 0 |
| | result = { |
| | "start": start_ts, |
| | "end": end_ts, |
| | "audio": audio_data, |
| | } |
| |
|
| | return result |
| | return None |
| |
|
| |
|
| | class SileroVADProcessor: |
| | """ |
| | A class for processing audio files using Silero VAD to detect voice activity |
| | and extract voice segments from audio files. |
| | """ |
| |
|
| | def __init__(self, |
| | activate_threshold=0.5, |
| | fusion_threshold=0.3, |
| | min_speech_duration=0.25, |
| | max_speech_duration=20, |
| | min_silence_duration=250, |
| | sample_rate=16000, |
| | ort_providers=None): |
| | """ |
| | Initialize the SileroVADProcessor. |
| | Args: |
| | activate_threshold (float): Threshold for voice activity detection |
| | fusion_threshold (float): Threshold for merging close speech segments (seconds) |
| | min_speech_duration (float): Minimum duration of speech to be considered valid (seconds) |
| | max_speech_duration (float): Maximum duration of speech (seconds) |
| | min_silence_duration (int): Minimum silence duration (ms) |
| | sample_rate (int): Sample rate of the audio (8000 or 16000 Hz) |
| | ort_providers (list): ONNX Runtime providers for acceleration |
| | """ |
| | |
| | self.activate_threshold = activate_threshold |
| | self.fusion_threshold = fusion_threshold |
| | self.min_speech_duration = min_speech_duration |
| | self.max_speech_duration = max_speech_duration |
| | self.min_silence_duration = min_silence_duration |
| | self.sample_rate = sample_rate |
| | self.ort_providers = ort_providers if ort_providers else [] |
| |
|
| | |
| | self.logger = logging.getLogger(__name__) |
| |
|
| | |
| | self._init_onnx_session() |
| | self.silero_vad = load_silero_vad(onnx=True) |
| |
|
| | def _init_onnx_session(self): |
| | """Initialize ONNX Runtime session with appropriate settings.""" |
| | session_opts = onnxruntime.SessionOptions() |
| | session_opts.log_severity_level = 3 |
| | session_opts.inter_op_num_threads = 0 |
| | session_opts.intra_op_num_threads = 0 |
| | session_opts.enable_cpu_mem_arena = True |
| | session_opts.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL |
| | session_opts.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL |
| |
|
| | session_opts.add_session_config_entry("session.intra_op.allow_spinning", "1") |
| | session_opts.add_session_config_entry("session.inter_op.allow_spinning", "1") |
| | session_opts.add_session_config_entry("session.set_denormal_as_zero", "1") |
| |
|
| | |
| | |
| |
|
| | def load_audio(self, audio_path): |
| | """ |
| | Load audio file and prepare it for VAD processing. |
| | Args: |
| | audio_path (str): Path to the audio file |
| | Returns: |
| | numpy.ndarray: Audio data as numpy array |
| | """ |
| | self.logger.info(f"Loading audio from {audio_path}") |
| | audio_segment = AudioSegment.from_file(audio_path) |
| | audio_segment = audio_segment.set_channels(1).set_frame_rate(self.sample_rate) |
| |
|
| | |
| | dtype = np.float16 if self.use_gpu_fp16 else np.float32 |
| | audio_array = np.array(audio_segment.get_array_of_samples(), dtype=dtype) * 0.000030517578 |
| |
|
| | self.audio_segment = audio_segment |
| | return audio_array |
| | |
| | @property |
| | def model(self): |
| | return self.silero_vad |
| |
|
| | def process_timestamps(self, timestamps): |
| | """ |
| | Process VAD timestamps: filter short segments and merge close segments. |
| | Args: |
| | timestamps (list): List of (start, end) tuples |
| | Returns: |
| | list: Processed list of (start, end) tuples |
| | """ |
| | |
| | filtered_timestamps = [(start, end) for start, end in timestamps |
| | if (end - start) >= self.min_speech_duration] |
| |
|
| | |
| | fused_timestamps_1st = [] |
| | for start, end in filtered_timestamps: |
| | if fused_timestamps_1st and (start - fused_timestamps_1st[-1][1] <= self.fusion_threshold): |
| | fused_timestamps_1st[-1] = (fused_timestamps_1st[-1][0], end) |
| | else: |
| | fused_timestamps_1st.append((start, end)) |
| |
|
| | fused_timestamps_2nd = [] |
| | for start, end in fused_timestamps_1st: |
| | if fused_timestamps_2nd and (start - fused_timestamps_2nd[-1][1] <= self.fusion_threshold): |
| | fused_timestamps_2nd[-1] = (fused_timestamps_2nd[-1][0], end) |
| | else: |
| | fused_timestamps_2nd.append((start, end)) |
| |
|
| | return fused_timestamps_2nd |
| |
|
| | def format_time(self, seconds): |
| | """ |
| | Convert seconds to VTT time format 'hh:mm:ss.mmm'. |
| | Args: |
| | seconds (float): Time in seconds |
| | Returns: |
| | str: Formatted time string |
| | """ |
| | td = timedelta(seconds=seconds) |
| | td_sec = td.total_seconds() |
| | total_seconds = int(td_sec) |
| | milliseconds = int((td_sec - total_seconds) * 1000) |
| | hours = total_seconds // 3600 |
| | minutes = (total_seconds % 3600) // 60 |
| | seconds = total_seconds % 60 |
| | return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}" |
| |
|
| | def detect_speech(self, audio:np.array): |
| | """ |
| | Run VAD on the audio file to detect speech segments. |
| | Args: |
| | audio_path (str): Path to the audio file |
| | Returns: |
| | list: List of processed timestamps as (start, end) tuples |
| | """ |
| | self.logger.info("Starting VAD process") |
| | start_time = time.time() |
| | |
| | raw_timestamps = get_speech_timestamps( |
| | audio, |
| | model=self.silero_vad, |
| | threshold=self.activate_threshold, |
| | max_speech_duration_s=self.max_speech_duration, |
| | min_speech_duration_ms=int(self.min_speech_duration * 1000), |
| | min_silence_duration_ms=self.min_silence_duration, |
| | return_seconds=True |
| | ) |
| |
|
| | |
| | timestamps = [(item['start'], item['end']) for item in raw_timestamps] |
| | processed_timestamps = self.process_timestamps(timestamps) |
| |
|
| | |
| | del audio |
| | gc.collect() |
| |
|
| | self.logger.info(f"VAD completed in {time.time() - start_time:.3f} seconds") |
| | return processed_timestamps |
| |
|
| | """ |
| | Save timestamps in both second and sample indices formats. |
| | Args: |
| | timestamps (list): List of (start, end) tuples |
| | output_prefix (str): Prefix for output files |
| | """ |
| | |
| | seconds_path = f"{output_prefix}_timestamps_second.txt" |
| | with open(seconds_path, "w", encoding='UTF-8') as file: |
| | self.logger.info("Saving timestamps in seconds format") |
| | for start, end in timestamps: |
| | s_time = self.format_time(start) |
| | e_time = self.format_time(end) |
| | line = f"{s_time} --> {e_time}\n" |
| | file.write(line) |
| |
|
| | |
| | indices_path = f"{output_prefix}_timestamps_indices.txt" |
| | with open(indices_path, "w", encoding='UTF-8') as file: |
| | self.logger.info("Saving timestamps in indices format") |
| | for start, end in timestamps: |
| | line = f"{int(start * self.sample_rate)} --> {int(end * self.sample_rate)}\n" |
| | file.write(line) |
| |
|
| | self.logger.info(f"Timestamps saved to {seconds_path} and {indices_path}") |
| |
|
| | def extract_speech_segments(self, audio_segment, timestamps): |
| | """ |
| | Extract speech segments from the audio and combine them into a single audio file. |
| | Args: |
| | timestamps (list): List of (start, end) tuples indicating speech segments |
| | Returns: |
| | AudioSegment: The combined speech segments |
| | """ |
| | audio_segment = audio_segment.numpy() |
| | combined_speech = np.array([], dtype=np.float32) |
| |
|
| | |
| | for i, (start, end) in enumerate(timestamps): |
| | |
| | start_ms = int(start * 1000) |
| | end_ms = int(end * 1000) |
| |
|
| | |
| | if end_ms > len(audio_segment): |
| | end_ms = len(audio_segment) |
| |
|
| | |
| | segment = audio_segment[start_ms:end_ms] |
| |
|
| | |
| | combined_speech = np.append(combined_speech, segment) |
| |
|
| | return combined_speech |
| |
|
| | def process_audio(self, audio_array:np.array): |
| | """ |
| | Complete processing pipeline: detect speech, save timestamps, and optionally extract speech. |
| | Returns: |
| | tuple: (timestamps, output_speech_path if extract_speech else None) |
| | """ |
| |
|
| | |
| | timestamps = self.detect_speech(audio_array) |
| |
|
| | combined_speech = self.extract_speech_segments(audio_array, timestamps) |
| |
|
| | return timestamps, combined_speech |
| |
|
| |
|
| |
|
| | class VadProcessor: |
| | def __init__( |
| | self, |
| | prob_threshold=0.5, |
| | silence_s=0.2, |
| | cache_s=0.15, |
| | sr=16000 |
| | ): |
| | self.prob_threshold = prob_threshold |
| | self.cache_s = cache_s |
| | self.sr = sr |
| | self.silence_s = silence_s |
| |
|
| | self.vad = VadV2(self.prob_threshold, self.sr, self.silence_s * 1000, self.cache_s * 1000, max_speech_duration_s=15) |
| |
|
| |
|
| | def process_audio(self, audio_buffer: np.ndarray): |
| | audio = np.array([], np.float32) |
| | for i in range(0, len(audio_buffer), 512): |
| | chunk = audio_buffer[i:i+512] |
| | ret = self.vad(chunk) |
| | if ret: |
| | audio = np.append(audio, ret['audio']) |
| | return audio |
| |
|