| | import torch |
| | import torch.nn as nn |
| | from torch.nn import init |
| | import functools |
| | from torch.optim import lr_scheduler |
| | import numpy as np |
| | import torch.nn.functional as F |
| | from torch.nn.modules.normalization import LayerNorm |
| | import os |
| | from torch.nn.utils import spectral_norm |
| | from torchvision import models |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def init_weights(net, init_type='normal', init_gain=0.02): |
| | """Initialize network weights. |
| | Parameters: |
| | net (network) -- network to be initialized |
| | init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal |
| | init_gain (float) -- scaling factor for normal, xavier and orthogonal. |
| | We use 'normal' in the original pix2pix and CycleGAN paper. But xavier and kaiming might |
| | work better for some applications. Feel free to try yourself. |
| | """ |
| | def init_func(m): |
| | classname = m.__class__.__name__ |
| | if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1): |
| | if init_type == 'normal': |
| | init.normal_(m.weight.data, 0.0, init_gain) |
| | elif init_type == 'xavier': |
| | init.xavier_normal_(m.weight.data, gain=init_gain) |
| | elif init_type == 'kaiming': |
| | |
| | init.kaiming_normal_(m.weight.data, a=0.2, mode='fan_in', nonlinearity='leaky_relu') |
| | elif init_type == 'orthogonal': |
| | init.orthogonal_(m.weight.data, gain=init_gain) |
| | else: |
| | raise NotImplementedError('initialization method [%s] is not implemented' % init_type) |
| | if hasattr(m, 'bias') and m.bias is not None: |
| | init.constant_(m.bias.data, 0.0) |
| | elif classname.find('BatchNorm2d') != -1: |
| | init.normal_(m.weight.data, 1.0, init_gain) |
| | init.constant_(m.bias.data, 0.0) |
| |
|
| | print('initialize network with %s' % init_type) |
| | net.apply(init_func) |
| |
|
| |
|
| | def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[], init=True): |
| | """Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights |
| | Parameters: |
| | net (network) -- the network to be initialized |
| | init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal |
| | gain (float) -- scaling factor for normal, xavier and orthogonal. |
| | gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2 |
| | Return an initialized network. |
| | """ |
| | if len(gpu_ids) > 0: |
| | assert(torch.cuda.is_available()) |
| | net.to(gpu_ids[0]) |
| | if init: |
| | init_weights(net, init_type, init_gain=init_gain) |
| | return net |
| |
|
| |
|
| | def get_scheduler(optimizer, opt): |
| | """Return a learning rate scheduler |
| | Parameters: |
| | optimizer -- the optimizer of the network |
| | opt (option class) -- stores all the experiment flags; needs to be a subclass of BaseOptionsοΌγ |
| | opt.lr_policy is the name of learning rate policy: linear | step | plateau | cosine |
| | For 'linear', we keep the same learning rate for the first <opt.niter> epochs |
| | and linearly decay the rate to zero over the next <opt.niter_decay> epochs. |
| | For other schedulers (step, plateau, and cosine), we use the default PyTorch schedulers. |
| | See https://pytorch.org/docs/stable/optim.html for more details. |
| | """ |
| | if opt.lr_policy == 'linear': |
| | def lambda_rule(epoch): |
| | lr_l = 1.0 - max(0, epoch + opt.epoch_count - opt.niter) / float(opt.niter_decay + 1) |
| | return lr_l |
| | scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule) |
| | elif opt.lr_policy == 'step': |
| | scheduler = lr_scheduler.StepLR(optimizer, step_size=opt.lr_decay_iters, gamma=0.1) |
| | elif opt.lr_policy == 'plateau': |
| | scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, threshold=0.01, patience=5) |
| | elif opt.lr_policy == 'cosine': |
| | scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=opt.niter, eta_min=0) |
| | else: |
| | return NotImplementedError('learning rate policy [%s] is not implemented', opt.lr_policy) |
| | return scheduler |
| |
|
| | class LayerNormWarpper(nn.Module): |
| | def __init__(self, num_features): |
| | super(LayerNormWarpper, self).__init__() |
| | self.num_features = int(num_features) |
| |
|
| | def forward(self, x): |
| | x = nn.LayerNorm([self.num_features, x.size()[2], x.size()[3]], elementwise_affine=False).cuda()(x) |
| | return x |
| |
|
| | def get_norm_layer(norm_type='instance'): |
| | """Return a normalization layer |
| | Parameters: |
| | norm_type (str) -- the name of the normalization layer: batch | instance | none |
| | For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev). |
| | For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics. |
| | """ |
| | if norm_type == 'batch': |
| | norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True) |
| | elif norm_type == 'instance': |
| | norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False) |
| | elif norm_type == 'layer': |
| | norm_layer = functools.partial(LayerNormWarpper) |
| | elif norm_type == 'none': |
| | norm_layer = None |
| | else: |
| | raise NotImplementedError('normalization layer [%s] is not found' % norm_type) |
| | return norm_layer |
| |
|
| |
|
| | def get_non_linearity(layer_type='relu'): |
| | if layer_type == 'relu': |
| | nl_layer = functools.partial(nn.ReLU, inplace=True) |
| | elif layer_type == 'lrelu': |
| | nl_layer = functools.partial( |
| | nn.LeakyReLU, negative_slope=0.2, inplace=True) |
| | elif layer_type == 'elu': |
| | nl_layer = functools.partial(nn.ELU, inplace=True) |
| | elif layer_type == 'selu': |
| | nl_layer = functools.partial(nn.SELU, inplace=True) |
| | elif layer_type == 'prelu': |
| | nl_layer = functools.partial(nn.PReLU) |
| | else: |
| | raise NotImplementedError( |
| | 'nonlinearity activitation [%s] is not found' % layer_type) |
| | return nl_layer |
| |
|
| |
|
| | def define_G(input_nc, output_nc, nz, ngf, netG='unet_128', norm='batch', nl='relu', use_noise=False, |
| | use_dropout=False, init_type='xavier', init_gain=0.02, gpu_ids=[], where_add='input', upsample='bilinear'): |
| | net = None |
| | norm_layer = get_norm_layer(norm_type=norm) |
| | nl_layer = get_non_linearity(layer_type=nl) |
| | |
| |
|
| | if nz == 0: |
| | where_add = 'input' |
| |
|
| | if netG == 'unet_128' and where_add == 'input': |
| | net = G_Unet_add_input(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample, device=gpu_ids) |
| | elif netG == 'unet_128_G' and where_add == 'input': |
| | net = G_Unet_add_input_G(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample, device=gpu_ids) |
| | elif netG == 'unet_256' and where_add == 'input': |
| | net = G_Unet_add_input(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample, device=gpu_ids) |
| | elif netG == 'unet_256_G' and where_add == 'input': |
| | net = G_Unet_add_input_G(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample, device=gpu_ids) |
| | elif netG == 'unet_128' and where_add == 'all': |
| | net = G_Unet_add_all(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample) |
| | elif netG == 'unet_256' and where_add == 'all': |
| | net = G_Unet_add_all(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, |
| | use_dropout=use_dropout, upsample=upsample) |
| | else: |
| | raise NotImplementedError('Generator model name [%s] is not recognized' % net) |
| | |
| | return init_net(net, init_type, init_gain, gpu_ids) |
| |
|
| |
|
| | def define_C(input_nc, output_nc, nz, ngf, netC='unet_128', norm='instance', nl='relu', |
| | use_dropout=False, init_type='normal', init_gain=0.02, gpu_ids=[], upsample='basic'): |
| | net = None |
| | norm_layer = get_norm_layer(norm_type=norm) |
| | nl_layer = get_non_linearity(layer_type=nl) |
| |
|
| | if netC == 'resnet_9blocks': |
| | net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=9) |
| | elif netC == 'resnet_6blocks': |
| | net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=6) |
| | elif netC == 'unet_128': |
| | net = G_Unet_add_input_C(input_nc, output_nc, 0, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, |
| | use_dropout=use_dropout, upsample=upsample) |
| | elif netC == 'unet_256': |
| | net = G_Unet_add_input(input_nc, output_nc, 0, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, |
| | use_dropout=use_dropout, upsample=upsample) |
| | elif netC == 'unet_32': |
| | net = G_Unet_add_input(input_nc, output_nc, 0, 5, ngf, norm_layer=norm_layer, nl_layer=nl_layer, |
| | use_dropout=use_dropout, upsample=upsample) |
| | else: |
| | raise NotImplementedError('Generator model name [%s] is not recognized' % net) |
| |
|
| | return init_net(net, init_type, init_gain, gpu_ids) |
| |
|
| |
|
| | def define_D(input_nc, ndf, netD, norm='batch', nl='lrelu', init_type='xavier', init_gain=0.02, num_Ds=1, gpu_ids=[]): |
| | net = None |
| | norm_layer = get_norm_layer(norm_type=norm) |
| | nl = 'lrelu' |
| | nl_layer = get_non_linearity(layer_type=nl) |
| |
|
| | if netD == 'basic_128': |
| | net = D_NLayers(input_nc, ndf, n_layers=2, norm_layer=norm_layer, nl_layer=nl_layer) |
| | elif netD == 'basic_256': |
| | net = D_NLayers(input_nc, ndf, n_layers=3, norm_layer=norm_layer, nl_layer=nl_layer) |
| | elif netD == 'basic_128_multi': |
| | net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=2, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) |
| | elif netD == 'basic_256_multi': |
| | net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=3, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) |
| | else: |
| | raise NotImplementedError('Discriminator model name [%s] is not recognized' % net) |
| | return init_net(net, init_type, init_gain, gpu_ids) |
| |
|
| |
|
| | def define_E(input_nc, output_nc, ndf, netE, norm='batch', nl='lrelu', |
| | init_type='xavier', init_gain=0.02, gpu_ids=[], vaeLike=False): |
| | net = None |
| | norm_layer = get_norm_layer(norm_type=norm) |
| | nl = 'lrelu' |
| | nl_layer = get_non_linearity(layer_type=nl) |
| | if netE == 'resnet_128': |
| | net = E_ResNet(input_nc, output_nc, ndf, n_blocks=4, norm_layer=norm_layer, |
| | nl_layer=nl_layer, vaeLike=vaeLike) |
| | elif netE == 'resnet_256': |
| | net = E_ResNet(input_nc, output_nc, ndf, n_blocks=5, norm_layer=norm_layer, |
| | nl_layer=nl_layer, vaeLike=vaeLike) |
| | elif netE == 'conv_128': |
| | net = E_NLayers(input_nc, output_nc, ndf, n_layers=4, norm_layer=norm_layer, |
| | nl_layer=nl_layer, vaeLike=vaeLike) |
| | elif netE == 'conv_256': |
| | net = E_NLayers(input_nc, output_nc, ndf, n_layers=5, norm_layer=norm_layer, |
| | nl_layer=nl_layer, vaeLike=vaeLike) |
| | else: |
| | raise NotImplementedError('Encoder model name [%s] is not recognized' % net) |
| |
|
| | return init_net(net, init_type, init_gain, gpu_ids, False) |
| |
|
| |
|
| | class ResnetGenerator(nn.Module): |
| | def __init__(self, input_nc, output_nc, ngf=64, n_downsampling=3, norm_layer=None, use_dropout=False, n_blocks=6, padding_type='replicate'): |
| | assert(n_blocks >= 0) |
| | super(ResnetGenerator, self).__init__() |
| | self.input_nc = input_nc |
| | self.output_nc = output_nc |
| | self.ngf = ngf |
| | if type(norm_layer) == functools.partial: |
| | use_bias = norm_layer.func != nn.BatchNorm2d |
| | else: |
| | use_bias = norm_layer != nn.BatchNorm2d |
| |
|
| | model = [nn.ReplicationPad2d(3), |
| | nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0, |
| | bias=use_bias)] |
| | if norm_layer is not None: |
| | model += [norm_layer(ngf)] |
| | model += [nn.ReLU(True)] |
| |
|
| | |
| | for i in range(n_downsampling): |
| | mult = 2**i |
| | model += [nn.ReplicationPad2d(1),nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, |
| | stride=2, padding=0, bias=use_bias)] |
| | |
| | |
| | if norm_layer is not None: |
| | model += [norm_layer(ngf * mult * 2)] |
| | model += [nn.ReLU(True)] |
| |
|
| | mult = 2**n_downsampling |
| | for i in range(n_blocks): |
| | model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)] |
| |
|
| | for i in range(n_downsampling): |
| | mult = 2**(n_downsampling - i) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | model += upsampleLayer(ngf * mult, int(ngf * mult / 2), upsample='bilinear', padding_type=padding_type) |
| | if norm_layer is not None: |
| | model += [norm_layer(int(ngf * mult / 2))] |
| | model += [nn.ReLU(True)] |
| | model +=[nn.ReplicationPad2d(1), |
| | nn.Conv2d(int(ngf * mult / 2), int(ngf * mult / 2), kernel_size=3, padding=0)] |
| | if norm_layer is not None: |
| | model += [norm_layer(ngf * mult / 2)] |
| | model += [nn.ReLU(True)] |
| | model += [nn.ReplicationPad2d(3)] |
| | model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)] |
| | |
| |
|
| | self.model = nn.Sequential(*model) |
| |
|
| | def forward(self, input): |
| | return self.model(input) |
| |
|
| |
|
| | |
| | class ResnetBlock(nn.Module): |
| | def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias): |
| | super(ResnetBlock, self).__init__() |
| | self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias) |
| |
|
| | def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias): |
| | conv_block = [] |
| | p = 0 |
| | if padding_type == 'reflect': |
| | conv_block += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | conv_block += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError('padding [%s] is not implemented' % padding_type) |
| |
|
| | conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] |
| | if norm_layer is not None: |
| | conv_block += [norm_layer(dim)] |
| | conv_block += [nn.ReLU(True)] |
| | |
| | |
| |
|
| | p = 0 |
| | if padding_type == 'reflect': |
| | conv_block += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | conv_block += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError('padding [%s] is not implemented' % padding_type) |
| | conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] |
| | if norm_layer is not None: |
| | conv_block += [norm_layer(dim)] |
| |
|
| | return nn.Sequential(*conv_block) |
| |
|
| | def forward(self, x): |
| | out = x + self.conv_block(x) |
| | return out |
| |
|
| |
|
| | class D_NLayersMulti(nn.Module): |
| | def __init__(self, input_nc, ndf=64, n_layers=3, |
| | norm_layer=nn.BatchNorm2d, num_D=1, nl_layer=None): |
| | super(D_NLayersMulti, self).__init__() |
| | |
| | self.num_D = num_D |
| | self.nl_layer=nl_layer |
| | if num_D == 1: |
| | layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) |
| | self.model = nn.Sequential(*layers) |
| | else: |
| | layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) |
| | self.add_module("model_0", nn.Sequential(*layers)) |
| | self.down = nn.functional.interpolate |
| | for i in range(1, num_D): |
| | ndf_i = int(round(ndf / (2**i))) |
| | layers = self.get_layers(input_nc, ndf_i, n_layers, norm_layer) |
| | self.add_module("model_%d" % i, nn.Sequential(*layers)) |
| |
|
| | def get_layers(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): |
| | kw = 3 |
| | padw = 1 |
| | sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, |
| | stride=2, padding=padw)), nn.LeakyReLU(0.2, True)] |
| |
|
| | nf_mult = 1 |
| | nf_mult_prev = 1 |
| | for n in range(1, n_layers): |
| | nf_mult_prev = nf_mult |
| | nf_mult = min(2**n, 8) |
| | sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, |
| | kernel_size=kw, stride=2, padding=padw))] |
| | if norm_layer: |
| | sequence += [norm_layer(ndf * nf_mult)] |
| |
|
| | sequence += [self.nl_layer()] |
| |
|
| | nf_mult_prev = nf_mult |
| | nf_mult = min(2**n_layers, 8) |
| | sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, |
| | kernel_size=kw, stride=1, padding=padw))] |
| | if norm_layer: |
| | sequence += [norm_layer(ndf * nf_mult)] |
| | sequence += [self.nl_layer()] |
| |
|
| | sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult, 1, |
| | kernel_size=kw, stride=1, padding=padw))] |
| |
|
| | return sequence |
| |
|
| | def forward(self, input): |
| | if self.num_D == 1: |
| | return self.model(input) |
| | result = [] |
| | down = input |
| | for i in range(self.num_D): |
| | model = getattr(self, "model_%d" % i) |
| | result.append(model(down)) |
| | if i != self.num_D - 1: |
| | down = self.down(down, scale_factor=0.5, mode='bilinear') |
| | return result |
| |
|
| | class D_NLayers(nn.Module): |
| | """Defines a PatchGAN discriminator""" |
| |
|
| | def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): |
| | """Construct a PatchGAN discriminator |
| | Parameters: |
| | input_nc (int) -- the number of channels in input images |
| | ndf (int) -- the number of filters in the last conv layer |
| | n_layers (int) -- the number of conv layers in the discriminator |
| | norm_layer -- normalization layer |
| | """ |
| | super(D_NLayers, self).__init__() |
| | if type(norm_layer) == functools.partial: |
| | use_bias = norm_layer.func != nn.BatchNorm2d |
| | else: |
| | use_bias = norm_layer != nn.BatchNorm2d |
| |
|
| | kw = 3 |
| | padw = 1 |
| | sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] |
| | nf_mult = 1 |
| | nf_mult_prev = 1 |
| | for n in range(1, n_layers): |
| | nf_mult_prev = nf_mult |
| | nf_mult = min(2 ** n, 8) |
| | sequence += [ |
| | nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), |
| | norm_layer(ndf * nf_mult), |
| | nn.LeakyReLU(0.2, True) |
| | ] |
| |
|
| | nf_mult_prev = nf_mult |
| | nf_mult = min(2 ** n_layers, 8) |
| | sequence += [ |
| | nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias), |
| | norm_layer(ndf * nf_mult), |
| | nn.LeakyReLU(0.2, True) |
| | ] |
| |
|
| | sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)] |
| | self.model = nn.Sequential(*sequence) |
| |
|
| | def forward(self, input): |
| | """Standard forward.""" |
| | return self.model(input) |
| |
|
| |
|
| | class G_Unet_add_input(nn.Module): |
| | def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, |
| | norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, |
| | upsample='basic', device=0): |
| | super(G_Unet_add_input, self).__init__() |
| | self.nz = nz |
| | max_nchn = 8 |
| | noise = [] |
| | for i in range(num_downs+1): |
| | if use_noise: |
| | noise.append(True) |
| | else: |
| | noise.append(False) |
| |
|
| | |
| | |
| | unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=noise[num_downs-1], |
| | innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | for i in range(num_downs - 5): |
| | unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise[num_downs-i-3], |
| | norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) |
| | unet_block = UnetBlock_A(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_A(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_A(ngf, ngf, ngf * 2, unet_block, noise[0], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_A(input_nc + nz, output_nc, ngf, unet_block, None, |
| | outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| |
|
| | self.model = unet_block |
| |
|
| | def forward(self, x, z=None): |
| | if self.nz > 0: |
| | z_img = z.view(z.size(0), z.size(1), 1, 1).expand( |
| | z.size(0), z.size(1), x.size(2), x.size(3)) |
| | x_with_z = torch.cat([x, z_img], 1) |
| | else: |
| | x_with_z = x |
| |
|
| |
|
| | return torch.tanh(self.model(x_with_z)) |
| | |
| |
|
| | class G_Unet_add_input_G(nn.Module): |
| | def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, |
| | norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, |
| | upsample='basic', device=0): |
| | super(G_Unet_add_input_G, self).__init__() |
| | self.nz = nz |
| | max_nchn = 8 |
| | noise = [] |
| | for i in range(num_downs+1): |
| | if use_noise: |
| | noise.append(True) |
| | else: |
| | noise.append(False) |
| | |
| | |
| | unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, |
| | innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | for i in range(num_downs - 5): |
| | unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, |
| | norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) |
| | unet_block = UnetBlock_G(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') |
| | unet_block = UnetBlock_G(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') |
| | unet_block = UnetBlock_G(ngf, ngf, ngf * 2, unet_block, noise[0], |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') |
| | unet_block = UnetBlock_G(input_nc + nz, output_nc, ngf, unet_block, None, |
| | outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') |
| |
|
| | self.model = unet_block |
| |
|
| | def forward(self, x, z=None): |
| | if self.nz > 0: |
| | z_img = z.view(z.size(0), z.size(1), 1, 1).expand( |
| | z.size(0), z.size(1), x.size(2), x.size(3)) |
| | x_with_z = torch.cat([x, z_img], 1) |
| | else: |
| | x_with_z = x |
| |
|
| | |
| | return self.model(x_with_z) |
| |
|
| | class G_Unet_add_input_C(nn.Module): |
| | def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, |
| | norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, |
| | upsample='basic', device=0): |
| | super(G_Unet_add_input_C, self).__init__() |
| | self.nz = nz |
| | max_nchn = 8 |
| | |
| | |
| | unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, |
| | innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | for i in range(num_downs - 5): |
| | unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, |
| | norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) |
| | unet_block = UnetBlock(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise=False, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock(ngf * 2, ngf * 2, ngf * 4, unet_block, noise=False, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock(ngf, ngf, ngf * 2, unet_block, noise=False, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock(input_nc + nz, output_nc, ngf, unet_block, noise=False, |
| | outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| |
|
| | self.model = unet_block |
| |
|
| | def forward(self, x, z=None): |
| | if self.nz > 0: |
| | z_img = z.view(z.size(0), z.size(1), 1, 1).expand( |
| | z.size(0), z.size(1), x.size(2), x.size(3)) |
| | x_with_z = torch.cat([x, z_img], 1) |
| | else: |
| | x_with_z = x |
| |
|
| | |
| | return self.model(x_with_z) |
| |
|
| | def upsampleLayer(inplanes, outplanes, kw=1, upsample='basic', padding_type='replicate'): |
| | |
| | if upsample == 'basic': |
| | upconv = [nn.ConvTranspose2d(inplanes, outplanes, kernel_size=4, stride=2, padding=1)] |
| | elif upsample == 'bilinear' or upsample == 'nearest' or upsample == 'linear': |
| | upconv = [nn.Upsample(scale_factor=2, mode=upsample, align_corners=True), |
| | |
| | nn.Conv2d(inplanes, outplanes, kernel_size=1, stride=1, padding=0)] |
| | |
| | |
| | |
| | else: |
| | raise NotImplementedError( |
| | 'upsample layer [%s] not implemented' % upsample) |
| | return upconv |
| |
|
| | class UnetBlock_G(nn.Module): |
| | def __init__(self, input_nc, outer_nc, inner_nc, |
| | submodule=None, noise=None, outermost=False, innermost=False, |
| | norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): |
| | super(UnetBlock_G, self).__init__() |
| | self.outermost = outermost |
| | p = 0 |
| | downconv = [] |
| | if padding_type == 'reflect': |
| | downconv += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | downconv += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError( |
| | 'padding [%s] is not implemented' % padding_type) |
| |
|
| | downconv += [nn.Conv2d(input_nc, inner_nc, |
| | kernel_size=3, stride=2, padding=p)] |
| | |
| | downrelu = nn.LeakyReLU(0.2, True) |
| | downnorm = norm_layer(inner_nc) if norm_layer is not None else None |
| | uprelu = nl_layer() |
| | uprelu2 = nl_layer() |
| | uppad = nn.ReplicationPad2d(1) |
| | upnorm = norm_layer(outer_nc) if norm_layer is not None else None |
| | upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None |
| | self.noiseblock = ApplyNoise(outer_nc) |
| | self.noise = noise |
| |
|
| | if outermost: |
| | upconv = upsampleLayer(inner_nc * 2, inner_nc, upsample=upsample, padding_type=padding_type) |
| | uppad = nn.ReplicationPad2d(3) |
| | upconv2 = nn.Conv2d(inner_nc, outer_nc, kernel_size=7, padding=0) |
| | down = downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [norm_layer(inner_nc)] |
| | |
| | |
| | |
| | |
| | |
| | |
| | up +=[uprelu2, uppad, upconv2] |
| | model = down + [submodule] + up |
| | elif innermost: |
| | upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) |
| | down = [downrelu] + downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| | model = down + up |
| | else: |
| | upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) |
| | down = [downrelu] + downconv |
| | if downnorm is not None: |
| | down += [downnorm] |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| |
|
| | if use_dropout: |
| | model = down + [submodule] + up + [nn.Dropout(0.5)] |
| | else: |
| | model = down + [submodule] + up |
| |
|
| | self.model = nn.Sequential(*model) |
| |
|
| | def forward(self, x): |
| | if self.outermost: |
| | return self.model(x) |
| | else: |
| | x2 = self.model(x) |
| | if self.noise: |
| | x2 = self.noiseblock(x2, self.noise) |
| | return torch.cat([x2, x], 1) |
| |
|
| |
|
| | class UnetBlock(nn.Module): |
| | def __init__(self, input_nc, outer_nc, inner_nc, |
| | submodule=None, noise=None, outermost=False, innermost=False, |
| | norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): |
| | super(UnetBlock, self).__init__() |
| | self.outermost = outermost |
| | p = 0 |
| | downconv = [] |
| | if padding_type == 'reflect': |
| | downconv += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | downconv += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError( |
| | 'padding [%s] is not implemented' % padding_type) |
| |
|
| | downconv += [nn.Conv2d(input_nc, inner_nc, |
| | kernel_size=3, stride=2, padding=p)] |
| | |
| | downrelu = nn.LeakyReLU(0.2, True) |
| | downnorm = norm_layer(inner_nc) if norm_layer is not None else None |
| | uprelu = nl_layer() |
| | uprelu2 = nl_layer() |
| | uppad = nn.ReplicationPad2d(1) |
| | upnorm = norm_layer(outer_nc) if norm_layer is not None else None |
| | upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None |
| | self.noiseblock = ApplyNoise(outer_nc) |
| | self.noise = noise |
| |
|
| | if outermost: |
| | upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) |
| | down = downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up +=[uprelu2, uppad, upconv2] |
| | model = down + [submodule] + up |
| | elif innermost: |
| | upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) |
| | down = [downrelu] + downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| | model = down + up |
| | else: |
| | upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) |
| | down = [downrelu] + downconv |
| | if downnorm is not None: |
| | down += [downnorm] |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| |
|
| | if use_dropout: |
| | model = down + [submodule] + up + [nn.Dropout(0.5)] |
| | else: |
| | model = down + [submodule] + up |
| |
|
| | self.model = nn.Sequential(*model) |
| |
|
| | def forward(self, x): |
| | if self.outermost: |
| | return self.model(x) |
| | else: |
| | x2 = self.model(x) |
| | if self.noise: |
| | x2 = self.noiseblock(x2, self.noise) |
| | return torch.cat([x2, x], 1) |
| |
|
| | |
| | |
| | |
| | class UnetBlock_A(nn.Module): |
| | def __init__(self, input_nc, outer_nc, inner_nc, |
| | submodule=None, noise=None, outermost=False, innermost=False, |
| | norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): |
| | super(UnetBlock_A, self).__init__() |
| | self.outermost = outermost |
| | p = 0 |
| | downconv = [] |
| | if padding_type == 'reflect': |
| | downconv += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | downconv += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError( |
| | 'padding [%s] is not implemented' % padding_type) |
| |
|
| | downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, |
| | kernel_size=3, stride=2, padding=p))] |
| | |
| | downrelu = nn.LeakyReLU(0.2, True) |
| | downnorm = norm_layer(inner_nc) if norm_layer is not None else None |
| | uprelu = nl_layer() |
| | uprelu2 = nl_layer() |
| | uppad = nn.ReplicationPad2d(1) |
| | upnorm = norm_layer(outer_nc) if norm_layer is not None else None |
| | upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None |
| | self.noiseblock = ApplyNoise(outer_nc) |
| | self.noise = noise |
| |
|
| | if outermost: |
| | upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up +=[uprelu2, uppad, upconv2] |
| | model = down + [submodule] + up |
| | elif innermost: |
| | upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = [downrelu] + downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| | model = down + up |
| | else: |
| | upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = [downrelu] + downconv |
| | if downnorm is not None: |
| | down += [downnorm] |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| |
|
| | if use_dropout: |
| | model = down + [submodule] + up + [nn.Dropout(0.5)] |
| | else: |
| | model = down + [submodule] + up |
| |
|
| | self.model = nn.Sequential(*model) |
| |
|
| | def forward(self, x): |
| | if self.outermost: |
| | return self.model(x) |
| | else: |
| | x2 = self.model(x) |
| | if self.noise: |
| | x2 = self.noiseblock(x2, self.noise) |
| | if x2.shape[-1]==x.shape[-1]: |
| | return x2 + x |
| | else: |
| | x2 = F.interpolate(x2, x.shape[2:]) |
| | return x2 + x |
| |
|
| |
|
| | class E_ResNet(nn.Module): |
| | def __init__(self, input_nc=3, output_nc=1, ndf=64, n_blocks=4, |
| | norm_layer=None, nl_layer=None, vaeLike=False): |
| | super(E_ResNet, self).__init__() |
| | self.vaeLike = vaeLike |
| | max_ndf = 4 |
| | conv_layers = [ |
| | nn.Conv2d(input_nc, ndf, kernel_size=3, stride=2, padding=1, bias=True)] |
| | for n in range(1, n_blocks): |
| | input_ndf = ndf * min(max_ndf, n) |
| | output_ndf = ndf * min(max_ndf, n + 1) |
| | conv_layers += [BasicBlock(input_ndf, |
| | output_ndf, norm_layer, nl_layer)] |
| | conv_layers += [nl_layer(), nn.AdaptiveAvgPool2d(4)] |
| | if vaeLike: |
| | self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) |
| | self.fcVar = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) |
| | else: |
| | self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) |
| | self.conv = nn.Sequential(*conv_layers) |
| |
|
| | def forward(self, x): |
| | x_conv = self.conv(x) |
| | conv_flat = x_conv.view(x.size(0), -1) |
| | output = self.fc(conv_flat) |
| | if self.vaeLike: |
| | outputVar = self.fcVar(conv_flat) |
| | return output, outputVar |
| | else: |
| | return output |
| | return output |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | class G_Unet_add_all(nn.Module): |
| | def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, |
| | norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, upsample='basic'): |
| | super(G_Unet_add_all, self).__init__() |
| | self.nz = nz |
| | self.mapping = G_mapping(self.nz, self.nz, 512, normalize_latents=False, lrmul=1) |
| | self.truncation_psi = 0 |
| | self.truncation_cutoff = 0 |
| |
|
| | |
| | |
| | num_layers = int(np.log2(512)) * 2 - 2 |
| | |
| | self.noise_inputs = [] |
| | for layer_idx in range(num_layers): |
| | res = layer_idx // 2 + 2 |
| | shape = [1, 1, 2 ** res, 2 ** res] |
| | self.noise_inputs.append(torch.randn(*shape).to("cuda")) |
| |
|
| | |
| | unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=None, innermost=True, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, |
| | norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) |
| | for i in range(num_downs - 6): |
| | unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, |
| | norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) |
| | unet_block = UnetBlock_with_z(ngf * 4, ngf * 4, ngf * 8, nz, submodule=unet_block, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_with_z(ngf * 2, ngf * 2, ngf * 4, nz, submodule=unet_block, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_with_z(ngf, ngf, ngf * 2, nz, submodule=unet_block, |
| | norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | unet_block = UnetBlock_with_z(input_nc, output_nc, ngf, nz, submodule=unet_block, |
| | outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) |
| | self.model = unet_block |
| |
|
| | def forward(self, x, z): |
| |
|
| | dlatents1, num_layers = self.mapping(z) |
| | dlatents1 = dlatents1.unsqueeze(1) |
| | dlatents1 = dlatents1.expand(-1, int(num_layers), -1) |
| |
|
| | |
| | if self.truncation_psi and self.truncation_cutoff: |
| | coefs = np.ones([1, num_layers, 1], dtype=np.float32) |
| | for i in range(num_layers): |
| | if i < self.truncation_cutoff: |
| | coefs[:, i, :] *= self.truncation_psi |
| | """Linear interpolation. |
| | a + (b - a) * t (a = 0) |
| | reduce to |
| | b * t |
| | """ |
| | dlatents1 = dlatents1 * torch.Tensor(coefs).to(dlatents1.device) |
| |
|
| | return torch.tanh(self.model(x, dlatents1, self.noise_inputs)) |
| |
|
| |
|
| | class ApplyNoise(nn.Module): |
| | def __init__(self, channels): |
| | super().__init__() |
| | self.channels = channels |
| | self.weight = nn.Parameter(torch.randn(channels), requires_grad=True) |
| | self.bias = nn.Parameter(torch.zeros(channels), requires_grad=True) |
| |
|
| | def forward(self, x, noise): |
| | W,_ = torch.split(self.weight.view(1, -1, 1, 1), self.channels // 2, dim=1) |
| | B,_ = torch.split(self.bias.view(1, -1, 1, 1), self.channels // 2, dim=1) |
| | Z = torch.zeros_like(W) |
| | w = torch.cat([W,Z], dim=1).to(x.device) |
| | b = torch.cat([B,Z], dim=1).to(x.device) |
| | adds = w * torch.randn_like(x) + b |
| | return x + adds.type_as(x) |
| |
|
| |
|
| | class FC(nn.Module): |
| | def __init__(self, |
| | in_channels, |
| | out_channels, |
| | gain=2**(0.5), |
| | use_wscale=False, |
| | lrmul=1.0, |
| | bias=True): |
| | """ |
| | The complete conversion of Dense/FC/Linear Layer of original Tensorflow version. |
| | """ |
| | super(FC, self).__init__() |
| | he_std = gain * in_channels ** (-0.5) |
| | if use_wscale: |
| | init_std = 1.0 / lrmul |
| | self.w_lrmul = he_std * lrmul |
| | else: |
| | init_std = he_std / lrmul |
| | self.w_lrmul = lrmul |
| |
|
| | self.weight = torch.nn.Parameter(torch.randn(out_channels, in_channels) * init_std) |
| | if bias: |
| | self.bias = torch.nn.Parameter(torch.zeros(out_channels)) |
| | self.b_lrmul = lrmul |
| | else: |
| | self.bias = None |
| |
|
| | def forward(self, x): |
| | if self.bias is not None: |
| | out = F.linear(x, self.weight * self.w_lrmul, self.bias * self.b_lrmul) |
| | else: |
| | out = F.linear(x, self.weight * self.w_lrmul) |
| | out = F.leaky_relu(out, 0.2, inplace=True) |
| | return out |
| |
|
| |
|
| | class ApplyStyle(nn.Module): |
| | """ |
| | @ref: https://github.com/lernapparat/lernapparat/blob/master/style_gan/pytorch_style_gan.ipynb |
| | """ |
| | def __init__(self, latent_size, channels, use_wscale, nl_layer): |
| | super(ApplyStyle, self).__init__() |
| | modules = [nn.Linear(latent_size, channels*2)] |
| | if nl_layer: |
| | modules += [nl_layer()] |
| | self.linear = nn.Sequential(*modules) |
| |
|
| | def forward(self, x, latent): |
| | style = self.linear(latent) |
| | shape = [-1, 2, x.size(1), 1, 1] |
| | style = style.view(shape) |
| | x = x * (style[:, 0] + 1.) + style[:, 1] |
| | return x |
| |
|
| | class PixelNorm(nn.Module): |
| | def __init__(self, epsilon=1e-8): |
| | """ |
| | @notice: avoid in-place ops. |
| | https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 |
| | """ |
| | super(PixelNorm, self).__init__() |
| | self.epsilon = epsilon |
| |
|
| | def forward(self, x): |
| | tmp = torch.mul(x, x) |
| | tmp1 = torch.rsqrt(torch.mean(tmp, dim=1, keepdim=True) + self.epsilon) |
| |
|
| | return x * tmp1 |
| |
|
| |
|
| | class InstanceNorm(nn.Module): |
| | def __init__(self, epsilon=1e-8): |
| | """ |
| | @notice: avoid in-place ops. |
| | https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 |
| | """ |
| | super(InstanceNorm, self).__init__() |
| | self.epsilon = epsilon |
| |
|
| | def forward(self, x): |
| | x = x - torch.mean(x, (2, 3), True) |
| | tmp = torch.mul(x, x) |
| | tmp = torch.rsqrt(torch.mean(tmp, (2, 3), True) + self.epsilon) |
| | return x * tmp |
| |
|
| |
|
| | class LayerEpilogue(nn.Module): |
| | def __init__(self, channels, dlatent_size, use_wscale, use_noise, |
| | use_pixel_norm, use_instance_norm, use_styles, nl_layer=None): |
| | super(LayerEpilogue, self).__init__() |
| | self.use_noise = use_noise |
| | if use_noise: |
| | self.noise = ApplyNoise(channels) |
| | self.act = nn.LeakyReLU(negative_slope=0.2) |
| |
|
| | if use_pixel_norm: |
| | self.pixel_norm = PixelNorm() |
| | else: |
| | self.pixel_norm = None |
| |
|
| | if use_instance_norm: |
| | self.instance_norm = InstanceNorm() |
| | else: |
| | self.instance_norm = None |
| |
|
| | if use_styles: |
| | self.style_mod = ApplyStyle(dlatent_size, channels, use_wscale=use_wscale, nl_layer=nl_layer) |
| | else: |
| | self.style_mod = None |
| |
|
| | def forward(self, x, noise, dlatents_in_slice=None): |
| | |
| | if self.use_noise: |
| | x = self.noise(x, noise) |
| | x = self.act(x) |
| | if self.pixel_norm is not None: |
| | x = self.pixel_norm(x) |
| | if self.instance_norm is not None: |
| | x = self.instance_norm(x) |
| | if self.style_mod is not None: |
| | x = self.style_mod(x, dlatents_in_slice) |
| |
|
| | return x |
| |
|
| | class G_mapping(nn.Module): |
| | def __init__(self, |
| | mapping_fmaps=512, |
| | dlatent_size=512, |
| | resolution=512, |
| | normalize_latents=True, |
| | use_wscale=True, |
| | lrmul=0.01, |
| | gain=2**(0.5), |
| | nl_layer=None |
| | ): |
| | super(G_mapping, self).__init__() |
| | self.mapping_fmaps = mapping_fmaps |
| | func = [ |
| | nn.Linear(self.mapping_fmaps, dlatent_size) |
| | ] |
| | if nl_layer: |
| | func += [nl_layer()] |
| |
|
| | for j in range(0,4): |
| | func += [ |
| | nn.Linear(dlatent_size, dlatent_size) |
| | ] |
| | if nl_layer: |
| | func += [nl_layer()] |
| |
|
| | self.func = nn.Sequential(*func) |
| | |
| | |
| |
|
| | self.normalize_latents = normalize_latents |
| | self.resolution_log2 = int(np.log2(resolution)) |
| | self.num_layers = self.resolution_log2 * 2 - 2 |
| | self.pixel_norm = PixelNorm() |
| | |
| | |
| |
|
| | def forward(self, x): |
| | if self.normalize_latents: |
| | x = self.pixel_norm(x) |
| | out = self.func(x) |
| | return out, self.num_layers |
| |
|
| | class UnetBlock_with_z(nn.Module): |
| | def __init__(self, input_nc, outer_nc, inner_nc, nz=0, |
| | submodule=None, outermost=False, innermost=False, |
| | norm_layer=None, nl_layer=None, use_dropout=False, |
| | upsample='basic', padding_type='replicate'): |
| | super(UnetBlock_with_z, self).__init__() |
| | p = 0 |
| | downconv = [] |
| | if padding_type == 'reflect': |
| | downconv += [nn.ReflectionPad2d(1)] |
| | elif padding_type == 'replicate': |
| | downconv += [nn.ReplicationPad2d(1)] |
| | elif padding_type == 'zero': |
| | p = 1 |
| | else: |
| | raise NotImplementedError( |
| | 'padding [%s] is not implemented' % padding_type) |
| |
|
| | self.outermost = outermost |
| | self.innermost = innermost |
| | self.nz = nz |
| |
|
| | |
| | downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, |
| | kernel_size=3, stride=2, padding=p))] |
| | |
| | downrelu = nn.LeakyReLU(0.2, True) |
| | downnorm = norm_layer(inner_nc) if norm_layer is not None else None |
| | uprelu = nl_layer() |
| | uprelu2 = nl_layer() |
| | uppad = nn.ReplicationPad2d(1) |
| | upnorm = norm_layer(outer_nc) if norm_layer is not None else None |
| | upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None |
| |
|
| | use_styles=False |
| | uprelu = nl_layer() |
| | if self.nz >0: |
| | use_styles=True |
| |
|
| | if outermost: |
| | self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, |
| | use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) |
| | upconv = upsampleLayer( |
| | inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = downconv |
| | up = [uprelu] + upconv |
| | if upnorm is not None: |
| | up += [upnorm] |
| | up +=[uprelu2, uppad, upconv2] |
| | elif innermost: |
| | self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=True, |
| | use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) |
| | upconv = upsampleLayer( |
| | inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = [downrelu] + downconv |
| | up = [uprelu] + upconv |
| | if norm_layer is not None: |
| | up += [norm_layer(outer_nc)] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| | else: |
| | self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, |
| | use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) |
| | upconv = upsampleLayer( |
| | inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) |
| | upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) |
| | down = [downrelu] + downconv |
| | if norm_layer is not None: |
| | down += [norm_layer(inner_nc)] |
| | up = [uprelu] + upconv |
| |
|
| | if norm_layer is not None: |
| | up += [norm_layer(outer_nc)] |
| | up += [uprelu2, uppad, upconv2] |
| | if upnorm2 is not None: |
| | up += [upnorm2] |
| |
|
| | if use_dropout: |
| | up += [nn.Dropout(0.5)] |
| | self.down = nn.Sequential(*down) |
| | self.submodule = submodule |
| | self.up = nn.Sequential(*up) |
| |
|
| |
|
| | def forward(self, x, z, noise): |
| | if self.outermost: |
| | x1 = self.down(x) |
| | x2 = self.submodule(x1, z[:,2:], noise[2:]) |
| | return self.up(x2) |
| |
|
| | elif self.innermost: |
| | x1 = self.down(x) |
| | x_and_z = self.adaIn(x1, noise[0], z[:,0]) |
| | x2 = self.up(x_and_z) |
| | x2 = F.interpolate(x2, x.shape[2:]) |
| | return x2 + x |
| |
|
| | else: |
| | x1 = self.down(x) |
| | x2 = self.submodule(x1, z[:,2:], noise[2:]) |
| | x_and_z = self.adaIn(x2, noise[0], z[:,0]) |
| | return self.up(x_and_z) + x |
| |
|
| |
|
| | class E_NLayers(nn.Module): |
| | def __init__(self, input_nc, output_nc=1, ndf=64, n_layers=4, |
| | norm_layer=None, nl_layer=None, vaeLike=False): |
| | super(E_NLayers, self).__init__() |
| | self.vaeLike = vaeLike |
| |
|
| | kw, padw = 3, 1 |
| | sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, |
| | stride=2, padding=padw, padding_mode='replicate')), nl_layer()] |
| |
|
| | nf_mult = 1 |
| | nf_mult_prev = 1 |
| | for n in range(1, n_layers): |
| | nf_mult_prev = nf_mult |
| | nf_mult = min(2**n, 8) |
| | sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, |
| | kernel_size=kw, stride=2, padding=padw, padding_mode='replicate'))] |
| | if norm_layer is not None: |
| | sequence += [norm_layer(ndf * nf_mult)] |
| | sequence += [nl_layer()] |
| | sequence += [nn.AdaptiveAvgPool2d(4)] |
| | self.conv = nn.Sequential(*sequence) |
| | self.fc = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) |
| | if vaeLike: |
| | self.fcVar = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) |
| |
|
| | def forward(self, x): |
| | x_conv = self.conv(x) |
| | conv_flat = x_conv.view(x.size(0), -1) |
| | output = self.fc(conv_flat) |
| | if self.vaeLike: |
| | outputVar = self.fcVar(conv_flat) |
| | return output, outputVar |
| | return output |
| |
|
| | class BasicBlock(nn.Module): |
| | def __init__(self, inplanes, outplanes): |
| | super(BasicBlock, self).__init__() |
| | layers = [] |
| | norm_layer=get_norm_layer(norm_type='layer') |
| | |
| | nl_layer=nn.ReLU() |
| | if norm_layer is not None: |
| | layers += [norm_layer(inplanes)] |
| | layers += [nl_layer] |
| | layers += [nn.ReplicationPad2d(1), |
| | nn.Conv2d(inplanes, outplanes, kernel_size=3, stride=1, |
| | padding=0, bias=True)] |
| | self.conv = nn.Sequential(*layers) |
| |
|
| | def forward(self, x): |
| | return self.conv(x) |
| |
|
| |
|
| | def define_SVAE(inc=96, outc=3, outplanes=64, blocks=1, netVAE='SVAE', model_name='', load_ext=True, save_dir='', |
| | init_type="normal", init_gain=0.02, gpu_ids=[]): |
| | if netVAE == 'SVAE': |
| | net = ScreenVAE(inc=inc, outc=outc, outplanes=outplanes, blocks=blocks, save_dir=save_dir, |
| | init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) |
| | else: |
| | raise NotImplementedError('Encoder model name [%s] is not recognized' % net) |
| | init_net(net, init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) |
| | net.load_networks('latest') |
| | return net |
| |
|
| |
|
| | class ScreenVAE(nn.Module): |
| | def __init__(self,inc=1,outc=4, outplanes=64, downs=5, blocks=2,load_ext=True, save_dir='',init_type="normal", init_gain=0.02, gpu_ids=[]): |
| | super(ScreenVAE, self).__init__() |
| | self.inc = inc |
| | self.outc = outc |
| | self.save_dir = save_dir |
| | norm_layer=functools.partial(LayerNormWarpper) |
| | nl_layer=nn.LeakyReLU |
| |
|
| | self.model_names=['enc','dec'] |
| | self.enc=define_C(inc+1, outc*2, 0, 24, netC='resnet_6blocks', |
| | norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', |
| | gpu_ids=gpu_ids, upsample='bilinear') |
| | self.dec=define_G(outc, inc, 0, 48, netG='unet_128_G', |
| | norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', |
| | gpu_ids=gpu_ids, where_add='input', upsample='bilinear', use_noise=True) |
| |
|
| | for param in self.parameters(): |
| | param.requires_grad = False |
| |
|
| | def load_networks(self, epoch): |
| | """Load all the networks from the disk. |
| | |
| | Parameters: |
| | epoch (int) -- current epoch; used in the file name '%s_net_%s.pth' % (epoch, name) |
| | """ |
| | for name in self.model_names: |
| | if isinstance(name, str): |
| | load_filename = '%s_net_%s.pth' % (epoch, name) |
| | load_path = os.path.join(self.save_dir, load_filename) |
| | net = getattr(self, name) |
| | if isinstance(net, torch.nn.DataParallel): |
| | net = net.module |
| | print('loading the model from %s' % load_path) |
| | state_dict = torch.load( |
| | load_path, map_location=lambda storage, loc: storage.cuda()) |
| | if hasattr(state_dict, '_metadata'): |
| | del state_dict._metadata |
| |
|
| | net.load_state_dict(state_dict) |
| | del state_dict |
| |
|
| | def npad(self, im, pad=128): |
| | h,w = im.shape[-2:] |
| | hp = h //pad*pad+pad |
| | wp = w //pad*pad+pad |
| | return F.pad(im, (0, wp-w, 0, hp-h), mode='replicate') |
| |
|
| | def forward(self, x, line=None, img_input=True, output_screen_only=True): |
| | if img_input: |
| | if line is None: |
| | line = torch.ones_like(x) |
| | else: |
| | line = torch.sign(line) |
| | x = torch.clamp(x + (1-line),-1,1) |
| | h,w = x.shape[-2:] |
| | input = torch.cat([x, line], 1) |
| | input = self.npad(input) |
| | inter = self.enc(input)[:,:,:h,:w] |
| | scr, logvar = torch.split(inter, (self.outc, self.outc), dim=1) |
| | if output_screen_only: |
| | return scr |
| | recons = self.dec(scr) |
| | return recons, scr, logvar |
| | else: |
| | h,w = x.shape[-2:] |
| | x = self.npad(x) |
| | recons = self.dec(x)[:,:,:h,:w] |
| | recons = (recons+1)*(line+1)/2-1 |
| | return torch.clamp(recons,-1,1) |
| |
|