临时手动测试
This commit is contained in:
parent
21d105a0fc
commit
8972af238d
4
nav.sh
4
nav.sh
@ -11,7 +11,9 @@ commands=(
|
||||
nav_rviz:=true"
|
||||
"ros2 launch rm_simpal_move simple_move.launch.py"
|
||||
"/bin/python3 /home/robofish/RC2025/src/rm_driver/rm_serial_driver/script/R2_Serial.py"
|
||||
"/bin/python3 /home/robofish/RC2025/src/rm_driver/rm_serial_driver/script/slect.py map"
|
||||
# "/usr/bin/python3 /home/robofish/RC2025/src/rc_lidar/juxing.py"
|
||||
"/usr/bin/python3 /home/robofish/RC2025/src/rc_lidar/caijian.py"
|
||||
# "/bin/python3 /home/robofish/RC2025/src/rm_driver/rm_serial_driver/script/slect.py map"
|
||||
|
||||
)
|
||||
|
||||
|
@ -12,7 +12,7 @@ class LidarFilterNode(Node):
|
||||
self.publisher_ = self.create_publisher(PointCloud2, '/livox/lidar_filtered', 10)
|
||||
self.subscription = self.create_subscription(
|
||||
PointCloud2,
|
||||
'/livox/lidar',
|
||||
'/livox/lidar/pointcloud',
|
||||
self.filter_callback,
|
||||
10)
|
||||
self.get_logger().info('caijian_node started, numpy filtering z in [1.5,3]m, distance<=12m, remove isolated points')
|
||||
|
63
src/rc_lidar/fliter.py
Normal file
63
src/rc_lidar/fliter.py
Normal file
@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env python3
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import PointCloud2, PointField
|
||||
import numpy as np
|
||||
import struct
|
||||
from sklearn.cluster import DBSCAN
|
||||
|
||||
class LidarFilterNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('caijian_node')
|
||||
self.publisher_ = self.create_publisher(PointCloud2, '/livox/lidar_filtered', 10)
|
||||
self.subscription = self.create_subscription(
|
||||
PointCloud2,
|
||||
'/livox/lidar/pointcloud',
|
||||
self.filter_callback,
|
||||
10)
|
||||
self.get_logger().info('caijian_node started, numpy filtering z in [1.5,3]m, distance<=12m, remove isolated points')
|
||||
|
||||
def filter_callback(self, msg):
|
||||
num_points = msg.width * msg.height
|
||||
data = np.frombuffer(msg.data, dtype=np.uint8)
|
||||
points = np.zeros((num_points, 4), dtype=np.float32) # x, y, z, intensity
|
||||
|
||||
for i in range(num_points):
|
||||
offset = i * msg.point_step
|
||||
x = struct.unpack_from('f', data, offset)[0]
|
||||
y = struct.unpack_from('f', data, offset + 4)[0]
|
||||
z = struct.unpack_from('f', data, offset + 8)[0]
|
||||
intensity = struct.unpack_from('f', data, offset + 12)[0]
|
||||
points[i] = [x, y, z, intensity]
|
||||
|
||||
z_mask = (points[:,2] >= 1.5) & (points[:,2] <= 3.0)
|
||||
dist_mask = np.linalg.norm(points[:,:3], axis=1) <= 12.0
|
||||
mask = z_mask & dist_mask
|
||||
filtered_points = points[mask]
|
||||
|
||||
# 使用DBSCAN去除孤立点
|
||||
if filtered_points.shape[0] > 0:
|
||||
clustering = DBSCAN(eps=0.3, min_samples=5).fit(filtered_points[:,:3])
|
||||
core_mask = clustering.labels_ != -1
|
||||
filtered_points = filtered_points[core_mask]
|
||||
|
||||
fields = [
|
||||
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1),
|
||||
]
|
||||
filtered_points_list = filtered_points.tolist()
|
||||
import sensor_msgs_py.point_cloud2 as pc2
|
||||
filtered_msg = pc2.create_cloud(msg.header, fields, filtered_points_list)
|
||||
self.publisher_.publish(filtered_msg)
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = LidarFilterNode()
|
||||
rclpy.spin(node)
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,164 +1,337 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import PointCloud2
|
||||
from sensor_msgs.msg import PointCloud2, PointField
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
import sensor_msgs_py.point_cloud2 as pc2
|
||||
import struct
|
||||
from sklearn.cluster import DBSCAN
|
||||
import cv2
|
||||
from visualization_msgs.msg import Marker
|
||||
import std_msgs.msg
|
||||
import geometry_msgs.msg
|
||||
import collections
|
||||
from sklearn.linear_model import RANSACRegressor
|
||||
|
||||
class RectangleDetector(Node):
|
||||
|
||||
def ransac_line_3d(points, threshold=0.05, min_inliers=20):
|
||||
best_inliers = []
|
||||
best_line = None
|
||||
N = len(points)
|
||||
if N < min_inliers:
|
||||
return None, []
|
||||
for _ in range(100):
|
||||
idx = np.random.choice(N, 2, replace=False)
|
||||
p1, p2 = points[idx]
|
||||
v = p2 - p1
|
||||
v = v / np.linalg.norm(v)
|
||||
dists = np.linalg.norm(np.cross(points - p1, v), axis=1)
|
||||
inliers = np.where(dists < threshold)[0]
|
||||
if len(inliers) > len(best_inliers):
|
||||
best_inliers = inliers
|
||||
best_line = (p1, p2)
|
||||
if len(best_inliers) > N * 0.5:
|
||||
break
|
||||
return best_line, best_inliers
|
||||
|
||||
def fit_rectangle_pca(cluster):
|
||||
# 用PCA找主方向和边界点
|
||||
pts = cluster[:, :3]
|
||||
mean = np.mean(pts, axis=0)
|
||||
cov = np.cov(pts.T)
|
||||
eigvals, eigvecs = np.linalg.eigh(cov)
|
||||
order = np.argsort(eigvals)[::-1]
|
||||
main_dir = eigvecs[:, order[0]]
|
||||
second_dir = eigvecs[:, order[1]]
|
||||
# 投影到主方向和次方向
|
||||
proj_main = np.dot(pts - mean, main_dir)
|
||||
proj_second = np.dot(pts - mean, second_dir)
|
||||
# 找四个角点
|
||||
corners = []
|
||||
for xm in [np.min(proj_main), np.max(proj_main)]:
|
||||
for xs in [np.min(proj_second), np.max(proj_second)]:
|
||||
idx = np.argmin((proj_main - xm)**2 + (proj_second - xs)**2)
|
||||
corners.append(pts[idx])
|
||||
return corners
|
||||
|
||||
def rectangle_score(pts):
|
||||
# 评估4个点是否接近矩形
|
||||
d = [np.linalg.norm(pts[i] - pts[(i+1)%4]) for i in range(4)]
|
||||
diag1 = np.linalg.norm(pts[0] - pts[2])
|
||||
diag2 = np.linalg.norm(pts[1] - pts[3])
|
||||
w = max(d)
|
||||
h = min(d)
|
||||
ratio = w / h if h > 0 else 0
|
||||
ideal_ratio = 1.8 / 1.05
|
||||
score = abs(ratio - ideal_ratio) + abs(diag1 - diag2) / max(diag1, diag2)
|
||||
return score
|
||||
|
||||
def classify_lines(lines):
|
||||
# lines: 每条线是 [x, y, z, intensity]
|
||||
# 假设 lines 是8个端点,两两为一条线
|
||||
vertical_lines = []
|
||||
horizontal_lines = []
|
||||
for i in range(0, len(lines), 2):
|
||||
p1 = np.array(lines[i][:3])
|
||||
p2 = np.array(lines[i+1][:3])
|
||||
vec = p2 - p1
|
||||
length = np.linalg.norm(vec)
|
||||
# 计算与地面的夹角(假设地面为z轴为0,垂直为z方向)
|
||||
dz = abs(vec[2])
|
||||
dxy = np.linalg.norm(vec[:2])
|
||||
# 垂直线:z方向分量大,长度约1.05m
|
||||
if dz > dxy and 0.95 < length < 1.15:
|
||||
vertical_lines.append((i, i+1, length))
|
||||
# 水平线:z方向分量小,长度约1.8m
|
||||
elif dz < dxy and 1.7 < length < 1.9:
|
||||
horizontal_lines.append((i, i+1, length))
|
||||
return vertical_lines, horizontal_lines
|
||||
|
||||
def find_best_rectangle_from_lines(lines):
|
||||
vertical_lines, horizontal_lines = classify_lines(lines)
|
||||
# 只选出2条垂直线和2条水平线
|
||||
if len(vertical_lines) < 2 or len(horizontal_lines) < 2:
|
||||
return None
|
||||
# 取长度最接近目标的两条
|
||||
vertical_lines = sorted(vertical_lines, key=lambda x: abs(x[2]-1.05))[:2]
|
||||
horizontal_lines = sorted(horizontal_lines, key=lambda x: abs(x[2]-1.8))[:2]
|
||||
# 组合4个端点
|
||||
indices = [vertical_lines[0][0], vertical_lines[0][1],
|
||||
vertical_lines[1][0], vertical_lines[1][1],
|
||||
horizontal_lines[0][0], horizontal_lines[0][1],
|
||||
horizontal_lines[1][0], horizontal_lines[1][1]]
|
||||
# 去重,只保留4个顶点
|
||||
unique_indices = list(set(indices))
|
||||
if len(unique_indices) < 4:
|
||||
return None
|
||||
pts = [lines[idx] for idx in unique_indices[:4]]
|
||||
# 按矩形评分排序
|
||||
from itertools import permutations
|
||||
best_score = float('inf')
|
||||
best_rect = None
|
||||
for order in permutations(range(4)):
|
||||
ordered = [pts[i] for i in order]
|
||||
score = rectangle_score(np.array([p[:3] for p in ordered]))
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
best_rect = ordered
|
||||
return best_rect
|
||||
|
||||
|
||||
class BasketballFrameDetector(Node):
|
||||
def __init__(self):
|
||||
super().__init__('rectangle_detector')
|
||||
super().__init__('basketball_frame_detector')
|
||||
self.subscription = self.create_subscription(
|
||||
PointCloud2,
|
||||
'/livox/lidar_filtered',
|
||||
self.pointcloud_callback,
|
||||
10
|
||||
)
|
||||
self.marker_pub = self.create_publisher(Marker, '/rectangle_marker', 10)
|
||||
# 可选:发布原始点云
|
||||
self.cloud_pub = self.create_publisher(PointCloud2, '/rectangle_cloud', 10)
|
||||
self.history = collections.deque(maxlen=5)
|
||||
self.last_bbox = None
|
||||
self.last_time = None
|
||||
|
||||
# 目标矩形尺寸(单位:米)
|
||||
self.length = 1.8
|
||||
self.width = 1.05
|
||||
self.thickness = 0.04
|
||||
def filter_points(self, points):
|
||||
# 统计滤波:去除离群点
|
||||
pcd = o3d.geometry.PointCloud()
|
||||
pcd.points = o3d.utility.Vector3dVector(points)
|
||||
cl, ind = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
|
||||
filtered = np.asarray(cl.points)
|
||||
# 半径滤波
|
||||
cl2, ind2 = cl.remove_radius_outlier(nb_points=10, radius=0.05)
|
||||
return np.asarray(cl2.points)
|
||||
|
||||
def smooth_bbox(self, bbox):
|
||||
# 保存历史中心和尺寸
|
||||
center = bbox.center
|
||||
extent = bbox.extent
|
||||
self.history.append((center, extent))
|
||||
# 计算加权平均
|
||||
centers = np.array([h[0] for h in self.history])
|
||||
extents = np.array([h[1] for h in self.history])
|
||||
smooth_center = np.mean(centers, axis=0)
|
||||
smooth_extent = np.mean(extents, axis=0)
|
||||
bbox.center = smooth_center
|
||||
bbox.extent = smooth_extent
|
||||
return bbox
|
||||
|
||||
|
||||
def publish_marker(self, bbox):
|
||||
marker = Marker()
|
||||
marker.header.frame_id = "livox_frame"
|
||||
marker.header.stamp = self.get_clock().now().to_msg()
|
||||
marker.ns = "rectangle_corners"
|
||||
marker.id = 0
|
||||
marker.type = Marker.SPHERE_LIST
|
||||
marker.action = Marker.ADD
|
||||
|
||||
corners = np.asarray(bbox.get_box_points())
|
||||
center = bbox.center
|
||||
|
||||
# 展示角点
|
||||
for c in corners:
|
||||
pt = geometry_msgs.msg.Point()
|
||||
pt.x, pt.y, pt.z = c
|
||||
marker.points.append(pt)
|
||||
|
||||
# 展示中心点
|
||||
center_pt = geometry_msgs.msg.Point()
|
||||
center_pt.x, center_pt.y, center_pt.z = center
|
||||
marker.points.append(center_pt)
|
||||
|
||||
# 沿中心点到原点方向,向外移动10cm
|
||||
origin = np.array([0.0, 0.0, 0.0])
|
||||
direction = origin - center
|
||||
norm = np.linalg.norm(direction)
|
||||
if norm > 0:
|
||||
move_vec = direction / norm * 0.46
|
||||
moved_pt = center + move_vec
|
||||
else:
|
||||
moved_pt = center
|
||||
out_pt = geometry_msgs.msg.Point()
|
||||
out_pt.x, out_pt.y, out_pt.z = moved_pt
|
||||
marker.points.append(out_pt)
|
||||
|
||||
marker.scale.x = 0.08
|
||||
marker.scale.y = 0.08
|
||||
marker.scale.z = 0.08
|
||||
marker.color.r = 0.0
|
||||
marker.color.g = 1.0
|
||||
marker.color.b = 0.0
|
||||
marker.color.a = 1.0
|
||||
marker.lifetime.sec = 1
|
||||
marker.lifetime.nanosec = 0
|
||||
self.marker_pub.publish(marker)
|
||||
# ...existing code...
|
||||
self.publisher = self.create_publisher(
|
||||
PointCloud2,
|
||||
'/basketball_frame_cloud',
|
||||
10
|
||||
)
|
||||
self.marker_pub = self.create_publisher(
|
||||
Marker,
|
||||
'/basketball_frame_lines',
|
||||
10
|
||||
)
|
||||
self.pointcloud_buffer = []
|
||||
self.max_buffer_size = 10 # 减少缓冲帧数,加快响应
|
||||
self.center_buffer = []
|
||||
self.center_buffer_size = 5
|
||||
|
||||
def pointcloud_callback(self, msg):
|
||||
points_list = []
|
||||
for p in pc2.read_points(msg, skip_nans=True):
|
||||
try:
|
||||
points_list.append([p[0], p[1], p[2]])
|
||||
except Exception as e:
|
||||
self.get_logger().warn(f'点异常: {e}, 内容: {p}')
|
||||
if len(points_list) == 0:
|
||||
self.get_logger().info('点云为空')
|
||||
points = self.pointcloud2_to_xyz(msg)
|
||||
if points.shape[0] == 0:
|
||||
return # 跳过空点云
|
||||
self.pointcloud_buffer.append(points)
|
||||
# 只保留非空点云
|
||||
self.pointcloud_buffer = [arr for arr in self.pointcloud_buffer if arr.shape[0] > 0]
|
||||
if len(self.pointcloud_buffer) > self.max_buffer_size:
|
||||
self.pointcloud_buffer.pop(0)
|
||||
all_points = np.vstack(self.pointcloud_buffer)
|
||||
xy_points = all_points[:, :2]
|
||||
|
||||
if len(xy_points) < 10:
|
||||
self.get_logger().info('点数太少,跳过')
|
||||
return
|
||||
points = np.array(points_list)
|
||||
clustering = DBSCAN(eps=0.3, min_samples=10).fit(xy_points)
|
||||
labels = clustering.labels_
|
||||
unique_labels = set(labels)
|
||||
found = False
|
||||
for label in unique_labels:
|
||||
if label == -1:
|
||||
continue
|
||||
cluster = all_points[labels == label]
|
||||
if len(cluster) < 30:
|
||||
continue
|
||||
min_x, min_y = np.min(cluster[:, :2], axis=0)
|
||||
max_x, max_y = np.max(cluster[:, :2], axis=0)
|
||||
width = abs(max_x - min_x)
|
||||
height = abs(max_y - min_y)
|
||||
if 1.5 < width < 2.1 and 0.7 < height < 1.3:
|
||||
cloud_msg = self.xyz_array_to_pointcloud2(cluster, msg.header)
|
||||
self.publisher.publish(cloud_msg)
|
||||
|
||||
# 点云预处理
|
||||
# points = self.filter_points(points)
|
||||
if len(points) == 0:
|
||||
self.get_logger().info('滤波后点云为空')
|
||||
return
|
||||
# 用PCA直接找矩形四角
|
||||
corners = fit_rectangle_pca(cluster)
|
||||
from itertools import permutations
|
||||
best_score = float('inf')
|
||||
best_rect = None
|
||||
for order in permutations(range(4)):
|
||||
ordered = [corners[i] for i in order]
|
||||
score = rectangle_score(np.array(ordered))
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
best_rect = ordered
|
||||
rect_lines = best_rect
|
||||
if rect_lines is None or len(rect_lines) < 4:
|
||||
self.get_logger().info('未找到合适矩形')
|
||||
continue
|
||||
|
||||
self.cloud_pub.publish(msg)
|
||||
# 发布最新的矩形
|
||||
marker = Marker()
|
||||
marker.header = msg.header
|
||||
marker.ns = "basketball_frame"
|
||||
marker.id = 0
|
||||
marker.type = Marker.LINE_LIST
|
||||
marker.action = Marker.ADD
|
||||
marker.scale.x = 0.08
|
||||
marker.color.r = 0.0
|
||||
marker.color.g = 1.0
|
||||
marker.color.b = 0.0
|
||||
marker.color.a = 1.0
|
||||
marker.points = []
|
||||
from geometry_msgs.msg import Point
|
||||
for i in range(4):
|
||||
p1 = rect_lines[i]
|
||||
p2 = rect_lines[(i+1)%4]
|
||||
pt1 = Point(x=float(p1[0]), y=float(p1[1]), z=float(p1[2]))
|
||||
pt2 = Point(x=float(p2[0]), y=float(p2[1]), z=float(p2[2]))
|
||||
marker.points.append(pt1)
|
||||
marker.points.append(pt2)
|
||||
self.marker_pub.publish(marker)
|
||||
|
||||
pcd = o3d.geometry.PointCloud()
|
||||
pcd.points = o3d.utility.Vector3dVector(points)
|
||||
voxel_size = 0.03
|
||||
pcd = pcd.voxel_down_sample(voxel_size=voxel_size)
|
||||
# 分别发布4条最优边线
|
||||
for i in range(4):
|
||||
edge_marker = Marker()
|
||||
edge_marker.header = msg.header
|
||||
edge_marker.ns = "basketball_frame_edges"
|
||||
edge_marker.id = i
|
||||
edge_marker.type = Marker.LINE_STRIP
|
||||
edge_marker.action = Marker.ADD
|
||||
edge_marker.scale.x = 0.12
|
||||
colors = [
|
||||
(1.0, 0.0, 0.0),
|
||||
(0.0, 1.0, 0.0),
|
||||
(0.0, 0.0, 1.0),
|
||||
(1.0, 1.0, 0.0)
|
||||
]
|
||||
edge_marker.color.r = colors[i][0]
|
||||
edge_marker.color.g = colors[i][1]
|
||||
edge_marker.color.b = colors[i][2]
|
||||
edge_marker.color.a = 1.0
|
||||
pt1 = Point(x=float(rect_lines[i][0]), y=float(rect_lines[i][1]), z=float(rect_lines[i][2]))
|
||||
pt2 = Point(x=float(rect_lines[(i+1)%4][0]), y=float(rect_lines[(i+1)%4][1]), z=float(rect_lines[(i+1)%4][2]))
|
||||
edge_marker.points = [pt1, pt2]
|
||||
self.marker_pub.publish(edge_marker)
|
||||
|
||||
plane_model, inliers = pcd.segment_plane(
|
||||
distance_threshold=0.02,
|
||||
ransac_n=3,
|
||||
num_iterations=2000
|
||||
)
|
||||
inlier_cloud = pcd.select_by_index(inliers)
|
||||
bbox = inlier_cloud.get_oriented_bounding_box()
|
||||
extent = bbox.extent
|
||||
center = bbox.center
|
||||
# 中心点滑动平均
|
||||
center = np.mean(np.array(rect_lines), axis=0)
|
||||
self.center_buffer.append(center)
|
||||
if len(self.center_buffer) > self.center_buffer_size:
|
||||
self.center_buffer.pop(0)
|
||||
stable_center = np.mean(self.center_buffer, axis=0)
|
||||
|
||||
dims = sorted(extent)
|
||||
# 检测阈值收紧
|
||||
if (abs(dims[0] - self.thickness) < 0.1 and
|
||||
abs(dims[1] - self.width) < 0.2 and
|
||||
abs(dims[2] - self.length) < 0.2):
|
||||
bbox = self.smooth_bbox(bbox)
|
||||
self.get_logger().info(f'检测到目标矩形框,位置: {bbox.center}, 尺寸: {bbox.extent}')
|
||||
self.publish_marker(bbox)
|
||||
self.last_bbox = bbox
|
||||
self.last_time = self.get_clock().now()
|
||||
else:
|
||||
# 若上一帧有结果且时间间隔<0.5s,则继续发布上一帧
|
||||
now = self.get_clock().now()
|
||||
if self.last_bbox and self.last_time and (now - self.last_time).nanoseconds < 5e8:
|
||||
self.get_logger().info('本帧未检测到,使用上一帧结果')
|
||||
self.publish_marker(self.last_bbox)
|
||||
else:
|
||||
self.get_logger().info('未检测到目标矩形框')
|
||||
# 发布中心点Marker
|
||||
center_marker = Marker()
|
||||
center_marker.header = msg.header
|
||||
center_marker.ns = "basketball_frame"
|
||||
center_marker.id = 1
|
||||
center_marker.type = Marker.SPHERE
|
||||
center_marker.action = Marker.ADD
|
||||
center_marker.scale.x = 0.15
|
||||
center_marker.scale.y = 0.15
|
||||
center_marker.scale.z = 0.15
|
||||
center_marker.color.r = 1.0
|
||||
center_marker.color.g = 0.0
|
||||
center_marker.color.b = 0.0
|
||||
center_marker.color.a = 1.0
|
||||
center_marker.pose.position.x = float(stable_center[0])
|
||||
center_marker.pose.position.y = float(stable_center[1])
|
||||
center_marker.pose.position.z = float(stable_center[2])
|
||||
center_marker.pose.orientation.x = 0.0
|
||||
center_marker.pose.orientation.y = 0.0
|
||||
center_marker.pose.orientation.z = 0.0
|
||||
center_marker.pose.orientation.w = 1.0
|
||||
self.marker_pub.publish(center_marker)
|
||||
|
||||
# 计算法向量(篮板主方向,PCA最小特征值方向)
|
||||
pts = np.array(rect_lines)
|
||||
mean = np.mean(pts, axis=0)
|
||||
cov = np.cov(pts.T)
|
||||
eigvals, eigvecs = np.linalg.eigh(cov)
|
||||
order = np.argsort(eigvals)[::-1]
|
||||
normal_vec = eigvecs[:, order[2]]
|
||||
normal_vec = normal_vec / np.linalg.norm(normal_vec)
|
||||
# 向内侧偏移30cm
|
||||
offset_point = stable_center + 0.3 * normal_vec
|
||||
|
||||
# 发布偏移点Marker
|
||||
offset_marker = Marker()
|
||||
offset_marker.header = msg.header
|
||||
offset_marker.ns = "basketball_frame"
|
||||
offset_marker.id = 2
|
||||
offset_marker.type = Marker.SPHERE
|
||||
offset_marker.action = Marker.ADD
|
||||
offset_marker.scale.x = 0.12
|
||||
offset_marker.scale.y = 0.12
|
||||
offset_marker.scale.z = 0.12
|
||||
offset_marker.color.r = 0.0
|
||||
offset_marker.color.g = 0.0
|
||||
offset_marker.color.b = 1.0
|
||||
offset_marker.color.a = 1.0
|
||||
offset_marker.pose.position.x = float(offset_point[0])
|
||||
offset_marker.pose.position.y = float(offset_point[1])
|
||||
offset_marker.pose.position.z = float(offset_point[2])
|
||||
offset_marker.pose.orientation.x = 0.0
|
||||
offset_marker.pose.orientation.y = 0.0
|
||||
offset_marker.pose.orientation.z = 0.0
|
||||
offset_marker.pose.orientation.w = 1.0
|
||||
self.marker_pub.publish(offset_marker)
|
||||
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
self.get_logger().info('本帧未找到矩形')
|
||||
|
||||
def pointcloud2_to_xyz(self, cloud_msg):
|
||||
fmt = 'ffff'
|
||||
points = []
|
||||
for i in range(cloud_msg.width):
|
||||
offset = i * cloud_msg.point_step
|
||||
x, y, z, intensity = struct.unpack_from(fmt, cloud_msg.data, offset)
|
||||
points.append([x, y, z, intensity])
|
||||
return np.array(points)
|
||||
|
||||
def xyz_array_to_pointcloud2(self, points, header):
|
||||
msg = PointCloud2()
|
||||
msg.header = header
|
||||
msg.height = 1
|
||||
msg.width = len(points)
|
||||
msg.is_dense = True
|
||||
msg.is_bigendian = False
|
||||
msg.point_step = 16
|
||||
msg.row_step = msg.point_step * msg.width
|
||||
msg.fields = [
|
||||
PointField(name='x', offset=0, datatype=7, count=1),
|
||||
PointField(name='y', offset=4, datatype=7, count=1),
|
||||
PointField(name='z', offset=8, datatype=7, count=1),
|
||||
PointField(name='intensity', offset=12, datatype=7, count=1),
|
||||
]
|
||||
msg.data = b''.join([struct.pack('ffff', *p) for p in points])
|
||||
return msg
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = RectangleDetector()
|
||||
node = BasketballFrameDetector()
|
||||
rclpy.spin(node)
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
0
src/rc_lidar/line.py
Normal file
0
src/rc_lidar/line.py
Normal file
102
src/rc_lidar/simple.py
Normal file
102
src/rc_lidar/simple.py
Normal file
@ -0,0 +1,102 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import PointCloud2, PointField
|
||||
import numpy as np
|
||||
import struct
|
||||
from sklearn.cluster import DBSCAN
|
||||
|
||||
class BasketballFrameDetector(Node):
|
||||
def __init__(self):
|
||||
super().__init__('basketball_frame_detector')
|
||||
self.subscription = self.create_subscription(
|
||||
PointCloud2,
|
||||
'/livox/lidar_filtered',
|
||||
self.pointcloud_callback,
|
||||
10
|
||||
)
|
||||
self.publisher = self.create_publisher(
|
||||
PointCloud2,
|
||||
'/basketball_frame_cloud',
|
||||
10
|
||||
)
|
||||
self.pointcloud_buffer = []
|
||||
|
||||
def pointcloud_callback(self, msg):
|
||||
points = self.pointcloud2_to_xyz(msg)
|
||||
self.pointcloud_buffer.append(points)
|
||||
self.get_logger().info(f'已保存点云组数: {len(self.pointcloud_buffer)}')
|
||||
if len(self.pointcloud_buffer) < 10:
|
||||
return
|
||||
|
||||
# 合并10组点云
|
||||
all_points = np.vstack(self.pointcloud_buffer)
|
||||
xy_points = all_points[:, :2]
|
||||
self.get_logger().info(f'合并后点数: {xy_points.shape[0]}')
|
||||
|
||||
# 清空缓存,准备下一批
|
||||
self.pointcloud_buffer = []
|
||||
|
||||
# 聚类识别
|
||||
if len(xy_points) < 10:
|
||||
return
|
||||
clustering = DBSCAN(eps=0.3, min_samples=10).fit(xy_points)
|
||||
labels = clustering.labels_
|
||||
unique_labels = set(labels)
|
||||
for label in unique_labels:
|
||||
if label == -1:
|
||||
continue
|
||||
cluster = all_points[labels == label]
|
||||
if len(cluster) < 30:
|
||||
continue
|
||||
min_x, min_y = np.min(cluster[:, :2], axis=0)
|
||||
max_x, max_y = np.max(cluster[:, :2], axis=0)
|
||||
width = abs(max_x - min_x)
|
||||
height = abs(max_y - min_y)
|
||||
self.get_logger().info(
|
||||
f'聚类: label={label}, width={width:.2f}, height={height:.2f}, 点数={len(cluster)}'
|
||||
)
|
||||
if 1.6 < width < 2.0 and 0.8 < height < 1.2:
|
||||
self.get_logger().info(
|
||||
f'可能是篮球框: label={label}, width={width:.2f}, height={height:.2f}, 点数={len(cluster)}'
|
||||
)
|
||||
# 发布识别到的篮球框点云
|
||||
cloud_msg = self.xyz_array_to_pointcloud2(cluster, msg.header)
|
||||
self.publisher.publish(cloud_msg)
|
||||
|
||||
def pointcloud2_to_xyz(self, cloud_msg):
|
||||
fmt = 'ffff' # x, y, z, intensity
|
||||
points = []
|
||||
for i in range(cloud_msg.width):
|
||||
offset = i * cloud_msg.point_step
|
||||
x, y, z, intensity = struct.unpack_from(fmt, cloud_msg.data, offset)
|
||||
points.append([x, y, z, intensity])
|
||||
return np.array(points)
|
||||
|
||||
def xyz_array_to_pointcloud2(self, points, header):
|
||||
# 构造 PointCloud2 消息
|
||||
msg = PointCloud2()
|
||||
msg.header = header
|
||||
msg.height = 1
|
||||
msg.width = len(points)
|
||||
msg.is_dense = True
|
||||
msg.is_bigendian = False
|
||||
msg.point_step = 16
|
||||
msg.row_step = msg.point_step * msg.width
|
||||
msg.fields = [
|
||||
PointField(name='x', offset=0, datatype=7, count=1),
|
||||
PointField(name='y', offset=4, datatype=7, count=1),
|
||||
PointField(name='z', offset=8, datatype=7, count=1),
|
||||
PointField(name='intensity', offset=12, datatype=7, count=1),
|
||||
]
|
||||
msg.data = b''.join([struct.pack('ffff', *p) for p in points])
|
||||
return msg
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = BasketballFrameDetector()
|
||||
rclpy.spin(node)
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
36
src/rm_driver/rm_serial_driver/script/pub_goal.py
Normal file
36
src/rm_driver/rm_serial_driver/script/pub_goal.py
Normal file
@ -0,0 +1,36 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import PointStamped
|
||||
from rm_msgs.msg import MoveGoal
|
||||
|
||||
class ClickedGoalPublisher(Node):
|
||||
def __init__(self):
|
||||
super().__init__('clicked_goal_publisher')
|
||||
self.subscription = self.create_subscription(
|
||||
PointStamped,
|
||||
'/clicked_point',
|
||||
self.clicked_callback,
|
||||
10
|
||||
)
|
||||
self.publisher = self.create_publisher(MoveGoal, '/move_goal', 10)
|
||||
|
||||
def clicked_callback(self, msg):
|
||||
goal = MoveGoal()
|
||||
goal.x = msg.point.x
|
||||
goal.y = msg.point.y
|
||||
goal.angle = 0.0
|
||||
goal.max_speed = 0.0
|
||||
goal.tolerance = 0.1
|
||||
goal.rotor = False
|
||||
self.publisher.publish(goal)
|
||||
self.get_logger().info(f'发布目标: x={goal.x:.2f}, y={goal.y:.2f}')
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = ClickedGoalPublisher()
|
||||
rclpy.spin(node)
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue
Block a user