import rclpy from rclpy.node import Node from sensor_msgs.msg import PointCloud2 import numpy as np import open3d as o3d import time class PointCloudSaver(Node): def __init__(self): super().__init__('pcd_saver') self.subscription = self.create_subscription( PointCloud2, '/livox/lidar_filtered', self.listener_callback, 10) self.point_clouds = [] self.start_time = time.time() self.timer = self.create_timer(3.0, self.save_and_exit) self.saving = False def listener_callback(self, msg): if not self.saving: pc = self.pointcloud2_to_xyz_array(msg) if pc is not None: self.point_clouds.append(pc) def pointcloud2_to_xyz_array(self, cloud_msg): # 仅支持 x, y, z 均为 float32 且无 padding 的点云 import struct points = [] point_step = cloud_msg.point_step for i in range(cloud_msg.width * cloud_msg.height): offset = i * point_step x, y, z = struct.unpack_from('fff', cloud_msg.data, offset) points.append([x, y, z]) return np.array(points) def save_and_exit(self): if not self.saving: self.saving = True if self.point_clouds: all_points = np.vstack(self.point_clouds) pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(all_points) o3d.io.write_point_cloud("output.pcd", pcd) self.get_logger().info("Saved output.pcd") rclpy.shutdown() def main(args=None): rclpy.init(args=args) saver = PointCloudSaver() rclpy.spin(saver) if __name__ == '__main__': main()