| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from typing import List, Optional, Tuple, Union |
|
|
| import paddle |
|
|
| from ...pipeline_utils import AudioPipelineOutput, DiffusionPipeline |
| from ...utils import logging |
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| class DanceDiffusionPipeline(DiffusionPipeline): |
| r""" |
| This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
| library implements for all the pipelines (such as downloading or saving, running on a particular xxxx, etc.) |
| |
| Parameters: |
| unet ([`UNet1DModel`]): U-Net architecture to denoise the encoded image. |
| scheduler ([`SchedulerMixin`]): |
| A scheduler to be used in combination with `unet` to denoise the encoded image. Can be one of |
| [`IPNDMScheduler`]. |
| """ |
|
|
| def __init__(self, unet, scheduler): |
| super().__init__() |
| self.register_modules(unet=unet, scheduler=scheduler) |
|
|
| @paddle.no_grad() |
| def __call__( |
| self, |
| batch_size: int = 1, |
| num_inference_steps: int = 100, |
| generator: Optional[Union[paddle.Generator, List[paddle.Generator]]] = None, |
| audio_length_in_s: Optional[float] = None, |
| return_dict: bool = True, |
| ) -> Union[AudioPipelineOutput, Tuple]: |
| r""" |
| Args: |
| batch_size (`int`, *optional*, defaults to 1): |
| The number of audio samples to generate. |
| num_inference_steps (`int`, *optional*, defaults to 50): |
| The number of denoising steps. More denoising steps usually lead to a higher quality audio sample at |
| the expense of slower inference. |
| generator (`paddle.Generator`, *optional*): |
| One or a list of paddle generator(s) to make generation deterministic. |
| audio_length_in_s (`float`, *optional*, defaults to `self.unet.config.sample_size/self.unet.config.sample_rate`): |
| The length of the generated audio sample in seconds. Note that the output of the pipeline, *i.e.* |
| `sample_size`, will be `audio_length_in_s` * `self.unet.sample_rate`. |
| return_dict (`bool`, *optional*, defaults to `True`): |
| Whether or not to return a [`~pipeline_utils.AudioPipelineOutput`] instead of a plain tuple. |
| |
| Returns: |
| [`~pipeline_utils.AudioPipelineOutput`] or `tuple`: [`~pipelines.utils.AudioPipelineOutput`] if |
| `return_dict` is True, otherwise a `tuple. When returning a tuple, the first element is a list with the |
| generated images. |
| """ |
|
|
| if audio_length_in_s is None: |
| audio_length_in_s = self.unet.config.sample_size / self.unet.config.sample_rate |
|
|
| sample_size = audio_length_in_s * self.unet.sample_rate |
|
|
| down_scale_factor = 2 ** len(self.unet.up_blocks) |
| if sample_size < 3 * down_scale_factor: |
| raise ValueError( |
| f"{audio_length_in_s} is too small. Make sure it's bigger or equal to" |
| f" {3 * down_scale_factor / self.unet.sample_rate}." |
| ) |
|
|
| original_sample_size = int(sample_size) |
| if sample_size % down_scale_factor != 0: |
| sample_size = ((audio_length_in_s * self.unet.sample_rate) // down_scale_factor + 1) * down_scale_factor |
| logger.info( |
| f"{audio_length_in_s} is increased to {sample_size / self.unet.sample_rate} so that it can be handled" |
| f" by the model. It will be cut to {original_sample_size / self.unet.sample_rate} after the denoising" |
| " process." |
| ) |
| sample_size = int(sample_size) |
|
|
| dtype = self.unet.dtype |
| shape = [batch_size, self.unet.in_channels, sample_size] |
| if isinstance(generator, list) and len(generator) != batch_size: |
| raise ValueError( |
| f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" |
| f" size of {batch_size}. Make sure the batch size matches the length of the generators." |
| ) |
|
|
| if isinstance(generator, list): |
| shape = [ |
| 1, |
| ] + shape[1:] |
| audio = [paddle.randn(shape, generator=generator[i], dtype=self.unet.dtype) for i in range(batch_size)] |
| audio = paddle.concat(audio, axis=0) |
| else: |
| audio = paddle.randn(shape, generator=generator, dtype=dtype) |
| |
| self.scheduler.set_timesteps(num_inference_steps) |
| self.scheduler.timesteps = self.scheduler.timesteps.cast(dtype) |
|
|
| for t in self.progress_bar(self.scheduler.timesteps): |
| |
| model_output = self.unet(audio, t).sample |
|
|
| |
| audio = self.scheduler.step(model_output, t, audio).prev_sample |
|
|
| audio = audio.clip(-1, 1).cast("float32").cpu().numpy() |
|
|
| audio = audio[:, :, :original_sample_size] |
|
|
| if not return_dict: |
| return (audio,) |
|
|
| return AudioPipelineOutput(audios=audio) |
|
|