jiateng_ws/widgets/camera_manager.py
2025-06-27 15:14:30 +08:00

455 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import logging
import json
import ctypes
from ctypes import *
# 添加相机模块路径
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), "camera"))
# 导入相机相关模块
from camera.MvCameraControl_class import MvCamera
from camera.CamOperation_class import CameraOperation
from camera.MvErrorDefine_const import *
from camera.CameraParams_header import *
from camera.CameraParams_const import *
class CameraManager:
"""相机管理器单例类,确保整个应用中只有一个相机实例"""
_instance = None
_initialized = False
@staticmethod
def get_instance():
"""获取相机管理器实例"""
if CameraManager._instance is None:
CameraManager._instance = CameraManager()
return CameraManager._instance
def __init__(self):
"""初始化相机管理器"""
if CameraManager._instance is not None:
raise Exception("相机管理器是单例类请使用get_instance()方法获取实例")
else:
CameraManager._instance = self
# 初始化变量
self.deviceList = None
self.cam = MvCamera()
self.nSelCamIndex = -1
self.obj_cam_operation = None
self.isOpen = False
self.isGrabbing = False
# 初始化SDK (只在第一次时初始化)
if not CameraManager._initialized:
MvCamera.MV_CC_Initialize()
CameraManager._initialized = True
logging.info("相机SDK已初始化")
def enum_devices(self):
"""枚举相机设备完全参考BasicDemo.py的enum_devices实现"""
try:
# 确保先关闭任何已打开的相机
if self.isOpen:
self.close_device()
# 枚举设备
self.deviceList = MV_CC_DEVICE_INFO_LIST()
n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE
| MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE)
ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.deviceList)
if ret != 0:
error_msg = f"枚举设备失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
if self.deviceList.nDeviceNum == 0:
logging.info("未找到相机设备")
return []
logging.info(f"找到 {self.deviceList.nDeviceNum} 个相机设备")
# 定义解码函数
def decoding_char(c_ubyte_value):
c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p)
try:
decode_str = c_char_p_value.value.decode('gbk') # 中文字符
except UnicodeDecodeError:
decode_str = str(c_char_p_value.value)
return decode_str
# 构造设备信息列表
devices_info = []
for i in range(0, self.deviceList.nDeviceNum):
mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
# 详细日志
logging.debug(f"设备 {i} 类型: {mvcc_dev_info.nTLayerType}")
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
# GigE相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
ip = f"{nip1}.{nip2}.{nip3}.{nip4}"
display = f"[{i}]GigE: {user_defined_name} {model_name} ({ip})"
logging.debug(f"GigE相机: {display}")
device_info = {
"index": i,
"type": "GigE",
"name": user_defined_name,
"model": model_name,
"ip": ip,
"display": display
}
devices_info.append(device_info)
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
# USB相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName)
# 序列号
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]USB: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"USB相机: {display}")
device_info = {
"index": i,
"type": "USB",
"name": user_defined_name,
"model": model_name,
"serial": strSerialNumber,
"display": display
}
devices_info.append(device_info)
elif mvcc_dev_info.nTLayerType == MV_GENTL_CAMERALINK_DEVICE:
# CameraLink相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chModelName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stCMLInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]CML: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"CML相机: {display}")
device_info = {
"index": i,
"type": "CML",
"name": user_defined_name,
"model": model_name,
"serial": strSerialNumber,
"display": display
}
devices_info.append(device_info)
else:
# 其他类型相机
display = f"[{i}]Other"
device_info = {
"index": i,
"type": "Other",
"display": display
}
devices_info.append(device_info)
logging.debug(f"其他类型相机: {display}")
# 添加详细日志
logging.debug(f"枚举到的设备数量: {len(devices_info)}")
return devices_info
except Exception as e:
error_msg = f"枚举设备时发生异常: {str(e)}"
logging.error(error_msg)
return None
def open_device(self, device_index):
"""打开相机设备参考BasicDemo.py的open_device实现
Args:
device_index: 设备索引
Returns:
bool: 是否成功打开设备
"""
# 检查是否已经打开
if self.isOpen:
logging.warning("相机已经打开!")
return False
# 确保有效的设备索引
if device_index < 0 or self.deviceList is None or device_index >= self.deviceList.nDeviceNum:
error_msg = f"无效的设备索引: {device_index}, 设备列表: {self.deviceList is not None}"
if self.deviceList:
error_msg += f", 设备数量: {self.deviceList.nDeviceNum}"
logging.error(error_msg)
return False
try:
logging.info(f"开始打开相机,设备索引: {device_index}")
# 设置当前选中的相机索引
self.nSelCamIndex = device_index
# 创建相机操作对象
self.obj_cam_operation = CameraOperation(self.cam, self.deviceList, self.nSelCamIndex)
ret = self.obj_cam_operation.Open_device()
if ret != 0:
error_msg = f"打开相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
self.isOpen = False
return False
# 设置连续模式 (非触发模式)
ret = self.obj_cam_operation.Set_trigger_mode(False)
if ret != 0:
error_msg = f"设置连续模式失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 出错时关闭设备
self.obj_cam_operation.Close_device()
self.isOpen = False
return False
# 获取参数
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 继续执行,不返回失败
self.isOpen = True
logging.info(f"相机已成功打开,设备索引: {device_index}")
return True
except Exception as e:
error_msg = f"打开相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False
return False
def close_device(self):
"""关闭相机设备
Returns:
bool: 是否成功关闭设备
"""
if not self.isOpen:
return True
try:
# 确保停止取图
if self.isGrabbing:
self.stop_grabbing()
# 关闭设备
ret = self.obj_cam_operation.Close_device()
if ret != 0:
error_msg = f"关闭相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isOpen = False
logging.info("相机已关闭")
return True
except Exception as e:
error_msg = f"关闭相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False # 强制设置为关闭状态
return False
def start_grabbing(self, window_id):
"""开始取图
Args:
window_id: 显示窗口句柄
Returns:
bool: 是否成功开始取图
"""
if not self.isOpen:
logging.error("相机未打开,无法开始取图")
return False
if self.isGrabbing:
logging.warning("相机已经在取图")
return True
try:
ret = self.obj_cam_operation.Start_grabbing(window_id)
if ret != 0:
error_msg = f"开始取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = True
logging.info("开始图像采集")
return True
except Exception as e:
error_msg = f"开始取图时发生异常: {str(e)}"
logging.error(error_msg)
return False
def stop_grabbing(self):
"""停止取图
Returns:
bool: 是否成功停止取图
"""
if not self.isOpen:
return True
if not self.isGrabbing:
return True
try:
ret = self.obj_cam_operation.Stop_grabbing()
if ret != 0:
error_msg = f"停止取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = False
logging.info("停止图像采集")
return True
except Exception as e:
error_msg = f"停止取图时发生异常: {str(e)}"
logging.error(error_msg)
self.isGrabbing = False # 强制设置为非取图状态
return False
def get_parameters(self):
"""获取相机参数
Returns:
tuple: (曝光时间, 增益, 帧率) 或 None (如果失败)
"""
if not self.isOpen:
logging.error("相机未打开,无法获取参数")
return None
try:
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
exposure_time = self.obj_cam_operation.exposure_time
gain = self.obj_cam_operation.gain
frame_rate = self.obj_cam_operation.frame_rate
logging.info(f"获取相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return (exposure_time, gain, frame_rate)
except Exception as e:
error_msg = f"获取相机参数时发生异常: {str(e)}"
logging.error(error_msg)
return None
def set_parameters(self, frame_rate, exposure_time, gain):
"""设置相机参数
Args:
frame_rate: 帧率
exposure_time: 曝光时间
gain: 增益
Returns:
bool: 是否成功设置参数
"""
if not self.isOpen:
logging.error("相机未打开,无法设置参数")
return False
try:
# 设置参数
ret = self.obj_cam_operation.Set_parameter(str(frame_rate), str(exposure_time), str(gain))
if ret != 0:
error_msg = f"设置相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
logging.info(f"设置相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return True
except Exception as e:
error_msg = f"设置相机参数时发生异常: {str(e)}"
logging.error(error_msg)
return False
def save_params_to_config(self, exposure, gain, frame_rate):
"""保存相机参数到配置文件
Args:
exposure: 曝光值(滑块值)
gain: 增益(滑块值)
frame_rate: 帧率(滑块值)
Returns:
bool: 是否成功保存参数
"""
try:
# 创建相机参数配置
config = {
"exposure": exposure,
"gain": gain,
"frame_rate": frame_rate
}
# 保存到配置文件
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "camera_config.json")
os.makedirs(os.path.dirname(config_path), exist_ok=True)
# 检查文件是否存在并读取
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
try:
full_config = json.load(f)
except json.JSONDecodeError:
full_config = {}
else:
full_config = {}
# 更新配置
full_config["camera_params"] = config
# 写入文件
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(full_config, f, indent=4, ensure_ascii=False)
logging.info(f"相机参数已保存到配置文件: {config_path}")
return True
except Exception as e:
error_msg = f"保存相机参数到配置文件时发生异常: {str(e)}"
logging.error(error_msg)
return False