# -*- coding: utf-8 -*- import sys import time import cv2 import numpy as np from camera_hik import CameraHik from dot_matrix_detector import DotMatrixDetector from detector import GreenLaserDetector from arbitrator import ModeArbitrator from config_panel import ConfigPanel from auto_exposure import auto_exposure_adjust from MvCameraControl_class import * class DualModeLaserDetectionSystem: def __init__(self): """初始化双模式激光识别系统""" self.camera = CameraHik() self.detector_a = GreenLaserDetector() # 模式A:单点光斑检测 self.detector_b = DotMatrixDetector() # 模式B:点阵灯盘检测 self.arbiter = ModeArbitrator() # 仲裁器 self.config_panel = None self.fps_counter = 0 self.fps_start_time = time.time() self.fps = 0 self.current_mode = None self.is_mode_locked = False def on_config_update(self, param, value): """ 处理参数更新回调 Args: param: 参数名称 value: 参数值 """ if param == 'exposure': self.camera.set_exposure_time(value) elif param == 'gain': self.camera.set_gain(value) elif param == 'fps': self.camera.set_frame_rate(value) elif param == 'roi': x, y, width, height = value self.camera.set_roi(x, y, width, height) elif param == 'hsv': h_low, h_high, s_low, s_high, v_low, v_high = value # 同时更新两个检测器的HSV范围 self.detector_a.set_hsv_range(h_low, h_high, s_low, s_high, v_low, v_high) self.detector_b.set_hsv_range(h_low, h_high, s_low, s_high, v_low, v_high) elif param == 'morph': kernel_size, open_kernel = value self.detector_b.set_morph_parameters(kernel_size, open_kernel) elif param == 'geometry': min_area, min_leds, circularity = value self.detector_b.set_geometry_parameters(min_area, min_leds, circularity) elif param == 'detection_mode': self.detector_b.set_detection_mode(value) elif param == 'algorithm_enable': enable_a, enable_b = value self.arbiter.set_algorithm_enable(enable_a, enable_b) elif param == 'work_mode': self.arbiter.set_work_mode(value) elif param == 'performance_optimization': use_alternating = value self.arbiter.set_performance_optimization(use_alternating) elif param == 'lock_mode': self.is_mode_locked = value elif param == 'reset_arbiter': self.arbiter.reset() self.is_mode_locked = False self.current_mode = None def work_thread_func(self): """工作线程函数,集成双模式激光检测和可视化""" st_out_frame = MV_FRAME_OUT() memset(byref(st_out_frame), 0, sizeof(st_out_frame)) while not self.camera.b_exit: # 获取图像 ret = self.camera.cam.MV_CC_GetImageBuffer(st_out_frame, 1000) if ret == 0: try: # 计算数据大小 width = st_out_frame.stFrameInfo.nWidth height = st_out_frame.stFrameInfo.nHeight data_size = width * height * 3 # 从缓冲区复制数据 frame_data = np.ctypeslib.as_array(st_out_frame.pBufAddr, shape=(data_size,)) frame = frame_data.reshape((height, width, 3)) # 自动曝光调节 new_exp, adjusted = auto_exposure_adjust(frame, self.camera.current_exp) if adjusted: self.camera.set_exposure_time(new_exp) # 双模式并行检测 # 模式A:单点光斑检测 mode_a_results, mask_a = self.detector_a.detect(frame) # 模式B:点阵灯盘检测 mode_b_results, mask_b, raw_leds_b, aggregated_mask_b = self.detector_b.detect(frame) # 智能仲裁决策 if self.is_mode_locked and self.current_mode: # 锁定模式,使用当前模式结果 selected_mode = self.current_mode if selected_mode == 'mode_a': selected_results = mode_a_results else: selected_results = mode_b_results debug_info = { 'mode_a_score': 0, 'mode_b_score': 0, 'decision_reason': '模式已锁定', 'processing_time': 0 } else: # 使用仲裁器决策 selected_mode, selected_results, debug_info = self.arbiter.arbitrate( mode_a_results, mode_b_results, frame ) self.current_mode = selected_mode # 绘制检测结果 frame_with_results = self._draw_dual_mode_results( frame.copy(), selected_mode, selected_results, mode_a_results, mode_b_results, raw_leds_b ) # 用户交互容错:过度调节警告 frame_with_results = self._add_user_warnings(frame_with_results) # 计算FPS self.fps_counter += 1 current_time = time.time() if current_time - self.fps_start_time >= 1.0: self.fps = self.fps_counter / (current_time - self.fps_start_time) self.fps_counter = 0 self.fps_start_time = current_time # 绘制状态栏 frame_with_status = self._draw_dual_mode_status_bar( frame_with_results, self.camera.current_exp, self.camera.current_gain, selected_mode, len(mode_a_results) if mode_a_results else 0, len(mode_b_results) if mode_b_results else 0, debug_info, self.fps ) # 显示图像 cv2.imshow("Camera", frame_with_status) # 显示掩码(可选) if selected_mode == 'mode_a' or selected_mode == 'dual': cv2.imshow("Mode A Mask", mask_a) if selected_mode == 'mode_b' or selected_mode == 'dual': cv2.imshow("Mode B Mask", mask_b) if aggregated_mask_b is not None: cv2.imshow("Mode B Aggregated", aggregated_mask_b) # 检查退出键 key = cv2.waitKey(1) & 0xFF if key == ord('q'): self.camera.b_exit = True break except Exception as e: print(f"处理图像失败: {e}") finally: # 释放缓存 self.camera.cam.MV_CC_FreeImageBuffer(st_out_frame) else: print(f"获取图像失败! ret = {self.camera.to_hex_str(ret)}") time.sleep(0.1) cv2.destroyAllWindows() def _draw_dual_mode_results(self, frame, selected_mode, selected_results, mode_a_results, mode_b_results, raw_leds_b): """绘制双模式检测结果""" # 绘制模式A结果(单点光斑) if selected_mode == 'mode_a' or selected_mode == 'dual': if selected_mode == 'dual': # 双模显示:显示所有模式A结果 display_results = mode_a_results else: # 单模式显示:显示选中的模式A结果 display_results = selected_results for i, result in enumerate(display_results): cx, cy, w, h, area, circularity = result # 绘制绿色细线矩形框 cv2.rectangle(frame, (int(cx - w//2), int(cy - h//2)), (int(cx + w//2), int(cy + h//2)), (0, 255, 0), 1) # 绘制绿色小圆点和十字线 cv2.circle(frame, (int(cx), int(cy)), 3, (0, 255, 0), -1) cross_size = int(w * 0.25) cv2.line(frame, (int(cx) - cross_size, int(cy)), (int(cx) + cross_size, int(cy)), (0, 255, 0), 1) cv2.line(frame, (int(cx), int(cy) - cross_size), (int(cx), int(cy) + cross_size), (0, 255, 0), 1) # 绘制白色标签 info_text = f"SP: ID:{i} ({int(cx)},{int(cy)})" cv2.putText(frame, info_text, (int(cx - w//2), int(cy - h//2) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 绘制模式B结果(点阵灯盘) if selected_mode == 'mode_b' or selected_mode == 'dual': if selected_mode == 'dual': # 双模显示:显示所有模式B结果 display_results = mode_b_results else: # 单模式显示:显示选中的模式B结果 display_results = selected_results # 绘制原始LED点(半透明蓝色小点) for point in raw_leds_b: x, y = point cv2.circle(frame, (int(x), int(y)), 2, (255, 0, 0), -1) for result in display_results: # 绘制青色粗线圆形 cx, cy = result['center'] radius = result['radius'] cv2.circle(frame, (int(cx), int(cy)), int(radius), (255, 255, 0), 2) # 绘制青色十字准星 cross_size = int(radius) # 十字准星大小与半径成比例 cv2.line(frame, (int(cx) - cross_size, int(cy)), (int(cx) + cross_size, int(cy)), (255, 255, 0), 2) cv2.line(frame, (int(cx), int(cy) - cross_size), (int(cx), int(cy) + cross_size), (255, 255, 0), 2) # 绘制黄色标签 info_text = f"LD: ID:{result['id']} ({int(cx)},{int(cy)}) R:{int(radius)} N:{result['led_count']}" cv2.putText(frame, info_text, (int(cx) - 100, int(cy) - int(radius) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) return frame def _draw_dual_mode_status_bar(self, frame, exposure, gain, selected_mode, mode_a_count, mode_b_count, debug_info, fps): """绘制双模式状态栏""" # 计算状态栏高度 status_bar_height = 100 status_bar = np.zeros((status_bar_height, frame.shape[1], 3), dtype=np.uint8) # 左上角:当前模式大字体标识 if selected_mode == 'mode_a': mode_text = "MODE: AUTO-SINGLE" mode_color = (0, 255, 0) # 绿色 elif selected_mode == 'mode_b': mode_text = "MODE: AUTO-LAMP" mode_color = (255, 255, 0) # 青色 elif selected_mode == 'dual': mode_text = "MODE: DUAL DISPLAY" mode_color = (0, 255, 255) # 蓝绿色 else: mode_text = "MODE: NO TARGET" mode_color = (128, 128, 128) # 灰色 cv2.putText(status_bar, mode_text, (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, mode_color, 2) # 右上角:FPS和相机参数 camera_text = f"FPS: {fps:.1f} | Exposure: {exposure:.0f}μs | Gain: {gain:.1f}dB" cv2.putText(status_bar, camera_text, (frame.shape[1] - 450, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # 中间:决策理由 decision_text = debug_info.get('decision_reason', 'No decision') cv2.putText(status_bar, f"Decision: {decision_text}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # 底部:两种模式的实时置信度条 # 模式A置信度条 a_score = debug_info.get('mode_a_score', 0) a_percentage = min(a_score, 100) cv2.rectangle(status_bar, (20, 75), (320, 95), (50, 50, 50), -1) cv2.rectangle(status_bar, (20, 75), (20 + int(a_percentage * 3), 95), (0, 255, 0), -1) cv2.putText(status_bar, f"Mode A: {a_percentage:.1f}% ({mode_a_count} targets)", (25, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 模式B置信度条 b_score = debug_info.get('mode_b_score', 0) b_percentage = min(b_score, 100) cv2.rectangle(status_bar, (340, 75), (640, 95), (50, 50, 50), -1) cv2.rectangle(status_bar, (340, 75), (340 + int(b_percentage * 3), 95), (255, 255, 0), -1) cv2.putText(status_bar, f"Mode B: {b_percentage:.1f}% ({mode_b_count} targets)", (345, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 将状态栏添加到图像顶部 frame_with_status = np.vstack((status_bar, frame)) return frame_with_status def _handle_visual_faults(self, results, frame): """视觉算法容错处理""" # 处理部分遮挡情况 filtered_results = [] for result in results: # 检查LED数量是否足够(即使部分遮挡) if result['led_count'] >= self.detector.min_led_count * 0.7: # 允许30%遮挡 filtered_results.append(result) # 处理多灯盘重叠情况 if len(filtered_results) > 1: # 检查是否有重叠的灯盘 overlap_pairs = [] for i in range(len(filtered_results)): for j in range(i + 1, len(filtered_results)): if self._is_overlapping(filtered_results[i], filtered_results[j]): overlap_pairs.append((i, j)) # 如果有重叠,保留置信度高的结果 if overlap_pairs: # 按置信度排序 filtered_results.sort(key=lambda x: x['confidence'], reverse=True) # 只保留前N个不重叠的结果 non_overlapping = [] for result in filtered_results: overlapping = False for existing in non_overlapping: if self._is_overlapping(result, existing): overlapping = True break if not overlapping: non_overlapping.append(result) filtered_results = non_overlapping return filtered_results def _is_overlapping(self, result1, result2): """判断两个灯盘是否重叠""" cx1, cy1 = result1['center'] r1 = result1['radius'] cx2, cy2 = result2['center'] r2 = result2['radius'] distance = np.sqrt((cx1 - cx2)**2 + (cy1 - cy2)**2) return distance < r1 + r2 * 0.5 # 允许部分重叠 def _add_user_warnings(self, frame): """添加用户交互警告""" # 计算画面平均亮度 gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) mean_brightness = np.mean(gray_frame) # 曝光过度警告 if mean_brightness > 250: cv2.putText(frame, "⚠️ 曝光过度! 请降低曝光时间", (10, frame.shape[0] - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) # 曝光不足警告 elif mean_brightness < 10: cv2.putText(frame, "⚠️ 曝光不足! 请增加曝光时间", (10, frame.shape[0] - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) # 增益过高警告 if self.camera.current_gain > 12: cv2.putText(frame, "⚠️ 增益过高! 可能产生噪声", (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) return frame def start(self): """启动系统""" print("双模式激光识别系统启动") print("按 Q 退出显示窗口") print("按 Ctrl+C 退出程序") print("系统特性:") print("- 模式A: 单点/稀疏光点检测(绿色激光笔)") print("- 模式B: 点阵灯盘检测(LED阵列)") print("- 智能仲裁: 自动选择最优检测模式") print("- 性能优化: 支持交替帧策略") # 初始化 SDK MvCamera.MV_CC_Initialize() try: # 枚举设备 device_count = self.camera.enum_devices() if device_count == 0: print("未找到设备,程序退出!") return # 选择设备 n_index = int(input("请输入要打开的相机索引: ")) # 打开设备 ret = self.camera.open_device(n_index) if ret != 0: print("打开设备失败,程序退出!") return # 替换工作线程函数 original_work_thread = self.camera.work_thread_func self.camera.work_thread_func = self.work_thread_func # 开始取流 ret = self.camera.start_grabbing() if ret != 0: print("开始取流失败,程序退出!") self.camera.close_device() return # 启动参数控制面板 self.config_panel = ConfigPanel(self.on_config_update) self.config_panel.start() # 等待用户操作 while True: time.sleep(1) if not self.camera.is_grabbing: break except KeyboardInterrupt: print("\n用户中断程序") except Exception as e: print(f"程序异常: {e}") finally: # 清理资源 if self.config_panel: self.config_panel.stop() if self.camera.is_open: self.camera.close_device() # 反初始化 SDK MvCamera.MV_CC_Finalize() print("SDK反初始化成功") print("程序退出") if __name__ == "__main__": system = DualModeLaserDetectionSystem() system.start()