# -*- coding: utf-8 -*- """ 双模式激光识别系统仲裁器 核心功能: 1. 双轨并行处理模式A和模式B的检测结果 2. 智能仲裁决策,选择最优模式 3. 边界情况处理和冲突解决 4. 性能优化策略 """ import numpy as np import time class ModeArbitrator: def __init__(self): """初始化仲裁器""" # 工作模式 self.work_mode = 'auto' # auto, forced_single, forced_lamp, dual_display # 算法使能状态 self.enable_mode_a = True self.enable_mode_b = True # 历史状态 self.last_mode = None # 上一帧使用的模式 self.last_a_score = 0 self.last_b_score = 0 self.consecutive_mode_count = 0 # 连续使用同一模式的帧数 # 模式失效检测 self.mode_a_failure_count = 0 self.mode_b_failure_count = 0 self.max_failure_count = 30 # 连续无检测的最大帧数 # 性能优化 self.use_alternating_frames = False # 是否使用交替帧策略 self.current_frame_parity = 0 # 帧奇偶性 # 调试信息 self.debug_info = { 'mode_a_score': 0, 'mode_b_score': 0, 'decision_reason': '', 'processing_time': 0 } def set_work_mode(self, mode): """设置工作模式""" self.work_mode = mode def set_algorithm_enable(self, enable_a, enable_b): """设置算法使能状态""" self.enable_mode_a = enable_a self.enable_mode_b = enable_b def set_performance_optimization(self, use_alternating): """设置性能优化策略""" self.use_alternating_frames = use_alternating def calculate_mode_a_score(self, results, frame=None): """计算模式A的场景适配分""" if not results: return 0 # 高置信度检测物数量(1-3个为佳) count_score = 0 if 1 <= len(results) <= 3: count_score = 1.0 elif len(results) > 3: count_score = max(0, 1.0 - (len(results) - 3) * 0.2) else: count_score = 0.5 # 单个目标圆度和面积 circularity_scores = [] area_scores = [] for result in results: # 圆度得分 circularity = result[5] # result格式: (cx, cy, w, h, area, circularity) circ_score = min(circularity / 0.8, 1.0) if circularity > 0.5 else 0 circularity_scores.append(circ_score) # 面积得分(适中为佳) area = result[4] if 100 <= area <= 2000: area_score = 1.0 elif area < 100: area_score = max(0, area / 100) else: area_score = max(0, 1.0 - (area - 2000) / 3000) area_scores.append(area_score) # 平均圆度和面积得分 avg_circularity_score = np.mean(circularity_scores) if circularity_scores else 0 avg_area_score = np.mean(area_scores) if area_scores else 0 # 离散度系数(目标越分散分越低) dispersion_coefficient = 1.0 if len(results) > 1: # 计算所有点之间的平均距离 distances = [] for i in range(len(results)): for j in range(i + 1, len(results)): dx = results[i][0] - results[j][0] dy = results[i][1] - results[j][1] distances.append(np.sqrt(dx*dx + dy*dy)) if distances: avg_distance = np.mean(distances) # 距离越大,离散度越高,得分越低 dispersion_coefficient = max(0.5, 1.0 - min(avg_distance / 300, 0.5)) # 总分计算 total_score = ( len(results) * 0.3 + avg_circularity_score * 0.4 + avg_area_score * 0.2 + count_score * 0.1 ) * dispersion_coefficient return total_score * 100 # 转换为0-100的分数 def calculate_mode_b_score(self, results, frame=None): """计算模式B的场景适配分""" if not results: return 0 # 取置信度最高的结果 best_result = max(results, key=lambda x: x['confidence']) # 拟合圆置信度 confidence_score = min(best_result['confidence'] / 70, 1.0) # 70分为满分 # 检测到的LED颗粒数 led_count = best_result['led_count'] led_score = min(np.log(max(led_count, 10)) / np.log(50), 1.0) # 10-50个LED # 圆度合理性 circularity = best_result['circularity'] circularity_score = 0 if 0.4 <= circularity <= 0.8: circularity_score = 1.0 elif circularity < 0.4: circularity_score = max(0, circularity / 0.4) else: circularity_score = max(0, 1.0 - (circularity - 0.8) / 0.2) # 总分计算 total_score = ( confidence_score * 0.5 + led_score * 0.3 + circularity_score * 0.2 ) * 100 # 转换为0-100的分数 return total_score def arbitrate(self, mode_a_results, mode_b_results, frame=None): """ 智能仲裁决策 Args: mode_a_results: 模式A的检测结果 mode_b_results: 模式B的检测结果 frame: 原始帧(可选,用于预筛选) Returns: selected_mode: 选中的模式 ('mode_a', 'mode_b', None) selected_results: 选中的结果 debug_info: 调试信息 """ start_time = time.time() # 重置调试信息 self.debug_info = { 'mode_a_score': 0, 'mode_b_score': 0, 'decision_reason': '', 'processing_time': 0 } # 检查使能状态 if not self.enable_mode_a and not self.enable_mode_b: self.debug_info['decision_reason'] = 'Both algorithms disabled' self.debug_info['processing_time'] = time.time() - start_time return None, None, self.debug_info # 强制模式 if self.work_mode == 'forced_single' and self.enable_mode_a: self.debug_info['decision_reason'] = 'Forced single point mode' self.debug_info['processing_time'] = time.time() - start_time return 'mode_a', mode_a_results, self.debug_info if self.work_mode == 'forced_lamp' and self.enable_mode_b: self.debug_info['decision_reason'] = 'Forced lamp disk mode' self.debug_info['processing_time'] = time.time() - start_time return 'mode_b', mode_b_results, self.debug_info # 双模显示 if self.work_mode == 'dual_display': self.debug_info['decision_reason'] = 'Dual display mode' self.debug_info['processing_time'] = time.time() - start_time return 'dual', (mode_a_results, mode_b_results), self.debug_info # 性能优化:交替帧策略 if self.use_alternating_frames: self.current_frame_parity = 1 - self.current_frame_parity if self.current_frame_parity == 0 and self.enable_mode_a: self.debug_info['decision_reason'] = 'Alternating frame: mode A' self.debug_info['processing_time'] = time.time() - start_time return 'mode_a', mode_a_results, self.debug_info elif self.current_frame_parity == 1 and self.enable_mode_b: self.debug_info['decision_reason'] = 'Alternating frame: mode B' self.debug_info['processing_time'] = time.time() - start_time return 'mode_b', mode_b_results, self.debug_info # 计算得分 a_score = 0 b_score = 0 if self.enable_mode_a: a_score = self.calculate_mode_a_score(mode_a_results, frame) self.debug_info['mode_a_score'] = a_score # 模式失效检测 if not mode_a_results: self.mode_a_failure_count += 1 else: self.mode_a_failure_count = 0 if self.enable_mode_b: b_score = self.calculate_mode_b_score(mode_b_results, frame) self.debug_info['mode_b_score'] = b_score # 模式失效检测 if not mode_b_results: self.mode_b_failure_count += 1 else: self.mode_b_failure_count = 0 # 自动仲裁逻辑 selected_mode = None selected_results = None # 单模式使能情况 if self.enable_mode_a and not self.enable_mode_b: selected_mode = 'mode_a' selected_results = mode_a_results self.debug_info['decision_reason'] = 'Only mode A enabled' elif self.enable_mode_b and not self.enable_mode_a: selected_mode = 'mode_b' selected_results = mode_b_results self.debug_info['decision_reason'] = 'Only mode B enabled' else: # 双模式仲裁 if b_score > a_score * 1.2: selected_mode = 'mode_b' selected_results = mode_b_results self.debug_info['decision_reason'] = f'Lamp mode score ({b_score:.1f}) > single mode ({a_score:.1f}) * 1.2' elif a_score > b_score * 1.5: selected_mode = 'mode_a' selected_results = mode_a_results self.debug_info['decision_reason'] = f'Single mode score ({a_score:.1f}) > lamp mode ({b_score:.1f}) * 1.5' else: # 保持上一帧决策(迟滞效应) if self.last_mode: selected_mode = self.last_mode selected_results = mode_a_results if self.last_mode == 'mode_a' else mode_b_results self.debug_info['decision_reason'] = f'Hysteresis:保持{"单点" if self.last_mode == "mode_a" else "灯盘"}模式' else: # 首次决策 if a_score > b_score: selected_mode = 'mode_a' selected_results = mode_a_results self.debug_info['decision_reason'] = 'First frame: single mode' else: selected_mode = 'mode_b' selected_results = mode_b_results self.debug_info['decision_reason'] = 'First frame: lamp mode' # 更新历史状态 if selected_mode == self.last_mode: self.consecutive_mode_count += 1 else: self.consecutive_mode_count = 1 self.last_mode = selected_mode self.last_a_score = a_score self.last_b_score = b_score # 处理模式失效 if selected_mode == 'mode_a' and self.mode_a_failure_count >= self.max_failure_count: if self.enable_mode_b: selected_mode = 'mode_b' selected_results = mode_b_results self.debug_info['decision_reason'] = 'Mode A failed, switching to mode B' elif selected_mode == 'mode_b' and self.mode_b_failure_count >= self.max_failure_count: if self.enable_mode_a: selected_mode = 'mode_a' selected_results = mode_a_results self.debug_info['decision_reason'] = 'Mode B failed, switching to mode A' # 计算处理时间 self.debug_info['processing_time'] = time.time() - start_time return selected_mode, selected_results, self.debug_info def reset(self): """重置仲裁器状态""" self.last_mode = None self.last_a_score = 0 self.last_b_score = 0 self.consecutive_mode_count = 0 self.mode_a_failure_count = 0 self.mode_b_failure_count = 0 self.current_frame_parity = 0 self.debug_info = { 'mode_a_score': 0, 'mode_b_score': 0, 'decision_reason': '', 'processing_time': 0 } def get_debug_info(self): """获取调试信息""" return self.debug_info def get_mode_failure_status(self): """获取模式失效状态""" return { 'mode_a_failed': self.mode_a_failure_count >= self.max_failure_count, 'mode_b_failed': self.mode_b_failure_count >= self.max_failure_count, 'mode_a_failure_count': self.mode_a_failure_count, 'mode_b_failure_count': self.mode_b_failure_count }