Spaces:
Running on Zero
Running on Zero
| import os | |
| import torch | |
| import numpy as np | |
| import kaldiio | |
| import librosa | |
| import torchaudio | |
| import torchaudio.compliance.kaldi as Kaldi | |
| from torch.nn.utils.rnn import pad_sequence | |
| import onnxruntime as ort | |
| try: | |
| from funcineforge.download.file import download_from_url | |
| except: | |
| print("urllib is not installed, if you infer from url, please install it first.") | |
| import subprocess | |
| from subprocess import CalledProcessError, run | |
| def is_ffmpeg_installed(): | |
| try: | |
| output = subprocess.check_output(["ffmpeg", "-version"], stderr=subprocess.STDOUT) | |
| return "ffmpeg version" in output.decode("utf-8") | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return False | |
| use_ffmpeg = False | |
| if is_ffmpeg_installed(): | |
| use_ffmpeg = True | |
| else: | |
| print( | |
| "Notice: ffmpeg is not installed. torchaudio is used to load audio\n" | |
| "If you want to use ffmpeg backend to load audio, please install it by:" | |
| "\n\tsudo apt install ffmpeg # ubuntu" | |
| "\n\t# brew install ffmpeg # mac" | |
| ) | |
| def load_audio_text_image_video( | |
| data_or_path_or_list, | |
| fs: int = 16000, | |
| audio_fs: int = 16000, | |
| data_type="sound", | |
| tokenizer=None, | |
| **kwargs, | |
| ): | |
| if isinstance(data_or_path_or_list, (list, tuple)): | |
| if data_type is not None and isinstance(data_type, (list, tuple)): | |
| data_types = [data_type] * len(data_or_path_or_list) | |
| data_or_path_or_list_ret = [[] for d in data_type] | |
| for i, (data_type_i, data_or_path_or_list_i) in enumerate( | |
| zip(data_types, data_or_path_or_list) | |
| ): | |
| for j, (data_type_j, data_or_path_or_list_j) in enumerate( | |
| zip(data_type_i, data_or_path_or_list_i) | |
| ): | |
| data_or_path_or_list_j = load_audio_text_image_video( | |
| data_or_path_or_list_j, | |
| fs=fs, | |
| audio_fs=audio_fs, | |
| data_type=data_type_j, | |
| tokenizer=tokenizer, | |
| **kwargs, | |
| ) | |
| data_or_path_or_list_ret[j].append(data_or_path_or_list_j) | |
| return data_or_path_or_list_ret | |
| else: | |
| return [ | |
| load_audio_text_image_video( | |
| audio, fs=fs, audio_fs=audio_fs, data_type=data_type, **kwargs | |
| ) | |
| for audio in data_or_path_or_list | |
| ] | |
| if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith( | |
| "http" | |
| ): # download url to local file | |
| data_or_path_or_list = download_from_url(data_or_path_or_list) | |
| if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): # local file | |
| if data_type is None or data_type in ["sound", "kaldi_ark_or_sound"]: | |
| if kwargs.get("use_ffmpeg", False): | |
| data_or_path_or_list = _load_audio_ffmpeg(data_or_path_or_list, sr=fs) | |
| data_or_path_or_list = torch.from_numpy( | |
| data_or_path_or_list | |
| ).squeeze() # [n_samples,] | |
| else: | |
| try: | |
| data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list) | |
| if kwargs.get("reduce_channels", True): | |
| data_or_path_or_list = data_or_path_or_list.mean(0) | |
| except: | |
| data_or_path_or_list = _load_audio_ffmpeg(data_or_path_or_list, sr=fs) | |
| data_or_path_or_list = torch.from_numpy( | |
| data_or_path_or_list | |
| ).squeeze() # [n_samples,] | |
| elif data_type == "text" and tokenizer is not None: | |
| data_or_path_or_list = tokenizer.encode(data_or_path_or_list) | |
| elif data_type == "image": # undo | |
| pass | |
| elif data_type == "video": # undo | |
| pass | |
| # if data_in is a file or url, set is_final=True | |
| if "cache" in kwargs: | |
| kwargs["cache"]["is_final"] = True | |
| kwargs["cache"]["is_streaming_input"] = False | |
| elif isinstance(data_or_path_or_list, str) and data_type == "text" and tokenizer is not None: | |
| data_or_path_or_list = tokenizer.encode(data_or_path_or_list) | |
| elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point | |
| data_or_path_or_list = torch.from_numpy(data_or_path_or_list).squeeze() # [n_samples,] | |
| elif isinstance(data_or_path_or_list, str) and data_type in ["kaldi_ark", "kaldi_ark_or_sound", "sound"]: | |
| if len(data_or_path_or_list.split()) == 2: | |
| data_or_path_or_list, audio_fs = data_or_path_or_list.split() | |
| audio_fs = int(audio_fs) | |
| data_mat = kaldiio.load_mat(data_or_path_or_list) | |
| if isinstance(data_mat, tuple): | |
| audio_fs, mat = data_mat | |
| else: | |
| mat = data_mat | |
| if mat.dtype == "int16": | |
| mat = mat.astype(np.float32) | |
| mat = mat / (2 ** 16) | |
| elif mat.dtype == "int32": | |
| mat = mat.astype(np.float32) | |
| mat = mat / (2 ** 32) | |
| if mat.ndim == 2: | |
| mat = mat[:, 0] | |
| data_or_path_or_list = torch.from_numpy(mat) | |
| elif isinstance(data_or_path_or_list, bytes): # audio bytes | |
| data_or_path_or_list = load_bytes(data_or_path_or_list) | |
| else: | |
| pass | |
| print(f"unsupport data type: {data_or_path_or_list}, return raw data") | |
| if audio_fs != fs and data_type != "text": | |
| resampler = torchaudio.transforms.Resample(audio_fs, fs, dtype=data_or_path_or_list.dtype) | |
| data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :] | |
| return data_or_path_or_list | |
| class FBank(object): | |
| def __init__(self, | |
| n_mels, | |
| sample_rate, | |
| mean_nor: bool = False, | |
| ): | |
| self.n_mels = n_mels | |
| self.sample_rate = sample_rate | |
| self.mean_nor = mean_nor | |
| def __call__(self, wav, dither=0): | |
| sr = 16000 | |
| assert sr == self.sample_rate | |
| if len(wav.shape) == 1: | |
| wav = wav.unsqueeze(0) | |
| if wav.shape[0] > 1: | |
| wav = torch.mean(wav, dim=0, keepdim=True) | |
| assert len(wav.shape) == 2 and wav.shape[0] == 1, wav.shape | |
| feat = Kaldi.fbank(wav, num_mel_bins=self.n_mels, | |
| sample_frequency=sr, dither=dither) | |
| # feat: [T, N] | |
| if self.mean_nor: | |
| feat = feat - feat.mean(0, keepdim=True) | |
| return feat | |
| class OnnxModel(object): | |
| def __init__(self, pretrained_model): | |
| session_options = ort.SessionOptions() | |
| self.model = ort.InferenceSession(pretrained_model, session_options) | |
| self.input_name = self.model.get_inputs()[0].name | |
| self.output_name = self.model.get_outputs()[0].name | |
| self.feature_extractor = FBank(n_mels=80, sample_rate=16000, mean_nor=True) | |
| def __call__(self, wav): | |
| feat = self.feature_extractor(torch.as_tensor(wav)) | |
| feat = feat.float().unsqueeze(0).numpy() | |
| emb = self.model.run([self.output_name], {self.input_name: feat})[0] | |
| return emb | |
| def extract_campp_xvec( | |
| wav_path: str = "", | |
| target_sr: int = 16000, | |
| **kwargs, | |
| ): | |
| wav, sr = librosa.load(wav_path, dtype=np.float32, sr=target_sr, mono=True) | |
| if sr != target_sr: | |
| wav = librosa.resample(wav, orig_sr=sr, target_sr=target_sr) | |
| onnx_path = kwargs.get("xvec_model", None) | |
| model = OnnxModel(onnx_path) | |
| xvec = model(wav) | |
| return xvec | |
| def load_bytes(input): | |
| middle_data = np.frombuffer(input, dtype=np.int16) | |
| middle_data = np.asarray(middle_data) | |
| if middle_data.dtype.kind not in "iu": | |
| raise TypeError("'middle_data' must be an array of integers") | |
| dtype = np.dtype("float32") | |
| if dtype.kind != "f": | |
| raise TypeError("'dtype' must be a floating point type") | |
| i = np.iinfo(middle_data.dtype) | |
| abs_max = 2 ** (i.bits - 1) | |
| offset = i.min + abs_max | |
| array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32) | |
| return array | |
| def extract_fbank(data, data_len=None, data_type: str = "sound", frontend=None, **kwargs): | |
| if isinstance(data, np.ndarray): | |
| data = torch.from_numpy(data) | |
| if len(data.shape) < 2: | |
| data = data[None, :] # data: [batch, N] | |
| data_len = [data.shape[1]] if data_len is None else data_len | |
| elif isinstance(data, torch.Tensor): | |
| if len(data.shape) < 2: | |
| data = data[None, :] # data: [batch, N] | |
| data_len = [data.shape[1]] if data_len is None else data_len | |
| elif isinstance(data, (list, tuple)): | |
| data_list, data_len = [], [] | |
| for data_i in data: | |
| if isinstance(data_i, np.ndarray): | |
| data_i = torch.from_numpy(data_i) | |
| data_list.append(data_i) | |
| data_len.append(data_i.shape[0]) | |
| data = pad_sequence(data_list, batch_first=True) # data: [batch, N] | |
| data, data_len = frontend(data, data_len, **kwargs) | |
| if isinstance(data_len, (list, tuple)): | |
| data_len = torch.tensor([data_len]) | |
| return data.to(torch.float32), data_len.to(torch.int32) | |
| def _load_audio_ffmpeg(file: str, sr: int = 16000): | |
| """ | |
| Open an audio file and read as mono waveform, resampling as necessary | |
| Parameters | |
| ---------- | |
| file: str | |
| The audio file to open | |
| sr: int | |
| The sample rate to resample the audio if necessary | |
| Returns | |
| ------- | |
| A NumPy array containing the audio waveform, in float32 dtype. | |
| """ | |
| # This launches a subprocess to decode audio while down-mixing | |
| # and resampling as necessary. Requires the ffmpeg CLI in PATH. | |
| # fmt: off | |
| cmd = [ | |
| "ffmpeg", | |
| "-nostdin", | |
| "-threads", "0", | |
| "-i", file, | |
| "-f", "s16le", | |
| "-ac", "1", | |
| "-acodec", "pcm_s16le", | |
| "-ar", str(sr), | |
| "-" | |
| ] | |
| # fmt: on | |
| try: | |
| out = run(cmd, capture_output=True, check=True).stdout | |
| except CalledProcessError as e: | |
| raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e | |
| return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0 | |