| from torch.jit import Final |
| import torch.nn.functional as F |
| from itertools import repeat |
| import collections.abc |
|
|
| import torch |
| import torch.nn as nn |
|
|
| class Attention(nn.Module): |
| fast_attn: Final[bool] |
|
|
| def __init__( |
| self, |
| dim, |
| num_heads=8, |
| qkv_bias=False, |
| qk_norm=False, |
| attn_drop=0, |
| proj_drop=0, |
| norm_layer=nn.LayerNorm, |
| ): |
| super().__init__() |
| assert dim % num_heads == 0, "dim should be divisible by num_heads" |
| self.num_heads = num_heads |
| self.head_dim = dim // num_heads |
|
|
| self.scale = self.head_dim**-0.5 |
| self.fast_attn = hasattr( |
| torch.nn.functional, "scaled_dot_product_attention" |
| ) |
| assert self.fast_attn, "scaled_dot_product_attention Not implemented" |
|
|
| self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) |
|
|
| self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() |
| self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity() |
| self.attn_drop = nn.Dropout(attn_drop) |
|
|
| self.proj = nn.Linear(dim, dim) |
| self.proj_drop = nn.Dropout(proj_drop) |
|
|
| def forward(self, x, node_mask): |
| B, N, D = x.shape |
|
|
| |
| qkv = ( |
| self.qkv(x) |
| .reshape(B, N, 3, self.num_heads, self.head_dim) |
| .permute(2, 0, 3, 1, 4) |
| ) |
| q, k, v = qkv.unbind(0) |
| q, k = self.q_norm(q), self.k_norm(k) |
|
|
| attn_mask = (node_mask[:, None, :, None] & node_mask[:, None, None, :]).expand( |
| -1, self.num_heads, N, N |
| ) |
| extended_nodes = (attn_mask.sum(dim=-1) == 0) |
| attn_mask = attn_mask.clone() |
| attn_mask[extended_nodes] = True |
|
|
| x = F.scaled_dot_product_attention( |
| q, |
| k, |
| v, |
| dropout_p=self.attn_drop.p, |
| attn_mask=attn_mask, |
| ) |
|
|
| x = x.transpose(1, 2).reshape(B, N, -1) |
| |
| |
|
|
| x = self.proj(x) |
| x = self.proj_drop(x) |
|
|
| return x |
|
|
|
|
| class MLP(nn.Module): |
| def __init__( |
| self, |
| in_features, |
| hidden_features=None, |
| out_features=None, |
| act_layer=nn.GELU, |
| bias=True, |
| drop=0.0, |
| ): |
| super().__init__() |
| out_features = out_features or in_features |
| hidden_features = hidden_features or in_features |
| bias = to_2tuple(bias) |
| linear_layer = nn.Linear |
|
|
| self.fc1 = linear_layer(in_features, hidden_features, bias=bias[0]) |
| self.act = act_layer() |
| self.drop1 = nn.Dropout(drop) |
| self.fc2 = linear_layer(hidden_features, out_features, bias=bias[1]) |
|
|
| def forward(self, x): |
| x = self.fc1(x) |
| x = self.act(x) |
| x = self.drop1(x) |
| x = self.fc2(x) |
| return x |
|
|
|
|
| |
| def _ntuple(n): |
| def parse(x): |
| if isinstance(x, collections.abc.Iterable) and not isinstance(x, str): |
| return tuple(x) |
| return tuple(repeat(x, n)) |
|
|
| return parse |
|
|
|
|
| to_2tuple = _ntuple(2) |
|
|