Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import os | |
| import cv2 | |
| import open3d as o3d | |
| from natsort import natsorted | |
| beam_altitudes = np.deg2rad(np.array([ | |
| 20.97, 18.51, 15.97, 13.37, 12.04, 10.70, 9.35, 8.70, | |
| 7.99, 7.34, 6.62, 5.96, 5.23, 4.55, 3.84, 3.51, | |
| 3.17, 2.82, 2.46, 2.11, 1.76, 1.43, 1.05, 0.70, | |
| 0.36, 0.03, -0.34, -0.70, -1.04, -1.37, -1.76, -2.10, | |
| -2.43, -2.77, -3.15, -3.48, -3.84, -4.18, -4.54, -4.88, | |
| -5.24, -5.55, -5.93, -6.29, -6.63, -6.94, -7.32, -7.67, | |
| -8.00, -8.33, -9.04, -9.71, -10.42, -11.06, -11.77, -12.41, | |
| -13.12, -13.74, -15.06, -16.36, -17.64, -18.91, -20.16, -21.38 | |
| ])) | |
| def read_calib_file(path): | |
| """ | |
| Read KITTI-style calibration file lines like: | |
| parameter: <value> | |
| parameter: <value> | |
| ... | |
| Returns a dict mapping keys to numpy arrays. | |
| """ | |
| d = {} | |
| with open(path, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or ':' not in line or line.startswith('#'): | |
| continue | |
| key, vals = line.split(':', 1) | |
| nums = [float(x) for x in vals.strip().split()] | |
| d[key.strip()] = np.array(nums) | |
| return d | |
| def get_all_corr_files(image_timestamps, dirs, tol=200000, adjust=None): | |
| single = not isinstance(image_timestamps, (list, np.ndarray)) | |
| if single: | |
| image_timestamps = [image_timestamps] | |
| image_timestamps = np.array(image_timestamps, dtype=np.int64) | |
| results_per_dir = [] | |
| indices_per_dir = [] | |
| for dir in dirs: | |
| if adjust is None: | |
| filenames = np.array([f for f in natsorted(os.listdir(dir)) if os.path.isfile(dir + f)]) | |
| else: | |
| filenames = np.array([f for f in natsorted(os.listdir(dir)) if os.path.isfile(dir + f)])[:adjust] | |
| timestamps = np.array([int(f.split('.')[0]) for f in filenames], dtype=np.int64) | |
| diffs = np.abs(image_timestamps[:, None] - timestamps[None, :]) | |
| closest_indices = np.argmin(diffs, axis=1) | |
| closest_diffs = diffs[np.arange(len(image_timestamps)), closest_indices] | |
| violations = closest_diffs > tol | |
| if violations.any(): | |
| bad_ts = image_timestamps[violations] | |
| raise Exception(f"No timestamp in {dir} within tolerance for timestamps: {bad_ts}") | |
| results_per_dir.append(filenames[closest_indices]) | |
| indices_per_dir.append(closest_indices) | |
| if len(dirs) == 1: | |
| results = [f"{dirs[0]}/{f}" for f in results_per_dir[0]] | |
| indices = indices_per_dir[0].tolist() | |
| else: | |
| results = [ | |
| tuple(f"{dirs[i]}/{results_per_dir[i][j]}" for i in range(len(dirs))) | |
| for j in range(len(image_timestamps)) | |
| ] | |
| indices = [ | |
| tuple(int(indices_per_dir[i][j]) for i in range(len(dirs))) | |
| for j in range(len(image_timestamps)) | |
| ] | |
| if single: | |
| return results[0], indices[0] | |
| return results, indices | |
| def get_corr_files(image_timestamp, dirs, tol=200000): | |
| output_filenames = [] | |
| for dir in dirs: | |
| ############ Find closest point cloud and UTM position data ######################## | |
| filenames = np.array([filename for filename in os.listdir(dir) if os.path.isfile(dir+filename)]) | |
| timestamps = np.array([int(filename.split('.')[0]) for filename in os.listdir(dir) if os.path.isfile(dir+filename)]) | |
| closest_lidar = np.argmin(abs(timestamps-int(image_timestamp))) | |
| timestamp_diff = abs(int(image_timestamp)-timestamps[closest_lidar]) | |
| timestamp_tolerance = tol # in microseconds (0.2 seconds) | |
| if timestamp_diff > timestamp_tolerance: | |
| raise Exception(f"No timestamp in {dir} in close enough proximity to {image_timestamp}") | |
| else: | |
| output_filenames.append(f"{dir}/{filenames[closest_lidar]}") | |
| if len(dirs) == 1: | |
| return output_filenames[0] | |
| else: | |
| return tuple(output_filenames) | |
| NUM_COLS = 1024 | |
| def compute_column_index(points_xyz): | |
| az = np.arctan2(points_xyz[:, 1], points_xyz[:, 0]) | |
| az = np.mod(az, 2 * np.pi) | |
| col = np.round(az / (2 * np.pi) * NUM_COLS).astype(int) | |
| return col % NUM_COLS | |
| # beam_altitudes = np.deg2rad(np.array(beam_altitude_angles)) | |
| def compute_ring_ids(points_xyz): | |
| xy_norm = np.linalg.norm(points_xyz[:, :2], axis=1) | |
| elev = np.arctan2(points_xyz[:, 2], xy_norm) | |
| return np.argmin( | |
| np.abs(elev[:, None] - beam_altitudes[None, :]), | |
| axis=1 | |
| ) | |
| def compute_ring_col_from_index(num_points, num_cols=1024): | |
| idx = np.arange(num_points) | |
| ring_ids = idx // num_cols | |
| col_ids = idx % num_cols | |
| return ring_ids, col_ids | |
| def is_valid_point(points_xyz): | |
| return np.linalg.norm(points_xyz, axis=1) > 1e-6 | |
| def fill_ring_known_cols_with_intensity(points_xyzi, ring_ids, col_ids): | |
| """ | |
| points_xyzi: (N,4) -> x,y,z,intensity | |
| """ | |
| filled_points = [] | |
| interp_flags = [] | |
| unique_ring_ids = np.array([i for i in range(0,64)]) | |
| for ring in unique_ring_ids: | |
| ring_mask = ring_ids == ring | |
| ring_pts = points_xyzi[ring_mask] | |
| ring_cols = col_ids[ring_mask] | |
| # storage per column | |
| ring_grid = [None] * NUM_COLS | |
| # place original points | |
| for p, c in zip(ring_pts, ring_cols): | |
| ring_grid[c] = p # keep intensity | |
| elev = beam_altitudes[ring] | |
| cos_e, sin_e = np.cos(elev), np.sin(elev) | |
| for c in range(NUM_COLS): | |
| if ring_grid[c] is not None: | |
| filled_points.append(ring_grid[c]) | |
| interp_flags.append(False) | |
| else: | |
| # find neighbors cyclically | |
| left = (c - 1) % NUM_COLS | |
| right = (c + 1) % NUM_COLS | |
| while ring_grid[left] is None: | |
| left = (left - 1) % NUM_COLS | |
| while ring_grid[right] is None: | |
| right = (right + 1) % NUM_COLS | |
| p0, p1 = ring_grid[left], ring_grid[right] | |
| r0 = np.linalg.norm(p0[:3]) | |
| r1 = np.linalg.norm(p1[:3]) | |
| d = (c - left) % NUM_COLS | |
| span = (right - left) % NUM_COLS | |
| t = d / span if span > 0 else 0.0 | |
| r = (1 - t) * r0 + t * r1 | |
| az = 2 * np.pi * c / NUM_COLS | |
| x = r * cos_e * np.cos(az) | |
| y = r * cos_e * np.sin(az) | |
| z = r * sin_e | |
| filled_points.append([x, y, z, 255]) | |
| interp_flags.append(True) | |
| return np.array(filled_points), np.array(interp_flags) | |
| def fill_ring_known_cols_with_intensity_and_plane( | |
| points_xyzi, | |
| ring_ids, | |
| col_ids, | |
| plane, | |
| max_range=200.0 | |
| ): | |
| """ | |
| points_xyzi: (N,4) -> x,y,z,intensity | |
| """ | |
| a, b, c, d = plane | |
| filled_points = [] | |
| interp_flags = [] | |
| unique_ring_ids = np.array([i for i in range(0,64)]) | |
| for ring in unique_ring_ids: | |
| # print(f"ring {ring}") | |
| ring_mask = ring_ids == ring | |
| ring_pts = points_xyzi[ring_mask] | |
| ring_cols = col_ids[ring_mask] | |
| ring_grid = [None] * NUM_COLS | |
| # print(f"ring grid shape: {np.array(ring_grid).shape}") | |
| # place original points | |
| for p, col in zip(ring_pts, ring_cols): | |
| ring_grid[col] = p | |
| elev = beam_altitudes[ring] | |
| cos_e, sin_e = np.cos(elev), np.sin(elev) | |
| for c_idx in range(NUM_COLS): | |
| # print(f"c_idx: {c_idx}") | |
| if ring_grid[c_idx] is not None: | |
| filled_points.append(ring_grid[c_idx]) | |
| interp_flags.append(False) | |
| continue | |
| # find neighbors cyclically | |
| # print(f"Find neighbours") | |
| # left = (c_idx - 1) % NUM_COLS | |
| # right = (c_idx + 1) % NUM_COLS | |
| # while ring_grid[left] is None: | |
| # left = (left - 1) % NUM_COLS | |
| # while ring_grid[right] is None: | |
| # right = (right + 1) % NUM_COLS | |
| # print(f"p0, p1") | |
| # p0, p1 = ring_grid[left], ring_grid[right] | |
| # r0 = np.linalg.norm(p0[:3]) | |
| # r1 = np.linalg.norm(p1[:3]) | |
| # print(f"dcol, span") | |
| # dcol = (c_idx - left) % NUM_COLS | |
| # span = (right - left) % NUM_COLS | |
| # t_interp = dcol / span if span > 0 else 0.0 | |
| # r_guess = (1 - t_interp) * r0 + t_interp * r1 | |
| # print(f"az") | |
| az = 2 * np.pi * c_idx / NUM_COLS | |
| # Ray direction | |
| dx = cos_e * np.cos(az) | |
| dy = cos_e * np.sin(az) | |
| dz = sin_e | |
| denom = a * dx + b * dy + c * dz | |
| if abs(denom) < 1e-6: | |
| # print(f"point in rang {ring} is parallel") | |
| continue # ray parallel to plane | |
| t_plane = -d / denom | |
| # if t_plane <= 0 or t_plane > max_range: | |
| # # print(f"point in rang {ring} is {t_plane}m away") | |
| # continue | |
| # print(f"Fill point") | |
| x = t_plane * dx | |
| y = t_plane * dy | |
| z = t_plane * dz | |
| filled_points.append([x, y, z, 255]) | |
| interp_flags.append(True) | |
| return np.asarray(filled_points), np.asarray(interp_flags) | |
| def complete_cloud(points_xyzi, plane): | |
| # ring_ids = compute_ring_ids(points_xyzi[:, :3]) | |
| # col_ids = compute_column_index(points_xyzi[:, :3]) | |
| ring_ids, col_ids = compute_ring_col_from_index( | |
| len(points_xyzi), NUM_COLS | |
| ) | |
| valid_mask = is_valid_point(points_xyzi[:, :3]) | |
| # return fill_ring_known_cols_with_intensity_and_plane(points_xyzi, ring_ids, col_ids, plane) | |
| return fill_ring_known_cols_with_intensity_and_heightfield(points_xyzi, ring_ids, col_ids, valid_mask, plane) | |
| def assign_semantic_labels( | |
| points_xyz, | |
| uv, | |
| valid, | |
| semantic_image, | |
| interp_flags=None, | |
| unknown_label=-1 | |
| ): | |
| """ | |
| semantic_image: (H, W) or (H, W, 1) integer labels | |
| """ | |
| H, W = semantic_image.shape[:2] | |
| N = points_xyz.shape[0] | |
| labels = np.full(N, unknown_label, dtype=np.uint8) | |
| # round to nearest pixel | |
| u = np.round(uv[:, 0]).astype(int) | |
| v = np.round(uv[:, 1]).astype(int) | |
| inside = ( | |
| valid & | |
| (u >= 0) & (u < W) & | |
| (v >= 0) & (v < H) | |
| ) | |
| labels[inside] = semantic_image[v[inside], u[inside]] | |
| # Optional: mark interpolated points as unknown | |
| if interp_flags is not None: | |
| labels[interp_flags] = unknown_label | |
| return labels | |
| def fit_height_field_linear(ground_points): | |
| """ | |
| Fit z = ax + by + c to ground points | |
| """ | |
| X = ground_points[:, 0] | |
| Y = ground_points[:, 1] | |
| Z = ground_points[:, 2] | |
| A = np.column_stack((X, Y, np.ones_like(X))) | |
| coef, _, _, _ = np.linalg.lstsq(A, Z, rcond=None) | |
| a, b, c = coef | |
| return a, b, c | |
| def fill_ring_known_cols_with_intensity_and_heightfield( | |
| points_xyzi, | |
| ring_ids, | |
| col_ids, | |
| valid_mask, | |
| height_field, # (a, b, c) | |
| max_range=200.0 | |
| ): | |
| """ | |
| points_xyzi: (N,4) -> x,y,z,intensity | |
| """ | |
| a, b, c = height_field | |
| filled_points = [] | |
| interp_flags = [] | |
| for ring in range(64): | |
| ring_mask = ring_ids == ring | |
| ring_pts = points_xyzi[ring_mask] | |
| ring_cols = col_ids[ring_mask] | |
| ring_valid = valid_mask[ring_mask] | |
| ring_grid = [None] * NUM_COLS | |
| # Place ONLY valid original points | |
| for p, col, v in zip(ring_pts, ring_cols, ring_valid): | |
| if v: | |
| ring_grid[col] = p | |
| elev = beam_altitudes[ring] | |
| cos_e, sin_e = np.cos(elev), np.sin(elev) | |
| for c_idx in range(NUM_COLS): | |
| if ring_grid[c_idx] is not None: | |
| filled_points.append(ring_grid[c_idx]) | |
| interp_flags.append(False) | |
| continue | |
| # Missing return → ray cast | |
| az = 2 * np.pi * c_idx / NUM_COLS | |
| dx = cos_e * np.cos(az) | |
| dy = cos_e * np.sin(az) | |
| dz = sin_e | |
| denom = dz - a * dx - b * dy | |
| if abs(denom) < 1e-6: | |
| continue | |
| t = c / denom | |
| if t <= 0 or t > max_range: | |
| continue | |
| x = t * dx | |
| y = t * dy | |
| z = t * dz | |
| filled_points.append([x, y, z, 255]) | |
| interp_flags.append(True) | |
| return np.asarray(filled_points), np.asarray(interp_flags) | |
| # a, b, c = height_field | |
| # filled_points = [] | |
| # interp_flags = [] | |
| # unique_ring_ids = np.arange(64) | |
| # for ring in unique_ring_ids: | |
| # ring_mask = ring_ids == ring | |
| # ring_pts = points_xyzi[ring_mask] | |
| # ring_cols = col_ids[ring_mask] | |
| # ring_grid = [None] * NUM_COLS | |
| # # place original points | |
| # for p, col in zip(ring_pts, ring_cols): | |
| # ring_grid[col] = p | |
| # elev = beam_altitudes[ring] | |
| # cos_e, sin_e = np.cos(elev), np.sin(elev) | |
| # for c_idx in range(NUM_COLS): | |
| # if ring_grid[c_idx] is not None: | |
| # filled_points.append(ring_grid[c_idx]) | |
| # interp_flags.append(False) | |
| # continue | |
| # az = 2 * np.pi * c_idx / NUM_COLS | |
| # # Unit ray direction | |
| # dx = cos_e * np.cos(az) | |
| # dy = cos_e * np.sin(az) | |
| # dz = sin_e | |
| # denom = dz - a * dx - b * dy | |
| # if abs(denom) < 1e-6: | |
| # continue # ray parallel to height field | |
| # t = c / denom | |
| # if t <= 0 or t > max_range: | |
| # continue | |
| # x = t * dx | |
| # y = t * dy | |
| # z = t * dz | |
| # filled_points.append([x, y, z, 255]) | |
| # interp_flags.append(True) | |
| # return np.asarray(filled_points), np.asarray(interp_flags) | |
| def create_height_field_mesh(plane, xlim, ylim, resolution=1.0): | |
| a, b, c = plane | |
| xs = np.arange(xlim[0], xlim[1], resolution) | |
| ys = np.arange(ylim[0], ylim[1], resolution) | |
| xx, yy = np.meshgrid(xs, ys) | |
| zz = a * xx + b * yy + c | |
| points = np.column_stack((xx.ravel(), yy.ravel(), zz.ravel())) | |
| mesh = o3d.geometry.TriangleMesh() | |
| mesh.vertices = o3d.utility.Vector3dVector(points) | |
| triangles = [] | |
| w = len(xs) | |
| h = len(ys) | |
| for y in range(h - 1): | |
| for x in range(w - 1): | |
| i = y * w + x | |
| triangles.append([i, i + 1, i + w]) | |
| triangles.append([i + 1, i + w + 1, i + w]) | |
| mesh.triangles = o3d.utility.Vector3iVector(triangles) | |
| mesh.compute_vertex_normals() | |
| mesh.paint_uniform_color([0.8, 0.8, 0.8]) | |
| return mesh | |