601 lines
22 KiB
Python
601 lines
22 KiB
Python
import sys
|
||
import os
|
||
import logging
|
||
import json
|
||
import ctypes
|
||
from ctypes import *
|
||
import numpy as np
|
||
import cv2
|
||
|
||
# 添加相机模块路径
|
||
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), "camera"))
|
||
|
||
# 导入相机相关模块
|
||
from camera.MvCameraControl_class import MvCamera
|
||
from camera.CamOperation_class import CameraOperation
|
||
from camera.MvErrorDefine_const import *
|
||
from camera.CameraParams_header import *
|
||
from camera.CameraParams_const import *
|
||
|
||
|
||
class CameraManager:
|
||
"""相机管理器单例类,确保整个应用中只有一个相机实例"""
|
||
|
||
_instance = None
|
||
_initialized = False
|
||
|
||
@staticmethod
|
||
def get_instance():
|
||
"""获取相机管理器实例"""
|
||
if CameraManager._instance is None:
|
||
CameraManager._instance = CameraManager()
|
||
return CameraManager._instance
|
||
|
||
def __init__(self):
|
||
"""初始化相机管理器"""
|
||
if CameraManager._instance is not None:
|
||
raise Exception("相机管理器是单例类,请使用get_instance()方法获取实例")
|
||
else:
|
||
CameraManager._instance = self
|
||
|
||
# 初始化属性
|
||
self.isOpen = False
|
||
self.isGrabbing = False
|
||
self.last_device_index = -1
|
||
self.device_list = []
|
||
self.current_window_id = 0
|
||
|
||
# 本地图像模式相关
|
||
self.local_mode = False
|
||
self.last_frame = None
|
||
self.has_real_camera = False # 是否有真实相机连接
|
||
|
||
# 初始化SDK
|
||
self.obj_cam_operation = CameraOperation()
|
||
|
||
# 将单例标记为已初始化
|
||
CameraManager._initialized = True
|
||
|
||
# 初始化日志
|
||
logging.info("相机管理器初始化")
|
||
|
||
def enum_devices(self):
|
||
"""枚举相机设备列表
|
||
|
||
Returns:
|
||
list: 设备信息列表
|
||
"""
|
||
# 如果当前处于本地图像模式,返回一个虚拟设备
|
||
if self.local_mode:
|
||
# 返回虚拟设备列表
|
||
virtual_device = {
|
||
"vendor_name": "本地图像模式",
|
||
"model_name": "虚拟相机",
|
||
"serial_number": "LOCAL_IMG",
|
||
"device_version": "1.0",
|
||
"spec_version": "1.0",
|
||
"user_defined_name": "本地图像播放器",
|
||
"index": 0
|
||
}
|
||
self.device_list = [virtual_device]
|
||
return self.device_list
|
||
|
||
# 清空设备列表
|
||
self.device_list = []
|
||
|
||
try:
|
||
# 枚举设备
|
||
ret = self.obj_cam_operation.Enumrate_Devices()
|
||
if ret != 0:
|
||
error_msg = f"枚举相机设备失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
# 标记没有真实相机
|
||
self.has_real_camera = False
|
||
return []
|
||
|
||
# 获取设备数量
|
||
device_num = self.obj_cam_operation.device_num
|
||
|
||
if device_num <= 0:
|
||
logging.warning("未发现相机设备")
|
||
# 标记没有真实相机
|
||
self.has_real_camera = False
|
||
return []
|
||
|
||
# 标记存在真实相机
|
||
self.has_real_camera = True
|
||
|
||
# 解析并构建设备信息
|
||
# 确保Hikvision SDK已正确加载
|
||
from camera.MvCameraControl_class import MvCamCtrldll
|
||
if MvCamCtrldll is None:
|
||
logging.error("相机SDK未正确加载,无法枚举设备")
|
||
return []
|
||
|
||
# 确保先关闭任何已打开的相机
|
||
if self.isOpen:
|
||
self.close_device()
|
||
|
||
# 枚举设备
|
||
self.deviceList = MV_CC_DEVICE_INFO_LIST()
|
||
n_layer_type = (MV_GIGE_DEVICE | MV_USB_DEVICE | MV_GENTL_CAMERALINK_DEVICE
|
||
| MV_GENTL_CXP_DEVICE | MV_GENTL_XOF_DEVICE)
|
||
ret = MvCamera.MV_CC_EnumDevices(n_layer_type, self.deviceList)
|
||
if ret != 0:
|
||
error_msg = f"枚举设备失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return None
|
||
|
||
if self.deviceList.nDeviceNum == 0:
|
||
logging.info("未找到相机设备")
|
||
return []
|
||
|
||
logging.info(f"找到 {self.deviceList.nDeviceNum} 个相机设备")
|
||
|
||
# 定义解码函数
|
||
def decoding_char(c_ubyte_value):
|
||
c_char_p_value = ctypes.cast(c_ubyte_value, ctypes.c_char_p)
|
||
try:
|
||
decode_str = c_char_p_value.value.decode('gbk') # 中文字符
|
||
except UnicodeDecodeError:
|
||
decode_str = str(c_char_p_value.value)
|
||
return decode_str
|
||
|
||
# 构造设备信息列表
|
||
devices_info = []
|
||
for i in range(0, self.deviceList.nDeviceNum):
|
||
mvcc_dev_info = cast(self.deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
|
||
|
||
# 详细日志
|
||
logging.debug(f"设备 {i} 类型: {mvcc_dev_info.nTLayerType}")
|
||
|
||
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
|
||
# GigE相机
|
||
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName)
|
||
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName)
|
||
|
||
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
|
||
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
|
||
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
|
||
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
|
||
ip = f"{nip1}.{nip2}.{nip3}.{nip4}"
|
||
|
||
display = f"[{i}]GigE: {user_defined_name} {model_name} ({ip})"
|
||
logging.debug(f"GigE相机: {display}")
|
||
|
||
device_info = {
|
||
"index": i,
|
||
"type": "GigE",
|
||
"name": user_defined_name,
|
||
"model": model_name,
|
||
"ip": ip,
|
||
"display": display
|
||
}
|
||
devices_info.append(device_info)
|
||
|
||
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
|
||
# USB相机
|
||
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName)
|
||
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName)
|
||
|
||
# 序列号
|
||
strSerialNumber = ""
|
||
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
|
||
if per == 0:
|
||
break
|
||
strSerialNumber = strSerialNumber + chr(per)
|
||
|
||
display = f"[{i}]USB: {user_defined_name} {model_name} ({strSerialNumber})"
|
||
logging.debug(f"USB相机: {display}")
|
||
|
||
device_info = {
|
||
"index": i,
|
||
"type": "USB",
|
||
"name": user_defined_name,
|
||
"model": model_name,
|
||
"serial": strSerialNumber,
|
||
"display": display
|
||
}
|
||
devices_info.append(device_info)
|
||
|
||
elif mvcc_dev_info.nTLayerType == MV_GENTL_CAMERALINK_DEVICE:
|
||
# CameraLink相机
|
||
user_defined_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chUserDefinedName)
|
||
model_name = decoding_char(mvcc_dev_info.SpecialInfo.stCMLInfo.chModelName)
|
||
|
||
strSerialNumber = ""
|
||
for per in mvcc_dev_info.SpecialInfo.stCMLInfo.chSerialNumber:
|
||
if per == 0:
|
||
break
|
||
strSerialNumber = strSerialNumber + chr(per)
|
||
|
||
display = f"[{i}]CML: {user_defined_name} {model_name} ({strSerialNumber})"
|
||
logging.debug(f"CML相机: {display}")
|
||
|
||
device_info = {
|
||
"index": i,
|
||
"type": "CML",
|
||
"name": user_defined_name,
|
||
"model": model_name,
|
||
"serial": strSerialNumber,
|
||
"display": display
|
||
}
|
||
devices_info.append(device_info)
|
||
|
||
else:
|
||
# 其他类型相机
|
||
display = f"[{i}]Other"
|
||
device_info = {
|
||
"index": i,
|
||
"type": "Other",
|
||
"display": display
|
||
}
|
||
devices_info.append(device_info)
|
||
logging.debug(f"其他类型相机: {display}")
|
||
|
||
# 添加详细日志
|
||
logging.debug(f"枚举到的设备数量: {len(devices_info)}")
|
||
self.device_list = devices_info
|
||
return devices_info
|
||
|
||
except Exception as e:
|
||
error_msg = f"枚举相机设备时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
self.has_real_camera = False
|
||
return []
|
||
|
||
def open_device(self, device_index):
|
||
"""打开相机设备
|
||
|
||
Args:
|
||
device_index: 设备索引
|
||
|
||
Returns:
|
||
bool: 是否成功打开设备
|
||
"""
|
||
# 如果当前处于本地图像模式,且设备索引是虚拟设备
|
||
if self.local_mode and device_index == 0:
|
||
# 模拟打开设备成功
|
||
self.isOpen = True
|
||
self.last_device_index = device_index
|
||
logging.info("打开本地图像模式虚拟设备")
|
||
return True
|
||
|
||
# 真实相机模式处理
|
||
# 检查设备索引是否有效
|
||
if device_index < 0 or device_index >= len(self.device_list):
|
||
logging.error(f"无效的设备索引: {device_index}")
|
||
return False
|
||
|
||
# 如果之前已经打开设备,先关闭
|
||
if self.isOpen:
|
||
self.close_device()
|
||
|
||
try:
|
||
logging.info(f"开始打开相机,设备索引: {device_index}")
|
||
|
||
# 设置当前选中的相机索引
|
||
self.last_device_index = device_index
|
||
|
||
# 创建相机操作对象
|
||
self.obj_cam_operation = CameraOperation(self.obj_cam_operation.cam, self.deviceList, self.last_device_index)
|
||
ret = self.obj_cam_operation.Open_device()
|
||
if ret != 0:
|
||
error_msg = f"打开相机失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
self.isOpen = False
|
||
return False
|
||
|
||
# 设置连续模式 (非触发模式)
|
||
ret = self.obj_cam_operation.Set_trigger_mode(False)
|
||
if ret != 0:
|
||
error_msg = f"设置连续模式失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
# 出错时关闭设备
|
||
self.obj_cam_operation.Close_device()
|
||
self.isOpen = False
|
||
return False
|
||
|
||
# 获取参数
|
||
ret = self.obj_cam_operation.Get_parameter()
|
||
if ret != 0:
|
||
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
# 继续执行,不返回失败
|
||
|
||
self.isOpen = True
|
||
logging.info(f"相机已成功打开,设备索引: {device_index}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"打开相机时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
self.isOpen = False
|
||
return False
|
||
|
||
def close_device(self):
|
||
"""关闭相机设备
|
||
|
||
Returns:
|
||
bool: 是否成功关闭设备
|
||
"""
|
||
# 如果处于本地图像模式且使用的是虚拟设备
|
||
if self.local_mode and not self.has_real_camera:
|
||
# 模拟关闭设备成功
|
||
if self.isGrabbing:
|
||
self.stop_grabbing()
|
||
self.isOpen = False
|
||
logging.info("关闭本地图像模式虚拟设备")
|
||
return True
|
||
|
||
# 真实相机模式处理
|
||
if not self.isOpen:
|
||
return True
|
||
|
||
try:
|
||
# 确保停止取图
|
||
if self.isGrabbing:
|
||
self.stop_grabbing()
|
||
|
||
# 关闭设备
|
||
ret = self.obj_cam_operation.Close_device()
|
||
if ret != 0:
|
||
error_msg = f"关闭相机失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
self.isOpen = False
|
||
logging.info("相机已关闭")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"关闭相机时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
self.isOpen = False # 强制设置为关闭状态
|
||
return False
|
||
|
||
def start_grabbing(self, window_id):
|
||
"""开始图像采集
|
||
|
||
Args:
|
||
window_id: 显示窗口句柄
|
||
|
||
Returns:
|
||
bool: 是否成功开始采集
|
||
"""
|
||
# 保存窗口ID
|
||
self.current_window_id = window_id
|
||
|
||
# 如果处于本地图像模式,模拟开始采集
|
||
if self.local_mode:
|
||
self.isGrabbing = True
|
||
logging.info("开始本地图像模式采集, 窗口ID: " + str(window_id))
|
||
return True
|
||
|
||
# 检查设备是否已打开
|
||
if not self.isOpen:
|
||
logging.error("相机未打开,无法开始取图")
|
||
return False
|
||
|
||
try:
|
||
ret = self.obj_cam_operation.Start_grabbing(window_id)
|
||
if ret != 0:
|
||
error_msg = f"开始取图失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
self.isGrabbing = True
|
||
logging.info("开始图像采集")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"开始取图时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
def stop_grabbing(self):
|
||
"""停止取图
|
||
|
||
Returns:
|
||
bool: 是否成功停止取图
|
||
"""
|
||
# 如果处于本地图像模式,模拟停止采集
|
||
if self.local_mode:
|
||
self.isGrabbing = False
|
||
logging.info("停止本地图像模式采集")
|
||
return True
|
||
|
||
if not self.isOpen:
|
||
return True
|
||
|
||
if not self.isGrabbing:
|
||
return True
|
||
|
||
try:
|
||
ret = self.obj_cam_operation.Stop_grabbing()
|
||
if ret != 0:
|
||
error_msg = f"停止取图失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
self.isGrabbing = False
|
||
logging.info("停止图像采集")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"停止取图时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
self.isGrabbing = False # 强制设置为非取图状态
|
||
return False
|
||
|
||
def get_parameters(self):
|
||
"""获取相机参数
|
||
|
||
Returns:
|
||
tuple: (曝光时间, 增益, 帧率) 或 None (如果失败)
|
||
"""
|
||
if not self.isOpen:
|
||
logging.error("相机未打开,无法获取参数")
|
||
return None
|
||
|
||
try:
|
||
ret = self.obj_cam_operation.Get_parameter()
|
||
if ret != 0:
|
||
error_msg = f"获取相机参数失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return None
|
||
|
||
exposure_time = self.obj_cam_operation.exposure_time
|
||
gain = self.obj_cam_operation.gain
|
||
frame_rate = self.obj_cam_operation.frame_rate
|
||
|
||
logging.info(f"获取相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
|
||
|
||
return (exposure_time, gain, frame_rate)
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取相机参数时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
return None
|
||
|
||
def set_parameters(self, frame_rate, exposure_time, gain):
|
||
"""设置相机参数
|
||
|
||
Args:
|
||
frame_rate: 帧率
|
||
exposure_time: 曝光时间
|
||
gain: 增益
|
||
|
||
Returns:
|
||
bool: 是否成功设置参数
|
||
"""
|
||
if not self.isOpen:
|
||
logging.error("相机未打开,无法设置参数")
|
||
return False
|
||
|
||
try:
|
||
# 设置参数
|
||
ret = self.obj_cam_operation.Set_parameter(str(frame_rate), str(exposure_time), str(gain))
|
||
if ret != 0:
|
||
error_msg = f"设置相机参数失败! 错误码: 0x{ret:x}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
logging.info(f"设置相机参数: 曝光={exposure_time}, 增益={gain}, 帧率={frame_rate}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"设置相机参数时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
return False
|
||
|
||
def set_local_mode(self, enabled):
|
||
"""设置是否启用本地图像模式
|
||
|
||
Args:
|
||
enabled: 是否启用本地模式
|
||
"""
|
||
# 如果当前正在采集,先停止
|
||
if self.isGrabbing:
|
||
self.stop_grabbing()
|
||
|
||
# 如果当前相机已打开且切换到本地模式,先关闭相机
|
||
if self.isOpen and enabled and not self.local_mode:
|
||
self.close_device()
|
||
|
||
# 设置模式
|
||
self.local_mode = enabled
|
||
|
||
# 重新枚举设备(会返回真实设备或虚拟设备)
|
||
self.enum_devices()
|
||
|
||
logging.info(f"本地图像模式已{'启用' if enabled else '禁用'}")
|
||
|
||
# 如果切换到本地模式,自动"打开"虚拟设备
|
||
if enabled and not self.isOpen:
|
||
self.open_device(0) # 虚拟设备索引为0
|
||
|
||
def handle_local_frame(self, frame, window_id=0):
|
||
"""处理并存储本地图像帧(不使用OpenCV窗口显示)
|
||
|
||
Args:
|
||
frame: 本地图像帧数据(OpenCV RGB图像)
|
||
window_id: 窗口句柄ID,仅用于兼容旧代码,实际不再使用
|
||
"""
|
||
if not self.local_mode:
|
||
return False
|
||
|
||
try:
|
||
# 记录窗口ID (仅用于日志和兼容)
|
||
self.current_window_id = window_id
|
||
|
||
# 保存最后一帧,这个帧可以供其他组件使用
|
||
self.last_frame = frame
|
||
|
||
# 设置为正在抓取状态,与实际相机行为保持一致
|
||
self.isGrabbing = True
|
||
|
||
# 输出日志帮助调试
|
||
logging.warning(f"====> 相机管理器存储本地图像帧,尺寸: {frame.shape if frame is not None else 'None'}")
|
||
|
||
# 我们仅存储帧数据,不进行显示操作
|
||
# 显示操作由CameraDisplayWidget完成
|
||
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"处理本地图像帧失败: {e}")
|
||
|
||
return False
|
||
|
||
def save_params_to_config(self, exposure, gain, frame_rate):
|
||
"""保存相机参数到配置文件
|
||
|
||
Args:
|
||
exposure: 曝光值(滑块值)
|
||
gain: 增益(滑块值)
|
||
frame_rate: 帧率(滑块值)
|
||
|
||
Returns:
|
||
bool: 是否成功保存参数
|
||
"""
|
||
try:
|
||
# 创建相机参数配置
|
||
config = {
|
||
"exposure": exposure,
|
||
"gain": gain,
|
||
"frame_rate": frame_rate
|
||
}
|
||
|
||
# 保存到配置文件
|
||
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "camera_config.json")
|
||
os.makedirs(os.path.dirname(config_path), exist_ok=True)
|
||
|
||
# 检查文件是否存在并读取
|
||
if os.path.exists(config_path):
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
try:
|
||
full_config = json.load(f)
|
||
except json.JSONDecodeError:
|
||
full_config = {}
|
||
else:
|
||
full_config = {}
|
||
|
||
# 更新配置
|
||
full_config["camera_params"] = config
|
||
|
||
# 写入文件
|
||
with open(config_path, 'w', encoding='utf-8') as f:
|
||
json.dump(full_config, f, indent=4, ensure_ascii=False)
|
||
|
||
logging.info(f"相机参数已保存到配置文件: {config_path}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"保存相机参数到配置文件时发生异常: {str(e)}"
|
||
logging.error(error_msg)
|
||
return False |