# -*- coding: utf-8 -*- import sys import ctypes import threading import time import os # 添加SDK路径 sys.path.append('Python/MvImport') from MvCameraControl_class import * from MvErrorDefine_const import * from CameraParams_header import * import cv2 import numpy as np # 枚举值定义 MV_EXPOSURE_AUTO_MODE_OFF = 0 MV_GAIN_AUTO_MODE_OFF = 0 MV_BALANCE_WHITE_AUTO_OFF = 0 class CameraHik: def __init__(self): self.cam = None self.device_list = MV_CC_DEVICE_INFO_LIST() self.is_open = False self.is_grabbing = False self.b_exit = False self.work_thread = None self.current_exp = 5000.0 # 当前曝光时间(微秒) self.current_gain = 0.0 # 当前增益(dB) self.current_fps = 30.0 # 当前帧率 # 将返回的错误码转换为十六进制显示 def to_hex_str(self, num): cha_dic = {10: 'a', 11: 'b', 12: 'c', 13: 'd', 14: 'e', 15: 'f'} hex_str = "" if num < 0: num = num + 2 ** 32 while num >= 16: digit = num % 16 hex_str = cha_dic.get(digit, str(digit)) + hex_str num //= 16 hex_str = cha_dic.get(num, str(num)) + hex_str return hex_str # Decoding Characters def decoding_char(self, ctypes_char_array): """ 安全地从 ctypes 字符数组中解码出字符串。 适用于 Python 2.x 和 3.x,以及 32/64 位环境。 """ byte_str = memoryview(ctypes_char_array).tobytes() # 在第一个空字符处截断 null_index = byte_str.find(b'\x00') if null_index != -1: byte_str = byte_str[:null_index] # 多编码尝试解码 for encoding in ['gbk', 'utf-8', 'latin-1']: try: return byte_str.decode(encoding) except UnicodeDecodeError: continue # 如果所有编码都失败,使用替换策略 return byte_str.decode('latin-1', errors='replace') # 枚举相机 def enum_devices(self): self.device_list = MV_CC_DEVICE_INFO_LIST() n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE) ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.device_list) if ret != 0: print(f"枚举设备失败! ret = :{self.to_hex_str(ret)}") return ret if self.device_list.nDeviceNum == 0: print("未找到设备") return ret print(f"找到 {self.device_list.nDeviceNum} 台设备!") dev_list = [] target_model_found = False for i in range(0, self.device_list.nDeviceNum): mvcc_dev_info = cast(self.device_list.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: print(f"\ngige device: [{i}]") user_defined_name = self.decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName) model_name = self.decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName) print(f"device user define name: {user_defined_name}") print(f"device model name: {model_name}") # 检查是否为目标型号 if "MV-CS016-10UC" in model_name: print("✓ 找到目标相机型号: MV-CS016-10UC") target_model_found = True nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) print(f"current ip: {nip1}.{nip2}.{nip3}.{nip4} ") dev_list.append(f"[{i}]GigE: {user_defined_name} {model_name}({nip1}.{nip2}.{nip3}.{nip4})") elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: print(f"\nu3v device: [{i}]") user_defined_name = self.decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName) model_name = self.decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName) print(f"device user define name: {user_defined_name}") print(f"device model name: {model_name}") # 检查是否为目标型号 if "MV-CS016-10UC" in model_name: print("✓ 找到目标相机型号: MV-CS016-10UC") target_model_found = True str_serial_number = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break str_serial_number = str_serial_number + chr(per) print(f"user serial number: {str_serial_number}") dev_list.append(f"[{i}]USB: {user_defined_name} {model_name}({str_serial_number})") # 打印设备列表 print("\n设备列表:") for i, dev in enumerate(dev_list): print(f"{i}: {dev}") # 提示用户是否找到目标型号 if not target_model_found: print("\n⚠️ 警告: 未找到目标相机型号 MV-CS016-10UC") print("⚠️ 系统仍将尝试使用找到的设备,但可能无法正常工作") return self.device_list.nDeviceNum # 打开相机 def open_device(self, n_index): if self.is_open: print("相机已经打开!") return MV_E_CALLORDER if n_index < 0 or n_index >= self.device_list.nDeviceNum: print("请选择有效的相机索引!") return MV_E_CALLORDER # 选择设备并创建句柄 st_device_list = cast(self.device_list.pDeviceInfo[n_index], POINTER(MV_CC_DEVICE_INFO)).contents self.cam = MvCamera() # 句柄占用保护 ret = self.cam.MV_CC_CreateHandle(st_device_list) if ret != 0: print(f"创建设备句柄失败! ret = {self.to_hex_str(ret)}") # 尝试清理可能的占用 try: self.cam.MV_CC_DestroyHandle() except: pass return ret # 打开设备 ret = self.cam.MV_CC_OpenDevice() if ret != 0: print(f"打开设备失败! ret = {self.to_hex_str(ret)}") try: self.cam.MV_CC_DestroyHandle() except: pass return ret print("设备打开成功!") self.is_open = True # 设置触发模式为off ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print(f"设置触发模式失败! ret = {self.to_hex_str(ret)}") # 设置像素格式为 BGR8 ret = self.cam.MV_CC_SetEnumValue("PixelFormat", PixelType_Gvsp_BGR8_Packed) if ret != 0: print(f"设置像素格式失败! ret = {self.to_hex_str(ret)}") # 设置连续采集模式 ret = self.cam.MV_CC_SetEnumValue("AcquisitionMode", 2) # Continuous if ret != 0: print(f"设置连续采集模式失败! ret = {self.to_hex_str(ret)}") # 防过曝设置 self.setup_anti_overexposure() return MV_OK # 防过曝参数设置 def setup_anti_overexposure(self): """设置防过曝参数(点阵灯盘专用)""" # 关闭自动曝光,设置初始曝光时间(微秒) ret = self.cam.MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_OFF) if ret != 0: print(f"设置曝光自动模式失败! ret = {self.to_hex_str(ret)}") # 点阵灯盘初始曝光时间:3000-5000微秒 initial_exp = 4000.0 ret = self.cam.MV_CC_SetFloatValue("ExposureTime", initial_exp) if ret != 0: print(f"设置曝光时间失败! ret = {self.to_hex_str(ret)}") else: self.current_exp = initial_exp print(f"初始曝光时间设置为: {initial_exp}μs") # 关闭自动增益 ret = self.cam.MV_CC_SetEnumValue("GainAuto", MV_GAIN_AUTO_MODE_OFF) if ret != 0: print(f"设置增益自动模式失败! ret = {self.to_hex_str(ret)}") ret = self.cam.MV_CC_SetFloatValue("Gain", 0.0) if ret != 0: print(f"设置增益失败! ret = {self.to_hex_str(ret)}") else: self.current_gain = 0.0 print("初始增益设置为: 0dB") # 关闭自动白平衡(识别绿色LED时必需) ret = self.cam.MV_CC_SetEnumValue("BalanceWhiteAuto", MV_BALANCE_WHITE_AUTO_OFF) if ret != 0: print(f"设置白平衡自动模式失败! ret = {self.to_hex_str(ret)}") else: print("自动白平衡已关闭") # 实时调节接口 def set_exposure_time(self, microseconds): """设置曝光时间(微秒)""" if not self.is_open: return MV_E_CALLORDER ret = self.cam.MV_CC_SetFloatValue("ExposureTime", microseconds) if ret != 0: print(f"设置曝光时间失败! ret = {self.to_hex_str(ret)}") return ret self.current_exp = microseconds return MV_OK def set_gain(self, db): """设置增益(dB)""" if not self.is_open: return MV_E_CALLORDER ret = self.cam.MV_CC_SetFloatValue("Gain", db) if ret != 0: print(f"设置增益失败! ret = {self.to_hex_str(ret)}") return ret self.current_gain = db return MV_OK def set_frame_rate(self, fps): """设置帧率""" if not self.is_open: return MV_E_CALLORDER ret = self.cam.MV_CC_SetFloatValue("AcquisitionFrameRate", fps) if ret != 0: print(f"设置帧率失败! ret = {self.to_hex_str(ret)}") return ret self.current_fps = fps return MV_OK def set_roi(self, x, y, w, h): """设置ROI""" if not self.is_open: return MV_E_CALLORDER # 停止取流 if self.is_grabbing: self.stop_grabbing() # 设置ROI参数 ret = self.cam.MV_CC_SetIntValue("OffsetX", x) if ret != 0: print(f"设置OffsetX失败! ret = {self.to_hex_str(ret)}") return ret ret = self.cam.MV_CC_SetIntValue("OffsetY", y) if ret != 0: print(f"设置OffsetY失败! ret = {self.to_hex_str(ret)}") return ret ret = self.cam.MV_CC_SetIntValue("Width", w) if ret != 0: print(f"设置Width失败! ret = {self.to_hex_str(ret)}") return ret ret = self.cam.MV_CC_SetIntValue("Height", h) if ret != 0: print(f"设置Height失败! ret = {self.to_hex_str(ret)}") return ret # 重新开始取流 if self.is_grabbing: self.start_grabbing() return MV_OK # 开始取流 def start_grabbing(self): if not self.is_open: print("相机未打开!") return MV_E_CALLORDER if self.is_grabbing: print("已经开始取流!") return MV_E_CALLORDER self.b_exit = False ret = self.cam.MV_CC_StartGrabbing() if ret != 0: print(f"开始取流失败! ret = {self.to_hex_str(ret)}") return ret self.is_grabbing = True print("开始取流成功!") # 启动取图线程 self.work_thread = threading.Thread(target=self.work_thread_func) self.work_thread.daemon = True self.work_thread.start() return MV_OK # 停止取流 def stop_grabbing(self): if not self.is_open: print("相机未打开!") return MV_E_CALLORDER if not self.is_grabbing: print("未开始取流!") return MV_E_CALLORDER self.b_exit = True time.sleep(0.1) ret = self.cam.MV_CC_StopGrabbing() if ret != 0: print(f"停止取流失败! ret = {self.to_hex_str(ret)}") return ret self.is_grabbing = False print("停止取流成功!") return MV_OK # 关闭相机 def close_device(self): if self.is_grabbing: self.stop_grabbing() if self.is_open: ret = self.cam.MV_CC_CloseDevice() if ret != 0: print(f"关闭设备失败! ret = {self.to_hex_str(ret)}") # 销毁句柄 self.cam.MV_CC_DestroyHandle() self.is_open = False print("设备关闭成功!") # 取图线程函数 def work_thread_func(self): st_out_frame = MV_FRAME_OUT() memset(byref(st_out_frame), 0, sizeof(st_out_frame)) timeout_count = 0 # 超时计数器 max_timeout_count = 3 # 最大超时次数 while not self.b_exit: # 获取图像 ret = self.cam.MV_CC_GetImageBuffer(st_out_frame, 1000) if ret == 0: timeout_count = 0 # 重置超时计数器 # 打印图像信息 print(f"获取一帧图像: 宽度[{st_out_frame.stFrameInfo.nWidth}], 高度[{st_out_frame.stFrameInfo.nHeight}], 帧数[{st_out_frame.stFrameInfo.nFrameNum}]") # 转换为 OpenCV 格式 try: # 计算数据大小 data_size = st_out_frame.stFrameInfo.nWidth * st_out_frame.stFrameInfo.nHeight * 3 # 从缓冲区复制数据 frame_data = np.ctypeslib.as_array(st_out_frame.pBufAddr, shape=(data_size,)) frame = frame_data.reshape((st_out_frame.stFrameInfo.nHeight, st_out_frame.stFrameInfo.nWidth, 3)) # 显示图像 cv2.imshow("Camera", frame) key = cv2.waitKey(1) & 0xFF if key == ord('q'): self.b_exit = True break except Exception as e: print(f"处理图像失败: {e}") finally: # 释放缓存 try: self.cam.MV_CC_FreeImageBuffer(st_out_frame) except Exception as e: print(f"释放图像缓存失败: {e}") else: print(f"获取图像失败! ret = {self.to_hex_str(ret)}") timeout_count += 1 time.sleep(0.1) # 连续超时3次,触发重连 if timeout_count >= max_timeout_count: print("连续超时,尝试重连相机...") self._reconnect_camera() timeout_count = 0 cv2.destroyAllWindows() def _reconnect_camera(self): """重连相机""" try: # 停止取流 if self.is_grabbing: print("停止取流...") self.stop_grabbing() # 关闭设备 print("关闭设备...") self.close_device() # 重新枚举设备 print("重新枚举设备...") device_count = self.enum_devices() if device_count == 0: print("重连失败:未找到设备") return # 重新打开设备(默认使用索引0) print("重新打开设备...") ret = self.open_device(0) if ret != 0: print(f"重连失败:打开设备失败! ret = {self.to_hex_str(ret)}") return # 重新开始取流 print("重新开始取流...") ret = self.start_grabbing() if ret != 0: print(f"重连失败:开始取流失败! ret = {self.to_hex_str(ret)}") return print("相机重连成功!") except Exception as e: print(f"重连相机失败: {e}")