jiateng_ws/widgets/camera_manager.py

461 lines
17 KiB
Python
Raw Normal View History

2025-06-07 10:45:09 +08:00
import sys
import os
import logging
2025-06-27 15:14:30 +08:00
import json
import ctypes
2025-06-07 10:45:09 +08:00
from ctypes import *
# 添加相机模块路径
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), "camera"))
# 导入相机相关模块
from camera.MvCameraControl_class import MvCamera
from camera.CamOperation_class import CameraOperation
from camera.MvErrorDefine_const import *
from camera.CameraParams_header import *
from camera.CameraParams_const import *
class CameraManager:
"""相机管理器单例类,确保整个应用中只有一个相机实例"""
_instance = None
_initialized = False
@staticmethod
def get_instance():
"""获取相机管理器实例"""
if CameraManager._instance is None:
CameraManager._instance = CameraManager()
return CameraManager._instance
def __init__(self):
"""初始化相机管理器"""
if CameraManager._instance is not None:
raise Exception("相机管理器是单例类请使用get_instance()方法获取实例")
else:
CameraManager._instance = self
# 初始化变量
self.deviceList = None
self.cam = MvCamera()
self.nSelCamIndex = -1
self.obj_cam_operation = None
self.isOpen = False
self.isGrabbing = False
# 初始化SDK (只在第一次时初始化)
if not CameraManager._initialized:
MvCamera.MV_CC_Initialize()
CameraManager._initialized = True
logging.info("相机SDK已初始化")
def enum_devices(self):
2025-06-27 15:14:30 +08:00
"""枚举相机设备完全参考BasicDemo.py的enum_devices实现"""
2025-06-07 10:45:09 +08:00
try:
# 确保Hikvision SDK已正确加载
from camera.MvCameraControl_class import MvCamCtrldll
if MvCamCtrldll is None:
logging.error("相机SDK未正确加载无法枚举设备")
return []
2025-06-07 10:45:09 +08:00
# 确保先关闭任何已打开的相机
if self.isOpen:
self.close_device()
# 枚举设备
self.deviceList = MV_CC_DEVICE_INFO_LIST()
n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE
| MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE)
ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.deviceList)
if ret != 0:
error_msg = f"枚举设备失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
if self.deviceList.nDeviceNum == 0:
logging.info("未找到相机设备")
return []
logging.info(f"找到 {self.deviceList.nDeviceNum} 个相机设备")
2025-06-27 15:14:30 +08:00
# 定义解码函数
def decoding_char(c_ubyte_value):
c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p)
try:
decode_str = c_char_p_value.value.decode('gbk') # 中文字符
except UnicodeDecodeError:
decode_str = str(c_char_p_value.value)
return decode_str
2025-06-07 10:45:09 +08:00
# 构造设备信息列表
devices_info = []
for i in range(0, self.deviceList.nDeviceNum):
mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
2025-06-27 15:14:30 +08:00
# 详细日志
logging.debug(f"设备 {i} 类型: {mvcc_dev_info.nTLayerType}")
2025-06-07 10:45:09 +08:00
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
# GigE相机
2025-06-27 15:14:30 +08:00
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName)
2025-06-07 10:45:09 +08:00
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
ip = f"{nip1}.{nip2}.{nip3}.{nip4}"
2025-06-27 15:14:30 +08:00
display = f"[{i}]GigE: {user_defined_name} {model_name} ({ip})"
logging.debug(f"GigE相机: {display}")
2025-06-07 10:45:09 +08:00
device_info = {
"index": i,
"type": "GigE",
"name": user_defined_name,
"model": model_name,
"ip": ip,
2025-06-27 15:14:30 +08:00
"display": display
2025-06-07 10:45:09 +08:00
}
devices_info.append(device_info)
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
# USB相机
2025-06-27 15:14:30 +08:00
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName)
2025-06-07 10:45:09 +08:00
2025-06-27 15:14:30 +08:00
# 序列号
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
2025-06-07 10:45:09 +08:00
if per == 0:
break
2025-06-27 15:14:30 +08:00
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]USB: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"USB相机: {display}")
2025-06-07 10:45:09 +08:00
device_info = {
"index": i,
"type": "USB",
"name": user_defined_name,
"model": model_name,
2025-06-27 15:14:30 +08:00
"serial": strSerialNumber,
"display": display
2025-06-07 10:45:09 +08:00
}
devices_info.append(device_info)
2025-06-27 15:14:30 +08:00
elif mvcc_dev_info.nTLayerType == MV_GENTL_CAMERALINK_DEVICE:
# CameraLink相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chModelName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stCMLInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]CML: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"CML相机: {display}")
device_info = {
"index": i,
"type": "CML",
"name": user_defined_name,
"model": model_name,
"serial": strSerialNumber,
"display": display
}
devices_info.append(device_info)
2025-06-07 10:45:09 +08:00
else:
# 其他类型相机
2025-06-27 15:14:30 +08:00
display = f"[{i}]Other"
2025-06-07 10:45:09 +08:00
device_info = {
"index": i,
"type": "Other",
2025-06-27 15:14:30 +08:00
"display": display
2025-06-07 10:45:09 +08:00
}
devices_info.append(device_info)
2025-06-27 15:14:30 +08:00
logging.debug(f"其他类型相机: {display}")
2025-06-07 10:45:09 +08:00
2025-06-27 15:14:30 +08:00
# 添加详细日志
logging.debug(f"枚举到的设备数量: {len(devices_info)}")
2025-06-07 10:45:09 +08:00
return devices_info
except Exception as e:
error_msg = f"枚举设备时发生异常: {str(e)}"
logging.error(error_msg)
return None
def open_device(self, device_index):
2025-06-27 15:14:30 +08:00
"""打开相机设备参考BasicDemo.py的open_device实现
2025-06-07 10:45:09 +08:00
Args:
device_index: 设备索引
Returns:
bool: 是否成功打开设备
"""
# 检查是否已经打开
if self.isOpen:
logging.warning("相机已经打开!")
return False
2025-06-27 15:14:30 +08:00
# 确保有效的设备索引
if device_index < 0 or self.deviceList is None or device_index >= self.deviceList.nDeviceNum:
error_msg = f"无效的设备索引: {device_index}, 设备列表: {self.deviceList is not None}"
if self.deviceList:
error_msg += f", 设备数量: {self.deviceList.nDeviceNum}"
logging.error(error_msg)
2025-06-07 10:45:09 +08:00
return False
try:
2025-06-27 15:14:30 +08:00
logging.info(f"开始打开相机,设备索引: {device_index}")
2025-06-07 10:45:09 +08:00
2025-06-27 15:14:30 +08:00
# 设置当前选中的相机索引
2025-06-07 10:45:09 +08:00
self.nSelCamIndex = device_index
# 创建相机操作对象
self.obj_cam_operation = CameraOperation(self.cam, self.deviceList, self.nSelCamIndex)
ret = self.obj_cam_operation.Open_device()
if ret != 0:
error_msg = f"打开相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
self.isOpen = False
return False
2025-06-27 15:14:30 +08:00
# 设置连续模式 (非触发模式)
ret = self.obj_cam_operation.Set_trigger_mode(False)
if ret != 0:
error_msg = f"设置连续模式失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 出错时关闭设备
self.obj_cam_operation.Close_device()
self.isOpen = False
return False
2025-06-07 10:45:09 +08:00
# 获取参数
2025-06-27 15:14:30 +08:00
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 继续执行,不返回失败
2025-06-07 10:45:09 +08:00
self.isOpen = True
2025-06-27 15:14:30 +08:00
logging.info(f"相机已成功打开,设备索引: {device_index}")
2025-06-07 10:45:09 +08:00
return True
except Exception as e:
error_msg = f"打开相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False
return False
def close_device(self):
"""关闭相机设备
Returns:
bool: 是否成功关闭设备
"""
if not self.isOpen:
return True
try:
# 确保停止取图
if self.isGrabbing:
self.stop_grabbing()
# 关闭设备
ret = self.obj_cam_operation.Close_device()
if ret != 0:
error_msg = f"关闭相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isOpen = False
logging.info("相机已关闭")
return True
except Exception as e:
error_msg = f"关闭相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False # 强制设置为关闭状态
return False
def start_grabbing(self, window_id):
"""开始取图
Args:
window_id: 显示窗口句柄
Returns:
bool: 是否成功开始取图
"""
if not self.isOpen:
logging.error("相机未打开,无法开始取图")
return False
if self.isGrabbing:
logging.warning("相机已经在取图")
return True
try:
ret = self.obj_cam_operation.Start_grabbing(window_id)
if ret != 0:
error_msg = f"开始取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = True
logging.info("开始图像采集")
return True
except Exception as e:
error_msg = f"开始取图时发生异常: {str(e)}"
logging.error(error_msg)
return False
def stop_grabbing(self):
"""停止取图
Returns:
bool: 是否成功停止取图
"""
if not self.isOpen:
return True
if not self.isGrabbing:
return True
try:
ret = self.obj_cam_operation.Stop_grabbing()
if ret != 0:
error_msg = f"停止取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = False
logging.info("停止图像采集")
return True
except Exception as e:
error_msg = f"停止取图时发生异常: {str(e)}"
logging.error(error_msg)
self.isGrabbing = False # 强制设置为非取图状态
return False
def get_parameters(self):
"""获取相机参数
Returns:
tuple: (曝光时间, 增益, 帧率) None (如果失败)
"""
if not self.isOpen:
logging.error("相机未打开,无法获取参数")
return None
try:
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
exposure_time = self.obj_cam_operation.exposure_time
gain = self.obj_cam_operation.gain
frame_rate = self.obj_cam_operation.frame_rate
logging.info(f"获取相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return (exposure_time, gain, frame_rate)
except Exception as e:
error_msg = f"获取相机参数时发生异常: {str(e)}"
logging.error(error_msg)
return None
def set_parameters(self, frame_rate, exposure_time, gain):
"""设置相机参数
Args:
frame_rate: 帧率
exposure_time: 曝光时间
gain: 增益
Returns:
bool: 是否成功设置参数
"""
if not self.isOpen:
logging.error("相机未打开,无法设置参数")
return False
try:
# 设置参数
ret = self.obj_cam_operation.Set_parameter(str(frame_rate), str(exposure_time), str(gain))
if ret != 0:
error_msg = f"设置相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
logging.info(f"设置相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return True
except Exception as e:
error_msg = f"设置相机参数时发生异常: {str(e)}"
logging.error(error_msg)
2025-06-27 15:14:30 +08:00
return False
def save_params_to_config(self, exposure, gain, frame_rate):
"""保存相机参数到配置文件
Args:
exposure: 曝光值(滑块值)
gain: 增益(滑块值)
frame_rate: 帧率(滑块值)
Returns:
bool: 是否成功保存参数
"""
try:
# 创建相机参数配置
config = {
"exposure": exposure,
"gain": gain,
"frame_rate": frame_rate
}
# 保存到配置文件
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "camera_config.json")
os.makedirs(os.path.dirname(config_path), exist_ok=True)
# 检查文件是否存在并读取
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
try:
full_config = json.load(f)
except json.JSONDecodeError:
full_config = {}
else:
full_config = {}
# 更新配置
full_config["camera_params"] = config
# 写入文件
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(full_config, f, indent=4, ensure_ascii=False)
logging.info(f"相机参数已保存到配置文件: {config_path}")
return True
except Exception as e:
error_msg = f"保存相机参数到配置文件时发生异常: {str(e)}"
logging.error(error_msg)
2025-06-07 10:45:09 +08:00
return False