| from typing import Union |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from einops import rearrange |
| from torch.nn.utils import weight_norm |
|
|
| class FactorizedVectorQuantize(nn.Module): |
| def __init__(self, dim, codebook_size, codebook_dim, commitment, **kwargs): |
| super().__init__() |
| self.codebook_size = codebook_size |
| self.codebook_dim = codebook_dim |
| self.commitment = commitment |
| |
| if dim != self.codebook_dim: |
| self.in_proj = weight_norm(nn.Linear(dim, self.codebook_dim)) |
| self.out_proj = weight_norm(nn.Linear(self.codebook_dim, dim)) |
| else: |
| self.in_proj = nn.Identity() |
| self.out_proj = nn.Identity() |
| self._codebook = nn.Embedding(codebook_size, self.codebook_dim) |
| |
| @property |
| def codebook(self): |
| return self._codebook |
|
|
| def forward(self, z): |
| """Quantized the input tensor using a fixed codebook and returns |
| the corresponding codebook vectors |
| |
| Parameters |
| ---------- |
| z : Tensor[B x D x T] |
| |
| Returns |
| ------- |
| Tensor[B x D x T] |
| Quantized continuous representation of input |
| Tensor[1] |
| Commitment loss to train encoder to predict vectors closer to codebook |
| entries |
| Tensor[1] |
| Codebook loss to update the codebook |
| Tensor[B x T] |
| Codebook indices (quantized discrete representation of input) |
| Tensor[B x D x T] |
| Projected latents (continuous representation of input before quantization) |
| """ |
| |
|
|
| z = rearrange(z, "b d t -> b t d") |
|
|
| |
| z_e = self.in_proj(z) |
| z_e = rearrange(z_e, "b t d -> b d t") |
| z_q, indices = self.decode_latents(z_e) |
| |
|
|
| if self.training: |
| commitment_loss = F.mse_loss(z_e, z_q.detach(), reduction='none').mean([1, 2]) * self.commitment |
| codebook_loss = F.mse_loss(z_q, z_e.detach(), reduction='none').mean([1, 2]) |
| commit_loss = commitment_loss + codebook_loss |
| else: |
| commit_loss = torch.zeros(z.shape[0], device = z.device) |
|
|
| z_q = ( |
| z_e + (z_q - z_e).detach() |
| ) |
|
|
| z_q = rearrange(z_q, "b d t -> b t d") |
| z_q = self.out_proj(z_q) |
| z_q = rearrange(z_q, "b t d -> b d t") |
|
|
| return z_q, indices, commit_loss |
|
|
| def vq2emb(self, vq, proj=True): |
| emb = self.embed_code(vq) |
| if proj: |
| emb = self.out_proj(emb) |
| return emb |
|
|
| def get_emb(self): |
| return self.codebook.weight |
|
|
| def embed_code(self, embed_id): |
| return F.embedding(embed_id, self.codebook.weight) |
|
|
| def decode_code(self, embed_id): |
| return self.embed_code(embed_id).transpose(1, 2) |
|
|
| def decode_latents(self, latents): |
| encodings = rearrange(latents, "b d t -> (b t) d") |
| codebook = self.codebook.weight |
|
|
| |
| encodings = F.normalize(encodings) |
| codebook = F.normalize(codebook) |
|
|
| |
| dist = ( |
| encodings.pow(2).sum(1, keepdim=True) |
| - 2 * encodings @ codebook.t() |
| + codebook.pow(2).sum(1, keepdim=True).t() |
| ) |
| indices = rearrange((-dist).max(1)[1], "(b t) -> b t", b=latents.size(0)) |
| z_q = self.decode_code(indices) |
| return z_q, indices |