| """inference.py — Sampling / inference for NSGF and NSGF++. |
| |
| Implements: |
| - NSGF Euler-step inference (standard model) |
| - NSGF++ two-phase inference (NSGF → phase transition → NSF) |
| |
| Reference: arXiv:2401.14069, Section 4.4, Appendix D |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| from typing import Optional, Tuple, List |
| from dataset_loader import DatasetLoader |
|
|
|
|
| class NSGFSampler: |
| """Sampler using a trained NSGF velocity field model.""" |
| def __init__(self, model: nn.Module, data_loader: DatasetLoader, |
| num_steps: int = 10, device: str = "cpu"): |
| self.model = model.to(device) |
| self.model.eval() |
| self.data_loader = data_loader |
| self.num_steps = num_steps |
| self.device = device |
|
|
| @torch.no_grad() |
| def sample(self, n: int) -> torch.Tensor: |
| X = self.data_loader.sample_source(n, self.device) |
| dt = 1.0 / self.num_steps |
| for step in range(self.num_steps): |
| t = torch.full((n,), step * dt, device=self.device) |
| v = self.model(X, t) |
| X = X + dt * v |
| return X |
|
|
| @torch.no_grad() |
| def sample_trajectory(self, n: int) -> List[torch.Tensor]: |
| X = self.data_loader.sample_source(n, self.device) |
| trajectory = [X.clone()] |
| dt = 1.0 / self.num_steps |
| for step in range(self.num_steps): |
| t = torch.full((n,), step * dt, device=self.device) |
| v = self.model(X, t) |
| X = X + dt * v |
| trajectory.append(X.clone()) |
| return trajectory |
|
|
|
|
| class NSGFPlusPlusSampler: |
| """Sampler for the NSGF++ two-phase model. |
| Phase 1 (NSGF): ≤5 Euler steps with Sinkhorn velocity field |
| Phase 2 (NSF): Straight flow velocity field |
| Total NFE = nsgf_steps + nsf_steps |
| """ |
| def __init__(self, nsgf_model: nn.Module, nsf_model: nn.Module, |
| phase_predictor: Optional[nn.Module], data_loader: DatasetLoader, |
| nsgf_steps: int = 5, nsf_steps: int = 55, device: str = "cpu"): |
| self.nsgf_model = nsgf_model.to(device) |
| self.nsf_model = nsf_model.to(device) |
| self.nsgf_model.eval() |
| self.nsf_model.eval() |
| if phase_predictor is not None: |
| self.phase_predictor = phase_predictor.to(device) |
| self.phase_predictor.eval() |
| else: |
| self.phase_predictor = None |
| self.data_loader = data_loader |
| self.nsgf_steps = nsgf_steps |
| self.nsf_steps = nsf_steps |
| self.device = device |
|
|
| @torch.no_grad() |
| def sample(self, n: int) -> torch.Tensor: |
| X = self.data_loader.sample_source(n, self.device) |
| dt_nsgf = 1.0 / self.nsgf_steps |
| for step in range(self.nsgf_steps): |
| t = torch.full((n,), step * dt_nsgf, device=self.device) |
| v = self.nsgf_model(X, t) |
| X = X + dt_nsgf * v |
| if self.phase_predictor is not None: |
| t_start = self.phase_predictor(X) |
| else: |
| t_start = torch.zeros(n, device=self.device) |
| dt_nsf = 1.0 / self.nsf_steps |
| for step in range(self.nsf_steps): |
| t_current = t_start + step * dt_nsf * (1.0 - t_start) |
| t_current = t_current.clamp(0, 1) |
| v = self.nsf_model(X, t_current) |
| X = X + dt_nsf * (1.0 - t_start.view(-1, *([1] * (X.dim() - 1)))) * v |
| return X |
|
|
| @torch.no_grad() |
| def sample_simple(self, n: int) -> torch.Tensor: |
| """Simplified: NSGF then NSF from t=0 to t=1.""" |
| X = self.data_loader.sample_source(n, self.device) |
| dt_nsgf = 1.0 / self.nsgf_steps |
| for step in range(self.nsgf_steps): |
| t = torch.full((n,), step * dt_nsgf, device=self.device) |
| v = self.nsgf_model(X, t) |
| X = X + dt_nsgf * v |
| dt_nsf = 1.0 / self.nsf_steps |
| for step in range(self.nsf_steps): |
| t = torch.full((n,), step * dt_nsf, device=self.device) |
| v = self.nsf_model(X, t) |
| X = X + dt_nsf * v |
| return X |
|
|
| @torch.no_grad() |
| def sample_trajectory(self, n: int) -> Tuple[List[torch.Tensor], int]: |
| trajectory = [] |
| X = self.data_loader.sample_source(n, self.device) |
| trajectory.append(X.clone()) |
| dt_nsgf = 1.0 / self.nsgf_steps |
| for step in range(self.nsgf_steps): |
| t = torch.full((n,), step * dt_nsgf, device=self.device) |
| v = self.nsgf_model(X, t) |
| X = X + dt_nsgf * v |
| trajectory.append(X.clone()) |
| phase_boundary = len(trajectory) - 1 |
| dt_nsf = 1.0 / self.nsf_steps |
| for step in range(self.nsf_steps): |
| t = torch.full((n,), step * dt_nsf, device=self.device) |
| v = self.nsf_model(X, t) |
| X = X + dt_nsf * v |
| trajectory.append(X.clone()) |
| return trajectory, phase_boundary |
|
|