GIT-test/main.py
2026-02-21 23:42:50 +08:00

438 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import sys
import time
import cv2
import numpy as np
from camera_hik import CameraHik
from dot_matrix_detector import DotMatrixDetector
from detector import GreenLaserDetector
from arbitrator import ModeArbitrator
from config_panel import ConfigPanel
from auto_exposure import auto_exposure_adjust
from MvCameraControl_class import *
class DualModeLaserDetectionSystem:
def __init__(self):
"""初始化双模式激光识别系统"""
self.camera = CameraHik()
self.detector_a = GreenLaserDetector() # 模式A单点光斑检测
self.detector_b = DotMatrixDetector() # 模式B点阵灯盘检测
self.arbiter = ModeArbitrator() # 仲裁器
self.config_panel = None
self.fps_counter = 0
self.fps_start_time = time.time()
self.fps = 0
self.current_mode = None
self.is_mode_locked = False
def on_config_update(self, param, value):
"""
处理参数更新回调
Args:
param: 参数名称
value: 参数值
"""
if param == 'exposure':
self.camera.set_exposure_time(value)
elif param == 'gain':
self.camera.set_gain(value)
elif param == 'fps':
self.camera.set_frame_rate(value)
elif param == 'roi':
x, y, width, height = value
self.camera.set_roi(x, y, width, height)
elif param == 'hsv':
h_low, h_high, s_low, s_high, v_low, v_high = value
# 同时更新两个检测器的HSV范围
self.detector_a.set_hsv_range(h_low, h_high, s_low, s_high, v_low, v_high)
self.detector_b.set_hsv_range(h_low, h_high, s_low, s_high, v_low, v_high)
elif param == 'morph':
kernel_size, open_kernel = value
self.detector_b.set_morph_parameters(kernel_size, open_kernel)
elif param == 'geometry':
min_area, min_leds, circularity = value
self.detector_b.set_geometry_parameters(min_area, min_leds, circularity)
elif param == 'detection_mode':
self.detector_b.set_detection_mode(value)
elif param == 'algorithm_enable':
enable_a, enable_b = value
self.arbiter.set_algorithm_enable(enable_a, enable_b)
elif param == 'work_mode':
self.arbiter.set_work_mode(value)
elif param == 'performance_optimization':
use_alternating = value
self.arbiter.set_performance_optimization(use_alternating)
elif param == 'lock_mode':
self.is_mode_locked = value
elif param == 'reset_arbiter':
self.arbiter.reset()
self.is_mode_locked = False
self.current_mode = None
def work_thread_func(self):
"""工作线程函数,集成双模式激光检测和可视化"""
st_out_frame = MV_FRAME_OUT()
memset(byref(st_out_frame), 0, sizeof(st_out_frame))
while not self.camera.b_exit:
# 获取图像
ret = self.camera.cam.MV_CC_GetImageBuffer(st_out_frame, 1000)
if ret == 0:
try:
# 计算数据大小
width = st_out_frame.stFrameInfo.nWidth
height = st_out_frame.stFrameInfo.nHeight
data_size = width * height * 3
# 从缓冲区复制数据
frame_data = np.ctypeslib.as_array(st_out_frame.pBufAddr, shape=(data_size,))
frame = frame_data.reshape((height, width, 3))
# 自动曝光调节
new_exp, adjusted = auto_exposure_adjust(frame, self.camera.current_exp)
if adjusted:
self.camera.set_exposure_time(new_exp)
# 双模式并行检测
# 模式A单点光斑检测
mode_a_results, mask_a = self.detector_a.detect(frame)
# 模式B点阵灯盘检测
mode_b_results, mask_b, raw_leds_b, aggregated_mask_b = self.detector_b.detect(frame)
# 智能仲裁决策
if self.is_mode_locked and self.current_mode:
# 锁定模式,使用当前模式结果
selected_mode = self.current_mode
if selected_mode == 'mode_a':
selected_results = mode_a_results
else:
selected_results = mode_b_results
debug_info = {
'mode_a_score': 0,
'mode_b_score': 0,
'decision_reason': '模式已锁定',
'processing_time': 0
}
else:
# 使用仲裁器决策
selected_mode, selected_results, debug_info = self.arbiter.arbitrate(
mode_a_results, mode_b_results, frame
)
self.current_mode = selected_mode
# 绘制检测结果
frame_with_results = self._draw_dual_mode_results(
frame.copy(),
selected_mode,
selected_results,
mode_a_results,
mode_b_results,
raw_leds_b
)
# 用户交互容错:过度调节警告
frame_with_results = self._add_user_warnings(frame_with_results)
# 计算FPS
self.fps_counter += 1
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 绘制状态栏
frame_with_status = self._draw_dual_mode_status_bar(
frame_with_results,
self.camera.current_exp,
self.camera.current_gain,
selected_mode,
len(mode_a_results) if mode_a_results else 0,
len(mode_b_results) if mode_b_results else 0,
debug_info,
self.fps
)
# 显示图像
cv2.imshow("Camera", frame_with_status)
# 显示掩码(可选)
if selected_mode == 'mode_a' or selected_mode == 'dual':
cv2.imshow("Mode A Mask", mask_a)
if selected_mode == 'mode_b' or selected_mode == 'dual':
cv2.imshow("Mode B Mask", mask_b)
if aggregated_mask_b is not None:
cv2.imshow("Mode B Aggregated", aggregated_mask_b)
# 检查退出键
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
self.camera.b_exit = True
break
except Exception as e:
print(f"处理图像失败: {e}")
finally:
# 释放缓存
self.camera.cam.MV_CC_FreeImageBuffer(st_out_frame)
else:
print(f"获取图像失败! ret = {self.camera.to_hex_str(ret)}")
time.sleep(0.1)
cv2.destroyAllWindows()
def _draw_dual_mode_results(self, frame, selected_mode, selected_results, mode_a_results, mode_b_results, raw_leds_b):
"""绘制双模式检测结果"""
# 绘制模式A结果单点光斑
if selected_mode == 'mode_a' or selected_mode == 'dual':
if selected_mode == 'dual':
# 双模显示显示所有模式A结果
display_results = mode_a_results
else:
# 单模式显示显示选中的模式A结果
display_results = selected_results
for i, result in enumerate(display_results):
cx, cy, w, h, area, circularity = result
# 绘制绿色细线矩形框
cv2.rectangle(frame, (int(cx - w//2), int(cy - h//2)),
(int(cx + w//2), int(cy + h//2)), (0, 255, 0), 1)
# 绘制绿色小圆点和十字线
cv2.circle(frame, (int(cx), int(cy)), 3, (0, 255, 0), -1)
cross_size = int(w * 0.25)
cv2.line(frame, (int(cx) - cross_size, int(cy)), (int(cx) + cross_size, int(cy)), (0, 255, 0), 1)
cv2.line(frame, (int(cx), int(cy) - cross_size), (int(cx), int(cy) + cross_size), (0, 255, 0), 1)
# 绘制白色标签
info_text = f"SP: ID:{i} ({int(cx)},{int(cy)})"
cv2.putText(frame, info_text, (int(cx - w//2), int(cy - h//2) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 绘制模式B结果点阵灯盘
if selected_mode == 'mode_b' or selected_mode == 'dual':
if selected_mode == 'dual':
# 双模显示显示所有模式B结果
display_results = mode_b_results
else:
# 单模式显示显示选中的模式B结果
display_results = selected_results
# 绘制原始LED点半透明蓝色小点
for point in raw_leds_b:
x, y = point
cv2.circle(frame, (int(x), int(y)), 2, (255, 0, 0), -1)
for result in display_results:
# 绘制青色粗线圆形
cx, cy = result['center']
radius = result['radius']
cv2.circle(frame, (int(cx), int(cy)), int(radius), (255, 255, 0), 2)
# 绘制青色十字准星
cross_size = int(radius) # 十字准星大小与半径成比例
cv2.line(frame, (int(cx) - cross_size, int(cy)), (int(cx) + cross_size, int(cy)), (255, 255, 0), 2)
cv2.line(frame, (int(cx), int(cy) - cross_size), (int(cx), int(cy) + cross_size), (255, 255, 0), 2)
# 绘制黄色标签
info_text = f"LD: ID:{result['id']} ({int(cx)},{int(cy)}) R:{int(radius)} N:{result['led_count']}"
cv2.putText(frame, info_text, (int(cx) - 100, int(cy) - int(radius) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
return frame
def _draw_dual_mode_status_bar(self, frame, exposure, gain, selected_mode, mode_a_count, mode_b_count, debug_info, fps):
"""绘制双模式状态栏"""
# 计算状态栏高度
status_bar_height = 100
status_bar = np.zeros((status_bar_height, frame.shape[1], 3), dtype=np.uint8)
# 左上角:当前模式大字体标识
if selected_mode == 'mode_a':
mode_text = "MODE: AUTO-SINGLE"
mode_color = (0, 255, 0) # 绿色
elif selected_mode == 'mode_b':
mode_text = "MODE: AUTO-LAMP"
mode_color = (255, 255, 0) # 青色
elif selected_mode == 'dual':
mode_text = "MODE: DUAL DISPLAY"
mode_color = (0, 255, 255) # 蓝绿色
else:
mode_text = "MODE: NO TARGET"
mode_color = (128, 128, 128) # 灰色
cv2.putText(status_bar, mode_text, (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, mode_color, 2)
# 右上角FPS和相机参数
camera_text = f"FPS: {fps:.1f} | Exposure: {exposure:.0f}μs | Gain: {gain:.1f}dB"
cv2.putText(status_bar, camera_text, (frame.shape[1] - 450, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 中间:决策理由
decision_text = debug_info.get('decision_reason', 'No decision')
cv2.putText(status_bar, f"Decision: {decision_text}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 底部:两种模式的实时置信度条
# 模式A置信度条
a_score = debug_info.get('mode_a_score', 0)
a_percentage = min(a_score, 100)
cv2.rectangle(status_bar, (20, 75), (320, 95), (50, 50, 50), -1)
cv2.rectangle(status_bar, (20, 75), (20 + int(a_percentage * 3), 95), (0, 255, 0), -1)
cv2.putText(status_bar, f"Mode A: {a_percentage:.1f}% ({mode_a_count} targets)", (25, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 模式B置信度条
b_score = debug_info.get('mode_b_score', 0)
b_percentage = min(b_score, 100)
cv2.rectangle(status_bar, (340, 75), (640, 95), (50, 50, 50), -1)
cv2.rectangle(status_bar, (340, 75), (340 + int(b_percentage * 3), 95), (255, 255, 0), -1)
cv2.putText(status_bar, f"Mode B: {b_percentage:.1f}% ({mode_b_count} targets)", (345, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 将状态栏添加到图像顶部
frame_with_status = np.vstack((status_bar, frame))
return frame_with_status
def _handle_visual_faults(self, results, frame):
"""视觉算法容错处理"""
# 处理部分遮挡情况
filtered_results = []
for result in results:
# 检查LED数量是否足够即使部分遮挡
if result['led_count'] >= self.detector.min_led_count * 0.7: # 允许30%遮挡
filtered_results.append(result)
# 处理多灯盘重叠情况
if len(filtered_results) > 1:
# 检查是否有重叠的灯盘
overlap_pairs = []
for i in range(len(filtered_results)):
for j in range(i + 1, len(filtered_results)):
if self._is_overlapping(filtered_results[i], filtered_results[j]):
overlap_pairs.append((i, j))
# 如果有重叠,保留置信度高的结果
if overlap_pairs:
# 按置信度排序
filtered_results.sort(key=lambda x: x['confidence'], reverse=True)
# 只保留前N个不重叠的结果
non_overlapping = []
for result in filtered_results:
overlapping = False
for existing in non_overlapping:
if self._is_overlapping(result, existing):
overlapping = True
break
if not overlapping:
non_overlapping.append(result)
filtered_results = non_overlapping
return filtered_results
def _is_overlapping(self, result1, result2):
"""判断两个灯盘是否重叠"""
cx1, cy1 = result1['center']
r1 = result1['radius']
cx2, cy2 = result2['center']
r2 = result2['radius']
distance = np.sqrt((cx1 - cx2)**2 + (cy1 - cy2)**2)
return distance < r1 + r2 * 0.5 # 允许部分重叠
def _add_user_warnings(self, frame):
"""添加用户交互警告"""
# 计算画面平均亮度
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
mean_brightness = np.mean(gray_frame)
# 曝光过度警告
if mean_brightness > 250:
cv2.putText(frame, "⚠️ 曝光过度! 请降低曝光时间", (10, frame.shape[0] - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# 曝光不足警告
elif mean_brightness < 10:
cv2.putText(frame, "⚠️ 曝光不足! 请增加曝光时间", (10, frame.shape[0] - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# 增益过高警告
if self.camera.current_gain > 12:
cv2.putText(frame, "⚠️ 增益过高! 可能产生噪声", (10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
return frame
def start(self):
"""启动系统"""
print("双模式激光识别系统启动")
print("按 Q 退出显示窗口")
print("按 Ctrl+C 退出程序")
print("系统特性:")
print("- 模式A: 单点/稀疏光点检测(绿色激光笔)")
print("- 模式B: 点阵灯盘检测LED阵列")
print("- 智能仲裁: 自动选择最优检测模式")
print("- 性能优化: 支持交替帧策略")
# 初始化 SDK
MvCamera.MV_CC_Initialize()
try:
# 枚举设备
device_count = self.camera.enum_devices()
if device_count == 0:
print("未找到设备,程序退出!")
return
# 选择设备
n_index = int(input("请输入要打开的相机索引: "))
# 打开设备
ret = self.camera.open_device(n_index)
if ret != 0:
print("打开设备失败,程序退出!")
return
# 替换工作线程函数
original_work_thread = self.camera.work_thread_func
self.camera.work_thread_func = self.work_thread_func
# 开始取流
ret = self.camera.start_grabbing()
if ret != 0:
print("开始取流失败,程序退出!")
self.camera.close_device()
return
# 启动参数控制面板
self.config_panel = ConfigPanel(self.on_config_update)
self.config_panel.start()
# 等待用户操作
while True:
time.sleep(1)
if not self.camera.is_grabbing:
break
except KeyboardInterrupt:
print("\n用户中断程序")
except Exception as e:
print(f"程序异常: {e}")
finally:
# 清理资源
if self.config_panel:
self.config_panel.stop()
if self.camera.is_open:
self.camera.close_device()
# 反初始化 SDK
MvCamera.MV_CC_Finalize()
print("SDK反初始化成功")
print("程序退出")
if __name__ == "__main__":
system = DualModeLaserDetectionSystem()
system.start()