| |
| |
| |
|
|
| |
| import io |
| import librosa |
| import numpy as np |
| import scipy.signal |
|
|
|
|
| def extract_audio_features(audio_input, sr=88200, from_bytes=False): |
| try: |
| if from_bytes: |
| y, sr = load_audio_from_bytes(audio_input, sr) |
| else: |
| y, sr = load_and_preprocess_audio(audio_input, sr) |
| except Exception as e: |
| print(f"Loading as WAV failed: {e}\nFalling back to PCM loading.") |
| y = load_pcm_audio_from_bytes(audio_input) |
| |
| frame_length = int(0.01667 * sr) |
| hop_length = frame_length // 2 |
| min_frames = 9 |
|
|
| num_frames = (len(y) - frame_length) // hop_length + 1 |
|
|
| if num_frames < min_frames: |
| print(f"Audio file is too short: {num_frames} frames, required: {min_frames} frames") |
| return None, None |
|
|
| combined_features = extract_and_combine_features(y, sr, frame_length, hop_length) |
| |
| return combined_features, y |
|
|
| def extract_and_combine_features(y, sr, frame_length, hop_length, include_autocorr=True): |
| |
| all_features = [] |
| mfcc_features = extract_mfcc_features(y, sr, frame_length, hop_length) |
| all_features.append(mfcc_features) |
|
|
| if include_autocorr: |
| autocorr_features = extract_autocorrelation_features( |
| y, sr, frame_length, hop_length |
| ) |
| all_features.append(autocorr_features) |
| |
| combined_features = np.hstack(all_features) |
|
|
| return combined_features |
|
|
|
|
| def extract_mfcc_features(y, sr, frame_length, hop_length, num_mfcc=23): |
| mfcc_features = extract_overlapping_mfcc(y, sr, num_mfcc, frame_length, hop_length) |
| reduced_mfcc_features = reduce_features(mfcc_features) |
| return reduced_mfcc_features.T |
|
|
| def cepstral_mean_variance_normalization(mfcc): |
| mean = np.mean(mfcc, axis=1, keepdims=True) |
| std = np.std(mfcc, axis=1, keepdims=True) |
| return (mfcc - mean) / (std + 1e-10) |
|
|
|
|
| def extract_overlapping_mfcc(chunk, sr, num_mfcc, frame_length, hop_length, include_deltas=True, include_cepstral=True, threshold=1e-5): |
| mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=num_mfcc, n_fft=frame_length, hop_length=hop_length) |
| if include_cepstral: |
| mfcc = cepstral_mean_variance_normalization(mfcc) |
|
|
| if include_deltas: |
| delta_mfcc = librosa.feature.delta(mfcc) |
| delta2_mfcc = librosa.feature.delta(mfcc, order=2) |
| combined_mfcc = np.vstack([mfcc, delta_mfcc, delta2_mfcc]) |
| return combined_mfcc |
| else: |
| return mfcc |
|
|
|
|
| def reduce_features(features): |
| num_frames = features.shape[1] |
| paired_frames = features[:, :num_frames // 2 * 2].reshape(features.shape[0], -1, 2) |
| reduced_frames = paired_frames.mean(axis=2) |
| |
| if num_frames % 2 == 1: |
| last_frame = features[:, -1].reshape(-1, 1) |
| reduced_final_features = np.hstack((reduced_frames, last_frame)) |
| else: |
| reduced_final_features = reduced_frames |
| |
| return reduced_final_features |
|
|
|
|
|
|
| def extract_overlapping_autocorr(y, sr, frame_length, hop_length, num_autocorr_coeff=187, pad_signal=True, padding_mode="reflect", trim_padded=False): |
| if pad_signal: |
| pad = frame_length // 2 |
| y_padded = np.pad(y, pad_width=pad, mode=padding_mode) |
| else: |
| y_padded = y |
|
|
| frames = librosa.util.frame(y_padded, frame_length=frame_length, hop_length=hop_length) |
| if pad_signal and trim_padded: |
| num_frames = frames.shape[1] |
| start_indices = np.arange(num_frames) * hop_length |
| valid_idx = np.where((start_indices >= pad) & (start_indices + frame_length <= len(y) + pad))[0] |
| frames = frames[:, valid_idx] |
|
|
| frames = frames - np.mean(frames, axis=0, keepdims=True) |
| hann_window = np.hanning(frame_length) |
| windowed_frames = frames * hann_window[:, np.newaxis] |
|
|
| autocorr_list = [] |
| for frame in windowed_frames.T: |
| full_corr = np.correlate(frame, frame, mode='full') |
| mid = frame_length - 1 |
| |
| wanted = full_corr[mid: mid + num_autocorr_coeff + 1] |
| |
| if wanted[0] != 0: |
| wanted = wanted / wanted[0] |
| autocorr_list.append(wanted) |
|
|
| |
| autocorr_features = np.array(autocorr_list).T |
| |
| autocorr_features = autocorr_features[1:, :] |
|
|
| autocorr_features = fix_edge_frames_autocorr(autocorr_features) |
| |
| return autocorr_features |
|
|
|
|
| def fix_edge_frames_autocorr(autocorr_features, zero_threshold=1e-7): |
| """If the first or last frame is near all-zero, replicate from adjacent frames.""" |
| |
| if np.all(np.abs(autocorr_features[:, 0]) < zero_threshold): |
| autocorr_features[:, 0] = autocorr_features[:, 1] |
| |
| if np.all(np.abs(autocorr_features[:, -1]) < zero_threshold): |
| autocorr_features[:, -1] = autocorr_features[:, -2] |
| return autocorr_features |
|
|
| def extract_autocorrelation_features( |
| y, sr, frame_length, hop_length, include_deltas=False |
| ): |
| """ |
| Extract autocorrelation features, optionally with deltas/delta-deltas, |
| then align with the MFCC frame count, reduce, and handle first/last frames. |
| """ |
| autocorr_features = extract_overlapping_autocorr( |
| y, sr, frame_length, hop_length |
| ) |
| |
| if include_deltas: |
| autocorr_features = compute_autocorr_with_deltas(autocorr_features) |
|
|
| autocorr_features_reduced = reduce_features(autocorr_features) |
|
|
| return autocorr_features_reduced.T |
|
|
|
|
| def compute_autocorr_with_deltas(autocorr_base): |
| delta_ac = librosa.feature.delta(autocorr_base) |
| delta2_ac = librosa.feature.delta(autocorr_base, order=2) |
| combined_autocorr = np.vstack([autocorr_base, delta_ac, delta2_ac]) |
| return combined_autocorr |
|
|
| def load_and_preprocess_audio(audio_path, sr=88200): |
| y, sr = load_audio(audio_path, sr) |
| if sr != 88200: |
| y = librosa.resample(y, orig_sr=sr, target_sr=88200) |
| sr = 88200 |
| |
| max_val = np.max(np.abs(y)) |
| if max_val > 0: |
| y = y / max_val |
|
|
| return y, sr |
|
|
| def load_audio(audio_path, sr=88200): |
| y, sr = librosa.load(audio_path, sr=sr) |
| print(f"Loaded audio file '{audio_path}' with sample rate {sr}") |
| return y, sr |
|
|
| def load_audio_from_bytes(audio_bytes, sr=88200): |
| audio_file = io.BytesIO(audio_bytes) |
| y, sr = librosa.load(audio_file, sr=sr) |
| |
| max_val = np.max(np.abs(y)) |
| if max_val > 0: |
| y = y / max_val |
|
|
| return y, sr |
|
|
| def load_audio_file_from_memory(audio_bytes, sr=88200): |
| """Load audio from memory bytes.""" |
| y, sr = librosa.load(io.BytesIO(audio_bytes), sr=sr) |
| print(f"Loaded audio data with sample rate {sr}") |
| |
| max_val = np.max(np.abs(y)) |
| if max_val > 0: |
| y = y / max_val |
|
|
| return y, sr |
|
|
|
|
|
|
|
|
| def load_pcm_audio_from_bytes(audio_bytes, sr=22050, channels=1, sample_width=2): |
| """ |
| Load raw PCM bytes into a normalized numpy array and upsample to 88200 Hz. |
| Assumes little-endian, 16-bit PCM data. |
| """ |
| |
| if sample_width == 2: |
| dtype = np.int16 |
| max_val = 32768.0 |
| else: |
| raise ValueError("Unsupported sample width") |
| |
| |
| data = np.frombuffer(audio_bytes, dtype=dtype) |
| |
| |
| if channels > 1: |
| data = data.reshape(-1, channels) |
| |
| |
| y = data.astype(np.float32) / max_val |
| |
| |
| target_sr = 88200 |
| if sr != target_sr: |
| |
| num_samples = int(len(y) * target_sr / sr) |
| if channels > 1: |
| |
| y_resampled = np.zeros((num_samples, channels), dtype=np.float32) |
| for ch in range(channels): |
| y_resampled[:, ch] = scipy.signal.resample(y[:, ch], num_samples) |
| else: |
| y_resampled = scipy.signal.resample(y, num_samples) |
| y = y_resampled |
| sr = target_sr |
|
|
| return y |
|
|