"""inference.py — Sampling / inference for NSGF and NSGF++. Implements: - NSGF Euler-step inference (standard model) - NSGF++ two-phase inference (NSGF → phase transition → NSF) Reference: arXiv:2401.14069, Section 4.4, Appendix D """ import torch import torch.nn as nn from typing import Optional, Tuple, List from dataset_loader import DatasetLoader class NSGFSampler: """Sampler using a trained NSGF velocity field model.""" def __init__(self, model: nn.Module, data_loader: DatasetLoader, num_steps: int = 10, device: str = "cpu"): self.model = model.to(device) self.model.eval() self.data_loader = data_loader self.num_steps = num_steps self.device = device @torch.no_grad() def sample(self, n: int) -> torch.Tensor: X = self.data_loader.sample_source(n, self.device) dt = 1.0 / self.num_steps for step in range(self.num_steps): t = torch.full((n,), step * dt, device=self.device) v = self.model(X, t) X = X + dt * v return X @torch.no_grad() def sample_trajectory(self, n: int) -> List[torch.Tensor]: X = self.data_loader.sample_source(n, self.device) trajectory = [X.clone()] dt = 1.0 / self.num_steps for step in range(self.num_steps): t = torch.full((n,), step * dt, device=self.device) v = self.model(X, t) X = X + dt * v trajectory.append(X.clone()) return trajectory class NSGFPlusPlusSampler: """Sampler for the NSGF++ two-phase model. Phase 1 (NSGF): ≤5 Euler steps with Sinkhorn velocity field Phase 2 (NSF): Straight flow velocity field Total NFE = nsgf_steps + nsf_steps """ def __init__(self, nsgf_model: nn.Module, nsf_model: nn.Module, phase_predictor: Optional[nn.Module], data_loader: DatasetLoader, nsgf_steps: int = 5, nsf_steps: int = 55, device: str = "cpu"): self.nsgf_model = nsgf_model.to(device) self.nsf_model = nsf_model.to(device) self.nsgf_model.eval() self.nsf_model.eval() if phase_predictor is not None: self.phase_predictor = phase_predictor.to(device) self.phase_predictor.eval() else: self.phase_predictor = None self.data_loader = data_loader self.nsgf_steps = nsgf_steps self.nsf_steps = nsf_steps self.device = device @torch.no_grad() def sample(self, n: int) -> torch.Tensor: X = self.data_loader.sample_source(n, self.device) dt_nsgf = 1.0 / self.nsgf_steps for step in range(self.nsgf_steps): t = torch.full((n,), step * dt_nsgf, device=self.device) v = self.nsgf_model(X, t) X = X + dt_nsgf * v if self.phase_predictor is not None: t_start = self.phase_predictor(X) else: t_start = torch.zeros(n, device=self.device) dt_nsf = 1.0 / self.nsf_steps for step in range(self.nsf_steps): t_current = t_start + step * dt_nsf * (1.0 - t_start) t_current = t_current.clamp(0, 1) v = self.nsf_model(X, t_current) X = X + dt_nsf * (1.0 - t_start.view(-1, *([1] * (X.dim() - 1)))) * v return X @torch.no_grad() def sample_simple(self, n: int) -> torch.Tensor: """Simplified: NSGF then NSF from t=0 to t=1.""" X = self.data_loader.sample_source(n, self.device) dt_nsgf = 1.0 / self.nsgf_steps for step in range(self.nsgf_steps): t = torch.full((n,), step * dt_nsgf, device=self.device) v = self.nsgf_model(X, t) X = X + dt_nsgf * v dt_nsf = 1.0 / self.nsf_steps for step in range(self.nsf_steps): t = torch.full((n,), step * dt_nsf, device=self.device) v = self.nsf_model(X, t) X = X + dt_nsf * v return X @torch.no_grad() def sample_trajectory(self, n: int) -> Tuple[List[torch.Tensor], int]: trajectory = [] X = self.data_loader.sample_source(n, self.device) trajectory.append(X.clone()) dt_nsgf = 1.0 / self.nsgf_steps for step in range(self.nsgf_steps): t = torch.full((n,), step * dt_nsgf, device=self.device) v = self.nsgf_model(X, t) X = X + dt_nsgf * v trajectory.append(X.clone()) phase_boundary = len(trajectory) - 1 dt_nsf = 1.0 / self.nsf_steps for step in range(self.nsf_steps): t = torch.full((n,), step * dt_nsf, device=self.device) v = self.nsf_model(X, t) X = X + dt_nsf * v trajectory.append(X.clone()) return trajectory, phase_boundary