| | import os |
| | from typing import Any, Dict, Tuple, Optional |
| | from random import random |
| | from copy import deepcopy |
| |
|
| | import numpy as np |
| | import torch |
| | from lightning import LightningModule |
| | from torchmetrics import MinMetric, MeanMetric |
| |
|
| | from src.models.score.frame import FrameDiffuser |
| | from src.models.loss import ScoreMatchingLoss |
| | from src.common.rigid_utils import Rigid |
| | from src.common.all_atom import compute_backbone |
| | from src.common.pdb_utils import atom37_to_pdb, merge_pdbfiles |
| |
|
| |
|
| | class DiffusionLitModule(LightningModule): |
| | """Example of a `LightningModule` for denoising diffusion training. |
| | |
| | A `LightningModule` implements 8 key methods: |
| | |
| | ```python |
| | def __init__(self): |
| | # Define initialization code here. |
| | |
| | def setup(self, stage): |
| | # Things to setup before each stage, 'fit', 'validate', 'test', 'predict'. |
| | # This hook is called on every process when using DDP. |
| | |
| | def training_step(self, batch, batch_idx): |
| | # The complete training step. |
| | |
| | def validation_step(self, batch, batch_idx): |
| | # The complete validation step. |
| | |
| | def test_step(self, batch, batch_idx): |
| | # The complete test step. |
| | |
| | def predict_step(self, batch, batch_idx): |
| | # The complete predict step. |
| | |
| | def configure_optimizers(self): |
| | # Define and configure optimizers and LR schedulers. |
| | ``` |
| | |
| | Docs: |
| | https://lightning.ai/docs/pytorch/latest/common/lightning_module.html |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | net: torch.nn.Module, |
| | optimizer: torch.optim.Optimizer, |
| | scheduler: torch.optim.lr_scheduler, |
| | diffuser: FrameDiffuser, |
| | loss: Dict[str, Any], |
| | compile: bool, |
| | inference: Optional[Dict[str, Any]] = None, |
| | ) -> None: |
| | """Initialize a `MNISTLitModule`. |
| | |
| | :param net: The model to train. |
| | :param optimizer: The optimizer to use for training. |
| | :param scheduler: The learning rate scheduler to use for training. |
| | """ |
| | super().__init__() |
| |
|
| | |
| | |
| | self.save_hyperparameters(logger=False) |
| |
|
| | |
| | self.net = net |
| | self.diffuser = diffuser |
| | |
| | |
| | self.loss = ScoreMatchingLoss(config=self.hparams.loss) |
| |
|
| | |
| | self.train_loss = MeanMetric() |
| | self.val_loss = MeanMetric() |
| | |
| |
|
| | |
| | self.val_loss_best = MinMetric() |
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor: |
| | """Perform a forward pass through the model `self.net`. |
| | (Not actually used) |
| | |
| | :param x: A tensor of images. |
| | :return: A tensor of logits. |
| | """ |
| | return self.net(x) |
| |
|
| | def on_train_start(self) -> None: |
| | """Lightning hook that is called when training begins.""" |
| | |
| | |
| | self.val_loss.reset() |
| | self.val_loss_best.reset() |
| |
|
| | def model_step( |
| | self, batch: Tuple[torch.Tensor, torch.Tensor], training: Optional[bool] = True |
| | ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: |
| | """Perform a single model step on a batch of data. |
| | |
| | :param batch: A batch of data (a tuple) containing the input tensor of images and target labels. |
| | |
| | :return: A tuple containing (in order): |
| | - A tensor of losses. |
| | - A tensor of predictions. |
| | - A tensor of target labels. |
| | """ |
| | |
| | rigids_0 = Rigid.from_tensor_4x4(batch['rigidgroups_gt_frames'][..., 0, :, :]) |
| | batch_size = rigids_0.shape[0] |
| | t = (1.0 - self.diffuser.min_t) * torch.rand(batch_size, device=rigids_0.device) + self.diffuser.min_t |
| | perturb_feats = self.diffuser.forward_marginal( |
| | rigids_0=rigids_0, |
| | t=t, |
| | diffuse_mask=None, |
| | as_tensor_7=True, |
| | ) |
| | patch_feats = { |
| | 't': t, |
| | 'rigids_0': rigids_0, |
| | } |
| | batch.update({**perturb_feats, **patch_feats}) |
| | |
| | |
| | if self.net.embedder.self_conditioning and random() > 0.5: |
| | with torch.no_grad(): |
| | batch['sc_ca_t'] = self.net(batch, as_tensor_7=True)['rigids'][..., 4:] |
| |
|
| | |
| | out = self.net(batch) |
| | |
| | |
| | pred_scores = self.diffuser.score( |
| | rigids_0=out['rigids'], |
| | rigids_t=Rigid.from_tensor_7(batch['rigids_t']), |
| | t=t, |
| | mask=batch['residue_mask'], |
| | ) |
| | out.update(pred_scores) |
| | |
| | |
| | loss, loss_bd = self.loss(out, batch, _return_breakdown=True) |
| | return loss, loss_bd |
| |
|
| | def training_step( |
| | self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int |
| | ) -> torch.Tensor: |
| | """Perform a single training step on a batch of data from the training set. |
| | |
| | :param batch: A batch of data (a tuple) containing the input tensor of images and target |
| | labels. |
| | :param batch_idx: The index of the current batch. |
| | :return: A tensor of losses between model predictions and targets. |
| | """ |
| | loss, loss_bd = self.model_step(batch) |
| |
|
| | |
| | self.train_loss(loss) |
| | self.log("train/loss", self.train_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True) |
| | |
| | for k,v in loss_bd.items(): |
| | if k == 'loss': continue |
| | self.log(f"train/{k}", v, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True) |
| | |
| | |
| | return loss |
| |
|
| | def on_train_epoch_end(self) -> None: |
| | "Lightning hook that is called when a training epoch ends." |
| | pass |
| |
|
| | def validation_step(self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int) -> None: |
| | """Perform a single validation step on a batch of data from the validation set. |
| | |
| | :param batch: A batch of data (a tuple) containing the input tensor of images and target |
| | labels. |
| | :param batch_idx: The index of the current batch. |
| | """ |
| | loss, loss_bd = self.model_step(batch, training=False) |
| |
|
| | |
| | self.val_loss(loss) |
| | self.log("val/loss", self.val_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True) |
| |
|
| | def on_validation_epoch_end(self) -> None: |
| | "Lightning hook that is called when a validation epoch ends." |
| | _vall = self.val_loss.compute() |
| | self.val_loss_best(_vall) |
| | |
| | |
| | self.log("val/loss_best", self.val_loss_best.compute(), sync_dist=True, prog_bar=True) |
| |
|
| | def test_step(self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int) -> None: |
| | """Perform a single test step on a batch of data from the test set. |
| | |
| | :param batch: A batch of data (a tuple) containing the input tensor of images and target |
| | labels. |
| | :param batch_idx: The index of the current batch. |
| | """ |
| | raise NotImplementedError("Test step not implemented.") |
| |
|
| | def on_test_epoch_end(self) -> None: |
| | """Lightning hook that is called when a test epoch ends.""" |
| | pass |
| |
|
| | def predict_step( |
| | self, batch: Tuple[torch.Tensor, torch.Tensor], batch_idx: int, |
| | ) -> str: |
| | """Perform a prediction step on a batch of data from the dataloader. |
| | |
| | This prediction step will sample `n_replica` copies from the forward-backward process, |
| | repeated for each delta-T in the range of [delta_min, delta_max] with step size |
| | `delta_step`. If `backward_only` is set to True, then only backward process will be |
| | performed, and `n_replica` will be multiplied by the number of delta-Ts. |
| | |
| | :param batch: A batch of data (a tuple) containing the input tensor of images and target |
| | labels. |
| | :param batch_idx: The index of the current batch. |
| | """ |
| | |
| | n_replica = self.hparams.inference.n_replica |
| | replica_per_batch = self.hparams.inference.replica_per_batch |
| | delta_range = np.arange( |
| | self.hparams.inference.delta_min, |
| | self.hparams.inference.delta_max + 1e-5, |
| | self.hparams.inference.delta_step |
| | ) |
| | delta_range = np.around(delta_range, decimals=2) |
| | num_timesteps = self.hparams.inference.num_timesteps |
| | noise_scale = self.hparams.inference.noise_scale |
| | probability_flow = self.hparams.inference.probability_flow |
| | self_conditioning = self.hparams.inference.self_conditioning and self.net.embedder.self_conditioning |
| | min_t = self.hparams.inference.min_t |
| | output_dir = self.hparams.inference.output_dir |
| | backward_only = self.hparams.inference.backward_only |
| | |
| | if backward_only: |
| | n_replica *= len(delta_range) |
| | delta_range = [-1.0] |
| | |
| | assert batch['aatype'].shape[0] == 1, "Batch size must be 1 for correct inference." |
| | |
| | |
| | accession_code = batch['accession_code'][0] |
| | extra = { |
| | 'aatype': batch['aatype'][0].detach().cpu().numpy(), |
| | 'chain_index': batch['chain_index'][0].detach().cpu().numpy(), |
| | 'residue_index': batch['residue_index'][0].detach().cpu().numpy(), |
| | } |
| | |
| | |
| | def forward_backward(rigids_0: Rigid, t_delta: float): |
| | |
| | T = t_delta if t_delta > 0 else 1.0 |
| | |
| | batch_size, device = rigids_0.shape[0], rigids_0.device |
| | _num_timesteps = int(float(num_timesteps) * T) |
| | dt = 1.0 / _num_timesteps |
| | ts = np.linspace(min_t, T, _num_timesteps)[::-1] |
| | |
| | _feats = deepcopy({ |
| | k: v.repeat(batch_size, *(1,)*(v.ndim-1)) |
| | for k,v in batch.items() if k in ('aatype', 'residue_mask', 'fixed_mask', 'residue_idx', 'torsion_angles_sin_cos') |
| | }) |
| | if t_delta > 0: |
| | rigids_t = self.diffuser.forward_marginal( |
| | rigids_0=rigids_0, |
| | t=t_delta * torch.ones(batch_size, device=device), |
| | diffuse_mask=_feats['residue_mask'], |
| | as_tensor_7=True, |
| | )['rigids_t'] |
| | else: |
| | rigids_t = self.diffuser.sample_prior( |
| | shape=rigids_0.shape, |
| | device=device, |
| | as_tensor_7=True, |
| | )['rigids_t'] |
| | |
| | _feats['rigids_t'] = rigids_t |
| | |
| | traj_atom37 = [] |
| | with torch.no_grad(): |
| | fixed_mask = _feats['fixed_mask'] * _feats['residue_mask'] |
| | diffuse_mask = (1 - _feats['fixed_mask']) * _feats['residue_mask'] |
| | |
| | if self_conditioning: |
| | _feats['sc_ca_t'] = torch.zeros_like(rigids_t[..., 4:]) |
| | _feats['t'] = ts[0] * torch.ones(batch_size, device=device) |
| | _feats['sc_ca_t'] = self.net(_feats, as_tensor_7=True)['rigids'][..., 4:] |
| | |
| | for t in ts: |
| | _feats['t'] = t * torch.ones(batch_size, device=device) |
| | out = self.net(_feats, as_tensor_7=False) |
| | |
| | |
| | if t == min_t: |
| | rigids_pred = out['rigids'] |
| | else: |
| | |
| | if self_conditioning: |
| | _feats['sc_ca_t'] = out['rigids'].to_tensor_7()[..., 4:] |
| | |
| | pred_scores = self.diffuser.score( |
| | rigids_0=out['rigids'], |
| | rigids_t=Rigid.from_tensor_7(_feats['rigids_t']), |
| | t=_feats['t'], |
| | mask=_feats['residue_mask'], |
| | ) |
| | rigids_pred = self.diffuser.reverse( |
| | rigids_t=Rigid.from_tensor_7(_feats['rigids_t']), |
| | rot_score=pred_scores['rot_score'], |
| | trans_score=pred_scores['trans_score'], |
| | t=_feats['t'], |
| | dt=dt, |
| | diffuse_mask=diffuse_mask, |
| | center_trans=True, |
| | noise_scale=noise_scale, |
| | probability_flow=probability_flow, |
| | ) |
| | |
| | _feats['rigids_t'] = rigids_pred.to_tensor_7() |
| | |
| | |
| | atom37 = compute_backbone(rigids_pred, out['psi'], aatype=_feats['aatype'])[0] |
| | atom37 = atom37.detach().cpu().numpy() |
| | return atom37 |
| | |
| | saved_paths = [] |
| | |
| | |
| | for t_delta in delta_range: |
| | gt_rigids_4x4 = batch['rigidgroups_gt_frames'][..., 0, :, :].clone() |
| | n_bs = n_replica // replica_per_batch |
| | last_bs = n_replica % replica_per_batch |
| | atom_positions = [] |
| | for _ in range(n_bs): |
| | rigids_0 = Rigid.from_tensor_4x4(gt_rigids_4x4.repeat(replica_per_batch, *(1,)*(gt_rigids_4x4.ndim-1))) |
| | traj_atom37 = forward_backward(rigids_0, t_delta) |
| | atom_positions.append(traj_atom37) |
| | if last_bs > 0: |
| | rigids_0 = Rigid.from_tensor_4x4(gt_rigids_4x4.repeat(last_bs, *(1,)*(gt_rigids_4x4.ndim-1))) |
| | traj_atom37 = forward_backward(rigids_0, t_delta) |
| | atom_positions.append(traj_atom37) |
| | atom_positions = np.concatenate(atom_positions, axis=0) |
| | |
| | |
| | t_delta_dir = os.path.join(output_dir, f"{t_delta}") |
| | os.makedirs(t_delta_dir, exist_ok=True) |
| | save_to = os.path.join(t_delta_dir, f"{accession_code}.pdb") |
| | saved_to = atom37_to_pdb( |
| | atom_positions=atom_positions, |
| | save_to=save_to, |
| | **extra, |
| | ) |
| | saved_paths.append(saved_to) |
| |
|
| | all_delta_dir = os.path.join(output_dir, "all_delta") |
| | os.makedirs(all_delta_dir, exist_ok=True) |
| | merge_pdbfiles(saved_paths, os.path.join(all_delta_dir, f"{accession_code}.pdb")) |
| |
|
| | return all_delta_dir |
| |
|
| | def setup(self, stage: str) -> None: |
| | """Lightning hook that is called at the beginning of fit (train + validate), validate, |
| | test, or predict. |
| | |
| | This is a good hook when you need to build models dynamically or adjust something about |
| | them. This hook is called on every process when using DDP. |
| | |
| | :param stage: Either `"fit"`, `"validate"`, `"test"`, or `"predict"`. |
| | """ |
| | if self.hparams.compile and stage == "fit": |
| | self.net = torch.compile(self.net) |
| |
|
| | def configure_optimizers(self) -> Dict[str, Any]: |
| | """Choose what optimizers and learning-rate schedulers to use in your optimization. |
| | Normally you'd need one. But in the case of GANs or similar you might have multiple. |
| | |
| | Examples: |
| | https://lightning.ai/docs/pytorch/latest/common/lightning_module.html#configure-optimizers |
| | |
| | :return: A dict containing the configured optimizers and learning-rate schedulers to be used for training. |
| | """ |
| | optimizer = self.hparams.optimizer(params=self.trainer.model.parameters()) |
| | if self.hparams.scheduler is not None: |
| | scheduler = self.hparams.scheduler(optimizer=optimizer) |
| | return { |
| | "optimizer": optimizer, |
| | "lr_scheduler": { |
| | "scheduler": scheduler, |
| | "monitor": "val/loss", |
| | "interval": "epoch", |
| | "frequency": 1, |
| | }, |
| | } |
| | return {"optimizer": optimizer} |
| |
|
| |
|
| | if __name__ == "__main__": |
| | _ = DiffusionLitModule(None, None, None, None, None) |
| |
|