import torch import torch.nn as nn import torch.nn.functional as F from transformers import PreTrainedModel from transformers.modeling_outputs import MaskedLMOutput, BaseModelOutput from .configuration_mybert import MyBertConfig def _build_rope_cache(head_dim, max_seq_len, base=10000.0): inv_freq = 1.0 / (base ** (torch.arange(0, head_dim, 2, dtype=torch.float32) / head_dim)) t = torch.arange(max_seq_len, dtype=torch.float32) freqs = torch.outer(t, inv_freq) emb = torch.cat((freqs, freqs), dim=-1) return emb.cos(), emb.sin() def _rotate_half(x): x1, x2 = x.chunk(2, dim=-1) return torch.cat((-x2, x1), dim=-1) def _apply_rope(q, k, cos, sin): cos = cos.to(q.dtype).unsqueeze(0).unsqueeze(0) sin = sin.to(q.dtype).unsqueeze(0).unsqueeze(0) q_rot = (q * cos) + (_rotate_half(q) * sin) k_rot = (k * cos) + (_rotate_half(k) * sin) return q_rot, k_rot class MyBertEmbeddings(nn.Module): def __init__(self, config): super().__init__() self.word_embeddings = nn.Embedding( config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id ) self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, input_ids): x = self.word_embeddings(input_ids) x = self.LayerNorm(x) x = self.dropout(x) return x class MyBertSelfAttention(nn.Module): def __init__(self, config): super().__init__() self.num_attention_heads = config.num_attention_heads self.attention_head_size = config.hidden_size // config.num_attention_heads self.all_head_size = config.hidden_size self.query = nn.Linear(config.hidden_size, self.all_head_size) self.key = nn.Linear(config.hidden_size, self.all_head_size) self.value = nn.Linear(config.hidden_size, self.all_head_size) self.dropout_prob = config.attention_probs_dropout_prob def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): q = self.query(hidden_states) k = self.key(hidden_states) v = self.value(hidden_states) new_shape = q.size()[:-1] + (self.num_attention_heads, self.attention_head_size) q = q.view(*new_shape).transpose(1, 2) k = k.view(*new_shape).transpose(1, 2) v = v.view(*new_shape).transpose(1, 2) if cos is not None and sin is not None: q, k = _apply_rope(q, k, cos, sin) context = F.scaled_dot_product_attention( q, k, v, attn_mask=attention_mask, dropout_p=self.dropout_prob if self.training else 0.0, is_causal=False, ) context = context.transpose(1, 2).contiguous() new_context_shape = context.size()[:-2] + (self.all_head_size,) return context.view(*new_context_shape) class MyBertSelfOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, hidden_states): return self.dropout(self.dense(hidden_states)) class MyBertAttention(nn.Module): def __init__(self, config): super().__init__() self.self = MyBertSelfAttention(config) self.output = MyBertSelfOutput(config) def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): self_outputs = self.self(hidden_states, attention_mask, cos, sin) return self.output(self_outputs) class MyBertIntermediate(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.intermediate_size) self.intermediate_act_fn = nn.GELU() def forward(self, hidden_states): return self.intermediate_act_fn(self.dense(hidden_states)) class MyBertOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.intermediate_size, config.hidden_size) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, hidden_states): return self.dropout(self.dense(hidden_states)) class MyBertLayer(nn.Module): def __init__(self, config): super().__init__() self.attention_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.attention = MyBertAttention(config) self.ffn_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.intermediate = MyBertIntermediate(config) self.output = MyBertOutput(config) def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): normed = self.attention_layernorm(hidden_states) attention_output = self.attention(normed, attention_mask, cos, sin) hidden_states = hidden_states + attention_output normed = self.ffn_layernorm(hidden_states) intermediate_out = self.intermediate(normed) layer_output = self.output(intermediate_out) hidden_states = hidden_states + layer_output return hidden_states class MyBertEncoder(nn.Module): def __init__(self, config): super().__init__() self.layer = nn.ModuleList([MyBertLayer(config) for _ in range(config.num_hidden_layers)]) self.final_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): for layer_module in self.layer: hidden_states = layer_module(hidden_states, attention_mask, cos, sin) return self.final_layernorm(hidden_states) class MyBertPredictionHeadTransform(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.transform_act_fn = nn.GELU() self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) def forward(self, hidden_states): hidden_states = self.dense(hidden_states) hidden_states = self.transform_act_fn(hidden_states) hidden_states = self.LayerNorm(hidden_states) return hidden_states class MyBertLMPredictionHead(nn.Module): def __init__(self, config): super().__init__() self.transform = MyBertPredictionHeadTransform(config) self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=True) def forward(self, hidden_states): hidden_states = self.transform(hidden_states) hidden_states = self.decoder(hidden_states) return hidden_states class MyBertOnlyMLMHead(nn.Module): def __init__(self, config): super().__init__() self.predictions = MyBertLMPredictionHead(config) def forward(self, sequence_output): return self.predictions(sequence_output) class MyBertPreTrainedModel(PreTrainedModel): config_class = MyBertConfig base_model_prefix = "mybert" supports_gradient_checkpointing = False _no_split_modules = ["MyBertLayer"] def _init_weights(self, module): std = self.config.initializer_range if isinstance(module, nn.Linear): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) class MyBertModel(MyBertPreTrainedModel): def __init__(self, config): super().__init__(config) self.embeddings = MyBertEmbeddings(config) self.encoder = MyBertEncoder(config) self.post_init() def get_input_embeddings(self): return self.embeddings.word_embeddings def set_input_embeddings(self, value): self.embeddings.word_embeddings = value def forward(self, input_ids=None, attention_mask=None, return_dict=True, **kwargs): _, T = input_ids.shape head_dim = self.config.hidden_size // self.config.num_attention_heads cos, sin = _build_rope_cache(head_dim, T, self.config.rope_theta) cos = cos.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype) sin = sin.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype) attn_mask = None if attention_mask is not None: attn_mask = attention_mask.bool()[:, None, None, :] hidden = self.embeddings(input_ids) sequence_output = self.encoder(hidden, attn_mask, cos, sin) if not return_dict: return (sequence_output,) return BaseModelOutput(last_hidden_state=sequence_output) class MyBertForMaskedLM(MyBertPreTrainedModel): _tied_weights_keys = { "cls.predictions.decoder.weight": "mybert.embeddings.word_embeddings.weight", } def __init__(self, config): super().__init__(config) self.mybert = MyBertModel(config) self.cls = MyBertOnlyMLMHead(config) self.post_init() def get_output_embeddings(self): return self.cls.predictions.decoder def set_output_embeddings(self, new_embeddings): self.cls.predictions.decoder = new_embeddings def forward(self, input_ids=None, attention_mask=None, labels=None, return_dict=True, **kwargs): outputs = self.mybert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True) sequence_output = outputs.last_hidden_state prediction_scores = self.cls(sequence_output) loss = None if labels is not None: loss = F.cross_entropy( prediction_scores.view(-1, self.config.vocab_size), labels.view(-1), ignore_index=-100, ) if not return_dict: output = (prediction_scores,) return ((loss,) + output) if loss is not None else output return MaskedLMOutput(loss=loss, logits=prediction_scores)