| | """ |
| | Audio processing utilities for temporal reasoning dataset generation. |
| | """ |
| |
|
| | import os |
| | import random |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| | from pydub import AudioSegment |
| |
|
| | try: |
| | import pyloudnorm as pyln |
| | PYLOUDNORM_AVAILABLE = True |
| | except ImportError: |
| | PYLOUDNORM_AVAILABLE = False |
| |
|
| | from .logger import setup_logger |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| |
|
| | def get_lufs_loudness(audio: AudioSegment) -> float: |
| | """ |
| | Calculate integrated LUFS loudness (perceived loudness) of an audio segment. |
| | |
| | LUFS (Loudness Units Full Scale) is the broadcast standard for measuring |
| | perceived loudness. It accounts for human hearing sensitivity to different |
| | frequencies using K-weighting. |
| | |
| | Args: |
| | audio: Input audio segment (pydub AudioSegment) |
| | |
| | Returns: |
| | Loudness in LUFS (negative values, typically -70 to 0) |
| | Returns dBFS if pyloudnorm is not available (fallback) |
| | """ |
| | if not PYLOUDNORM_AVAILABLE: |
| | logger.warning("pyloudnorm not available, falling back to dBFS") |
| | return audio.dBFS |
| | |
| | |
| | samples = np.array(audio.get_array_of_samples()) |
| | |
| | |
| | if audio.channels == 2: |
| | samples = samples.reshape((-1, 2)) |
| | |
| | |
| | if audio.sample_width == 1: |
| | samples = samples.astype(np.float64) / 128.0 - 1.0 |
| | elif audio.sample_width == 2: |
| | samples = samples.astype(np.float64) / 32768.0 |
| | elif audio.sample_width == 4: |
| | samples = samples.astype(np.float64) / 2147483648.0 |
| | else: |
| | samples = samples.astype(np.float64) / 32768.0 |
| | |
| | |
| | meter = pyln.Meter(audio.frame_rate) |
| | |
| | |
| | try: |
| | loudness = meter.integrated_loudness(samples) |
| | |
| | if np.isinf(loudness): |
| | loudness = -70.0 |
| | return loudness |
| | except Exception as e: |
| | logger.warning(f"LUFS measurement failed: {e}, falling back to dBFS") |
| | return audio.dBFS |
| |
|
| |
|
| | def normalize_to_lufs(audio: AudioSegment, target_lufs: float = -23.0) -> AudioSegment: |
| | """ |
| | Normalize audio to a target LUFS level (perceived loudness normalization). |
| | |
| | This is superior to dBFS normalization for comparing different sound types |
| | because it accounts for human hearing sensitivity. |
| | |
| | Args: |
| | audio: Input audio segment |
| | target_lufs: Target loudness level in LUFS (default: -23 LUFS, EBU R128 standard) |
| | |
| | Returns: |
| | Loudness-normalized audio segment |
| | """ |
| | if not PYLOUDNORM_AVAILABLE: |
| | logger.warning("pyloudnorm not available, falling back to dBFS normalization") |
| | change_db = target_lufs - audio.dBFS |
| | return audio.apply_gain(change_db) |
| | |
| | current_lufs = get_lufs_loudness(audio) |
| | |
| | |
| | gain_db = target_lufs - current_lufs |
| | |
| | |
| | normalized = audio.apply_gain(gain_db) |
| | |
| | logger.debug(f"Normalized LUFS: {current_lufs:.2f} -> {get_lufs_loudness(normalized):.2f} LUFS") |
| | |
| | return normalized |
| |
|
| |
|
| | class AudioProcessor: |
| | """Handles audio loading, processing, and concatenation.""" |
| | |
| | def __init__( |
| | self, |
| | crossfade_duration: int = 500, |
| | silence_duration: int = 1000, |
| | with_silence: bool = True, |
| | normalize: bool = False, |
| | normalize_target_dBFS: float = -20.0, |
| | synthetic_silence_path: Optional[str] = None |
| | ): |
| | """ |
| | Initialize the audio processor. |
| | |
| | Args: |
| | crossfade_duration: Duration of crossfade in milliseconds |
| | silence_duration: Duration of silence between clips in milliseconds |
| | with_silence: Whether to add silence between clips |
| | normalize: Whether to normalize audio levels |
| | normalize_target_dBFS: Target dBFS level for normalization |
| | synthetic_silence_path: Path to synthetic silence audio files |
| | """ |
| | self.crossfade_duration = crossfade_duration |
| | self.silence_duration = silence_duration |
| | self.with_silence = with_silence |
| | self.normalize = normalize |
| | self.normalize_target_dBFS = normalize_target_dBFS |
| | self.synthetic_silence_path = synthetic_silence_path |
| | self._silence_cache = {} |
| | |
| | def load_audio(self, audio_path: str) -> AudioSegment: |
| | """ |
| | Load an audio file. |
| | |
| | Args: |
| | audio_path: Path to the audio file |
| | |
| | Returns: |
| | Loaded audio segment |
| | """ |
| | try: |
| | audio = AudioSegment.from_file(audio_path, format="wav") |
| | logger.debug(f"Loaded audio: {audio_path}, duration: {len(audio)}ms") |
| | return audio |
| | except Exception as e: |
| | logger.error(f"Error loading audio {audio_path}: {e}") |
| | raise |
| | |
| | def normalize_audio(self, audio: AudioSegment, target_dBFS: Optional[float] = None) -> AudioSegment: |
| | """ |
| | Normalize audio to a target dBFS level. |
| | |
| | Args: |
| | audio: Input audio segment |
| | target_dBFS: Target dBFS level (uses default if None) |
| | |
| | Returns: |
| | Normalized audio segment |
| | """ |
| | if target_dBFS is None: |
| | target_dBFS = self.normalize_target_dBFS |
| | |
| | change_in_dBFS = target_dBFS - audio.dBFS |
| | normalized = audio.apply_gain(change_in_dBFS) |
| | logger.debug(f"Normalized audio: {audio.dBFS:.2f} dBFS -> {normalized.dBFS:.2f} dBFS") |
| | return normalized |
| | |
| | def adjust_volume(self, audio: AudioSegment, volume_db: float) -> AudioSegment: |
| | """ |
| | Adjust audio volume by a specific dB amount. |
| | |
| | Args: |
| | audio: Input audio segment |
| | volume_db: Volume adjustment in dB (positive = louder, negative = quieter) |
| | |
| | Returns: |
| | Volume-adjusted audio segment |
| | """ |
| | adjusted = audio.apply_gain(volume_db) |
| | logger.debug(f"Adjusted volume by {volume_db} dB: {audio.dBFS:.2f} -> {adjusted.dBFS:.2f} dBFS") |
| | return adjusted |
| | |
| | def get_silence(self, duration: Optional[int] = None) -> AudioSegment: |
| | """ |
| | Get a silence audio segment, using synthetic silence if available. |
| | |
| | Args: |
| | duration: Duration in milliseconds (uses default if None) |
| | |
| | Returns: |
| | Silence audio segment |
| | """ |
| | if duration is None: |
| | duration = self.silence_duration |
| | |
| | |
| | if duration in self._silence_cache: |
| | return self._silence_cache[duration] |
| | |
| | |
| | if self.synthetic_silence_path and os.path.exists(self.synthetic_silence_path): |
| | silence_files = list(Path(self.synthetic_silence_path).glob("*.wav")) |
| | if silence_files: |
| | silence = self.load_audio(str(random.choice(silence_files))) |
| | |
| | if len(silence) < duration: |
| | |
| | repetitions = (duration // len(silence)) + 1 |
| | silence = silence * repetitions |
| | silence = silence[:duration] |
| | self._silence_cache[duration] = silence |
| | logger.debug(f"Using synthetic silence: {duration}ms") |
| | return silence |
| | |
| | |
| | silence = AudioSegment.silent(duration=duration) |
| | self._silence_cache[duration] = silence |
| | logger.debug(f"Using pure silence: {duration}ms") |
| | return silence |
| | |
| | def concatenate_audios( |
| | self, |
| | audio_list: List[AudioSegment], |
| | normalize_each: bool = False, |
| | volume_adjustments: Optional[List[float]] = None |
| | ) -> AudioSegment: |
| | """ |
| | Concatenate multiple audio segments with crossfade and optional silence. |
| | |
| | Args: |
| | audio_list: List of audio segments to concatenate |
| | normalize_each: Whether to normalize each audio before concatenation |
| | volume_adjustments: Optional list of volume adjustments (in dB) for each audio |
| | |
| | Returns: |
| | Concatenated audio segment |
| | """ |
| | if not audio_list: |
| | raise ValueError("audio_list cannot be empty") |
| | |
| | if len(audio_list) == 1: |
| | audio = audio_list[0] |
| | if normalize_each and self.normalize: |
| | audio = self.normalize_audio(audio) |
| | if volume_adjustments and len(volume_adjustments) > 0: |
| | audio = self.adjust_volume(audio, volume_adjustments[0]) |
| | return audio |
| | |
| | |
| | merged = audio_list[0] |
| | if normalize_each and self.normalize: |
| | merged = self.normalize_audio(merged) |
| | if volume_adjustments and len(volume_adjustments) > 0: |
| | merged = self.adjust_volume(merged, volume_adjustments[0]) |
| | |
| | |
| | for i, audio in enumerate(audio_list[1:], start=1): |
| | |
| | current = audio |
| | if normalize_each and self.normalize: |
| | current = self.normalize_audio(current) |
| | if volume_adjustments and len(volume_adjustments) > i: |
| | current = self.adjust_volume(current, volume_adjustments[i]) |
| | |
| | |
| | if self.with_silence: |
| | silence = self.get_silence() |
| | |
| | merged = merged.append(silence, crossfade=self.crossfade_duration) |
| | |
| | |
| | |
| | merged = merged.append(current, crossfade=0) |
| | |
| | logger.debug(f"Concatenated {len(audio_list)} audio segments, total duration: {len(merged)}ms") |
| | return merged |
| | |
| | def concatenate_audio_files( |
| | self, |
| | audio_paths: List[str], |
| | output_path: str, |
| | normalize_each: bool = False, |
| | volume_adjustments: Optional[List[float]] = None, |
| | target_durations: Optional[List[float]] = None |
| | ) -> Tuple[AudioSegment, dict]: |
| | """ |
| | Load, concatenate, and save multiple audio files. |
| | |
| | Args: |
| | audio_paths: List of paths to audio files |
| | output_path: Path to save the concatenated audio |
| | normalize_each: Whether to normalize each audio before concatenation |
| | volume_adjustments: Optional list of volume adjustments (in dB) for each audio |
| | target_durations: Optional list of target durations (in seconds) for each clip |
| | |
| | Returns: |
| | Tuple of (concatenated audio segment, metadata dict) |
| | """ |
| | |
| | audio_segments = [] |
| | for i, path in enumerate(audio_paths): |
| | audio = self.load_audio(path) |
| | |
| | |
| | if target_durations and i < len(target_durations): |
| | target_ms = int(target_durations[i] * 1000) |
| | audio = trim_or_repeat_audio(audio, target_ms) |
| | logger.debug(f"Adjusted clip {i} to {len(audio)}ms (target: {target_ms}ms)") |
| | |
| | audio_segments.append(audio) |
| | |
| | |
| | merged = self.concatenate_audios(audio_segments, normalize_each, volume_adjustments) |
| | |
| | |
| | output_path = Path(output_path) |
| | output_path.parent.mkdir(parents=True, exist_ok=True) |
| | merged.export(str(output_path), format="wav") |
| | logger.info(f"Saved concatenated audio: {output_path}") |
| | |
| | |
| | metadata = { |
| | "output_path": str(output_path), |
| | "source_files": audio_paths, |
| | "num_sources": len(audio_paths), |
| | "total_duration_ms": len(merged), |
| | "total_duration_s": len(merged) / 1000.0, |
| | "individual_durations_ms": [len(a) for a in audio_segments], |
| | "individual_durations_s": [len(a) / 1000.0 for a in audio_segments], |
| | "target_durations_s": target_durations if target_durations else [], |
| | "volume_adjustments_db": volume_adjustments if volume_adjustments else [] |
| | } |
| | |
| | return merged, metadata |
| |
|
| |
|
| | def generate_sample_durations_for_task( |
| | task_duration_hours: float, |
| | min_clip_duration: float, |
| | max_clip_duration: float |
| | ) -> list: |
| | """ |
| | Generate sample durations that exactly fill the target task duration. |
| | |
| | Algorithm: |
| | 1. Start with remaining = total_seconds |
| | 2. While remaining >= min_clip_duration: |
| | - Sample d ~ Uniform(min, min(max, remaining)) |
| | - Append d to durations list |
| | - Subtract d from remaining |
| | 3. Return shuffled list of durations |
| | |
| | This ensures: |
| | - Total of all durations ≈ task_duration (within min_clip_duration tolerance) |
| | - Each duration is uniformly sampled within valid range |
| | - No overshoot of target duration |
| | |
| | Args: |
| | task_duration_hours: Total duration for the task in hours |
| | min_clip_duration: Minimum duration per clip in seconds |
| | max_clip_duration: Maximum duration per clip in seconds |
| | |
| | Returns: |
| | List of sample durations in seconds (shuffled) |
| | """ |
| | task_duration_seconds = task_duration_hours * 3600 |
| | remaining = task_duration_seconds |
| | durations = [] |
| | |
| | while remaining >= min_clip_duration: |
| | |
| | effective_max = min(max_clip_duration, remaining) |
| | |
| | |
| | if effective_max < min_clip_duration: |
| | break |
| | |
| | |
| | d = random.uniform(min_clip_duration, effective_max) |
| | durations.append(d) |
| | remaining -= d |
| | |
| | |
| | random.shuffle(durations) |
| | |
| | total_duration = sum(durations) |
| | logger.info(f"Task duration target: {task_duration_hours}h ({task_duration_seconds:.1f}s)") |
| | logger.info(f"Generated {len(durations)} sample durations, total: {total_duration:.1f}s") |
| | logger.info(f"Duration range: [{min(durations):.1f}s, {max(durations):.1f}s], " |
| | f"mean: {total_duration/len(durations):.1f}s") |
| | logger.info(f"Unused remainder: {remaining:.1f}s ({remaining/task_duration_seconds*100:.2f}%)") |
| | |
| | return durations |
| |
|
| |
|
| | def calculate_num_samples_for_task( |
| | task_duration_hours: float, |
| | min_clip_duration: float, |
| | max_clip_duration: float |
| | ) -> int: |
| | """ |
| | Calculate number of samples needed to fill the task duration. |
| | |
| | DEPRECATED: Use generate_sample_durations_for_task() instead for exact duration filling. |
| | This function is kept for backward compatibility but uses average-based estimation. |
| | |
| | Args: |
| | task_duration_hours: Total duration for the task in hours |
| | min_clip_duration: Minimum duration per clip in seconds |
| | max_clip_duration: Maximum duration per clip in seconds |
| | |
| | Returns: |
| | Number of samples to generate (estimate) |
| | """ |
| | task_duration_seconds = task_duration_hours * 3600 |
| | avg_clip_duration = (min_clip_duration + max_clip_duration) / 2 |
| | num_samples = int(task_duration_seconds / avg_clip_duration) |
| | |
| | logger.info(f"Task duration: {task_duration_hours}h ({task_duration_seconds}s)") |
| | logger.info(f"Avg clip duration: {avg_clip_duration}s (min: {min_clip_duration}s, max: {max_clip_duration}s)") |
| | logger.info(f"Calculated number of samples: {num_samples}") |
| | |
| | return max(1, num_samples) |
| |
|
| |
|
| | def generate_single_clip_duration( |
| | min_duration: float, |
| | max_duration: float |
| | ) -> float: |
| | """ |
| | Generate a random clip duration between min and max. |
| | |
| | Args: |
| | min_duration: Minimum duration in seconds |
| | max_duration: Maximum duration in seconds |
| | |
| | Returns: |
| | Random duration in seconds |
| | """ |
| | return random.uniform(min_duration, max_duration) |
| |
|
| |
|
| | def concatenate_to_target_duration( |
| | base_audio: AudioSegment, |
| | target_duration_seconds: float, |
| | crossfade_ms: int = 0 |
| | ) -> AudioSegment: |
| | """ |
| | Concatenate a base audio clip to reach target duration. |
| | |
| | This takes a 5-second ESC-50 clip and repeats it to create a longer clip. |
| | |
| | Args: |
| | base_audio: Original 5s audio segment |
| | target_duration_seconds: Target duration in seconds |
| | crossfade_ms: Crossfade between repetitions in milliseconds |
| | |
| | Returns: |
| | Audio segment of target duration |
| | """ |
| | target_duration_ms = int(target_duration_seconds * 1000) |
| | base_duration_ms = len(base_audio) |
| | |
| | if target_duration_ms <= base_duration_ms: |
| | |
| | return base_audio[:target_duration_ms] |
| | |
| | |
| | num_repetitions = (target_duration_ms // base_duration_ms) + 1 |
| | |
| | |
| | result = base_audio |
| | for i in range(1, num_repetitions): |
| | if crossfade_ms > 0: |
| | result = result.append(base_audio, crossfade=crossfade_ms) |
| | else: |
| | result = result + base_audio |
| | |
| | |
| | if len(result) >= target_duration_ms: |
| | break |
| | |
| | |
| | return result[:target_duration_ms] |
| |
|
| |
|
| | def set_random_seed(seed: int): |
| | """Set random seed for reproducibility.""" |
| | random.seed(seed) |
| | np.random.seed(seed) |
| | logger.info(f"Random seed set to: {seed}") |
| |
|
| |
|
| | def get_max_clip_num_to_be_joined( |
| | target_duration_seconds: float, |
| | source_clip_duration_seconds: float, |
| | min_silence_ms: int = 100 |
| | ) -> Tuple[int, float]: |
| | """ |
| | Calculate the maximum number of source clips needed to reach target duration. |
| | |
| | Pipeline: pick dataset -> pick class -> pick audio clip -> get duration -> |
| | concatenate clips to reach target duration -> modulo to get num clips -> |
| | inserting silences randomly based on remainder. |
| | |
| | Args: |
| | target_duration_seconds: Target total duration in seconds |
| | source_clip_duration_seconds: Duration of each source clip (e.g., 5s for ESC-50) |
| | min_silence_ms: Minimum silence between clips in milliseconds |
| | |
| | Returns: |
| | Tuple of (num_clips_needed, remainder_seconds_for_silences) |
| | - num_clips_needed: How many source clips to concatenate |
| | - remainder_seconds_for_silences: Extra time to distribute as random silences |
| | |
| | Example: |
| | target=30s, source=5s -> (6, 0.0) - exactly 6 clips, no extra silence |
| | target=32s, source=5s -> (6, 2.0) - 6 clips + 2s distributed as silences |
| | """ |
| | target_ms = target_duration_seconds * 1000 |
| | source_ms = source_clip_duration_seconds * 1000 |
| | |
| | |
| | |
| | |
| | |
| | |
| | num_clips = int(target_ms // source_ms) |
| | num_clips = max(1, num_clips) |
| | |
| | |
| | clips_duration_ms = num_clips * source_ms |
| | |
| | |
| | num_gaps = max(0, num_clips - 1) |
| | min_total_silence_ms = num_gaps * min_silence_ms |
| | |
| | |
| | while num_clips > 1 and (clips_duration_ms + min_total_silence_ms) > target_ms: |
| | num_clips -= 1 |
| | clips_duration_ms = num_clips * source_ms |
| | num_gaps = num_clips - 1 |
| | min_total_silence_ms = num_gaps * min_silence_ms |
| | |
| | |
| | remainder_ms = target_ms - clips_duration_ms - min_total_silence_ms |
| | remainder_seconds = max(0, remainder_ms / 1000.0) |
| | |
| | logger.debug( |
| | f"get_max_clip_num: target={target_duration_seconds}s, source={source_clip_duration_seconds}s " |
| | f"-> {num_clips} clips, {remainder_seconds:.3f}s remainder for extra silences" |
| | ) |
| | |
| | return num_clips, remainder_seconds |
| |
|
| |
|
| | def build_clip_sequence_with_silences( |
| | audio_segments: List[AudioSegment], |
| | target_duration_seconds: float, |
| | min_silence_ms: int = 100, |
| | max_extra_silence_per_gap_ms: int = 500, |
| | crossfade_ms: int = 0 |
| | ) -> AudioSegment: |
| | """ |
| | Build a final audio clip by concatenating segments with guaranteed silences. |
| | |
| | Ensures: |
| | 1. All clips are joined with at least min_silence_ms between them |
| | 2. Any remainder duration is distributed as random extra silences in gaps |
| | 3. Final duration matches target_duration_seconds exactly |
| | |
| | Args: |
| | audio_segments: List of audio segments to concatenate |
| | target_duration_seconds: Target total duration in seconds |
| | min_silence_ms: Minimum silence between each pair of clips (always inserted) |
| | max_extra_silence_per_gap_ms: Maximum extra silence to add per gap |
| | crossfade_ms: Crossfade duration in ms (applied when joining) |
| | |
| | Returns: |
| | Concatenated audio segment of exact target duration |
| | """ |
| | if not audio_segments: |
| | raise ValueError("audio_segments cannot be empty") |
| | |
| | target_ms = int(target_duration_seconds * 1000) |
| | |
| | if len(audio_segments) == 1: |
| | |
| | audio = audio_segments[0] |
| | if len(audio) >= target_ms: |
| | return audio[:target_ms] |
| | else: |
| | |
| | return concatenate_to_target_duration(audio, target_duration_seconds, crossfade_ms) |
| | |
| | |
| | total_audio_ms = sum(len(seg) for seg in audio_segments) |
| | num_gaps = len(audio_segments) - 1 |
| | |
| | |
| | min_total_silence_ms = num_gaps * min_silence_ms |
| | |
| | |
| | available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms |
| | |
| | if available_extra_ms < 0: |
| | |
| | logger.warning( |
| | f"Clips too long for target duration. Total audio: {total_audio_ms}ms, " |
| | f"target: {target_ms}ms. Will trim final result." |
| | ) |
| | available_extra_ms = 0 |
| | |
| | |
| | extra_silences_ms = distribute_remainder_as_silences( |
| | available_extra_ms, |
| | num_gaps, |
| | max_extra_silence_per_gap_ms |
| | ) |
| | |
| | |
| | result = audio_segments[0] |
| | |
| | for i, audio in enumerate(audio_segments[1:]): |
| | |
| | gap_silence_ms = min_silence_ms + extra_silences_ms[i] |
| | |
| | |
| | silence = AudioSegment.silent(duration=gap_silence_ms) |
| | |
| | if crossfade_ms > 0 and crossfade_ms < gap_silence_ms: |
| | |
| | result = result.append(silence, crossfade=crossfade_ms) |
| | result = result.append(audio, crossfade=0) |
| | else: |
| | result = result + silence + audio |
| | |
| | |
| | if len(result) > target_ms: |
| | result = result[:target_ms] |
| | elif len(result) < target_ms: |
| | |
| | padding = AudioSegment.silent(duration=target_ms - len(result)) |
| | result = result + padding |
| | |
| | logger.debug( |
| | f"Built clip sequence: {len(audio_segments)} segments, " |
| | f"final duration: {len(result)}ms (target: {target_ms}ms)" |
| | ) |
| | |
| | return result |
| |
|
| |
|
| | def distribute_remainder_as_silences( |
| | remainder_ms: float, |
| | num_gaps: int, |
| | max_per_gap_ms: int = 500 |
| | ) -> List[int]: |
| | """ |
| | Distribute remainder time as random silences across gaps. |
| | |
| | Args: |
| | remainder_ms: Total extra time to distribute (in ms) |
| | num_gaps: Number of gaps between clips |
| | max_per_gap_ms: Maximum extra silence per gap |
| | |
| | Returns: |
| | List of extra silence durations (in ms) for each gap |
| | """ |
| | if num_gaps <= 0: |
| | return [] |
| | |
| | remainder_ms = int(max(0, remainder_ms)) |
| | |
| | if remainder_ms == 0: |
| | return [0] * num_gaps |
| | |
| | |
| | weights = [random.random() for _ in range(num_gaps)] |
| | total_weight = sum(weights) |
| | |
| | if total_weight == 0: |
| | |
| | weights = [1.0] * num_gaps |
| | total_weight = num_gaps |
| | |
| | |
| | extra_silences = [] |
| | remaining = remainder_ms |
| | |
| | for i, w in enumerate(weights): |
| | if i == num_gaps - 1: |
| | |
| | extra = min(remaining, max_per_gap_ms) |
| | else: |
| | proportion = w / total_weight |
| | extra = int(remainder_ms * proportion) |
| | extra = min(extra, max_per_gap_ms, remaining) |
| | |
| | extra_silences.append(extra) |
| | remaining -= extra |
| | total_weight -= w |
| | |
| | |
| | while remaining > 0: |
| | for i in range(num_gaps): |
| | if extra_silences[i] < max_per_gap_ms and remaining > 0: |
| | add = min(remaining, max_per_gap_ms - extra_silences[i]) |
| | extra_silences[i] += add |
| | remaining -= add |
| | if remaining > 0: |
| | |
| | break |
| | |
| | logger.debug(f"Distributed {remainder_ms}ms across {num_gaps} gaps: {extra_silences}") |
| | |
| | return extra_silences |
| |
|
| |
|
| | def repeat_clips_to_fill_duration( |
| | source_audios: List[AudioSegment], |
| | source_categories: List[str], |
| | target_duration_seconds: float, |
| | source_clip_duration_seconds: float = 5.0, |
| | min_silence_ms: int = 100 |
| | ) -> Tuple[List[AudioSegment], List[str], int]: |
| | """ |
| | Repeat source clips to fill target duration, cycling through all sources. |
| | |
| | This ensures all unique sources appear and are repeated proportionally. |
| | |
| | Args: |
| | source_audios: List of unique source audio segments |
| | source_categories: List of category names corresponding to source_audios |
| | target_duration_seconds: Target total duration |
| | source_clip_duration_seconds: Duration of each source clip |
| | min_silence_ms: Minimum silence between clips |
| | |
| | Returns: |
| | Tuple of (expanded_audio_list, expanded_categories, num_clips) |
| | """ |
| | num_clips, remainder = get_max_clip_num_to_be_joined( |
| | target_duration_seconds, |
| | source_clip_duration_seconds, |
| | min_silence_ms |
| | ) |
| | |
| | num_sources = len(source_audios) |
| | |
| | if num_sources == 0: |
| | raise ValueError("source_audios cannot be empty") |
| | |
| | |
| | expanded_audios = [] |
| | expanded_categories = [] |
| | |
| | for i in range(num_clips): |
| | idx = i % num_sources |
| | expanded_audios.append(source_audios[idx]) |
| | expanded_categories.append(source_categories[idx]) |
| | |
| | logger.debug( |
| | f"Repeated {num_sources} sources to {num_clips} clips for " |
| | f"{target_duration_seconds}s target duration" |
| | ) |
| | |
| | return expanded_audios, expanded_categories, num_clips |
| |
|
| |
|
| | def build_consecutive_sources_for_count_task( |
| | source_audios: List[AudioSegment], |
| | source_categories: List[str], |
| | target_duration_seconds: float, |
| | source_clip_duration_seconds: float = 5.0, |
| | min_silence_between_sources_ms: int = 100, |
| | max_extra_silence_per_gap_ms: int = 500, |
| | crossfade_within_source_ms: int = 50 |
| | ) -> Tuple[AudioSegment, List[str], dict]: |
| | """ |
| | Build audio for COUNT task with consecutive same-class clips. |
| | |
| | For count task, same-class clips must be consecutive (AAA BBB CCC) so they |
| | are perceived as ONE sound source. Silences are only inserted BETWEEN |
| | different classes, not within same-class repetitions. |
| | |
| | Pipeline: pick classes -> for each class concatenate clips consecutively -> |
| | insert silences only between different classes -> distribute remainder |
| | |
| | Args: |
| | source_audios: List of unique source audio segments (one per class) |
| | source_categories: List of category names |
| | target_duration_seconds: Target total duration |
| | source_clip_duration_seconds: Duration of each source clip |
| | min_silence_between_sources_ms: Minimum silence between different sources |
| | max_extra_silence_per_gap_ms: Max extra silence per gap for remainder distribution |
| | crossfade_within_source_ms: Small crossfade within same-source repetitions |
| | |
| | Returns: |
| | Tuple of (final_audio, category_sequence, metadata_dict) |
| | """ |
| | target_ms = int(target_duration_seconds * 1000) |
| | source_ms = int(source_clip_duration_seconds * 1000) |
| | num_sources = len(source_audios) |
| | |
| | if num_sources == 0: |
| | raise ValueError("source_audios cannot be empty") |
| | |
| | |
| | num_clips, remainder_seconds = get_max_clip_num_to_be_joined( |
| | target_duration_seconds, |
| | source_clip_duration_seconds, |
| | min_silence_between_sources_ms |
| | ) |
| | |
| | |
| | if num_sources > num_clips: |
| | logger.warning( |
| | f"More sources ({num_sources}) than clips that fit ({num_clips}). " |
| | f"Each source needs at least 1 clip, so output may exceed target duration. " |
| | f"Consider capping n_unique_audios <= max_clips in task_count.py" |
| | ) |
| | |
| | num_clips = num_sources |
| | |
| | |
| | |
| | base_reps = num_clips // num_sources |
| | extra_reps = num_clips % num_sources |
| | |
| | repetitions_per_source = [] |
| | for i in range(num_sources): |
| | reps = base_reps + (1 if i < extra_reps else 0) |
| | repetitions_per_source.append(reps) |
| | |
| | |
| | random.shuffle(repetitions_per_source) |
| | |
| | |
| | source_blocks = [] |
| | category_sequence = [] |
| | |
| | for i, (audio, category, reps) in enumerate(zip(source_audios, source_categories, repetitions_per_source)): |
| | if reps == 0: |
| | continue |
| | |
| | |
| | block = audio |
| | for _ in range(reps - 1): |
| | if crossfade_within_source_ms > 0: |
| | block = block.append(audio, crossfade=crossfade_within_source_ms) |
| | else: |
| | block = block + audio |
| | |
| | source_blocks.append(block) |
| | category_sequence.append(category) |
| | |
| | |
| | |
| | num_gaps = len(source_blocks) - 1 |
| | |
| | if num_gaps <= 0: |
| | |
| | final_audio = source_blocks[0] |
| | else: |
| | |
| | total_blocks_ms = sum(len(block) for block in source_blocks) |
| | min_total_silence_ms = num_gaps * min_silence_between_sources_ms |
| | |
| | |
| | available_extra_ms = target_ms - total_blocks_ms - min_total_silence_ms |
| | available_extra_ms = max(0, available_extra_ms) |
| | |
| | |
| | extra_silences = distribute_remainder_as_silences( |
| | available_extra_ms, |
| | num_gaps, |
| | max_extra_silence_per_gap_ms |
| | ) |
| | |
| | |
| | final_audio = source_blocks[0] |
| | for i, block in enumerate(source_blocks[1:]): |
| | gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] |
| | silence = AudioSegment.silent(duration=gap_silence_ms) |
| | final_audio = final_audio + silence + block |
| | |
| | |
| | if len(final_audio) > target_ms: |
| | final_audio = final_audio[:target_ms] |
| | elif len(final_audio) < target_ms: |
| | padding = AudioSegment.silent(duration=target_ms - len(final_audio)) |
| | final_audio = final_audio + padding |
| | |
| | |
| | metadata = { |
| | 'num_unique_sources': num_sources, |
| | 'total_clips': num_clips, |
| | 'ordering_mode': 'consecutive', |
| | 'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), |
| | 'target_duration_ms': target_ms, |
| | 'actual_duration_ms': len(final_audio), |
| | 'num_gaps_between_sources': num_gaps |
| | } |
| | |
| | logger.debug( |
| | f"Count task (consecutive): {num_sources} sources, {num_clips} total clips, " |
| | f"reps={repetitions_per_source}, duration={len(final_audio)}ms" |
| | ) |
| | |
| | return final_audio, category_sequence, metadata |
| |
|
| |
|
| | def build_random_order_for_count_task( |
| | source_audios: List[AudioSegment], |
| | source_categories: List[str], |
| | target_duration_seconds: float, |
| | source_clip_duration_seconds: float = 5.0, |
| | min_silence_ms: int = 100, |
| | max_extra_silence_per_gap_ms: int = 500 |
| | ) -> Tuple[AudioSegment, List[str], dict]: |
| | """ |
| | Build audio for COUNT task with RANDOM ordering of clips. |
| | |
| | Clips from different sources are shuffled randomly (A B A C B A C...). |
| | This tests whether the model can recognize recurring sounds as the same source. |
| | Silences are inserted between ALL clips (same or different source). |
| | |
| | Pipeline: |
| | 1. Calculate total clips needed |
| | 2. Distribute clips across sources |
| | 3. Create expanded list with all clip instances |
| | 4. Shuffle randomly |
| | 5. Insert silences between ALL clips |
| | 6. Distribute remainder as extra random silences |
| | |
| | Args: |
| | source_audios: List of unique source audio segments (one per class) |
| | source_categories: List of category names |
| | target_duration_seconds: Target total duration |
| | source_clip_duration_seconds: Duration of each source clip |
| | min_silence_ms: Minimum silence between ALL clips |
| | max_extra_silence_per_gap_ms: Max extra silence per gap |
| | |
| | Returns: |
| | Tuple of (final_audio, clip_sequence, metadata_dict) |
| | """ |
| | target_ms = int(target_duration_seconds * 1000) |
| | source_ms = int(source_clip_duration_seconds * 1000) |
| | num_sources = len(source_audios) |
| | |
| | if num_sources == 0: |
| | raise ValueError("source_audios cannot be empty") |
| | |
| | |
| | num_clips, remainder_seconds = get_max_clip_num_to_be_joined( |
| | target_duration_seconds, |
| | source_clip_duration_seconds, |
| | min_silence_ms |
| | ) |
| | |
| | |
| | if num_sources > num_clips: |
| | logger.warning( |
| | f"More sources ({num_sources}) than clips that fit ({num_clips}). " |
| | f"Each source needs at least 1 clip, so output may exceed target duration. " |
| | f"Consider capping n_unique_audios <= max_clips in task_count.py" |
| | ) |
| | |
| | num_clips = num_sources |
| | |
| | |
| | base_reps = num_clips // num_sources |
| | extra_reps = num_clips % num_sources |
| | |
| | repetitions_per_source = [] |
| | for i in range(num_sources): |
| | reps = base_reps + (1 if i < extra_reps else 0) |
| | repetitions_per_source.append(reps) |
| | |
| | |
| | expanded_clips = [] |
| | for audio, category, reps in zip(source_audios, source_categories, repetitions_per_source): |
| | for _ in range(reps): |
| | expanded_clips.append((audio, category)) |
| | |
| | |
| | random.shuffle(expanded_clips) |
| | |
| | |
| | shuffled_audios = [clip[0] for clip in expanded_clips] |
| | clip_sequence = [clip[1] for clip in expanded_clips] |
| | |
| | |
| | final_audio = build_clip_sequence_with_silences( |
| | shuffled_audios, |
| | target_duration_seconds, |
| | min_silence_ms=min_silence_ms, |
| | max_extra_silence_per_gap_ms=max_extra_silence_per_gap_ms, |
| | crossfade_ms=0 |
| | ) |
| | |
| | |
| | metadata = { |
| | 'num_unique_sources': num_sources, |
| | 'total_clips': len(expanded_clips), |
| | 'ordering_mode': 'random', |
| | 'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), |
| | 'clip_sequence': clip_sequence, |
| | 'target_duration_ms': target_ms, |
| | 'actual_duration_ms': len(final_audio), |
| | 'num_gaps': len(expanded_clips) - 1 |
| | } |
| | |
| | logger.debug( |
| | f"Count task (random): {num_sources} sources, {len(expanded_clips)} clips, " |
| | f"sequence={clip_sequence[:5]}..., duration={len(final_audio)}ms" |
| | ) |
| | |
| | return final_audio, clip_sequence, metadata |
| |
|
| |
|
| | def build_count_task_audio( |
| | source_audios: List[AudioSegment], |
| | source_categories: List[str], |
| | target_duration_seconds: float, |
| | ordering_mode: str = "random", |
| | source_clip_duration_seconds: float = 5.0, |
| | min_silence_ms: int = 100, |
| | max_extra_silence_per_gap_ms: int = 500, |
| | crossfade_within_source_ms: int = 50 |
| | ) -> Tuple[AudioSegment, List[str], dict]: |
| | """ |
| | Build audio for COUNT task with configurable ordering mode. |
| | |
| | Args: |
| | source_audios: List of unique source audio segments (one per class) |
| | source_categories: List of category names |
| | target_duration_seconds: Target total duration |
| | ordering_mode: "random" or "consecutive" |
| | - "random": Clips shuffled (A B A C B A C) - tests sound recognition |
| | - "consecutive": Same-source grouped (AAA BBB CCC) - easier |
| | source_clip_duration_seconds: Duration of each source clip |
| | min_silence_ms: Minimum silence between clips |
| | max_extra_silence_per_gap_ms: Max extra silence per gap |
| | crossfade_within_source_ms: Crossfade for consecutive mode only |
| | |
| | Returns: |
| | Tuple of (final_audio, clip_sequence, metadata_dict) |
| | """ |
| | if ordering_mode == "consecutive": |
| | return build_consecutive_sources_for_count_task( |
| | source_audios, |
| | source_categories, |
| | target_duration_seconds, |
| | source_clip_duration_seconds, |
| | min_silence_ms, |
| | max_extra_silence_per_gap_ms, |
| | crossfade_within_source_ms |
| | ) |
| | else: |
| | return build_random_order_for_count_task( |
| | source_audios, |
| | source_categories, |
| | target_duration_seconds, |
| | source_clip_duration_seconds, |
| | min_silence_ms, |
| | max_extra_silence_per_gap_ms |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def calculate_duration_slot_distribution( |
| | target_total_duration_s: float, |
| | effective_durations: Dict[str, float], |
| | target_category: str, |
| | question_type: str, |
| | multiplier_longest: float = 1.5, |
| | multiplier_shortest: float = 0.5, |
| | min_silence_between_sources_ms: int = 100 |
| | ) -> Tuple[Dict[str, int], bool, Dict]: |
| | """ |
| | Calculate how many repetitions each source gets for duration task. |
| | |
| | For LONGEST: target gets max repetitions, backgrounds get 1 each |
| | For SHORTEST: target gets 1, backgrounds share remaining duration |
| | |
| | Args: |
| | target_total_duration_s: Target total audio duration |
| | effective_durations: Dict mapping category -> effective duration in seconds |
| | target_category: The category that should be longest/shortest |
| | question_type: "longest" or "shortest" |
| | multiplier_longest: target >= max_background * this |
| | multiplier_shortest: target <= min_background * this |
| | min_silence_between_sources_ms: Minimum silence between different sources |
| | |
| | Returns: |
| | Tuple of (slot_distribution, gap_satisfied, metadata) |
| | slot_distribution: Dict mapping category -> number of repetitions |
| | gap_satisfied: Whether the duration gap constraint is met |
| | metadata: Additional info about the calculation |
| | """ |
| | categories = list(effective_durations.keys()) |
| | n_sources = len(categories) |
| | |
| | if n_sources < 2: |
| | |
| | reps = max(1, int(target_total_duration_s / effective_durations[target_category])) |
| | return {target_category: reps}, True, {'note': 'single_source'} |
| | |
| | |
| | total_silence_s = (n_sources - 1) * min_silence_between_sources_ms / 1000.0 |
| | available_for_audio_s = target_total_duration_s - total_silence_s |
| | |
| | background_categories = [c for c in categories if c != target_category] |
| | |
| | if question_type == "longest": |
| | |
| | background_duration_s = sum(effective_durations[c] for c in background_categories) |
| | |
| | |
| | remaining_for_target_s = available_for_audio_s - background_duration_s |
| | target_duration_per_rep = effective_durations[target_category] |
| | |
| | |
| | target_reps = max(1, int(remaining_for_target_s / target_duration_per_rep)) |
| | actual_target_duration = target_reps * target_duration_per_rep |
| | |
| | |
| | max_background_duration = max(effective_durations[c] for c in background_categories) |
| | required_target_duration = max_background_duration * multiplier_longest |
| | gap_satisfied = actual_target_duration >= required_target_duration |
| | |
| | slot_distribution = {c: 1 for c in background_categories} |
| | slot_distribution[target_category] = target_reps |
| | |
| | metadata = { |
| | 'available_for_audio_s': available_for_audio_s, |
| | 'background_duration_s': background_duration_s, |
| | 'remaining_for_target_s': remaining_for_target_s, |
| | 'target_reps': target_reps, |
| | 'actual_target_duration_s': actual_target_duration, |
| | 'max_background_duration_s': max_background_duration, |
| | 'required_target_duration_s': required_target_duration, |
| | 'multiplier_used': multiplier_longest |
| | } |
| | |
| | else: |
| | |
| | target_duration_s = effective_durations[target_category] |
| | |
| | |
| | remaining_for_backgrounds_s = available_for_audio_s - target_duration_s |
| | |
| | |
| | |
| | slot_distribution = {target_category: 1} |
| | |
| | |
| | min_background_required = target_duration_s / multiplier_shortest |
| | |
| | background_reps = {} |
| | for cat in background_categories: |
| | eff_dur = effective_durations[cat] |
| | |
| | min_reps = max(1, int(min_background_required / eff_dur) + 1) |
| | background_reps[cat] = min_reps |
| | |
| | |
| | total_background_needed = sum( |
| | background_reps[c] * effective_durations[c] |
| | for c in background_categories |
| | ) |
| | |
| | if total_background_needed <= remaining_for_backgrounds_s: |
| | |
| | extra_available = remaining_for_backgrounds_s - total_background_needed |
| | |
| | |
| | while extra_available > 0: |
| | added_any = False |
| | for cat in background_categories: |
| | eff_dur = effective_durations[cat] |
| | if extra_available >= eff_dur: |
| | background_reps[cat] += 1 |
| | extra_available -= eff_dur |
| | added_any = True |
| | if not added_any: |
| | break |
| | |
| | slot_distribution.update(background_reps) |
| | gap_satisfied = True |
| | else: |
| | |
| | slot_distribution.update(background_reps) |
| | gap_satisfied = False |
| | |
| | |
| | actual_durations = { |
| | cat: slot_distribution[cat] * effective_durations[cat] |
| | for cat in categories |
| | } |
| | min_background_actual = min( |
| | actual_durations[c] for c in background_categories |
| | ) |
| | |
| | |
| | gap_satisfied = actual_durations[target_category] <= min_background_actual * multiplier_shortest |
| | |
| | metadata = { |
| | 'available_for_audio_s': available_for_audio_s, |
| | 'target_duration_s': target_duration_s, |
| | 'remaining_for_backgrounds_s': remaining_for_backgrounds_s, |
| | 'min_background_required_s': min_background_required, |
| | 'actual_durations_s': actual_durations, |
| | 'min_background_actual_s': min_background_actual, |
| | 'multiplier_used': multiplier_shortest |
| | } |
| | |
| | return slot_distribution, gap_satisfied, metadata |
| |
|
| |
|
| | def build_duration_task_audio( |
| | source_audio_lists: Dict[str, List[AudioSegment]], |
| | slot_distribution: Dict[str, int], |
| | effective_durations: Dict[str, float], |
| | target_total_duration_s: float, |
| | min_silence_between_sources_ms: int = 100, |
| | max_extra_silence_per_gap_ms: int = 500, |
| | crossfade_within_source_ms: int = 50 |
| | ) -> Tuple[AudioSegment, List[str], Dict]: |
| | """ |
| | Build audio for DURATION task with consecutive ordering per source. |
| | |
| | Structure: [SourceA × n] + silence + [SourceB × m] + silence + ... |
| | Order of sources is randomized to avoid patterns. |
| | |
| | Args: |
| | source_audio_lists: Dict mapping category -> list of audio segments |
| | slot_distribution: Dict mapping category -> number of repetitions |
| | effective_durations: Dict mapping category -> effective duration per clip |
| | target_total_duration_s: Target total duration |
| | min_silence_between_sources_ms: Min silence between different sources |
| | max_extra_silence_per_gap_ms: Max extra silence per gap |
| | crossfade_within_source_ms: Crossfade between same-source repetitions |
| | |
| | Returns: |
| | Tuple of (final_audio, category_sequence, metadata) |
| | """ |
| | categories = list(slot_distribution.keys()) |
| | |
| | |
| | random.shuffle(categories) |
| | |
| | |
| | source_blocks = [] |
| | category_sequence = [] |
| | actual_durations = {} |
| | block_durations_ms = [] |
| | |
| | for category in categories: |
| | reps = slot_distribution[category] |
| | audio_list = source_audio_lists[category] |
| | |
| | if reps == 0: |
| | continue |
| | |
| | |
| | block = audio_list[0] |
| | for i in range(1, reps): |
| | |
| | next_clip = audio_list[i % len(audio_list)] |
| | |
| | |
| | if crossfade_within_source_ms > 0: |
| | if len(block) > crossfade_within_source_ms and len(next_clip) > crossfade_within_source_ms: |
| | block = block.append(next_clip, crossfade=crossfade_within_source_ms) |
| | else: |
| | block = block + next_clip |
| | else: |
| | block = block + next_clip |
| | |
| | source_blocks.append((category, block)) |
| | block_durations_ms.append(len(block)) |
| | category_sequence.extend([category] * reps) |
| | actual_durations[category] = len(block) / 1000.0 |
| | |
| | |
| | total_audio_ms = sum(len(block) for _, block in source_blocks) |
| | num_gaps = len(source_blocks) - 1 |
| | min_total_silence_ms = num_gaps * min_silence_between_sources_ms |
| | |
| | target_ms = int(target_total_duration_s * 1000) |
| | available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms |
| | |
| | |
| | if available_extra_ms > 0 and num_gaps > 0: |
| | extra_silences = distribute_remainder_as_silences( |
| | available_extra_ms, |
| | num_gaps, |
| | max_extra_silence_per_gap_ms |
| | ) |
| | else: |
| | extra_silences = [0] * max(num_gaps, 1) |
| | |
| | |
| | source_timestamps = [] |
| | current_position_ms = 0 |
| | |
| | if len(source_blocks) == 1: |
| | final_audio = source_blocks[0][1] |
| | cat, block = source_blocks[0] |
| | source_timestamps.append((cat, 0, len(block))) |
| | else: |
| | final_audio = source_blocks[0][1] |
| | cat, block = source_blocks[0] |
| | source_timestamps.append((cat, 0, len(block))) |
| | current_position_ms = len(block) |
| |
|
| | for i, (cat, block) in enumerate(source_blocks[1:]): |
| | gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] |
| | silence = AudioSegment.silent(duration=gap_silence_ms) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | crossfade_ms = min(500, gap_silence_ms) |
| | if crossfade_ms > 0 and crossfade_ms < gap_silence_ms and len(final_audio) > crossfade_ms and len(block) > crossfade_ms: |
| | final_audio = final_audio.append(silence, crossfade=crossfade_ms) |
| | |
| | final_audio = final_audio.append(block, crossfade=0) |
| | |
| | start_ms = current_position_ms + gap_silence_ms |
| | end_ms = start_ms + len(block) |
| | source_timestamps.append((cat, start_ms, end_ms)) |
| | current_position_ms = end_ms |
| | else: |
| | |
| | final_audio = final_audio + silence + block |
| | start_ms = current_position_ms + gap_silence_ms |
| | end_ms = start_ms + len(block) |
| | source_timestamps.append((cat, start_ms, end_ms)) |
| | current_position_ms = end_ms |
| | |
| | |
| | if len(final_audio) > target_ms: |
| | final_audio = final_audio[:target_ms] |
| | elif len(final_audio) < target_ms: |
| | padding = AudioSegment.silent(duration=target_ms - len(final_audio)) |
| | final_audio = final_audio + padding |
| | |
| | |
| | timestamp_parts = [] |
| | for cat, start_ms, end_ms in source_timestamps: |
| | start_s = round(start_ms / 1000.0, 2) |
| | end_s = round(end_ms / 1000.0, 2) |
| | duration_s = round((end_ms - start_ms) / 1000.0, 2) |
| | timestamp_parts.append(f"{cat} {start_s}s-{end_s}s ({duration_s}s)") |
| | timestamp_string = ", ".join(timestamp_parts) |
| | |
| | metadata = { |
| | 'source_order': [cat for cat, _ in source_blocks], |
| | 'slot_distribution': slot_distribution, |
| | 'actual_durations_s': actual_durations, |
| | 'total_audio_ms': total_audio_ms, |
| | 'num_gaps': num_gaps, |
| | 'final_duration_ms': len(final_audio), |
| | 'source_timestamps': source_timestamps, |
| | 'timestamp_string': timestamp_string |
| | } |
| | |
| | logger.debug( |
| | f"Duration task audio: {len(source_blocks)} sources, " |
| | f"order={metadata['source_order']}, duration={len(final_audio)}ms" |
| | ) |
| | |
| | return final_audio, category_sequence, metadata |
| |
|