| import tempfile | |
| import unittest | |
| from pathlib import Path | |
| import numpy as np | |
| import soundfile as sf | |
| from infer.cover_pipeline import CoverPipeline | |
| def _rms(audio: np.ndarray) -> float: | |
| return float(np.sqrt(np.mean(np.square(audio)) + 1e-12)) | |
| def _preemphasis_rms(audio: np.ndarray) -> float: | |
| audio = np.asarray(audio, dtype=np.float32).reshape(-1) | |
| if audio.size == 0: | |
| return 0.0 | |
| residual = np.empty_like(audio) | |
| residual[0] = audio[0] | |
| residual[1:] = audio[1:] - 0.97 * audio[:-1] | |
| return _rms(residual) | |
| def _fade(length: int) -> np.ndarray: | |
| env = np.ones(length, dtype=np.float32) | |
| ramp = max(1, min(length // 4, 512)) | |
| env[:ramp] = np.linspace(0.0, 1.0, ramp, dtype=np.float32) | |
| env[-ramp:] = np.linspace(1.0, 0.0, ramp, dtype=np.float32) | |
| return env | |
| class EchoTailGateTests(unittest.TestCase): | |
| def test_loud_echo_tail_is_suppressed_when_dereverb_removes_it(self): | |
| sr = 48000 | |
| t = np.arange(int(2.4 * sr), dtype=np.float32) / sr | |
| original = np.zeros_like(t) | |
| dereverbed = np.zeros_like(t) | |
| direct_mask = (t >= 0.20) & (t < 0.72) | |
| echo_mask = (t >= 1.16) & (t < 1.68) | |
| direct = 0.24 * np.sin(2.0 * np.pi * 330.0 * t[direct_mask]) | |
| echo = 0.12 * np.sin(2.0 * np.pi * 330.0 * (t[echo_mask] - 0.96)) | |
| direct *= _fade(direct.size) | |
| echo *= _fade(echo.size) | |
| original[direct_mask] = direct | |
| original[echo_mask] = echo | |
| dereverbed[direct_mask] = direct | |
| gain, gated_frames, total_frames = CoverPipeline._compute_echo_tail_sample_gain( | |
| original=original, | |
| dereverbed=dereverbed, | |
| sr=sr, | |
| ) | |
| direct_gain = gain[int(0.34 * sr): int(0.58 * sr)] | |
| echo_gain = gain[int(1.28 * sr): int(1.54 * sr)] | |
| self.assertGreater(total_frames, 0) | |
| self.assertGreater(gated_frames, 0) | |
| self.assertGreater(float(np.percentile(direct_gain, 5)), 0.85) | |
| self.assertLess( | |
| float(np.mean(echo_gain)), | |
| 0.55, | |
| "loud echo tails should be reduced even when they are not quiet", | |
| ) | |
| class BreathCleanupTests(unittest.TestCase): | |
| def test_low_energy_recovery_breath_loses_tonal_hf_without_body_loss(self): | |
| sr = 48000 | |
| t = np.arange(int(2.0 * sr), dtype=np.float32) / sr | |
| source = np.zeros_like(t) | |
| converted = np.zeros_like(t) | |
| body = (t >= 0.25) & (t < 0.90) | |
| breath = (t >= 1.15) & (t < 1.45) | |
| source[body] = 0.15 * np.sin(2.0 * np.pi * 260.0 * t[body]) | |
| converted[body] = source[body] | |
| breath_body = 0.012 * np.sin(2.0 * np.pi * 620.0 * t[breath]) | |
| breath_body *= _fade(breath_body.size) | |
| source[breath] = breath_body | |
| converted[breath] = breath_body + 0.009 * np.sin(2.0 * np.pi * 5200.0 * t[breath]) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| tmp = Path(tmp_dir) | |
| source_path = tmp / "source.wav" | |
| converted_path = tmp / "converted.wav" | |
| sf.write(source_path, source, sr) | |
| sf.write(converted_path, converted, sr) | |
| pipeline = CoverPipeline(device="cpu") | |
| before, _ = sf.read(converted_path) | |
| pipeline._apply_source_breath_cleanup(str(source_path), str(converted_path)) | |
| pipeline._apply_source_transition_cleanup(str(source_path), str(converted_path)) | |
| after, out_sr = sf.read(converted_path) | |
| self.assertEqual(out_sr, sr) | |
| before_breath = before[int(1.20 * sr): int(1.40 * sr)] | |
| after_breath = after[int(1.20 * sr): int(1.40 * sr)] | |
| before_body = before[int(0.36 * sr): int(0.76 * sr)] | |
| after_body = after[int(0.36 * sr): int(0.76 * sr)] | |
| self.assertLess(_preemphasis_rms(after_breath), _preemphasis_rms(before_breath) * 0.80) | |
| self.assertGreater(_rms(after_body), _rms(before_body) * 0.96) | |
| if __name__ == "__main__": | |
| unittest.main() | |