| from __future__ import annotations |
|
|
| from io import BytesIO |
| from typing import IO, Any, List, NamedTuple |
|
|
| import numpy as np |
| import numpy.typing as npt |
|
|
|
|
| class PointCloudHeader(NamedTuple): |
| """Class for Point Cloud header.""" |
|
|
| version: str |
| fields: List[str] |
| size: List[int] |
| type: List[str] |
| count: List[int] |
| width: int |
| height: int |
| viewpoint: List[int] |
| points: int |
| data: str |
|
|
|
|
| class PointCloud: |
| """ |
| Class for raw .pcd file. |
| """ |
|
|
| def __init__(self, header: PointCloudHeader, points: npt.NDArray[np.float64]) -> None: |
| """ |
| PointCloud. |
| :param header: Pointcloud header. |
| :param points: <np.ndarray, X, N>. X columns, N points. |
| """ |
| self._header = header |
| self._points = points |
|
|
| @property |
| def header(self) -> PointCloudHeader: |
| """ |
| Returns pointcloud header. |
| :return: A PointCloudHeader instance. |
| """ |
| return self._header |
|
|
| @property |
| def points(self) -> npt.NDArray[np.float64]: |
| """ |
| Returns points. |
| :return: <np.ndarray, X, N>. X columns, N points. |
| """ |
| return self._points |
|
|
| def save(self, file_path: str) -> None: |
| """ |
| Saves to .pcd file. |
| :param file_path: The path to the .pcd file. |
| """ |
| with open(file_path, 'wb') as fp: |
| fp.write('# .PCD v{} - Point Cloud Data file format\n'.format(self._header.version).encode('utf8')) |
| for field in self._header._fields: |
| value = getattr(self._header, field) |
| if isinstance(value, list): |
| text = ' '.join(map(str, value)) |
| else: |
| text = str(value) |
| fp.write('{} {}\n'.format(field.upper(), text).encode('utf8')) |
| fp.write(self._points.tobytes()) |
|
|
| @classmethod |
| def parse(cls, pcd_content: bytes) -> PointCloud: |
| """ |
| Parses the pointcloud from byte stream. |
| :param pcd_content: The byte stream that holds the pcd content. |
| :return: A PointCloud object. |
| """ |
| with BytesIO(pcd_content) as stream: |
| header = cls.parse_header(stream) |
| points = cls.parse_points(stream, header) |
| return cls(header, points) |
|
|
| @classmethod |
| def parse_from_file(cls, pcd_file: str) -> PointCloud: |
| """ |
| Parses the pointcloud from .pcd file on disk. |
| :param pcd_file: The path to the .pcd file. |
| :return: A PointCloud instance. |
| """ |
| with open(pcd_file, 'rb') as stream: |
| header = cls.parse_header(stream) |
| points = cls.parse_points(stream, header) |
| return cls(header, points) |
|
|
| @staticmethod |
| def parse_header(stream: IO[Any]) -> PointCloudHeader: |
| """ |
| Parses the header of a pointcloud from byte IO stream. |
| :param stream: Binary stream. |
| :return: A PointCloudHeader instance. |
| """ |
| headers_list = [] |
| while True: |
| line = stream.readline().decode('utf8').strip() |
| if line.startswith('#'): |
| continue |
| columns = line.split() |
| key = columns[0].lower() |
| val = columns[1:] if len(columns) > 2 else columns[1] |
| headers_list.append((key, val)) |
|
|
| if key == 'data': |
| break |
|
|
| headers = dict(headers_list) |
| headers['size'] = list(map(int, headers['size'])) |
| headers['count'] = list(map(int, headers['count'])) |
| headers['width'] = int(headers['width']) |
| headers['height'] = int(headers['height']) |
| headers['viewpoint'] = list(map(int, headers['viewpoint'])) |
| headers['points'] = int(headers['points']) |
| header = PointCloudHeader(**headers) |
|
|
| if any([c != 1 for c in header.count]): |
| raise RuntimeError('"count" has to be 1') |
|
|
| if not len(header.fields) == len(header.size) == len(header.type) == len(header.count): |
| raise RuntimeError('fields/size/type/count field number are inconsistent') |
|
|
| return header |
|
|
| @staticmethod |
| def parse_points(stream: IO[Any], header: PointCloudHeader) -> npt.NDArray[np.float64]: |
| """ |
| Parses points from byte IO stream. |
| :param stream: Byte stream that holds the points. |
| :param header: <np.ndarray, X, N>. A numpy array that has X columns(features), N points. |
| :return: Points of Point Cloud. |
| """ |
| if header.data != 'binary': |
| raise RuntimeError('Un-supported data foramt: {}. "binary" is expected.'.format(header.data)) |
|
|
| |
| row_type = PointCloud.np_type(header) |
| length = row_type.itemsize * header.points |
| buff = stream.read(length) |
| if len(buff) != length: |
| raise RuntimeError('Incomplete pointcloud stream: {} bytes expected, {} got'.format(length, len(buff))) |
|
|
| points = np.frombuffer(buff, row_type) |
|
|
| return points |
|
|
| @staticmethod |
| def np_type(header: PointCloudHeader) -> np.dtype: |
| """ |
| Helper function that translate column types in pointcloud to np types. |
| :param header: A PointCloudHeader object. |
| :return: np.dtype that holds the X features. |
| """ |
| type_mapping = {'I': 'int', 'U': 'uint', 'F': 'float'} |
| np_types = [type_mapping[t] + str(int(s) * 8) for t, s in zip(header.type, header.size)] |
|
|
| return np.dtype([(f, getattr(np, nt)) for f, nt in zip(header.fields, np_types)]) |
|
|
| def to_pcd_bin(self) -> npt.NDArray[np.float32]: |
| """ |
| Converts pointcloud to .pcd.bin format. |
| :return: <np.float32, 5, N>, the point cloud in .pcd.bin format. |
| """ |
| lidar_fields = ['x', 'y', 'z', 'intensity', 'ring'] |
| return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields]) |
|
|
| def to_pcd_bin2(self) -> npt.NDArray[np.float32]: |
| """ |
| Converts pointcloud to .pcd.bin2 format. |
| :return: <np.float32, 6, N>, the point cloud in .pcd.bin2 format. |
| """ |
| lidar_fields = ['x', 'y', 'z', 'intensity', 'ring', 'lidar_info'] |
| return np.array([np.array(self.points[f], dtype=np.float32) for f in lidar_fields]) |
|
|