jiateng_ws/widgets/camera_manager.py

601 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
import logging
import json
import ctypes
from ctypes import *
import numpy as np
import cv2
# 添加相机模块路径
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), "camera"))
# 导入相机相关模块
from camera.MvCameraControl_class import MvCamera
from camera.CamOperation_class import CameraOperation
from camera.MvErrorDefine_const import *
from camera.CameraParams_header import *
from camera.CameraParams_const import *
class CameraManager:
"""相机管理器单例类,确保整个应用中只有一个相机实例"""
_instance = None
_initialized = False
@staticmethod
def get_instance():
"""获取相机管理器实例"""
if CameraManager._instance is None:
CameraManager._instance = CameraManager()
return CameraManager._instance
def __init__(self):
"""初始化相机管理器"""
if CameraManager._instance is not None:
raise Exception("相机管理器是单例类请使用get_instance()方法获取实例")
else:
CameraManager._instance = self
# 初始化属性
self.isOpen = False
self.isGrabbing = False
self.last_device_index = -1
self.device_list = []
self.current_window_id = 0
# 本地图像模式相关
self.local_mode = False
self.last_frame = None
self.has_real_camera = False # 是否有真实相机连接
# 初始化SDK
self.obj_cam_operation = CameraOperation()
# 将单例标记为已初始化
CameraManager._initialized = True
# 初始化日志
logging.info("相机管理器初始化")
def enum_devices(self):
"""枚举相机设备列表
Returns:
list: 设备信息列表
"""
# 如果当前处于本地图像模式,返回一个虚拟设备
if self.local_mode:
# 返回虚拟设备列表
virtual_device = {
"vendor_name": "本地图像模式",
"model_name": "虚拟相机",
"serial_number": "LOCAL_IMG",
"device_version": "1.0",
"spec_version": "1.0",
"user_defined_name": "本地图像播放器",
"index": 0
}
self.device_list = [virtual_device]
return self.device_list
# 清空设备列表
self.device_list = []
try:
# 枚举设备
ret = self.obj_cam_operation.Enumrate_Devices()
if ret != 0:
error_msg = f"枚举相机设备失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 标记没有真实相机
self.has_real_camera = False
return []
# 获取设备数量
device_num = self.obj_cam_operation.device_num
if device_num <= 0:
logging.warning("未发现相机设备")
# 标记没有真实相机
self.has_real_camera = False
return []
# 标记存在真实相机
self.has_real_camera = True
# 解析并构建设备信息
# 确保Hikvision SDK已正确加载
from camera.MvCameraControl_class import MvCamCtrldll
if MvCamCtrldll is None:
logging.error("相机SDK未正确加载无法枚举设备")
return []
# 确保先关闭任何已打开的相机
if self.isOpen:
self.close_device()
# 枚举设备
self.deviceList = MV_CC_DEVICE_INFO_LIST()
n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE
| MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE)
ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.deviceList)
if ret != 0:
error_msg = f"枚举设备失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
if self.deviceList.nDeviceNum == 0:
logging.info("未找到相机设备")
return []
logging.info(f"找到 {self.deviceList.nDeviceNum} 个相机设备")
# 定义解码函数
def decoding_char(c_ubyte_value):
c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p)
try:
decode_str = c_char_p_value.value.decode('gbk') # 中文字符
except UnicodeDecodeError:
decode_str = str(c_char_p_value.value)
return decode_str
# 构造设备信息列表
devices_info = []
for i in range(0, self.deviceList.nDeviceNum):
mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
# 详细日志
logging.debug(f"设备 {i} 类型: {mvcc_dev_info.nTLayerType}")
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
# GigE相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
ip = f"{nip1}.{nip2}.{nip3}.{nip4}"
display = f"[{i}]GigE: {user_defined_name} {model_name} ({ip})"
logging.debug(f"GigE相机: {display}")
device_info = {
"index": i,
"type": "GigE",
"name": user_defined_name,
"model": model_name,
"ip": ip,
"display": display
}
devices_info.append(device_info)
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
# USB相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName)
# 序列号
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]USB: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"USB相机: {display}")
device_info = {
"index": i,
"type": "USB",
"name": user_defined_name,
"model": model_name,
"serial": strSerialNumber,
"display": display
}
devices_info.append(device_info)
elif mvcc_dev_info.nTLayerType == MV_GENTL_CAMERALINK_DEVICE:
# CameraLink相机
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chUserDefinedName)
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chModelName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stCMLInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
display = f"[{i}]CML: {user_defined_name} {model_name} ({strSerialNumber})"
logging.debug(f"CML相机: {display}")
device_info = {
"index": i,
"type": "CML",
"name": user_defined_name,
"model": model_name,
"serial": strSerialNumber,
"display": display
}
devices_info.append(device_info)
else:
# 其他类型相机
display = f"[{i}]Other"
device_info = {
"index": i,
"type": "Other",
"display": display
}
devices_info.append(device_info)
logging.debug(f"其他类型相机: {display}")
# 添加详细日志
logging.debug(f"枚举到的设备数量: {len(devices_info)}")
self.device_list = devices_info
return devices_info
except Exception as e:
error_msg = f"枚举相机设备时发生异常: {str(e)}"
logging.error(error_msg)
self.has_real_camera = False
return []
def open_device(self, device_index):
"""打开相机设备
Args:
device_index: 设备索引
Returns:
bool: 是否成功打开设备
"""
# 如果当前处于本地图像模式,且设备索引是虚拟设备
if self.local_mode and device_index == 0:
# 模拟打开设备成功
self.isOpen = True
self.last_device_index = device_index
logging.info("打开本地图像模式虚拟设备")
return True
# 真实相机模式处理
# 检查设备索引是否有效
if device_index < 0 or device_index >= len(self.device_list):
logging.error(f"无效的设备索引: {device_index}")
return False
# 如果之前已经打开设备,先关闭
if self.isOpen:
self.close_device()
try:
logging.info(f"开始打开相机,设备索引: {device_index}")
# 设置当前选中的相机索引
self.last_device_index = device_index
# 创建相机操作对象
self.obj_cam_operation = CameraOperation(self.obj_cam_operation.cam, self.deviceList, self.last_device_index)
ret = self.obj_cam_operation.Open_device()
if ret != 0:
error_msg = f"打开相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
self.isOpen = False
return False
# 设置连续模式 (非触发模式)
ret = self.obj_cam_operation.Set_trigger_mode(False)
if ret != 0:
error_msg = f"设置连续模式失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 出错时关闭设备
self.obj_cam_operation.Close_device()
self.isOpen = False
return False
# 获取参数
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
# 继续执行,不返回失败
self.isOpen = True
logging.info(f"相机已成功打开,设备索引: {device_index}")
return True
except Exception as e:
error_msg = f"打开相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False
return False
def close_device(self):
"""关闭相机设备
Returns:
bool: 是否成功关闭设备
"""
# 如果处于本地图像模式且使用的是虚拟设备
if self.local_mode and not self.has_real_camera:
# 模拟关闭设备成功
if self.isGrabbing:
self.stop_grabbing()
self.isOpen = False
logging.info("关闭本地图像模式虚拟设备")
return True
# 真实相机模式处理
if not self.isOpen:
return True
try:
# 确保停止取图
if self.isGrabbing:
self.stop_grabbing()
# 关闭设备
ret = self.obj_cam_operation.Close_device()
if ret != 0:
error_msg = f"关闭相机失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isOpen = False
logging.info("相机已关闭")
return True
except Exception as e:
error_msg = f"关闭相机时发生异常: {str(e)}"
logging.error(error_msg)
self.isOpen = False # 强制设置为关闭状态
return False
def start_grabbing(self, window_id):
"""开始图像采集
Args:
window_id: 显示窗口句柄
Returns:
bool: 是否成功开始采集
"""
# 保存窗口ID
self.current_window_id = window_id
# 如果处于本地图像模式,模拟开始采集
if self.local_mode:
self.isGrabbing = True
logging.info("开始本地图像模式采集, 窗口ID: " + str(window_id))
return True
# 检查设备是否已打开
if not self.isOpen:
logging.error("相机未打开,无法开始取图")
return False
try:
ret = self.obj_cam_operation.Start_grabbing(window_id)
if ret != 0:
error_msg = f"开始取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = True
logging.info("开始图像采集")
return True
except Exception as e:
error_msg = f"开始取图时发生异常: {str(e)}"
logging.error(error_msg)
return False
def stop_grabbing(self):
"""停止取图
Returns:
bool: 是否成功停止取图
"""
# 如果处于本地图像模式,模拟停止采集
if self.local_mode:
self.isGrabbing = False
logging.info("停止本地图像模式采集")
return True
if not self.isOpen:
return True
if not self.isGrabbing:
return True
try:
ret = self.obj_cam_operation.Stop_grabbing()
if ret != 0:
error_msg = f"停止取图失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
self.isGrabbing = False
logging.info("停止图像采集")
return True
except Exception as e:
error_msg = f"停止取图时发生异常: {str(e)}"
logging.error(error_msg)
self.isGrabbing = False # 强制设置为非取图状态
return False
def get_parameters(self):
"""获取相机参数
Returns:
tuple: (曝光时间, 增益, 帧率) 或 None (如果失败)
"""
if not self.isOpen:
logging.error("相机未打开,无法获取参数")
return None
try:
ret = self.obj_cam_operation.Get_parameter()
if ret != 0:
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return None
exposure_time = self.obj_cam_operation.exposure_time
gain = self.obj_cam_operation.gain
frame_rate = self.obj_cam_operation.frame_rate
logging.info(f"获取相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return (exposure_time, gain, frame_rate)
except Exception as e:
error_msg = f"获取相机参数时发生异常: {str(e)}"
logging.error(error_msg)
return None
def set_parameters(self, frame_rate, exposure_time, gain):
"""设置相机参数
Args:
frame_rate: 帧率
exposure_time: 曝光时间
gain: 增益
Returns:
bool: 是否成功设置参数
"""
if not self.isOpen:
logging.error("相机未打开,无法设置参数")
return False
try:
# 设置参数
ret = self.obj_cam_operation.Set_parameter(str(frame_rate), str(exposure_time), str(gain))
if ret != 0:
error_msg = f"设置相机参数失败! 错误码: 0x{ret:x}"
logging.error(error_msg)
return False
logging.info(f"设置相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
return True
except Exception as e:
error_msg = f"设置相机参数时发生异常: {str(e)}"
logging.error(error_msg)
return False
def set_local_mode(self, enabled):
"""设置是否启用本地图像模式
Args:
enabled: 是否启用本地模式
"""
# 如果当前正在采集,先停止
if self.isGrabbing:
self.stop_grabbing()
# 如果当前相机已打开且切换到本地模式,先关闭相机
if self.isOpen and enabled and not self.local_mode:
self.close_device()
# 设置模式
self.local_mode = enabled
# 重新枚举设备(会返回真实设备或虚拟设备)
self.enum_devices()
logging.info(f"本地图像模式已{'启用' if enabled else '禁用'}")
# 如果切换到本地模式,自动"打开"虚拟设备
if enabled and not self.isOpen:
self.open_device(0) # 虚拟设备索引为0
def handle_local_frame(self, frame, window_id=0):
"""处理并存储本地图像帧不使用OpenCV窗口显示
Args:
frame: 本地图像帧数据(OpenCV RGB图像)
window_id: 窗口句柄ID仅用于兼容旧代码实际不再使用
"""
if not self.local_mode:
return False
try:
# 记录窗口ID (仅用于日志和兼容)
self.current_window_id = window_id
# 保存最后一帧,这个帧可以供其他组件使用
self.last_frame = frame
# 设置为正在抓取状态,与实际相机行为保持一致
self.isGrabbing = True
# 输出日志帮助调试
logging.warning(f"====> 相机管理器存储本地图像帧,尺寸: {frame.shape if frame is not None else 'None'}")
# 我们仅存储帧数据,不进行显示操作
# 显示操作由CameraDisplayWidget完成
return True
except Exception as e:
logging.error(f"处理本地图像帧失败: {e}")
return False
def save_params_to_config(self, exposure, gain, frame_rate):
"""保存相机参数到配置文件
Args:
exposure: 曝光值(滑块值)
gain: 增益(滑块值)
frame_rate: 帧率(滑块值)
Returns:
bool: 是否成功保存参数
"""
try:
# 创建相机参数配置
config = {
"exposure": exposure,
"gain": gain,
"frame_rate": frame_rate
}
# 保存到配置文件
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "camera_config.json")
os.makedirs(os.path.dirname(config_path), exist_ok=True)
# 检查文件是否存在并读取
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
try:
full_config = json.load(f)
except json.JSONDecodeError:
full_config = {}
else:
full_config = {}
# 更新配置
full_config["camera_params"] = config
# 写入文件
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(full_config, f, indent=4, ensure_ascii=False)
logging.info(f"相机参数已保存到配置文件: {config_path}")
return True
except Exception as e:
error_msg = f"保存相机参数到配置文件时发生异常: {str(e)}"
logging.error(error_msg)
return False