| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from transformers import PreTrainedModel |
| from transformers.modeling_outputs import MaskedLMOutput, BaseModelOutput |
|
|
| from .configuration_mybert import MyBertConfig |
|
|
|
|
| def _build_rope_cache(head_dim, max_seq_len, base=10000.0): |
| inv_freq = 1.0 / (base ** (torch.arange(0, head_dim, 2, dtype=torch.float32) / head_dim)) |
| t = torch.arange(max_seq_len, dtype=torch.float32) |
| freqs = torch.outer(t, inv_freq) |
| emb = torch.cat((freqs, freqs), dim=-1) |
| return emb.cos(), emb.sin() |
|
|
|
|
| def _rotate_half(x): |
| x1, x2 = x.chunk(2, dim=-1) |
| return torch.cat((-x2, x1), dim=-1) |
|
|
|
|
| def _apply_rope(q, k, cos, sin): |
| cos = cos.to(q.dtype).unsqueeze(0).unsqueeze(0) |
| sin = sin.to(q.dtype).unsqueeze(0).unsqueeze(0) |
| q_rot = (q * cos) + (_rotate_half(q) * sin) |
| k_rot = (k * cos) + (_rotate_half(k) * sin) |
| return q_rot, k_rot |
|
|
|
|
| class MyBertEmbeddings(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.word_embeddings = nn.Embedding( |
| config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id |
| ) |
| self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
| def forward(self, input_ids): |
| x = self.word_embeddings(input_ids) |
| x = self.LayerNorm(x) |
| x = self.dropout(x) |
| return x |
|
|
|
|
| class MyBertSelfAttention(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.num_attention_heads = config.num_attention_heads |
| self.attention_head_size = config.hidden_size // config.num_attention_heads |
| self.all_head_size = config.hidden_size |
| self.query = nn.Linear(config.hidden_size, self.all_head_size) |
| self.key = nn.Linear(config.hidden_size, self.all_head_size) |
| self.value = nn.Linear(config.hidden_size, self.all_head_size) |
| self.dropout_prob = config.attention_probs_dropout_prob |
|
|
| def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): |
| q = self.query(hidden_states) |
| k = self.key(hidden_states) |
| v = self.value(hidden_states) |
| new_shape = q.size()[:-1] + (self.num_attention_heads, self.attention_head_size) |
| q = q.view(*new_shape).transpose(1, 2) |
| k = k.view(*new_shape).transpose(1, 2) |
| v = v.view(*new_shape).transpose(1, 2) |
| if cos is not None and sin is not None: |
| q, k = _apply_rope(q, k, cos, sin) |
| context = F.scaled_dot_product_attention( |
| q, k, v, |
| attn_mask=attention_mask, |
| dropout_p=self.dropout_prob if self.training else 0.0, |
| is_causal=False, |
| ) |
| context = context.transpose(1, 2).contiguous() |
| new_context_shape = context.size()[:-2] + (self.all_head_size,) |
| return context.view(*new_context_shape) |
|
|
|
|
| class MyBertSelfOutput(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
| def forward(self, hidden_states): |
| return self.dropout(self.dense(hidden_states)) |
|
|
|
|
| class MyBertAttention(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.self = MyBertSelfAttention(config) |
| self.output = MyBertSelfOutput(config) |
|
|
| def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): |
| self_outputs = self.self(hidden_states, attention_mask, cos, sin) |
| return self.output(self_outputs) |
|
|
|
|
| class MyBertIntermediate(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.hidden_size, config.intermediate_size) |
| self.intermediate_act_fn = nn.GELU() |
|
|
| def forward(self, hidden_states): |
| return self.intermediate_act_fn(self.dense(hidden_states)) |
|
|
|
|
| class MyBertOutput(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.intermediate_size, config.hidden_size) |
| self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
| def forward(self, hidden_states): |
| return self.dropout(self.dense(hidden_states)) |
|
|
|
|
| class MyBertLayer(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.attention_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
| self.attention = MyBertAttention(config) |
| self.ffn_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
| self.intermediate = MyBertIntermediate(config) |
| self.output = MyBertOutput(config) |
|
|
| def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): |
| normed = self.attention_layernorm(hidden_states) |
| attention_output = self.attention(normed, attention_mask, cos, sin) |
| hidden_states = hidden_states + attention_output |
| normed = self.ffn_layernorm(hidden_states) |
| intermediate_out = self.intermediate(normed) |
| layer_output = self.output(intermediate_out) |
| hidden_states = hidden_states + layer_output |
| return hidden_states |
|
|
|
|
| class MyBertEncoder(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.layer = nn.ModuleList([MyBertLayer(config) for _ in range(config.num_hidden_layers)]) |
| self.final_layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
| def forward(self, hidden_states, attention_mask=None, cos=None, sin=None): |
| for layer_module in self.layer: |
| hidden_states = layer_module(hidden_states, attention_mask, cos, sin) |
| return self.final_layernorm(hidden_states) |
|
|
|
|
| class MyBertPredictionHeadTransform(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| self.transform_act_fn = nn.GELU() |
| self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
| def forward(self, hidden_states): |
| hidden_states = self.dense(hidden_states) |
| hidden_states = self.transform_act_fn(hidden_states) |
| hidden_states = self.LayerNorm(hidden_states) |
| return hidden_states |
|
|
|
|
| class MyBertLMPredictionHead(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.transform = MyBertPredictionHeadTransform(config) |
| self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=True) |
|
|
| def forward(self, hidden_states): |
| hidden_states = self.transform(hidden_states) |
| hidden_states = self.decoder(hidden_states) |
| return hidden_states |
|
|
|
|
| class MyBertOnlyMLMHead(nn.Module): |
| def __init__(self, config): |
| super().__init__() |
| self.predictions = MyBertLMPredictionHead(config) |
|
|
| def forward(self, sequence_output): |
| return self.predictions(sequence_output) |
|
|
|
|
| class MyBertPreTrainedModel(PreTrainedModel): |
| config_class = MyBertConfig |
| base_model_prefix = "mybert" |
| supports_gradient_checkpointing = False |
| _no_split_modules = ["MyBertLayer"] |
|
|
| def _init_weights(self, module): |
| std = self.config.initializer_range |
| if isinstance(module, nn.Linear): |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.bias is not None: |
| module.bias.data.zero_() |
| elif isinstance(module, nn.Embedding): |
| module.weight.data.normal_(mean=0.0, std=std) |
| if module.padding_idx is not None: |
| module.weight.data[module.padding_idx].zero_() |
| elif isinstance(module, nn.LayerNorm): |
| module.bias.data.zero_() |
| module.weight.data.fill_(1.0) |
|
|
|
|
| class MyBertModel(MyBertPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.embeddings = MyBertEmbeddings(config) |
| self.encoder = MyBertEncoder(config) |
|
|
|
|
| self.post_init() |
|
|
| def get_input_embeddings(self): |
| return self.embeddings.word_embeddings |
|
|
| def set_input_embeddings(self, value): |
| self.embeddings.word_embeddings = value |
|
|
| def forward(self, input_ids=None, attention_mask=None, return_dict=True, **kwargs): |
| _, T = input_ids.shape |
| head_dim = self.config.hidden_size // self.config.num_attention_heads |
| cos, sin = _build_rope_cache(head_dim, T, self.config.rope_theta) |
| cos = cos.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype) |
| sin = sin.to(device=input_ids.device, dtype=self.embeddings.word_embeddings.weight.dtype) |
|
|
| attn_mask = None |
| if attention_mask is not None: |
| attn_mask = attention_mask.bool()[:, None, None, :] |
|
|
| hidden = self.embeddings(input_ids) |
| sequence_output = self.encoder(hidden, attn_mask, cos, sin) |
| if not return_dict: |
| return (sequence_output,) |
| return BaseModelOutput(last_hidden_state=sequence_output) |
|
|
|
|
| class MyBertForMaskedLM(MyBertPreTrainedModel): |
| _tied_weights_keys = { |
| "cls.predictions.decoder.weight": "mybert.embeddings.word_embeddings.weight", |
| } |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| self.mybert = MyBertModel(config) |
| self.cls = MyBertOnlyMLMHead(config) |
| self.post_init() |
|
|
| def get_output_embeddings(self): |
| return self.cls.predictions.decoder |
|
|
| def set_output_embeddings(self, new_embeddings): |
| self.cls.predictions.decoder = new_embeddings |
|
|
| def forward(self, input_ids=None, attention_mask=None, labels=None, return_dict=True, **kwargs): |
| outputs = self.mybert(input_ids=input_ids, attention_mask=attention_mask, return_dict=True) |
| sequence_output = outputs.last_hidden_state |
| prediction_scores = self.cls(sequence_output) |
|
|
| loss = None |
| if labels is not None: |
| loss = F.cross_entropy( |
| prediction_scores.view(-1, self.config.vocab_size), |
| labels.view(-1), |
| ignore_index=-100, |
| ) |
|
|
| if not return_dict: |
| output = (prediction_scores,) |
| return ((loss,) + output) if loss is not None else output |
|
|
| return MaskedLMOutput(loss=loss, logits=prediction_scores) |