| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ PyTorch ResNet model.""" |
|
|
| from typing import Optional |
|
|
| import torch |
| import torch.utils.checkpoint |
| from torch import Tensor, nn |
| from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss |
|
|
| from transformers.activations import ACT2FN |
| from transformers.modeling_outputs import ( |
| BackboneOutput, |
| BaseModelOutputWithNoAttention, |
| BaseModelOutputWithPoolingAndNoAttention, |
| ImageClassifierOutputWithNoAttention, |
| ) |
| from transformers.modeling_utils import BackboneMixin, PreTrainedModel |
| from transformers.utils import ( |
| add_code_sample_docstrings, |
| add_start_docstrings, |
| add_start_docstrings_to_model_forward, |
| logging, |
| replace_return_docstrings, |
| ) |
| from transformers import ResNetConfig |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
| |
| _CONFIG_FOR_DOC = "ResNetConfig" |
| _FEAT_EXTRACTOR_FOR_DOC = "AutoImageProcessor" |
|
|
| |
| _CHECKPOINT_FOR_DOC = "microsoft/resnet-50" |
| _EXPECTED_OUTPUT_SHAPE = [1, 2048, 7, 7] |
|
|
| |
| _IMAGE_CLASS_CHECKPOINT = "microsoft/resnet-50" |
| _IMAGE_CLASS_EXPECTED_OUTPUT = "tiger cat" |
|
|
| RESNET_PRETRAINED_MODEL_ARCHIVE_LIST = [ |
| "microsoft/resnet-50", |
| |
| ] |
|
|
|
|
| class ResNetConvLayer(nn.Module): |
| def __init__( |
| self, in_channels: int, out_channels: int, kernel_size: int = 3, stride: int = 1, activation: str = "relu" |
| ): |
| super().__init__() |
| self.convolution = nn.Conv2d( |
| in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=kernel_size // 2, bias=False |
| ) |
| self.normalization = nn.BatchNorm2d(out_channels) |
| self.activation = ACT2FN[activation] if activation is not None else nn.Identity() |
|
|
| def forward(self, input: Tensor) -> Tensor: |
| hidden_state = self.convolution(input) |
| hidden_state = self.normalization(hidden_state) |
| hidden_state = self.activation(hidden_state) |
| return hidden_state |
|
|
|
|
| class ResNetEmbeddings(nn.Module): |
| """ |
| ResNet Embeddings (stem) composed of a single aggressive convolution. |
| """ |
|
|
| def __init__(self, config: ResNetConfig): |
| super().__init__() |
| self.embedder = ResNetConvLayer( |
| config.num_channels, config.embedding_size, kernel_size=7, stride=2, activation=config.hidden_act |
| ) |
| self.pooler = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) |
| self.num_channels = config.num_channels |
|
|
| def forward(self, pixel_values: Tensor) -> Tensor: |
| num_channels = pixel_values.shape[1] |
| if num_channels != self.num_channels: |
| raise ValueError( |
| "Make sure that the channel dimension of the pixel values match with the one set in the configuration." |
| ) |
| embedding = self.embedder(pixel_values) |
| embedding = self.pooler(embedding) |
| return embedding |
|
|
|
|
| class ResNetShortCut(nn.Module): |
| """ |
| ResNet shortcut, used to project the residual features to the correct size. If needed, it is also used to |
| downsample the input using `stride=2`. |
| """ |
|
|
| def __init__(self, in_channels: int, out_channels: int, stride: int = 2): |
| super().__init__() |
| self.convolution = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False) |
| self.normalization = nn.BatchNorm2d(out_channels) |
|
|
| def forward(self, input: Tensor) -> Tensor: |
| hidden_state = self.convolution(input) |
| hidden_state = self.normalization(hidden_state) |
| return hidden_state |
|
|
|
|
| class ResNetBasicLayer(nn.Module): |
| """ |
| A classic ResNet's residual layer composed by two `3x3` convolutions. |
| """ |
|
|
| def __init__(self, in_channels: int, out_channels: int, stride: int = 1, activation: str = "relu"): |
| super().__init__() |
| should_apply_shortcut = in_channels != out_channels or stride != 1 |
| self.shortcut = ( |
| ResNetShortCut(in_channels, out_channels, stride=stride) if should_apply_shortcut else nn.Identity() |
| ) |
| self.layer = nn.Sequential( |
| ResNetConvLayer(in_channels, out_channels, stride=stride), |
| ResNetConvLayer(out_channels, out_channels, activation=None), |
| ) |
| self.activation = ACT2FN[activation] |
|
|
| def forward(self, hidden_state): |
| residual = hidden_state |
| hidden_state = self.layer(hidden_state) |
| residual = self.shortcut(residual) |
| hidden_state += residual |
| hidden_state = self.activation(hidden_state) |
| return hidden_state |
|
|
|
|
| class ResNetBottleNeckLayer(nn.Module): |
| """ |
| A classic ResNet's bottleneck layer composed by three `3x3` convolutions. |
| |
| The first `1x1` convolution reduces the input by a factor of `reduction` in order to make the second `3x3` |
| convolution faster. The last `1x1` convolution remaps the reduced features to `out_channels`. |
| """ |
|
|
| def __init__( |
| self, in_channels: int, out_channels: int, stride: int = 1, activation: str = "relu", reduction: int = 4 |
| ): |
| super().__init__() |
| should_apply_shortcut = in_channels != out_channels or stride != 1 |
| reduces_channels = out_channels // reduction |
| self.shortcut = ( |
| ResNetShortCut(in_channels, out_channels, stride=stride) if should_apply_shortcut else nn.Identity() |
| ) |
| self.layer = nn.Sequential( |
| ResNetConvLayer(in_channels, reduces_channels, kernel_size=1), |
| ResNetConvLayer(reduces_channels, reduces_channels, stride=stride), |
| ResNetConvLayer(reduces_channels, out_channels, kernel_size=1, activation=None), |
| ) |
| self.activation = ACT2FN[activation] |
|
|
| def forward(self, hidden_state): |
| residual = hidden_state |
| hidden_state = self.layer(hidden_state) |
| residual = self.shortcut(residual) |
| hidden_state += residual |
| hidden_state = self.activation(hidden_state) |
| return hidden_state |
|
|
|
|
| class ResNetStage(nn.Module): |
| """ |
| A ResNet stage composed by stacked layers. |
| """ |
|
|
| def __init__( |
| self, |
| config: ResNetConfig, |
| in_channels: int, |
| out_channels: int, |
| stride: int = 2, |
| depth: int = 2, |
| ): |
| super().__init__() |
|
|
| layer = ResNetBottleNeckLayer if config.layer_type == "bottleneck" else ResNetBasicLayer |
|
|
| self.layers = nn.Sequential( |
| |
| layer(in_channels, out_channels, stride=stride, activation=config.hidden_act), |
| *[layer(out_channels, out_channels, activation=config.hidden_act) for _ in range(depth - 1)], |
| ) |
|
|
| def forward(self, input: Tensor) -> Tensor: |
| hidden_state = input |
| for layer in self.layers: |
| hidden_state = layer(hidden_state) |
| hidden_state = hidden_state + 1 |
| print("having fun in my custom code") |
| return hidden_state |
|
|
|
|
| class ResNetEncoder(nn.Module): |
| def __init__(self, config: ResNetConfig): |
| super().__init__() |
| self.stages = nn.ModuleList([]) |
| |
| self.stages.append( |
| ResNetStage( |
| config, |
| config.embedding_size, |
| config.hidden_sizes[0], |
| stride=2 if config.downsample_in_first_stage else 1, |
| depth=config.depths[0], |
| ) |
| ) |
| in_out_channels = zip(config.hidden_sizes, config.hidden_sizes[1:]) |
| for (in_channels, out_channels), depth in zip(in_out_channels, config.depths[1:]): |
| self.stages.append(ResNetStage(config, in_channels, out_channels, depth=depth)) |
|
|
| def forward( |
| self, hidden_state: Tensor, output_hidden_states: bool = False, return_dict: bool = True |
| ) -> BaseModelOutputWithNoAttention: |
| hidden_states = () if output_hidden_states else None |
|
|
| for stage_module in self.stages: |
| if output_hidden_states: |
| hidden_states = hidden_states + (hidden_state,) |
|
|
| hidden_state = stage_module(hidden_state) |
|
|
| if output_hidden_states: |
| hidden_states = hidden_states + (hidden_state,) |
|
|
| if not return_dict: |
| return tuple(v for v in [hidden_state, hidden_states] if v is not None) |
|
|
| return BaseModelOutputWithNoAttention( |
| last_hidden_state=hidden_state, |
| hidden_states=hidden_states, |
| ) |
|
|
|
|
| class ResNetPreTrainedModel(PreTrainedModel): |
| """ |
| An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained |
| models. |
| """ |
|
|
| config_class = ResNetConfig |
| base_model_prefix = "resnet" |
| main_input_name = "pixel_values" |
| supports_gradient_checkpointing = True |
|
|
| def _init_weights(self, module): |
| if isinstance(module, nn.Conv2d): |
| nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") |
| elif isinstance(module, (nn.BatchNorm2d, nn.GroupNorm)): |
| nn.init.constant_(module.weight, 1) |
| nn.init.constant_(module.bias, 0) |
|
|
| def _set_gradient_checkpointing(self, module, value=False): |
| if isinstance(module, ResNetEncoder): |
| module.gradient_checkpointing = value |
|
|
|
|
| RESNET_START_DOCSTRING = r""" |
| This model is a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. Use it |
| as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage and |
| behavior. |
| |
| Parameters: |
| config ([`ResNetConfig`]): Model configuration class with all the parameters of the model. |
| Initializing with a config file does not load the weights associated with the model, only the |
| configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights. |
| """ |
|
|
| RESNET_INPUTS_DOCSTRING = r""" |
| Args: |
| pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`): |
| Pixel values. Pixel values can be obtained using [`AutoImageProcessor`]. See |
| [`AutoImageProcessor.__call__`] for details. |
| |
| output_hidden_states (`bool`, *optional*): |
| Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for |
| more detail. |
| return_dict (`bool`, *optional*): |
| Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. |
| """ |
|
|
|
|
| @add_start_docstrings( |
| "The bare ResNet model outputting raw features without any specific head on top.", |
| RESNET_START_DOCSTRING, |
| ) |
| class ResNetModel(ResNetPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.config = config |
| self.embedder = ResNetEmbeddings(config) |
| self.encoder = ResNetEncoder(config) |
| self.pooler = nn.AdaptiveAvgPool2d((1, 1)) |
| |
| self.post_init() |
|
|
| @add_start_docstrings_to_model_forward(RESNET_INPUTS_DOCSTRING) |
| @add_code_sample_docstrings( |
| processor_class=_FEAT_EXTRACTOR_FOR_DOC, |
| checkpoint=_CHECKPOINT_FOR_DOC, |
| output_type=BaseModelOutputWithPoolingAndNoAttention, |
| config_class=_CONFIG_FOR_DOC, |
| modality="vision", |
| expected_output=_EXPECTED_OUTPUT_SHAPE, |
| ) |
| def forward( |
| self, pixel_values: Tensor, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None |
| ) -> BaseModelOutputWithPoolingAndNoAttention: |
| output_hidden_states = ( |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
| ) |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| embedding_output = self.embedder(pixel_values) |
|
|
| encoder_outputs = self.encoder( |
| embedding_output, output_hidden_states=output_hidden_states, return_dict=return_dict |
| ) |
|
|
| last_hidden_state = encoder_outputs[0] |
|
|
| pooled_output = self.pooler(last_hidden_state) |
|
|
| if not return_dict: |
| return (last_hidden_state, pooled_output) + encoder_outputs[1:] |
|
|
| return BaseModelOutputWithPoolingAndNoAttention( |
| last_hidden_state=last_hidden_state, |
| pooler_output=pooled_output, |
| hidden_states=encoder_outputs.hidden_states, |
| ) |
|
|
|
|
| @add_start_docstrings( |
| """ |
| ResNet Model with an image classification head on top (a linear layer on top of the pooled features), e.g. for |
| ImageNet. |
| """, |
| RESNET_START_DOCSTRING, |
| ) |
| class ResNetCustomForImageClassification(ResNetPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
| self.num_labels = config.num_labels |
| self.resnet = ResNetModel(config) |
| |
| self.classifier = nn.Sequential( |
| nn.Flatten(), |
| nn.Linear(config.hidden_sizes[-1], config.num_labels) if config.num_labels > 0 else nn.Identity(), |
| ) |
| |
| self.post_init() |
|
|
| @add_start_docstrings_to_model_forward(RESNET_INPUTS_DOCSTRING) |
| @add_code_sample_docstrings( |
| processor_class=_FEAT_EXTRACTOR_FOR_DOC, |
| checkpoint=_IMAGE_CLASS_CHECKPOINT, |
| output_type=ImageClassifierOutputWithNoAttention, |
| config_class=_CONFIG_FOR_DOC, |
| expected_output=_IMAGE_CLASS_EXPECTED_OUTPUT, |
| ) |
| def forward( |
| self, |
| pixel_values: Optional[torch.FloatTensor] = None, |
| labels: Optional[torch.LongTensor] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| ) -> ImageClassifierOutputWithNoAttention: |
| r""" |
| labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): |
| Labels for computing the image classification/regression loss. Indices should be in `[0, ..., |
| config.num_labels - 1]`. If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
| """ |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
| outputs = self.resnet(pixel_values, output_hidden_states=output_hidden_states, return_dict=return_dict) |
|
|
| pooled_output = outputs.pooler_output if return_dict else outputs[1] |
|
|
| logits = self.classifier(pooled_output) |
|
|
| loss = None |
|
|
| if labels is not None: |
| if self.config.problem_type is None: |
| if self.num_labels == 1: |
| self.config.problem_type = "regression" |
| elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): |
| self.config.problem_type = "single_label_classification" |
| else: |
| self.config.problem_type = "multi_label_classification" |
| if self.config.problem_type == "regression": |
| loss_fct = MSELoss() |
| if self.num_labels == 1: |
| loss = loss_fct(logits.squeeze(), labels.squeeze()) |
| else: |
| loss = loss_fct(logits, labels) |
| elif self.config.problem_type == "single_label_classification": |
| loss_fct = CrossEntropyLoss() |
| loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) |
| elif self.config.problem_type == "multi_label_classification": |
| loss_fct = BCEWithLogitsLoss() |
| loss = loss_fct(logits, labels) |
|
|
| if not return_dict: |
| output = (logits,) + outputs[2:] |
| return (loss,) + output if loss is not None else output |
|
|
| return ImageClassifierOutputWithNoAttention(loss=loss, logits=logits, hidden_states=outputs.hidden_states) |
|
|
|
|
| @add_start_docstrings( |
| """ |
| ResNet backbone, to be used with frameworks like DETR and MaskFormer. |
| """, |
| RESNET_START_DOCSTRING, |
| ) |
| class ResNetBackbone(ResNetPreTrainedModel, BackboneMixin): |
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.stage_names = config.stage_names |
| self.embedder = ResNetEmbeddings(config) |
| self.encoder = ResNetEncoder(config) |
|
|
| self.out_features = config.out_features if config.out_features is not None else [self.stage_names[-1]] |
|
|
| out_feature_channels = {} |
| out_feature_channels["stem"] = config.embedding_size |
| for idx, stage in enumerate(self.stage_names[1:]): |
| out_feature_channels[stage] = config.hidden_sizes[idx] |
|
|
| self.out_feature_channels = out_feature_channels |
|
|
| |
| self.post_init() |
|
|
| @property |
| def channels(self): |
| return [self.out_feature_channels[name] for name in self.out_features] |
|
|
| @add_start_docstrings_to_model_forward(RESNET_INPUTS_DOCSTRING) |
| @replace_return_docstrings(output_type=BackboneOutput, config_class=_CONFIG_FOR_DOC) |
| def forward( |
| self, pixel_values: Tensor, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None |
| ) -> BackboneOutput: |
| """ |
| Returns: |
| |
| Examples: |
| |
| ```python |
| >>> from transformers import AutoImageProcessor, AutoBackbone |
| >>> import torch |
| >>> from PIL import Image |
| >>> import requests |
| |
| >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" |
| >>> image = Image.open(requests.get(url, stream=True).raw) |
| |
| >>> processor = AutoImageProcessor.from_pretrained("microsoft/resnet-50") |
| >>> model = AutoBackbone.from_pretrained( |
| ... "microsoft/resnet-50", out_features=["stage1", "stage2", "stage3", "stage4"] |
| ... ) |
| |
| >>> inputs = processor(image, return_tensors="pt") |
| |
| >>> outputs = model(**inputs) |
| >>> feature_maps = outputs.feature_maps |
| >>> list(feature_maps[-1].shape) |
| [1, 2048, 7, 7] |
| ```""" |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| output_hidden_states = ( |
| output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
| ) |
|
|
| embedding_output = self.embedder(pixel_values) |
|
|
| outputs = self.encoder(embedding_output, output_hidden_states=True, return_dict=True) |
|
|
| hidden_states = outputs.hidden_states |
|
|
| feature_maps = () |
| for idx, stage in enumerate(self.stage_names): |
| if stage in self.out_features: |
| feature_maps += (hidden_states[idx],) |
|
|
| if not return_dict: |
| output = (feature_maps,) |
| if output_hidden_states: |
| output += (outputs.hidden_states,) |
| return output |
|
|
| return BackboneOutput( |
| feature_maps=feature_maps, |
| hidden_states=outputs.hidden_states if output_hidden_states else None, |
| attentions=None, |
| ) |
|
|
|
|