This commit is contained in:
Robofish 2025-07-13 00:02:41 +08:00
parent 096e317353
commit 15e01f4892
2 changed files with 138 additions and 0 deletions

63
caijian.py Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2, PointField
import numpy as np
import struct
from sklearn.cluster import DBSCAN
class LidarFilterNode(Node):
def __init__(self):
super().__init__('caijian_node')
self.publisher_ = self.create_publisher(PointCloud2, '/livox/lidar_filtered', 10)
self.subscription = self.create_subscription(
PointCloud2,
'/livox/lidar/pointcloud',
self.filter_callback,
10)
self.get_logger().info('caijian_node started, numpy filtering z in [1.5,3]m, distance<=12m, remove isolated points')
def filter_callback(self, msg):
num_points = msg.width * msg.height
data = np.frombuffer(msg.data, dtype=np.uint8)
points = np.zeros((num_points, 4), dtype=np.float32) # x, y, z, intensity
for i in range(num_points):
offset = i * msg.point_step
x = struct.unpack_from('f', data, offset)[0]
y = struct.unpack_from('f', data, offset + 4)[0]
z = struct.unpack_from('f', data, offset + 8)[0]
intensity = struct.unpack_from('f', data, offset + 12)[0]
points[i] = [x, y, z, intensity]
z_mask = (points[:,2] >= 1.5) & (points[:,2] <= 3.0)
dist_mask = np.linalg.norm(points[:,:3], axis=1) <= 12.0
mask = z_mask & dist_mask
filtered_points = points[mask]
# 使用DBSCAN去除孤立点
if filtered_points.shape[0] > 0:
clustering = DBSCAN(eps=0.3, min_samples=5).fit(filtered_points[:,:3])
core_mask = clustering.labels_ != -1
filtered_points = filtered_points[core_mask]
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1),
]
filtered_points_list = filtered_points.tolist()
import sensor_msgs_py.point_cloud2 as pc2
filtered_msg = pc2.create_cloud(msg.header, fields, filtered_points_list)
self.publisher_.publish(filtered_msg)
def main(args=None):
rclpy.init(args=args)
node = LidarFilterNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

75
pcd2pgm.py Executable file
View File

@ -0,0 +1,75 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import OccupancyGrid
import numpy as np
import struct
import time
class PointCloudToGrid(Node):
def __init__(self):
super().__init__('pointcloud_to_grid')
self.subscription = self.create_subscription(
PointCloud2,
'/livox/lidar_filtered',
self.pointcloud_callback,
10)
self.publisher = self.create_publisher(OccupancyGrid, '/lidar_grid', 10)
self.grid_size = 2000
self.resolution = 0.02
self.origin_x = -20.0
self.origin_y = -20.0
self.points_buffer = []
self.last_header = None
# 定时器每0.5秒触发一次
self.timer = self.create_timer(0.5, self.publish_grid)
def pointcloud_callback(self, msg):
points = self.pointcloud2_to_xyz_array(msg)
self.points_buffer.append(points)
self.last_header = msg.header # 保存最新header用于地图消息
def publish_grid(self):
if not self.points_buffer:
return
# 合并0.5秒内所有点
all_points = np.concatenate(self.points_buffer, axis=0)
grid = np.zeros((self.grid_size, self.grid_size), dtype=np.int8)
for x, y, z in all_points:
if z < 2.0:
ix = int((x - self.origin_x) / self.resolution)
iy = int((y - self.origin_y) / self.resolution)
if 0 <= ix < self.grid_size and 0 <= iy < self.grid_size:
grid[iy, ix] = 100
grid_msg = OccupancyGrid()
if self.last_header:
grid_msg.header = self.last_header
grid_msg.info.resolution = self.resolution
grid_msg.info.width = self.grid_size
grid_msg.info.height = self.grid_size
grid_msg.info.origin.position.x = self.origin_x
grid_msg.info.origin.position.y = self.origin_y
grid_msg.data = grid.flatten().tolist()
self.publisher.publish(grid_msg)
self.points_buffer.clear() # 清空缓存
def pointcloud2_to_xyz_array(self, cloud_msg):
# 解析 PointCloud2 数据为 numpy 数组
fmt = 'fff' # x, y, z
point_step = cloud_msg.point_step
data = cloud_msg.data
points = []
for i in range(0, len(data), point_step):
x, y, z = struct.unpack_from(fmt, data, i)
points.append([x, y, z])
return np.array(points)
def main(args=None):
rclpy.init(args=args)
node = PointCloudToGrid()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()