RoPERT-MLM-mini / modeling_mybert.py
kd13's picture
Update modeling_mybert.py
95ffa1d verified
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import PreTrainedModel
from transformers.modeling_outputs import MaskedLMOutput, BaseModelOutput
from .configuration_mybert import MyBertConfig
def _build_rope_cache(head_dim, max_seq_len, base=10000.0):
inv_freq = 1.0 / (base ** (torch.arange(0, head_dim, 2, dtype=torch.float32) / head_dim))
t = torch.arange(max_seq_len, dtype=torch.float32)
freqs = torch.outer(t, inv_freq)
emb = torch.cat((freqs, freqs), dim=-1)
return emb.cos(), emb.sin()
def _rotate_half(x):
x1, x2 = x.chunk(2, dim=-1)
return torch.cat((-x2, x1), dim=-1)
def _apply_rope(q, k, cos, sin):
cos = cos.to(q.dtype).unsqueeze(0).unsqueeze(0)
sin = sin.to(q.dtype).unsqueeze(0).unsqueeze(0)
q_rot = (q * cos) + (_rotate_half(q) * sin)
k_rot = (k * cos) + (_rotate_half(k) * sin)
return q_rot, k_rot
class MyBertEmbeddings(nn.Module):
def __init__(self, config):
super().__init__()
self.word_embeddings = nn.Embedding(
config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id
)
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, input_ids):
x = self.word_embeddings(input_ids)
x = self.LayerNorm(x)
x = self.dropout(x)
return x
class MyBertSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.num_attention_heads = config.num_attention_heads
self.attention_head_size = config.hidden_size // config.num_attention_heads
self.all_head_size = config.hidden_size
self.query = nn.Linear(config.hidden_size, self.all_head_size)
self.key = nn.Linear(config.hidden_size, self.all_head_size)
self.value = nn.Linear(config.hidden_size, self.all_head_size)
self.dropout_prob = config.attention_probs_dropout_prob
def forward(self, hidden_states, attention_mask=None, cos=None, sin=None):
q = self.query(hidden_states)
k = self.key(hidden_states)
v = self.value(hidden_states)
new_shape = q.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
q = q.view(*new_shape).transpose(1, 2)
k = k.view(*new_shape).transpose(1, 2)
v = v.view(*new_shape).transpose(1, 2)
if cos is not None and sin is not None:
q, k = _apply_rope(q, k, cos, sin)
context = F.scaled_dot_product_attention(
q, k, v,
attn_mask=attention_mask,
dropout_p=self.dropout_prob if self.training else 0.0,
is_causal=False,
)
context = context.transpose(1, 2).contiguous()
new_context_shape = context.size()[:-2] + (self.all_head_size,)
return context.view(*new_context_shape)
class MyBertSelfOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states):
return self.dropout(self.dense(hidden_states))
class MyBertAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.self = MyBertSelfAttention(config)
self.output = MyBertSelfOutput(config)
def forward(self, hidden_states, attention_mask=None, cos=None, sin=None):
self_outputs = self.self(hidden_states, attention_mask, cos, sin)
return self.output(self_outputs)
class MyBertIntermediate(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
self.intermediate_act_fn = nn.GELU()
def forward(self, hidden_states):
return self.intermediate_act_fn(self.dense(hidden_states))
class MyBertOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states):
return self.dropout(self.dense(hidden_states))
class MyBertLayer(nn.Module):
def __init__(self, config):
super().__init__()
self.attention_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.attention = MyBertAttention(config)
self.ffn_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.intermediate = MyBertIntermediate(config)
self.output = MyBertOutput(config)
def forward(self, hidden_states, attention_mask=None, cos=None, sin=None):
normed = self.attention_layernorm(hidden_states)
attention_output = self.attention(normed, attention_mask, cos, sin)
hidden_states = hidden_states + attention_output
normed = self.ffn_layernorm(hidden_states)
intermediate_out = self.intermediate(normed)
layer_output = self.output(intermediate_out)
hidden_states = hidden_states + layer_output
return hidden_states
class MyBertEncoder(nn.Module):
def __init__(self, config):
super().__init__()
self.layer = nn.ModuleList([MyBertLayer(config) for _ in range(config.num_hidden_layers)])
self.final_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
def forward(self, hidden_states, attention_mask=None, cos=None, sin=None):
for layer_module in self.layer:
hidden_states = layer_module(hidden_states, attention_mask, cos, sin)
return self.final_layernorm(hidden_states)
class MyBertPredictionHeadTransform(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.transform_act_fn = nn.GELU()
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
def forward(self, hidden_states):
hidden_states = self.dense(hidden_states)
hidden_states = self.transform_act_fn(hidden_states)
hidden_states = self.LayerNorm(hidden_states)
return hidden_states
class MyBertLMPredictionHead(nn.Module):
def __init__(self, config):
super().__init__()
self.transform = MyBertPredictionHeadTransform(config)
self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=True)
def forward(self, hidden_states):
hidden_states = self.transform(hidden_states)
hidden_states = self.decoder(hidden_states)
return hidden_states
class MyBertOnlyMLMHead(nn.Module):
def __init__(self, config):
super().__init__()
self.predictions = MyBertLMPredictionHead(config)
def forward(self, sequence_output):
return self.predictions(sequence_output)
class MyBertPreTrainedModel(PreTrainedModel):
config_class = MyBertConfig
base_model_prefix = "mybert"
supports_gradient_checkpointing = False
_no_split_modules = ["MyBertLayer"]
def _init_weights(self, module):
std = self.config.initializer_range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
class MyBertModel(MyBertPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.embeddings = MyBertEmbeddings(config)
self.encoder = MyBertEncoder(config)
self.post_init()
def get_input_embeddings(self):
return self.embeddings.word_embeddings
def set_input_embeddings(self, value):
self.embeddings.word_embeddings = value
def forward(self, input_ids=None, attention_mask=None, return_dict=True, **kwargs):
_, T = input_ids.shape
head_dim = self.config.hidden_size // self.config.num_attention_heads
cos, sin = _build_rope_cache(head_dim, T, self.config.rope_theta)
cos = cos.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype)
sin = sin.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype)
attn_mask = None
if attention_mask is not None:
attn_mask = attention_mask.bool()[:, None, None, :]
hidden = self.embeddings(input_ids)
sequence_output = self.encoder(hidden, attn_mask, cos, sin)
if not return_dict:
return (sequence_output,)
return BaseModelOutput(last_hidden_state=sequence_output)
class MyBertForMaskedLM(MyBertPreTrainedModel):
_tied_weights_keys = {
"cls.predictions.decoder.weight": "mybert.embeddings.word_embeddings.weight",
}
def __init__(self, config):
super().__init__(config)
self.mybert = MyBertModel(config)
self.cls = MyBertOnlyMLMHead(config)
self.post_init()
def get_output_embeddings(self):
return self.cls.predictions.decoder
def set_output_embeddings(self, new_embeddings):
self.cls.predictions.decoder = new_embeddings
def forward(self, input_ids=None, attention_mask=None, labels=None, return_dict=True, **kwargs):
outputs = self.mybert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True)
sequence_output = outputs.last_hidden_state
prediction_scores = self.cls(sequence_output)
loss = None
if labels is not None:
loss = F.cross_entropy(
prediction_scores.view(-1, self.config.vocab_size),
labels.view(-1),
ignore_index=-100,
)
if not return_dict:
output = (prediction_scores,)
return ((loss,) + output) if loss is not None else output
return MaskedLMOutput(loss=loss, logits=prediction_scores)