| | |
| | import math |
| | import numbers |
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | import torchvision.transforms as transforms |
| | import numpy as np |
| | import torchvision.datasets as datasets |
| | from util.feature_extraction_utils import feature_extractor |
| | from backbone.model_irse import IR_50, IR_152 |
| | from backbone.model_resnet import ResNet_50, ResNet_152 |
| |
|
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | tensor_transform = transforms.ToTensor() |
| | pil_transform = transforms.ToPILImage() |
| |
|
| |
|
| | class ImageFolderWithPaths(datasets.ImageFolder): |
| | """Custom dataset that includes image file paths. Extends |
| | torchvision.datasets.ImageFolder |
| | """ |
| |
|
| | |
| | def __getitem__(self, index): |
| | |
| | original_tuple = super(ImageFolderWithPaths, self).__getitem__(index) |
| | |
| | path = self.imgs[index][0] |
| | |
| | tuple_with_path = original_tuple + (path,) |
| | return tuple_with_path |
| |
|
| |
|
| | class GaussianSmoothing(nn.Module): |
| | """ |
| | Apply gaussian smoothing on a |
| | 1d, 2d or 3d tensor. Filtering is performed seperately for each channel |
| | in the input using a depthwise convolution. |
| | Arguments: |
| | channels (int, sequence): Number of channels of the input tensors. Output will |
| | have this number of channels as well. |
| | kernel_size (int, sequence): Size of the gaussian kernel. |
| | sigma (float, sequence): Standard deviation of the gaussian kernel. |
| | dim (int, optional): The number of dimensions of the data. |
| | Default value is 2 (spatial). |
| | """ |
| |
|
| | def __init__(self, channels, kernel_size, sigma, dim=2): |
| | super(GaussianSmoothing, self).__init__() |
| | if isinstance(kernel_size, numbers.Number): |
| | kernel_size = [kernel_size] * dim |
| | if isinstance(sigma, numbers.Number): |
| | sigma = [sigma] * dim |
| |
|
| | |
| | |
| | kernel = 1 |
| | meshgrids = torch.meshgrid( |
| | [torch.arange(size, dtype=torch.float32) for size in kernel_size] |
| | ) |
| | for size, std, mgrid in zip(kernel_size, sigma, meshgrids): |
| | mean = (size - 1) / 2 |
| | kernel *= ( |
| | 1 |
| | / (std * math.sqrt(2 * math.pi)) |
| | * torch.exp(-(((mgrid - mean) / std) ** 2) / 2) |
| | ) |
| |
|
| | |
| | kernel = kernel / torch.sum(kernel) |
| |
|
| | |
| | kernel = kernel.view(1, 1, *kernel.size()) |
| | kernel = kernel.repeat(channels, *[1] * (kernel.dim() - 1)) |
| |
|
| | self.register_buffer("weight", kernel) |
| | self.groups = channels |
| |
|
| | if dim == 1: |
| | self.conv = F.conv1d |
| | elif dim == 2: |
| | self.conv = F.conv2d |
| | elif dim == 3: |
| | self.conv = F.conv3d |
| | else: |
| | raise RuntimeError( |
| | "Only 1, 2 and 3 dimensions are supported. Received {}.".format(dim) |
| | ) |
| | self.pad_size = int(kernel_size[0] / 2) |
| |
|
| | def forward(self, input): |
| | """ |
| | Apply gaussian filter to input. |
| | Arguments: |
| | input (torch.Tensor): Input to apply gaussian filter on. |
| | Returns: |
| | filtered (torch.Tensor): Filtered output. |
| | """ |
| | input = F.pad( |
| | input, |
| | (self.pad_size, self.pad_size, self.pad_size, self.pad_size), |
| | mode="reflect", |
| | ) |
| | return self.conv(input, weight=self.weight, groups=self.groups) |
| |
|
| |
|
| | class dim_reduction(nn.Module): |
| | def __init__(self, V): |
| | super(dim_reduction, self).__init__() |
| | self.V = V |
| |
|
| | def forward(self, input): |
| | return torch.matmul(input, self.V.to(input.device)) |
| |
|
| |
|
| | def get_ensemble( |
| | models, |
| | sigma_gf, |
| | kernel_size_gf, |
| | combination, |
| | V_reduction, |
| | warp=False, |
| | theta_warp=None, |
| | ): |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
|
| | |
| | |
| | feature_extractor_ensemble = [] |
| | if sigma_gf is not None: |
| | |
| | gaussian_filtering = GaussianSmoothing(3, kernel_size_gf, sigma_gf) |
| | if V_reduction is None: |
| | for model in models: |
| | feature_extractor_model = nn.DataParallel( |
| | nn.Sequential( |
| | gaussian_filtering, |
| | feature_extractor( |
| | model=model, warp=warp, theta_warp=theta_warp |
| | ), |
| | ) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| | if combination: |
| | feature_extractor_model = nn.DataParallel( |
| | feature_extractor(model=model, warp=warp, theta_warp=theta_warp) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| |
|
| | else: |
| | for i, model in enumerate(models): |
| | feature_extractor_model = nn.DataParallel( |
| | nn.Sequential( |
| | gaussian_filtering, |
| | feature_extractor( |
| | model=model, warp=warp, theta_warp=theta_warp |
| | ), |
| | dim_reduction(V_reduction[i]), |
| | ) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| | if combination: |
| | feature_extractor_model = nn.DataParallel( |
| | nn.Sequential( |
| | feature_extractor( |
| | model=model, warp=warp, theta_warp=theta_warp |
| | ), |
| | dim_reduction(V_reduction[i]), |
| | ) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| |
|
| | else: |
| | if V_reduction is None: |
| | for model in models: |
| | feature_extractor_model = nn.DataParallel( |
| | feature_extractor(model=model, warp=warp, theta_warp=theta_warp) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| | else: |
| | for i, model in enumerate(models): |
| | feature_extractor_model = nn.DataParallel( |
| | nn.Sequential( |
| | feature_extractor( |
| | model=model, warp=warp, theta_warp=theta_warp |
| | ), |
| | dim_reduction(V_reduction[i]), |
| | ) |
| | ).to(device) |
| | feature_extractor_ensemble.append(feature_extractor_model) |
| |
|
| | return feature_extractor_ensemble |
| |
|
| |
|
| | def extract_features(imgs, feature_extractor_ensemble, dim): |
| | |
| |
|
| | features = torch.zeros(imgs.shape[0], len(feature_extractor_ensemble), dim) |
| | for i, feature_extractor_model in enumerate(feature_extractor_ensemble): |
| | |
| | features_model = feature_extractor_model(imgs) |
| | features[:, i, :] = features_model |
| |
|
| | return features |
| |
|
| |
|
| | def prepare_models( |
| | model_backbones, |
| | input_size, |
| | model_roots, |
| | kernel_size_attack, |
| | sigma_attack, |
| | combination, |
| | using_subspace, |
| | V_reduction_root, |
| | ): |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
|
| | backbone_dict = { |
| | "IR_50": IR_50(input_size), |
| | "IR_152": IR_152(input_size), |
| | "ResNet_50": ResNet_50(input_size), |
| | "ResNet_152": ResNet_152(input_size), |
| | } |
| |
|
| | print("Loading Attack Backbone Checkpoint '{}'".format(model_roots)) |
| | print("=" * 20) |
| |
|
| | models_attack = [] |
| | for i in range(len(model_backbones)): |
| | model = backbone_dict[model_backbones[i]] |
| | state_dict = torch.hub.load_state_dict_from_url( |
| | model_roots[i], map_location=device, progress=True |
| | ) |
| | model.load_state_dict(state_dict) |
| | models_attack.append(model) |
| |
|
| | if using_subspace: |
| | V_reduction = [] |
| | for i in range(len(model_backbones)): |
| | V_reduction.append(torch.tensor(np.load(V_reduction_root[i]))) |
| |
|
| | dim = V_reduction[0].shape[1] |
| | else: |
| | V_reduction = None |
| | dim = 512 |
| |
|
| | return models_attack, V_reduction, dim |
| |
|
| |
|
| | def prepare_data( |
| | query_data_root, target_data_root, freq, batch_size, warp=False, theta_warp=None |
| | ): |
| | data = datasets.ImageFolder(query_data_root, tensor_transform) |
| |
|
| | subset_query = list(range(0, len(data), freq)) |
| | subset_gallery = [x for x in list(range(0, len(data))) if x not in subset_query] |
| | query_set = torch.utils.data.Subset(data, subset_query) |
| | gallery_set = torch.utils.data.Subset(data, subset_gallery) |
| |
|
| | if target_data_root is not None: |
| | target_data = datasets.ImageFolder(target_data_root, tensor_transform) |
| | target_loader = torch.utils.data.DataLoader(target_data, batch_size=batch_size) |
| | else: |
| | target_loader = None |
| |
|
| | query_loader = torch.utils.data.DataLoader(query_set, batch_size=batch_size) |
| | gallery_loader = torch.utils.data.DataLoader(gallery_set, batch_size=batch_size) |
| |
|
| | return query_loader, gallery_loader, target_loader |
| |
|
| |
|
| | def prepare_dir_vec(dir_vec_extractor, imgs, dim, combination): |
| | dir_vec = extract_features(imgs, dir_vec_extractor, dim).detach().cpu() |
| | if combination: |
| | dir_vec = torch.repeat_interleave(dir_vec, 2, 1) |
| | return dir_vec |
| |
|