| |
| |
| |
| |
| |
|
|
| import unittest |
|
|
| import numpy as np |
| import torch |
| from pytorch3d.ops import points_alignment |
| from pytorch3d.structures.pointclouds import Pointclouds |
| from pytorch3d.transforms import rotation_conversions |
|
|
| from .common_testing import get_tests_dir, TestCaseMixin |
|
|
|
|
| def _apply_pcl_transformation(X, R, T, s=None): |
| """ |
| Apply a batch of similarity/rigid transformations, parametrized with |
| rotation `R`, translation `T` and scale `s`, to an input batch of |
| point clouds `X`. |
| """ |
| if isinstance(X, Pointclouds): |
| num_points = X.num_points_per_cloud() |
| X_t = X.points_padded() |
| else: |
| X_t = X |
|
|
| if s is not None: |
| X_t = s[:, None, None] * X_t |
|
|
| X_t = torch.bmm(X_t, R) + T[:, None, :] |
|
|
| if isinstance(X, Pointclouds): |
| X_list = [x[:n_p] for x, n_p in zip(X_t, num_points)] |
| X_t = Pointclouds(X_list) |
|
|
| return X_t |
|
|
|
|
| class TestICP(TestCaseMixin, unittest.TestCase): |
| def setUp(self) -> None: |
| super().setUp() |
| torch.manual_seed(42) |
| np.random.seed(42) |
| trimesh_results_path = get_tests_dir() / "data/icp_data.pth" |
| self.trimesh_results = torch.load(trimesh_results_path) |
|
|
| @staticmethod |
| def iterative_closest_point( |
| batch_size=10, |
| n_points_X=100, |
| n_points_Y=100, |
| dim=3, |
| use_pointclouds=False, |
| estimate_scale=False, |
| ): |
|
|
| device = torch.device("cuda:0") |
|
|
| |
| X, Y = [ |
| TestCorrespondingPointsAlignment.init_point_cloud( |
| batch_size=batch_size, |
| n_points=n_points, |
| dim=dim, |
| device=device, |
| use_pointclouds=use_pointclouds, |
| random_pcl_size=True, |
| fix_seed=i, |
| ) |
| for i, n_points in enumerate((n_points_X, n_points_Y)) |
| ] |
|
|
| torch.cuda.synchronize() |
|
|
| def run_iterative_closest_point(): |
| points_alignment.iterative_closest_point( |
| X, |
| Y, |
| estimate_scale=estimate_scale, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| relative_rmse_thr=1e-4, |
| ) |
| torch.cuda.synchronize() |
|
|
| return run_iterative_closest_point |
|
|
| def test_init_transformation(self, batch_size=10): |
| """ |
| First runs a full ICP on a random problem. Then takes a given point |
| in the history of ICP iteration transformations, initializes |
| a second run of ICP with this transformation and checks whether |
| both runs ended with the same solution. |
| """ |
|
|
| device = torch.device("cuda:0") |
|
|
| for dim in (2, 3, 11): |
| for n_points_X in (30, 100): |
| for n_points_Y in (30, 100): |
| |
| X, Y = [ |
| TestCorrespondingPointsAlignment.init_point_cloud( |
| batch_size=batch_size, |
| n_points=n_points, |
| dim=dim, |
| device=device, |
| use_pointclouds=False, |
| random_pcl_size=True, |
| ) |
| for n_points in (n_points_X, n_points_Y) |
| ] |
|
|
| |
| ( |
| converged, |
| _, |
| Xt, |
| (R, T, s), |
| t_hist, |
| ) = points_alignment.iterative_closest_point( |
| X, |
| Y, |
| estimate_scale=False, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| ) |
|
|
| |
| |
| t_init = t_hist[min(2, len(t_hist) - 1)] |
|
|
| |
| ( |
| converged_init, |
| _, |
| Xt_init, |
| (R_init, T_init, s_init), |
| t_hist_init, |
| ) = points_alignment.iterative_closest_point( |
| X, |
| Y, |
| init_transform=t_init, |
| estimate_scale=False, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| ) |
|
|
| |
| |
| atol = 3e-5 |
| self.assertClose(R_init, R, atol=atol) |
| self.assertClose(T_init, T, atol=atol) |
| self.assertClose(s_init, s, atol=atol) |
| self.assertClose(Xt_init, Xt, atol=atol) |
|
|
| def test_heterogeneous_inputs(self, batch_size=7): |
| """ |
| Tests whether we get the same result when running ICP on |
| a set of randomly-sized Pointclouds and on their padded versions. |
| """ |
|
|
| torch.manual_seed(4) |
| device = torch.device("cuda:0") |
|
|
| for estimate_scale in (True, False): |
| for max_n_points in (10, 30, 100): |
| |
| X_pcl, Y_pcl = [ |
| TestCorrespondingPointsAlignment.init_point_cloud( |
| batch_size=batch_size, |
| n_points=max_n_points, |
| dim=3, |
| device=device, |
| use_pointclouds=True, |
| random_pcl_size=True, |
| ) |
| for _ in range(2) |
| ] |
|
|
| |
| X_padded = X_pcl.points_padded() |
| Y_padded = Y_pcl.points_padded() |
| n_points_X = X_pcl.num_points_per_cloud() |
| n_points_Y = Y_pcl.num_points_per_cloud() |
|
|
| |
| ( |
| _, |
| _, |
| Xt_pcl, |
| (R_pcl, T_pcl, s_pcl), |
| _, |
| ) = points_alignment.iterative_closest_point( |
| X_pcl, |
| Y_pcl, |
| estimate_scale=estimate_scale, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| ) |
| Xt_pcl = Xt_pcl.points_padded() |
|
|
| |
| |
| icp_results = [ |
| points_alignment.iterative_closest_point( |
| X_[None, :n_X, :], |
| Y_[None, :n_Y, :], |
| estimate_scale=estimate_scale, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| ) |
| for X_, Y_, n_X, n_Y in zip( |
| X_padded, Y_padded, n_points_X, n_points_Y |
| ) |
| ] |
|
|
| |
| R, T, s = [ |
| torch.cat([x.RTs[i] for x in icp_results], dim=0) for i in range(3) |
| ] |
|
|
| |
| atol = 1e-5 |
| self.assertClose(R_pcl, R, atol=atol) |
| self.assertClose(T_pcl, T, atol=atol) |
| self.assertClose(s_pcl, s, atol=atol) |
|
|
| |
| for pcli in range(batch_size): |
| nX = n_points_X[pcli] |
| Xt_ = icp_results[pcli].Xt[0, :nX] |
| Xt_pcl_ = Xt_pcl[pcli][:nX] |
| self.assertClose(Xt_pcl_, Xt_, atol=atol) |
|
|
| def test_compare_with_trimesh(self): |
| """ |
| Compares the outputs of `iterative_closest_point` with the results |
| of `trimesh.registration.icp` from the `trimesh` python package: |
| https://github.com/mikedh/trimesh |
| |
| We have run `trimesh.registration.icp` on several random problems |
| with different point cloud sizes. The results of trimesh, together with |
| the randomly generated input clouds are loaded in the constructor of |
| this class and this test compares the loaded results to our runs. |
| """ |
| for n_points_X in (10, 20, 50, 100): |
| for n_points_Y in (10, 20, 50, 100): |
| self._compare_with_trimesh(n_points_X=n_points_X, n_points_Y=n_points_Y) |
|
|
| def _compare_with_trimesh( |
| self, n_points_X=100, n_points_Y=100, estimate_scale=False |
| ): |
| """ |
| Executes a single test for `iterative_closest_point` for a |
| specific setting of the inputs / outputs. Compares the result with |
| the result of the trimesh package on the same input data. |
| """ |
|
|
| device = torch.device("cuda:0") |
|
|
| |
| key = (int(n_points_X), int(n_points_Y), int(estimate_scale)) |
| X, Y, R_trimesh, T_trimesh, s_trimesh = [ |
| x.to(device) for x in self.trimesh_results[key] |
| ] |
|
|
| |
| ( |
| converged, |
| _, |
| _, |
| (R_ours, T_ours, s_ours), |
| _, |
| ) = points_alignment.iterative_closest_point( |
| X, |
| Y, |
| estimate_scale=estimate_scale, |
| allow_reflection=False, |
| verbose=False, |
| max_iterations=100, |
| ) |
|
|
| |
| |
| atol = 1e-5 |
| self.assertClose(R_ours, R_trimesh, atol=atol) |
| self.assertClose(T_ours, T_trimesh, atol=atol) |
| self.assertClose(s_ours, s_trimesh, atol=atol) |
| self.assertTrue(converged) |
|
|
|
|
| class TestCorrespondingPointsAlignment(TestCaseMixin, unittest.TestCase): |
| def setUp(self) -> None: |
| super().setUp() |
| torch.manual_seed(42) |
| np.random.seed(42) |
|
|
| @staticmethod |
| def random_rotation(batch_size, dim, device=None): |
| """ |
| Generates a batch of random `dim`-dimensional rotation matrices. |
| """ |
| if dim == 3: |
| R = rotation_conversions.random_rotations(batch_size, device=device) |
| else: |
| |
| |
| |
| H = torch.randn(batch_size, dim, dim, dtype=torch.float32, device=device) |
| U, _, V = torch.svd(H) |
| E = torch.eye(dim, dtype=torch.float32, device=device)[None].repeat( |
| batch_size, 1, 1 |
| ) |
| E[:, -1, -1] = torch.det(torch.bmm(U, V.transpose(2, 1))) |
| R = torch.bmm(torch.bmm(U, E), V.transpose(2, 1)) |
| assert torch.allclose(torch.det(R), R.new_ones(batch_size), atol=1e-4) |
|
|
| return R |
|
|
| @staticmethod |
| def init_point_cloud( |
| batch_size=10, |
| n_points=1000, |
| dim=3, |
| device=None, |
| use_pointclouds=False, |
| random_pcl_size=True, |
| fix_seed=None, |
| ): |
| """ |
| Generate a batch of normally distributed point clouds. |
| """ |
|
|
| if fix_seed is not None: |
| |
| seed = torch.random.get_rng_state() |
| torch.manual_seed(fix_seed) |
|
|
| if use_pointclouds: |
| assert dim == 3, "Pointclouds support only 3-dim points." |
| |
| |
| if random_pcl_size: |
| n_points_per_batch = torch.randint( |
| low=4, |
| high=n_points, |
| size=(batch_size,), |
| device=device, |
| dtype=torch.int64, |
| ) |
| X_list = [ |
| torch.randn(int(n_pt), dim, device=device, dtype=torch.float32) |
| for n_pt in n_points_per_batch |
| ] |
| X = Pointclouds(X_list) |
| else: |
| X = torch.randn( |
| batch_size, n_points, dim, device=device, dtype=torch.float32 |
| ) |
| X = Pointclouds(list(X)) |
| else: |
| X = torch.randn( |
| batch_size, n_points, dim, device=device, dtype=torch.float32 |
| ) |
|
|
| if fix_seed: |
| torch.random.set_rng_state(seed) |
|
|
| return X |
|
|
| @staticmethod |
| def generate_pcl_transformation( |
| batch_size=10, scale=False, reflect=False, dim=3, device=None |
| ): |
| """ |
| Generate a batch of random rigid/similarity transformations. |
| """ |
| R = TestCorrespondingPointsAlignment.random_rotation( |
| batch_size, dim, device=device |
| ) |
| T = torch.randn(batch_size, dim, dtype=torch.float32, device=device) |
| if scale: |
| s = torch.rand(batch_size, dtype=torch.float32, device=device) + 0.1 |
| else: |
| s = torch.ones(batch_size, dtype=torch.float32, device=device) |
|
|
| return R, T, s |
|
|
| @staticmethod |
| def generate_random_reflection(batch_size=10, dim=3, device=None): |
| """ |
| Generate a batch of reflection matrices of shape (batch_size, dim, dim), |
| where M_i is an identity matrix with one random entry on the |
| diagonal equal to -1. |
| """ |
| |
| |
| dim_to_reflect = torch.randint( |
| low=0, high=dim, size=(batch_size,), device=device, dtype=torch.int64 |
| ) |
|
|
| |
| M = torch.diag_embed( |
| ( |
| dim_to_reflect[:, None] |
| != torch.arange(dim, device=device, dtype=torch.float32) |
| ).float() |
| * 2 |
| - 1, |
| dim1=1, |
| dim2=2, |
| ) |
|
|
| return M |
|
|
| @staticmethod |
| def corresponding_points_alignment( |
| batch_size=10, |
| n_points=100, |
| dim=3, |
| use_pointclouds=False, |
| estimate_scale=False, |
| allow_reflection=False, |
| reflect=False, |
| random_weights=False, |
| ): |
|
|
| device = torch.device("cuda:0") |
|
|
| |
| X = TestCorrespondingPointsAlignment.init_point_cloud( |
| batch_size=batch_size, |
| n_points=n_points, |
| dim=dim, |
| device=device, |
| use_pointclouds=use_pointclouds, |
| random_pcl_size=True, |
| ) |
|
|
| |
| R, T, s = TestCorrespondingPointsAlignment.generate_pcl_transformation( |
| batch_size=batch_size, |
| scale=estimate_scale, |
| reflect=reflect, |
| dim=dim, |
| device=device, |
| ) |
|
|
| |
| |
| X_t = _apply_pcl_transformation(X, R, T, s=s) |
|
|
| weights = None |
| if random_weights: |
| template = X.points_padded() if use_pointclouds else X |
| weights = torch.rand_like(template[:, :, 0]) |
| weights = weights / weights.sum(dim=1, keepdim=True) |
| |
| |
| weights *= (weights * template.size()[1] > 0.3).to(weights) |
| if use_pointclouds: |
| weights = [ |
| w[:npts] for w, npts in zip(weights, X.num_points_per_cloud()) |
| ] |
|
|
| torch.cuda.synchronize() |
|
|
| def run_corresponding_points_alignment(): |
| points_alignment.corresponding_points_alignment( |
| X, |
| X_t, |
| weights, |
| allow_reflection=allow_reflection, |
| estimate_scale=estimate_scale, |
| ) |
| torch.cuda.synchronize() |
|
|
| return run_corresponding_points_alignment |
|
|
| def test_corresponding_points_alignment(self, batch_size=10): |
| """ |
| Tests whether we can estimate a rigid/similarity motion between |
| a randomly initialized point cloud and its randomly transformed version. |
| |
| The tests are done for all possible combinations |
| of the following boolean flags: |
| - estimate_scale ... Estimate also a scaling component of |
| the transformation. |
| - reflect ... The ground truth orthonormal part of the generated |
| transformation is a reflection (det==-1). |
| - allow_reflection ... If True, the orthonormal matrix of the |
| estimated transformation is allowed to be |
| a reflection (det==-1). |
| - use_pointclouds ... If True, passes the Pointclouds objects |
| to corresponding_points_alignment. |
| """ |
| |
| for n_points in (100, 3, 2, 1): |
| |
| for dim in range(2, 10): |
| |
| use_point_clouds_cases = ( |
| (True, False) if dim == 3 and n_points > 3 else (False,) |
| ) |
| for random_weights in (False, True): |
| for use_pointclouds in use_point_clouds_cases: |
| for estimate_scale in (False, True): |
| for reflect in (False, True): |
| for allow_reflection in (False, True): |
| self._test_single_corresponding_points_alignment( |
| batch_size=10, |
| n_points=n_points, |
| dim=dim, |
| use_pointclouds=use_pointclouds, |
| estimate_scale=estimate_scale, |
| reflect=reflect, |
| allow_reflection=allow_reflection, |
| random_weights=random_weights, |
| ) |
|
|
| def _test_single_corresponding_points_alignment( |
| self, |
| batch_size=10, |
| n_points=100, |
| dim=3, |
| use_pointclouds=False, |
| estimate_scale=False, |
| reflect=False, |
| allow_reflection=False, |
| random_weights=False, |
| ): |
| """ |
| Executes a single test for `corresponding_points_alignment` for a |
| specific setting of the inputs / outputs. |
| """ |
|
|
| device = torch.device("cuda:0") |
|
|
| |
| X = TestCorrespondingPointsAlignment.init_point_cloud( |
| batch_size=batch_size, |
| n_points=n_points, |
| dim=dim, |
| device=device, |
| use_pointclouds=use_pointclouds, |
| random_pcl_size=True, |
| ) |
|
|
| |
| R, T, s = TestCorrespondingPointsAlignment.generate_pcl_transformation( |
| batch_size=batch_size, |
| scale=estimate_scale, |
| reflect=reflect, |
| dim=dim, |
| device=device, |
| ) |
|
|
| if reflect: |
| |
| M = TestCorrespondingPointsAlignment.generate_random_reflection( |
| batch_size=batch_size, dim=dim, device=device |
| ) |
| R = torch.bmm(M, R) |
|
|
| weights = None |
| if random_weights: |
| template = X.points_padded() if use_pointclouds else X |
| weights = torch.rand_like(template[:, :, 0]) |
| weights = weights / weights.sum(dim=1, keepdim=True) |
| |
| |
| weights *= (weights * template.size()[1] > 0.3).to(weights) |
| if use_pointclouds: |
| weights = [ |
| w[:npts] for w, npts in zip(weights, X.num_points_per_cloud()) |
| ] |
|
|
| |
| |
| X_t = _apply_pcl_transformation(X, R, T, s=s) |
|
|
| |
| R_est, T_est, s_est = points_alignment.corresponding_points_alignment( |
| X, |
| X_t, |
| weights, |
| allow_reflection=allow_reflection, |
| estimate_scale=estimate_scale, |
| ) |
|
|
| assert_error_message = ( |
| f"Corresponding_points_alignment assertion failure for " |
| f"n_points={n_points}, " |
| f"dim={dim}, " |
| f"use_pointclouds={use_pointclouds}, " |
| f"estimate_scale={estimate_scale}, " |
| f"reflect={reflect}, " |
| f"allow_reflection={allow_reflection}," |
| f"random_weights={random_weights}." |
| ) |
|
|
| |
| if random_weights and not use_pointclouds and n_points >= (dim + 10): |
| |
| X_noisy = X_t.clone() |
| _, mink_idx = torch.topk(-weights, int(n_points * 0.2), dim=1) |
| mink_idx = mink_idx[:, :, None].expand(-1, -1, X_t.shape[-1]) |
| X_noisy.scatter_add_( |
| 1, mink_idx, 0.3 * torch.randn_like(mink_idx, dtype=X_t.dtype) |
| ) |
|
|
| def align_and_get_mse(weights_): |
| R_n, T_n, s_n = points_alignment.corresponding_points_alignment( |
| X_noisy, |
| X_t, |
| weights_, |
| allow_reflection=allow_reflection, |
| estimate_scale=estimate_scale, |
| ) |
|
|
| X_t_est = _apply_pcl_transformation(X_noisy, R_n, T_n, s=s_n) |
|
|
| return (((X_t_est - X_t) * weights[..., None]) ** 2).sum( |
| dim=(1, 2) |
| ) / weights.sum(dim=-1) |
|
|
| |
| self.assertTrue( |
| torch.all(align_and_get_mse(weights) <= align_and_get_mse(None)) |
| ) |
|
|
| if reflect and not allow_reflection: |
| |
| self._assert_all_close( |
| torch.det(R_est), |
| R_est.new_ones(batch_size), |
| assert_error_message, |
| atol=2e-5, |
| ) |
|
|
| else: |
| |
| w = ( |
| torch.ones_like(R_est[:, 0, 0]) |
| if weights is None or n_points >= dim + 10 |
| else (weights > 0.0).all(dim=1).to(R_est) |
| ) |
| |
| |
| if n_points >= (dim + 1): |
| |
| |
| msg = assert_error_message |
| self._assert_all_close(R_est, R, msg, w[:, None, None], atol=1e-5) |
| self._assert_all_close(T_est, T, msg, w[:, None]) |
| self._assert_all_close(s_est, s, msg, w) |
|
|
| |
| |
| desired_det = R_est.new_ones(batch_size) |
| if reflect: |
| desired_det *= -1.0 |
| self._assert_all_close(torch.det(R_est), desired_det, msg, w, atol=2e-5) |
|
|
| |
| |
| X_t_est = _apply_pcl_transformation(X, R_est, T_est, s=s_est) |
| self._assert_all_close( |
| X_t, X_t_est, assert_error_message, w[:, None, None], atol=2e-5 |
| ) |
|
|
| def _assert_all_close(self, a_, b_, err_message, weights=None, atol=1e-6): |
| if isinstance(a_, Pointclouds): |
| a_ = a_.points_packed() |
| if isinstance(b_, Pointclouds): |
| b_ = b_.points_packed() |
| if weights is None: |
| self.assertClose(a_, b_, atol=atol, msg=err_message) |
| else: |
| self.assertClose(a_ * weights, b_ * weights, atol=atol, msg=err_message) |
|
|