| | """The Sampeling class serve as a helper module for retriving subject model data""" |
| | from abc import ABC, abstractmethod |
| |
|
| | import os |
| | import gc |
| | import time |
| |
|
| | import numpy as np |
| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | from singleVis.utils import * |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from scipy.special import softmax |
| | import torch |
| | from torch import nn |
| | from torch.nn import functional as F |
| |
|
| | class VAE(nn.Module): |
| | def __init__(self, input_dim, hidden_dim, latent_dim): |
| | super(VAE, self).__init__() |
| | self.fc1 = nn.Linear(input_dim, hidden_dim) |
| | self.fc21 = nn.Linear(hidden_dim, latent_dim) |
| | self.fc22 = nn.Linear(hidden_dim, latent_dim) |
| | self.fc3 = nn.Linear(latent_dim, hidden_dim) |
| | self.fc4 = nn.Linear(hidden_dim, input_dim) |
| |
|
| | def encode(self, x): |
| | h1 = F.relu(self.fc1(x)) |
| | return self.fc21(h1), self.fc22(h1) |
| |
|
| | def reparameterize(self, mu, logvar): |
| | std = torch.exp(0.5*logvar) |
| | eps = torch.randn_like(std) |
| | return mu + eps*std |
| |
|
| | def decode(self, z): |
| | h3 = F.relu(self.fc3(z)) |
| | return torch.sigmoid(self.fc4(h3)) |
| |
|
| | def forward(self, x): |
| | mu, logvar = self.encode(x.view(-1, 512)) |
| | z = self.reparameterize(mu, logvar) |
| | return self.decode(z), mu, logvar |
| |
|
| |
|
| | """ |
| | DataContainder module |
| | 1. calculate information entropy for singel sample and subset |
| | 2. sample informative subset |
| | """ |
| | class DataGenerationAbstractClass(ABC): |
| | |
| | def __init__(self, data_provider, epoch): |
| | self.mode = "abstract" |
| | self.data_provider = data_provider |
| | |
| | self.epoch = epoch |
| | |
| | |
| | |
| | |
| |
|
| | class DataGeneration(DataGenerationAbstractClass): |
| | def __init__(self, model, data_provider, epoch, device): |
| | self.data_provider = data_provider |
| | self.model = model |
| | |
| | self.epoch = epoch |
| | self.DEVICE = device |
| | |
| | def generate_adversarial_example(self,input_data, target,epsilon): |
| | self.model.to(self.DEVICE) |
| | self.model.eval() |
| | |
| | input_data.requires_grad = True |
| |
|
| | target = target.to(self.DEVICE) |
| |
|
| | |
| | output = self.model(input_data) |
| | loss_function = nn.CrossEntropyLoss() |
| |
|
| | target = target.expand(input_data.size(0)) |
| | loss = loss_function(output, target) |
| |
|
| |
|
| | """calculate the input data's graint of the loss function """ |
| | self.model.zero_grad() |
| | loss.backward() |
| | gradient = input_data.grad.data |
| |
|
| | |
| | adversarial_example = input_data + epsilon * gradient.sign() |
| |
|
| | return adversarial_example |
| |
|
| | def gen(self,epsilon=0.2,sample_ratio=0.1): |
| | labels = self.data_provider.train_labels(self.epoch) |
| | |
| |
|
| | training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| | training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| | map_location="cpu") |
| | training_data = training_data.to(self.DEVICE) |
| |
|
| | sample_ratio = sample_ratio |
| | adversarial_samples = [] |
| | epsilon = epsilon |
| |
|
| | for label in range(10): |
| | indices = np.where(labels == label)[0] |
| | sample_size = int(len(indices) * sample_ratio) |
| | sampled_indices = np.random.choice(indices, size=sample_size, replace=False) |
| | sampled_data = torch.Tensor(training_data[sampled_indices]) |
| | print("sampeled data:{}".format(len(sampled_data))) |
| | for i in range(10): |
| | if i == label: |
| | continue |
| | target_label = i |
| | |
| | target = torch.tensor([target_label]) |
| | adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon) |
| | print("generating class {} 's adversary sampes for target{}, num of adv{}".format(label,i,len(adversarial_example))) |
| | adversarial_samples.extend(adversarial_example) |
| | |
| | repr_model = self.feature_function(self.epoch) |
| | adversarial_samples_torch = torch.stack(adversarial_samples) |
| | print("adversarial_samples_torch", adversarial_samples_torch.shape) |
| | data_representation = batch_run(repr_model,adversarial_samples_torch) |
| |
|
| | np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.epoch), "adv_representation.npy"),data_representation ) |
| |
|
| | return adversarial_samples,data_representation |
| | |
| | def gen_specific_class_adv(self,epsilon=0.2,sample_ratio=0.1,from_label=1,target_label=2): |
| | labels = self.data_provider.train_labels(self.epoch) |
| | |
| |
|
| | training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| | training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| | map_location="cpu") |
| | training_data = training_data.to(self.DEVICE) |
| |
|
| | sample_ratio = sample_ratio |
| | adversarial_samples = [] |
| | epsilon = epsilon |
| |
|
| | |
| | indices = np.where(labels == from_label)[0] |
| | sample_size = int(len(indices) * sample_ratio) |
| | sampled_indices = np.random.choice(indices, size=sample_size, replace=False) |
| | sampled_data = torch.Tensor(training_data[sampled_indices]) |
| | print("sampeled data:{}".format(len(sampled_data))) |
| | |
| | target_label = target_label |
| | |
| | target = torch.tensor([target_label]) |
| | adversarial_example = self.generate_adversarial_example(sampled_data, target, epsilon) |
| | print("generating class {} 's adversary sampes for target{}, num of adv{}".format(from_label,target_label,len(adversarial_example))) |
| | adversarial_samples.extend(adversarial_example) |
| | |
| | repr_model = self.feature_function(self.epoch) |
| | adversarial_samples_torch = torch.stack(adversarial_samples) |
| | print("adversarial_samples_torch", adversarial_samples_torch.shape) |
| | data_representation = batch_run(repr_model,adversarial_samples_torch) |
| |
|
| | return adversarial_samples,data_representation |
| | |
| | |
| | def feature_function(self, epoch): |
| | model_path = os.path.join(self.data_provider.content_path, "Model") |
| | model_location = os.path.join(model_path, "{}_{:d}".format('Epoch', epoch), "subject_model.pth") |
| | self.model.load_state_dict(torch.load(model_location, map_location=torch.device("cpu"))) |
| | self.model = self.model.to(self.DEVICE) |
| | self.model.eval() |
| |
|
| | fea_fn = self.model.feature |
| | return fea_fn |
| |
|
| | def vae_loss(self,recon_x, x, mu, logvar): |
| | BCE = F.binary_cross_entropy(recon_x, x.view(-1, 512), reduction='sum') |
| | KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) |
| | return BCE + KLD |
| | |
| | def generate_by_VAE(self): |
| | train_data = self.data_provider.train_representation(self.epoch) |
| | data_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True) |
| | vae = VAE(512, 256, 2).to(self.data_provider.DEVICE) |
| | optimizer = optim.Adam(vae.parameters()) |
| | |
| |
|
| | vae.train() |
| | num_epochs = 20 |
| |
|
| | for epoch in range(num_epochs): |
| | for i, data in enumerate(data_loader): |
| |
|
| | data = data.to(self.data_provider.DEVICE) |
| | optimizer.zero_grad() |
| |
|
| | recon_batch, mu, logvar = vae(data) |
| |
|
| | loss = self.vae_loss(recon_batch, data, mu, logvar) |
| |
|
| | loss.backward() |
| | optimizer.step() |
| |
|
| | print(f'Epoch {epoch}, Loss: {loss.item()}') |
| | |
| |
|
| | with torch.no_grad(): |
| | mu, _ = vae.encode(torch.Tensor(train_data).to(self.data_provider.DEVICE)) |
| | mu = mu.cpu().numpy() |
| |
|
| | ebd_min = np.min(mu, axis=0) |
| | ebd_max = np.max(mu, axis=0) |
| | ebd_extent = ebd_max - ebd_min |
| | x_min, y_min = ebd_min - 0.02 * ebd_extent |
| | x_max, y_max = ebd_max + 0.02 * ebd_extent |
| | x_min = min(x_min, y_min) |
| | y_min = min(x_min, y_min) |
| | x_max = max(x_max, y_max) |
| | y_max = max(x_max, y_max) |
| |
|
| | num_points =100 |
| | x_values = np.linspace(x_min, x_max, num_points) |
| | y_values = np.linspace(y_min, y_max, num_points) |
| | x_grid, y_grid = np.meshgrid(x_values, y_values) |
| | z_grid = np.column_stack([x_grid.flat, y_grid.flat]) |
| |
|
| |
|
| |
|
| | with torch.no_grad(): |
| | z = torch.tensor(z_grid).to(self.data_provider.DEVICE).float() |
| | samples = vae.decode(z) |
| |
|
| | |
| | return samples |
| |
|
| | |
| | def interpolate_samples(self, sample1, sample2, t): |
| | return t * sample1 + (1 - t) * sample2 |
| |
|
| | def select_samples_from_different_classes(self, X, labels): |
| | classes = np.unique(labels) |
| | selected_samples = [] |
| | for i in range(len(classes)-1): |
| | for j in range(i+1, len(classes)): |
| | samples_class_i = X[labels == classes[i]] |
| | samples_class_j = X[labels == classes[j]] |
| | sample1 = samples_class_i[np.random.choice(samples_class_i.shape[0])] |
| | sample2 = samples_class_j[np.random.choice(samples_class_j.shape[0])] |
| | selected_samples.append((sample1, sample2)) |
| | return selected_samples |
| | def get_conf(self, epoch, interpolated_X): |
| | predctions = self.data_provider.get_pred(epoch, interpolated_X) |
| | scores = np.amax(softmax(predctions, axis=1), axis=1) |
| | return scores |
| |
|
| | def generate_interpolated_samples(self, X, labels, get_conf, num_interpolations_per_bin): |
| | selected_samples = self.select_samples_from_different_classes(X, labels) |
| |
|
| | |
| | confidence_bins = np.linspace(0.5, 1, 6)[1:-1] |
| | |
| | interpolated_X = {bin: [] for bin in confidence_bins} |
| |
|
| | |
| | while min([len(samples) for samples in interpolated_X.values()]) < num_interpolations_per_bin: |
| | batch_samples = [] |
| | for _ in range(100): |
| | |
| | sample1, sample2 = selected_samples[np.random.choice(len(selected_samples))] |
| | t = np.random.rand() |
| | interpolated_sample = self.interpolate_samples(sample1, sample2, t) |
| | batch_samples.append(interpolated_sample) |
| |
|
| | |
| | confidences = get_conf(self.iteration, np.array(batch_samples)) |
| | for i, confidence in enumerate(confidences): |
| | for bin in confidence_bins: |
| | if confidence < bin: |
| | interpolated_X[bin].append(batch_samples[i]) |
| | |
| | break |
| |
|
| | return interpolated_X |
| | |
| | def inter_gen(self,num_pairs=2000): |
| | train_data = self.data_provider.train_representation |
| | labels = self.data_provider.train_labels |
| | num_pairs = num_pairs |
| | interpolated_X_div = self.generate_interpolated_samples(train_data,labels,self.get_conf,num_pairs) |
| | confidence_bins = np.linspace(0.5, 1, 6)[1:-1] |
| | interpolated_X = np.concatenate([np.array(interpolated_X_div[bin]) for bin in confidence_bins]) |
| |
|
| | np.save(os.path.join(self.data_provider.content_path, "Model", "Epoch_{}".format(self.iteration),"interpolated_X.npy"), interpolated_X) |
| | return interpolated_X |
| | |
| |
|
| | |
| | def gen_more_boundary_mixed_up(self,l_bound=0.6,num_adv_eg=6000,name='border_centers_1.npy'): |
| |
|
| | training_data_path = os.path.join(self.data_provider.content_path, "Training_data") |
| | training_data = torch.load(os.path.join(training_data_path, "training_dataset_data.pth"), |
| | map_location="cpu") |
| | training_data = training_data.to(self.DEVICE) |
| |
|
| | self.model = self.model.to(self.DEVICE) |
| | confs = batch_run(self.model, training_data) |
| | preds = np.argmax(confs, axis=1).squeeze() |
| |
|
| | repr_model = self.feature_function(self.epoch) |
| | print("border_points generating...") |
| | |
| | border_points, _, _ = get_border_points(model=self.model, input_x=training_data, confs=confs, predictions=preds, device=self.DEVICE, l_bound=l_bound, num_adv_eg=num_adv_eg, lambd=0.05, verbose=0) |
| |
|
| | |
| | border_points = border_points.to(self.DEVICE) |
| | border_centers = batch_run(repr_model, border_points) |
| | model_path = os.path.join(self.data_provider.content_path, "Model") |
| | location = os.path.join(model_path, "Epoch_{:d}".format(self.epoch), name) |
| | print("border_points saving...") |
| | np.save(location, border_centers) |
| |
|
| | return border_centers |
| | |
| | def get_near_epoch_border(self,n_epoch): |
| |
|
| | model_path = os.path.join(self.data_provider.content_path, "Model") |
| | location = os.path.join(model_path, "Epoch_{:d}".format(n_epoch), "ori_border_centers.npy") |
| | border_points = np.load(location) |
| | border_points = torch.Tensor(border_points) |
| | border_points = border_points.to(self.DEVICE) |
| | repr_model = self.feature_function(self.epoch) |
| | border_centers = batch_run(repr_model, border_points) |
| | |
| | return border_centers |
| |
|
| | |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| |
|