import sys import os import logging import json import ctypes from ctypes import * import numpy as np import cv2 # 添加相机模块路径 sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), "camera")) # 导入相机相关模块 from camera.MvCameraControl_class import MvCamera from camera.CamOperation_class import CameraOperation from camera.MvErrorDefine_const import * from camera.CameraParams_header import * from camera.CameraParams_const import * class CameraManager: """相机管理器单例类,确保整个应用中只有一个相机实例""" _instance = None _initialized = False @staticmethod def get_instance(): """获取相机管理器实例""" if CameraManager._instance is None: CameraManager._instance = CameraManager() return CameraManager._instance def __init__(self): """初始化相机管理器""" if CameraManager._instance is not None: raise Exception("相机管理器是单例类,请使用get_instance()方法获取实例") else: CameraManager._instance = self # 初始化属性 self.isOpen = False self.isGrabbing = False self.last_device_index = -1 self.device_list = [] self.current_window_id = 0 # 本地图像模式相关 self.local_mode = False self.last_frame = None self.has_real_camera = False # 是否有真实相机连接 # 初始化SDK self.obj_cam_operation = CameraOperation() # 将单例标记为已初始化 CameraManager._initialized = True # 初始化日志 logging.info("相机管理器初始化") def enum_devices(self): """枚举相机设备列表 Returns: list: 设备信息列表 """ # 如果当前处于本地图像模式,返回一个虚拟设备 if self.local_mode: # 返回虚拟设备列表 virtual_device = { "vendor_name": "本地图像模式", "model_name": "虚拟相机", "serial_number": "LOCAL_IMG", "device_version": "1.0", "spec_version": "1.0", "user_defined_name": "本地图像播放器", "index": 0 } self.device_list = [virtual_device] return self.device_list # 清空设备列表 self.device_list = [] try: # 枚举设备 ret = self.obj_cam_operation.Enumrate_Devices() if ret != 0: error_msg = f"枚举相机设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) # 标记没有真实相机 self.has_real_camera = False return [] # 获取设备数量 device_num = self.obj_cam_operation.device_num if device_num <= 0: logging.warning("未发现相机设备") # 标记没有真实相机 self.has_real_camera = False return [] # 标记存在真实相机 self.has_real_camera = True # 解析并构建设备信息 # 确保Hikvision SDK已正确加载 from camera.MvCameraControl_class import MvCamCtrldll if MvCamCtrldll is None: logging.error("相机SDK未正确加载,无法枚举设备") return [] # 确保先关闭任何已打开的相机 if self.isOpen: self.close_device() # 枚举设备 self.deviceList = MV_CC_DEVICE_INFO_LIST() n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE | MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE) ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.deviceList) if ret != 0: error_msg = f"枚举设备失败! 错误码: 0x{ret:x}" logging.error(error_msg) return None if self.deviceList.nDeviceNum == 0: logging.info("未找到相机设备") return [] logging.info(f"找到 {self.deviceList.nDeviceNum} 个相机设备") # 定义解码函数 def decoding_char(c_ubyte_value): c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p) try: decode_str = c_char_p_value.value.decode('gbk') # 中文字符 except UnicodeDecodeError: decode_str = str(c_char_p_value.value) return decode_str # 构造设备信息列表 devices_info = [] for i in range(0, self.deviceList.nDeviceNum): mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents # 详细日志 logging.debug(f"设备 {i} 类型: {mvcc_dev_info.nTLayerType}") if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: # GigE相机 user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName) model_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName) nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) ip = f"{nip1}.{nip2}.{nip3}.{nip4}" display = f"[{i}]GigE: {user_defined_name} {model_name} ({ip})" logging.debug(f"GigE相机: {display}") device_info = { "index": i, "type": "GigE", "name": user_defined_name, "model": model_name, "ip": ip, "display": display } devices_info.append(device_info) elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: # USB相机 user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName) model_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName) # 序列号 strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) display = f"[{i}]USB: {user_defined_name} {model_name} ({strSerialNumber})" logging.debug(f"USB相机: {display}") device_info = { "index": i, "type": "USB", "name": user_defined_name, "model": model_name, "serial": strSerialNumber, "display": display } devices_info.append(device_info) elif mvcc_dev_info.nTLayerType == MV_GENTL_CAMERALINK_DEVICE: # CameraLink相机 user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chUserDefinedName) model_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chModelName) strSerialNumber = "" for per in mvcc_dev_info.SpecialInfo.stCMLInfo.chSerialNumber: if per == 0: break strSerialNumber = strSerialNumber + chr(per) display = f"[{i}]CML: {user_defined_name} {model_name} ({strSerialNumber})" logging.debug(f"CML相机: {display}") device_info = { "index": i, "type": "CML", "name": user_defined_name, "model": model_name, "serial": strSerialNumber, "display": display } devices_info.append(device_info) else: # 其他类型相机 display = f"[{i}]Other" device_info = { "index": i, "type": "Other", "display": display } devices_info.append(device_info) logging.debug(f"其他类型相机: {display}") # 添加详细日志 logging.debug(f"枚举到的设备数量: {len(devices_info)}") self.device_list = devices_info return devices_info except Exception as e: error_msg = f"枚举相机设备时发生异常: {str(e)}" logging.error(error_msg) self.has_real_camera = False return [] def open_device(self, device_index): """打开相机设备 Args: device_index: 设备索引 Returns: bool: 是否成功打开设备 """ # 如果当前处于本地图像模式,且设备索引是虚拟设备 if self.local_mode and device_index == 0: # 模拟打开设备成功 self.isOpen = True self.last_device_index = device_index logging.info("打开本地图像模式虚拟设备") return True # 真实相机模式处理 # 检查设备索引是否有效 if device_index < 0 or device_index >= len(self.device_list): logging.error(f"无效的设备索引: {device_index}") return False # 如果之前已经打开设备,先关闭 if self.isOpen: self.close_device() try: logging.info(f"开始打开相机,设备索引: {device_index}") # 设置当前选中的相机索引 self.last_device_index = device_index # 创建相机操作对象 self.obj_cam_operation = CameraOperation(self.obj_cam_operation.cam, self.deviceList, self.last_device_index) ret = self.obj_cam_operation.Open_device() if ret != 0: error_msg = f"打开相机失败! 错误码: 0x{ret:x}" logging.error(error_msg) self.isOpen = False return False # 设置连续模式 (非触发模式) ret = self.obj_cam_operation.Set_trigger_mode(False) if ret != 0: error_msg = f"设置连续模式失败! 错误码: 0x{ret:x}" logging.error(error_msg) # 出错时关闭设备 self.obj_cam_operation.Close_device() self.isOpen = False return False # 获取参数 ret = self.obj_cam_operation.Get_parameter() if ret != 0: error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}" logging.error(error_msg) # 继续执行,不返回失败 self.isOpen = True logging.info(f"相机已成功打开,设备索引: {device_index}") return True except Exception as e: error_msg = f"打开相机时发生异常: {str(e)}" logging.error(error_msg) self.isOpen = False return False def close_device(self): """关闭相机设备 Returns: bool: 是否成功关闭设备 """ # 如果处于本地图像模式且使用的是虚拟设备 if self.local_mode and not self.has_real_camera: # 模拟关闭设备成功 if self.isGrabbing: self.stop_grabbing() self.isOpen = False logging.info("关闭本地图像模式虚拟设备") return True # 真实相机模式处理 if not self.isOpen: return True try: # 确保停止取图 if self.isGrabbing: self.stop_grabbing() # 关闭设备 ret = self.obj_cam_operation.Close_device() if ret != 0: error_msg = f"关闭相机失败! 错误码: 0x{ret:x}" logging.error(error_msg) return False self.isOpen = False logging.info("相机已关闭") return True except Exception as e: error_msg = f"关闭相机时发生异常: {str(e)}" logging.error(error_msg) self.isOpen = False # 强制设置为关闭状态 return False def start_grabbing(self, window_id): """开始图像采集 Args: window_id: 显示窗口句柄 Returns: bool: 是否成功开始采集 """ # 保存窗口ID self.current_window_id = window_id # 如果处于本地图像模式,模拟开始采集 if self.local_mode: self.isGrabbing = True logging.info("开始本地图像模式采集, 窗口ID: " + str(window_id)) return True # 检查设备是否已打开 if not self.isOpen: logging.error("相机未打开,无法开始取图") return False try: ret = self.obj_cam_operation.Start_grabbing(window_id) if ret != 0: error_msg = f"开始取图失败! 错误码: 0x{ret:x}" logging.error(error_msg) return False self.isGrabbing = True logging.info("开始图像采集") return True except Exception as e: error_msg = f"开始取图时发生异常: {str(e)}" logging.error(error_msg) return False def stop_grabbing(self): """停止取图 Returns: bool: 是否成功停止取图 """ # 如果处于本地图像模式,模拟停止采集 if self.local_mode: self.isGrabbing = False logging.info("停止本地图像模式采集") return True if not self.isOpen: return True if not self.isGrabbing: return True try: ret = self.obj_cam_operation.Stop_grabbing() if ret != 0: error_msg = f"停止取图失败! 错误码: 0x{ret:x}" logging.error(error_msg) return False self.isGrabbing = False logging.info("停止图像采集") return True except Exception as e: error_msg = f"停止取图时发生异常: {str(e)}" logging.error(error_msg) self.isGrabbing = False # 强制设置为非取图状态 return False def get_parameters(self): """获取相机参数 Returns: tuple: (曝光时间, 增益, 帧率) 或 None (如果失败) """ if not self.isOpen: logging.error("相机未打开,无法获取参数") return None try: ret = self.obj_cam_operation.Get_parameter() if ret != 0: error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}" logging.error(error_msg) return None exposure_time = self.obj_cam_operation.exposure_time gain = self.obj_cam_operation.gain frame_rate = self.obj_cam_operation.frame_rate logging.info(f"获取相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}") return (exposure_time, gain, frame_rate) except Exception as e: error_msg = f"获取相机参数时发生异常: {str(e)}" logging.error(error_msg) return None def set_parameters(self, frame_rate, exposure_time, gain): """设置相机参数 Args: frame_rate: 帧率 exposure_time: 曝光时间 gain: 增益 Returns: bool: 是否成功设置参数 """ if not self.isOpen: logging.error("相机未打开,无法设置参数") return False try: # 设置参数 ret = self.obj_cam_operation.Set_parameter(str(frame_rate), str(exposure_time), str(gain)) if ret != 0: error_msg = f"设置相机参数失败! 错误码: 0x{ret:x}" logging.error(error_msg) return False logging.info(f"设置相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}") return True except Exception as e: error_msg = f"设置相机参数时发生异常: {str(e)}" logging.error(error_msg) return False def set_local_mode(self, enabled): """设置是否启用本地图像模式 Args: enabled: 是否启用本地模式 """ # 如果当前正在采集,先停止 if self.isGrabbing: self.stop_grabbing() # 如果当前相机已打开且切换到本地模式,先关闭相机 if self.isOpen and enabled and not self.local_mode: self.close_device() # 设置模式 self.local_mode = enabled # 重新枚举设备(会返回真实设备或虚拟设备) self.enum_devices() logging.info(f"本地图像模式已{'启用' if enabled else '禁用'}") # 如果切换到本地模式,自动"打开"虚拟设备 if enabled and not self.isOpen: self.open_device(0) # 虚拟设备索引为0 def handle_local_frame(self, frame, window_id=0): """处理并存储本地图像帧(不使用OpenCV窗口显示) Args: frame: 本地图像帧数据(OpenCV RGB图像) window_id: 窗口句柄ID,仅用于兼容旧代码,实际不再使用 """ if not self.local_mode: return False try: # 记录窗口ID (仅用于日志和兼容) self.current_window_id = window_id # 保存最后一帧,这个帧可以供其他组件使用 self.last_frame = frame # 设置为正在抓取状态,与实际相机行为保持一致 self.isGrabbing = True # 输出日志帮助调试 logging.warning(f"====> 相机管理器存储本地图像帧,尺寸: {frame.shape if frame is not None else 'None'}") # 我们仅存储帧数据,不进行显示操作 # 显示操作由CameraDisplayWidget完成 return True except Exception as e: logging.error(f"处理本地图像帧失败: {e}") return False def save_params_to_config(self, exposure, gain, frame_rate): """保存相机参数到配置文件 Args: exposure: 曝光值(滑块值) gain: 增益(滑块值) frame_rate: 帧率(滑块值) Returns: bool: 是否成功保存参数 """ try: # 创建相机参数配置 config = { "exposure": exposure, "gain": gain, "frame_rate": frame_rate } # 保存到配置文件 config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "camera_config.json") os.makedirs(os.path.dirname(config_path), exist_ok=True) # 检查文件是否存在并读取 if os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: try: full_config = json.load(f) except json.JSONDecodeError: full_config = {} else: full_config = {} # 更新配置 full_config["camera_params"] = config # 写入文件 with open(config_path, 'w', encoding='utf-8') as f: json.dump(full_config, f, indent=4, ensure_ascii=False) logging.info(f"相机参数已保存到配置文件: {config_path}") return True except Exception as e: error_msg = f"保存相机参数到配置文件时发生异常: {str(e)}" logging.error(error_msg) return False