| | import torch |
| | import torch.nn as nn |
| | from typing import Optional, Tuple, Union |
| |
|
| | from transformers.modeling_utils import PreTrainedModel |
| | from transformers.utils import logging |
| |
|
| | from configuration_pdeeppp import PDeepPPConfig |
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| | class SelfAttentionGlobalFeatures(nn.Module): |
| | def __init__(self, config): |
| | super().__init__() |
| | self.self_attention = nn.MultiheadAttention( |
| | embed_dim=config.input_size, |
| | num_heads=config.num_heads, |
| | batch_first=True |
| | ) |
| | self.fc1 = nn.Linear(config.input_size, config.hidden_size) |
| | self.fc2 = nn.Linear(config.hidden_size, config.output_size) |
| | self.layer_norm = nn.LayerNorm(config.input_size) |
| | self.dropout = nn.Dropout(config.dropout) |
| |
|
| | def forward(self, x): |
| | attn_output, _ = self.self_attention(x, x, x) |
| | x = self.layer_norm(x + attn_output) |
| | x = self.fc1(x) |
| | x = self.dropout(x) |
| | x = self.fc2(x) |
| | return x |
| |
|
| | class TransConv1d(nn.Module): |
| | def __init__(self, config): |
| | super().__init__() |
| | self.self_attention_global_features = SelfAttentionGlobalFeatures(config) |
| | self.transformer_encoder = nn.TransformerEncoderLayer( |
| | d_model=config.output_size, |
| | nhead=config.num_heads, |
| | dim_feedforward=config.hidden_size*2, |
| | dropout=config.dropout, |
| | batch_first=True |
| | ) |
| | self.transformer = nn.TransformerEncoder( |
| | self.transformer_encoder, |
| | num_layers=config.num_transformer_layers |
| | ) |
| | self.fc1 = nn.Linear(config.output_size, config.output_size) |
| | self.fc2 = nn.Linear(config.output_size, config.output_size) |
| | self.layer_norm = nn.LayerNorm(config.output_size) |
| |
|
| | def forward(self, x): |
| | x = self.self_attention_global_features(x) |
| | residual = x |
| | x = self.transformer(x) |
| | x = self.fc1(x) |
| | residual = x |
| | x = self.fc2(x) |
| | x = self.layer_norm(x + residual) |
| | return x |
| |
|
| | class PosCNN(nn.Module): |
| | def __init__(self, config, use_position_encoding=True): |
| | super().__init__() |
| | self.use_position_encoding = use_position_encoding |
| | self.conv1d = nn.Conv1d( |
| | in_channels=config.input_size, |
| | out_channels=64, |
| | kernel_size=3, |
| | padding=1 |
| | ) |
| | self.relu = nn.ReLU() |
| | self.global_pooling = nn.AdaptiveAvgPool1d(1) |
| | self.fc = nn.Linear(64, config.output_size) |
| | |
| | if self.use_position_encoding: |
| | self.position_encoding = nn.Parameter(torch.zeros(64, config.input_size)) |
| |
|
| | def forward(self, x): |
| | x = x.permute(0, 2, 1) |
| | x = self.conv1d(x) |
| | x = self.relu(x) |
| | |
| | if self.use_position_encoding: |
| | seq_len = x.size(2) |
| | pos_encoding = self.position_encoding[:, :seq_len].unsqueeze(0) |
| | x = x + pos_encoding |
| | |
| | x = self.global_pooling(x) |
| | x = x.squeeze(-1) |
| | x = self.fc(x) |
| | return x |
| |
|
| | class PDeepPPPreTrainedModel(PreTrainedModel): |
| | """ |
| | 抽象基类,包含所有PDeepPP模型所需的方法 |
| | """ |
| | config_class = PDeepPPConfig |
| | base_model_prefix = "PDeepPP" |
| | supports_gradient_checkpointing = True |
| | |
| | def _init_weights(self, module): |
| | """初始化权重""" |
| | if isinstance(module, nn.Linear): |
| | module.weight.data.normal_(mean=0.0, std=0.02) |
| | if module.bias is not None: |
| | module.bias.data.zero_() |
| | elif isinstance(module, nn.LayerNorm): |
| | module.bias.data.zero_() |
| | module.weight.data.fill_(1.0) |
| |
|
| | class PDeepPPModel(PDeepPPPreTrainedModel): |
| | def __init__(self, config): |
| | super().__init__(config) |
| | self.config = config |
| | |
| | self.transformer = TransConv1d(config) |
| | self.cnn = PosCNN(config) |
| | self.cnn_layers = nn.Sequential( |
| | nn.Conv1d(config.output_size*2, 32, kernel_size=3, padding=1), |
| | nn.ReLU(), |
| | nn.AdaptiveMaxPool1d(1), |
| | nn.Dropout(config.dropout/2), |
| | nn.Conv1d(32, 64, kernel_size=3, padding=1), |
| | nn.ReLU(), |
| | nn.AdaptiveMaxPool1d(1), |
| | nn.Dropout(config.dropout/2), |
| | nn.Flatten(), |
| | nn.Linear(64, 1) |
| | ) |
| | |
| | |
| | self.post_init() |
| |
|
| | def forward( |
| | self, |
| | input_embeds=None, |
| | labels=None, |
| | return_dict=None, |
| | ): |
| | r""" |
| | labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| | Labels for computing the classification loss. |
| | |
| | Returns: |
| | dict or tuple: 根据return_dict参数返回不同格式的结果 |
| | """ |
| | return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| | |
| | transformer_output = self.transformer(input_embeds) |
| | cnn_output = self.cnn(input_embeds) |
| | cnn_output = cnn_output.unsqueeze(1).expand(-1, transformer_output.size(1), -1) |
| | combined = torch.cat([transformer_output, cnn_output], dim=2) |
| | combined = combined.permute(0, 2, 1) |
| | logits = self.cnn_layers(combined).squeeze(1) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fct = nn.BCEWithLogitsLoss() |
| | loss = loss_fct(logits, labels.float()) |
| | |
| | |
| | probs = torch.sigmoid(logits) |
| | ent = -(probs*torch.log(probs+1e-12) + |
| | (1-probs)*torch.log(1-probs+1e-12)).mean() |
| | cond_ent = -(probs*torch.log(probs+1e-12)).mean() |
| | reg_loss = self.config.lambda_ * ent - self.config.lambda_ * cond_ent |
| | |
| | loss = self.config.lambda_ * loss + (1 - self.config.lambda_) * reg_loss |
| | |
| | if return_dict: |
| | return { |
| | "loss": loss, |
| | "logits": logits, |
| | } |
| | else: |
| | return (loss, logits) if loss is not None else logits |
| | |
| | PDeepPPModel.register_for_auto_class("AutoModel") |