jiateng_ws/widgets/serial_settings_widget.py
2025-06-30 20:46:56 +08:00

483 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import logging
import serial.tools.list_ports
import time
from PySide6.QtWidgets import QMessageBox
from ui.serial_settings_ui import SerialSettingsUI
from utils.config_loader import ConfigLoader
from utils.serial_manager import SerialManager
from PySide6.QtWidgets import QApplication
class SerialSettingsWidget(SerialSettingsUI):
"""串口设置组件"""
def __init__(self, parent=None):
super().__init__(parent)
self.config = ConfigLoader.get_instance()
self.serial_manager = SerialManager()
# 连接信号
self.init_connections()
# 加载设置
self.load_settings()
def init_connections(self):
"""初始化信号连接"""
# 米电阻串口
self.mdz_refresh_btn.clicked.connect(self.refresh_ports)
self.test_mdz_btn.clicked.connect(self.test_mdz_port)
# 线径串口
self.xj_refresh_btn.clicked.connect(self.refresh_ports)
self.test_xj_btn.clicked.connect(self.test_xj_port)
# 扫码器串口
self.scanner_refresh_btn.clicked.connect(self.refresh_ports)
self.test_scanner_btn.clicked.connect(self.test_scanner_port)
# 保存按钮
self.save_btn.clicked.connect(self.save_settings)
# 初始化时刷新串口列表
self.refresh_ports()
# 加载设置
self.load_settings()
def refresh_ports(self):
"""刷新串口列表"""
try:
# 保存当前选择
current_mdz_port = self.mdz_port_combo.currentData()
current_xj_port = self.xj_port_combo.currentData()
current_scanner_port = self.scanner_port_combo.currentData()
# 清空列表
self.mdz_port_combo.clear()
self.xj_port_combo.clear()
self.scanner_port_combo.clear()
# 添加"不使用"选项
self.mdz_port_combo.addItem("不使用", "")
self.xj_port_combo.addItem("不使用", "")
self.scanner_port_combo.addItem("不使用", "")
# 获取可用串口
ports = list(serial.tools.list_ports.comports())
for port in ports:
port_name = port.device
port_desc = f"{port_name} ({port.description})"
# 添加到米电阻下拉框
self.mdz_port_combo.addItem(port_desc, port_name)
# 添加到线径下拉框
self.xj_port_combo.addItem(port_desc, port_name)
# 添加到扫码器下拉框
self.scanner_port_combo.addItem(port_desc, port_name)
# 恢复之前的选择
if current_mdz_port:
index = self.mdz_port_combo.findData(current_mdz_port)
if index >= 0:
self.mdz_port_combo.setCurrentIndex(index)
else:
# 如果之前没有选择,则设为"不使用"
self.mdz_port_combo.setCurrentIndex(0)
if current_xj_port:
index = self.xj_port_combo.findData(current_xj_port)
if index >= 0:
self.xj_port_combo.setCurrentIndex(index)
else:
# 如果之前没有选择,则设为"不使用"
self.xj_port_combo.setCurrentIndex(0)
if current_scanner_port:
index = self.scanner_port_combo.findData(current_scanner_port)
if index >= 0:
self.scanner_port_combo.setCurrentIndex(index)
else:
# 如果之前没有选择,则设为"不使用"
self.scanner_port_combo.setCurrentIndex(0)
logging.info(f"已刷新串口列表,找到 {len(ports)} 个串口")
except Exception as e:
logging.error(f"刷新串口列表失败: {e}")
QMessageBox.warning(self, "刷新失败", f"刷新串口列表失败: {e}")
def load_settings(self):
"""加载设置"""
try:
# 加载全局设置
enable_serial = self.config.get_value('app.features.enable_serial_ports', False)
enable_keyboard = self.config.get_value('app.features.enable_keyboard_listener', False)
self.enable_serial_checkbox.setChecked(enable_serial)
self.enable_keyboard_checkbox.setChecked(enable_keyboard)
# 加载米电阻设置
mdz_config = self.config.get_config('mdz')
if mdz_config:
# 设置串口
mdz_port = mdz_config.get('ser', '')
index = self.mdz_port_combo.findData(mdz_port)
if index >= 0:
self.mdz_port_combo.setCurrentIndex(index)
# 设置波特率
mdz_baud = str(mdz_config.get('port', '9600'))
index = self.mdz_baud_combo.findText(mdz_baud)
if index >= 0:
self.mdz_baud_combo.setCurrentIndex(index)
# 设置数据位
mdz_data_bits = str(mdz_config.get('data_bits', '8'))
index = self.mdz_data_bits_combo.findText(mdz_data_bits)
if index >= 0:
self.mdz_data_bits_combo.setCurrentIndex(index)
# 设置停止位
mdz_stop_bits = str(mdz_config.get('stop_bits', '1'))
index = self.mdz_stop_bits_combo.findText(mdz_stop_bits)
if index >= 0:
self.mdz_stop_bits_combo.setCurrentIndex(index)
# 设置校验位
mdz_parity = mdz_config.get('parity', 'N')
index = self.mdz_parity_combo.findData(mdz_parity)
if index >= 0:
self.mdz_parity_combo.setCurrentIndex(index)
# 设置查询指令
mdz_query_cmd = mdz_config.get('query_cmd', '01 03 00 01 00 07 55 C8')
self.mdz_query_cmd.setText(mdz_query_cmd)
# 设置查询间隔
mdz_query_interval = mdz_config.get('query_interval', 5)
self.mdz_query_interval.setValue(mdz_query_interval)
# 加载线径设置
xj_config = self.config.get_config('xj')
if xj_config:
# 设置串口
xj_port = xj_config.get('ser', '')
index = self.xj_port_combo.findData(xj_port)
if index >= 0:
self.xj_port_combo.setCurrentIndex(index)
# 设置波特率
xj_baud = str(xj_config.get('port', '9600'))
index = self.xj_baud_combo.findText(xj_baud)
if index >= 0:
self.xj_baud_combo.setCurrentIndex(index)
# 设置数据位
xj_data_bits = str(xj_config.get('data_bits', '8'))
index = self.xj_data_bits_combo.findText(xj_data_bits)
if index >= 0:
self.xj_data_bits_combo.setCurrentIndex(index)
# 设置停止位
xj_stop_bits = str(xj_config.get('stop_bits', '1'))
index = self.xj_stop_bits_combo.findText(xj_stop_bits)
if index >= 0:
self.xj_stop_bits_combo.setCurrentIndex(index)
# 设置校验位
xj_parity = xj_config.get('parity', 'N')
index = self.xj_parity_combo.findData(xj_parity)
if index >= 0:
self.xj_parity_combo.setCurrentIndex(index)
# 加载扫码器设置
scanner_config = self.config.get_config('scanner')
if scanner_config:
# 设置串口
scanner_port = scanner_config.get('ser', '')
index = self.scanner_port_combo.findData(scanner_port)
if index >= 0:
self.scanner_port_combo.setCurrentIndex(index)
# 设置波特率
scanner_baud = str(scanner_config.get('port', '9600'))
index = self.scanner_baud_combo.findText(scanner_baud)
if index >= 0:
self.scanner_baud_combo.setCurrentIndex(index)
# 设置数据位
scanner_data_bits = str(scanner_config.get('data_bits', '8'))
index = self.scanner_data_bits_combo.findText(scanner_data_bits)
if index >= 0:
self.scanner_data_bits_combo.setCurrentIndex(index)
# 设置停止位
scanner_stop_bits = str(scanner_config.get('stop_bits', '1'))
index = self.scanner_stop_bits_combo.findText(scanner_stop_bits)
if index >= 0:
self.scanner_stop_bits_combo.setCurrentIndex(index)
# 设置校验位
scanner_parity = scanner_config.get('parity', 'N')
index = self.scanner_parity_combo.findData(scanner_parity)
if index >= 0:
self.scanner_parity_combo.setCurrentIndex(index)
logging.info("已加载串口设置")
except Exception as e:
logging.error(f"加载串口设置失败: {e}")
QMessageBox.warning(self, "加载失败", f"加载串口设置失败: {e}")
def save_settings(self):
"""保存设置"""
try:
# 保存全局设置
enable_serial = self.enable_serial_checkbox.isChecked()
enable_keyboard = self.enable_keyboard_checkbox.isChecked()
self.config.set_value('app.features.enable_serial_ports', enable_serial)
self.config.set_value('app.features.enable_keyboard_listener', enable_keyboard)
# 保存米电阻设置
mdz_port = self.mdz_port_combo.currentData()
mdz_baud = int(self.mdz_baud_combo.currentText())
mdz_data_bits = int(self.mdz_data_bits_combo.currentText())
mdz_stop_bits = float(self.mdz_stop_bits_combo.currentText())
mdz_parity = self.mdz_parity_combo.currentData()
mdz_query_cmd = self.mdz_query_cmd.text().strip()
mdz_query_interval = self.mdz_query_interval.value()
mdz_config = {
'port': mdz_baud,
'data_bits': mdz_data_bits,
'stop_bits': mdz_stop_bits,
'parity': mdz_parity,
'query_cmd': mdz_query_cmd,
'query_interval': mdz_query_interval
}
# 只有当用户选择了串口时才保存串口配置
if mdz_port:
mdz_config['ser'] = mdz_port
self.config.set_config('mdz', mdz_config)
# 保存线径设置
xj_port = self.xj_port_combo.currentData()
xj_baud = int(self.xj_baud_combo.currentText())
xj_data_bits = int(self.xj_data_bits_combo.currentText())
xj_stop_bits = float(self.xj_stop_bits_combo.currentText())
xj_parity = self.xj_parity_combo.currentData()
xj_config = {
'port': xj_baud,
'data_bits': xj_data_bits,
'stop_bits': xj_stop_bits,
'parity': xj_parity
}
# 只有当用户选择了串口时才保存串口配置
if xj_port:
xj_config['ser'] = xj_port
self.config.set_config('xj', xj_config)
# 保存扫码器设置
scanner_port = self.scanner_port_combo.currentData()
scanner_baud = int(self.scanner_baud_combo.currentText())
scanner_data_bits = int(self.scanner_data_bits_combo.currentText())
scanner_stop_bits = float(self.scanner_stop_bits_combo.currentText())
scanner_parity = self.scanner_parity_combo.currentData()
scanner_config = {
'port': scanner_baud,
'data_bits': scanner_data_bits,
'stop_bits': scanner_stop_bits,
'parity': scanner_parity
}
# 只有当用户选择了串口时才保存串口配置
if scanner_port:
scanner_config['ser'] = scanner_port
self.config.set_config('scanner', scanner_config)
# 发送设置变更信号
self.settings_changed.emit()
QMessageBox.information(self, "保存成功", "串口设置已保存")
except Exception as e:
logging.error(f"保存串口设置失败: {e}")
QMessageBox.critical(self, "保存失败", f"保存串口设置失败: {e}")
def test_mdz_port(self):
"""测试米电阻串口"""
try:
port = self.mdz_port_combo.currentData()
if not port:
QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"")
return
baud = int(self.mdz_baud_combo.currentText())
data_bits = int(self.mdz_data_bits_combo.currentText())
stop_bits = float(self.mdz_stop_bits_combo.currentText())
parity = self.mdz_parity_combo.currentData()
# 关闭可能已经打开的串口
if self.serial_manager.is_port_open(port):
self.serial_manager.close_port(port)
# 尝试打开串口
success = self.serial_manager.open_port(
port, 'mdz', baud, data_bits, stop_bits, parity, 1.0
)
if success:
# 尝试发送查询指令
query_cmd = self.mdz_query_cmd.text()
if query_cmd:
try:
# 转换查询指令为字节
cmd_bytes = bytes.fromhex(query_cmd.replace(' ', ''))
self.serial_manager.write_data(port, cmd_bytes)
time.sleep(0.1) # 等待响应
# 读取响应
response = self.serial_manager.read_data(port)
if response:
# 将字节转换为十六进制字符串
hex_str = ' '.join(f'{b:02X}' for b in response)
QMessageBox.information(self, "测试成功", f"串口打开成功,收到响应:\n{hex_str}")
else:
QMessageBox.information(self, "测试成功", "串口打开成功,但未收到响应")
except Exception as e:
QMessageBox.warning(self, "测试结果", f"串口打开成功,但发送指令失败: {e}")
else:
QMessageBox.information(self, "测试成功", "串口打开成功")
# 关闭串口
self.serial_manager.close_port(port)
else:
QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}")
except Exception as e:
logging.error(f"测试米电阻串口失败: {e}")
QMessageBox.critical(self, "测试失败", f"测试米电阻串口失败: {e}")
def test_xj_port(self):
"""测试线径串口"""
try:
port = self.xj_port_combo.currentData()
if not port:
QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"")
return
baud = int(self.xj_baud_combo.currentText())
data_bits = int(self.xj_data_bits_combo.currentText())
stop_bits = float(self.xj_stop_bits_combo.currentText())
parity = self.xj_parity_combo.currentData()
# 关闭可能已经打开的串口
if self.serial_manager.is_port_open(port):
self.serial_manager.close_port(port)
# 尝试打开串口
success = self.serial_manager.open_port(
port, 'xj', baud, data_bits, stop_bits, parity, 1.0
)
if success:
QMessageBox.information(self, "测试成功", f"串口 {port} 打开成功")
# 关闭串口
self.serial_manager.close_port(port)
else:
QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}")
except Exception as e:
logging.error(f"测试线径串口失败: {e}")
QMessageBox.critical(self, "测试失败", f"测试线径串口失败: {e}")
def test_scanner_port(self):
"""测试扫码器串口"""
try:
port = self.scanner_port_combo.currentData()
if not port:
QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"")
return
baud = int(self.scanner_baud_combo.currentText())
data_bits = int(self.scanner_data_bits_combo.currentText())
stop_bits = float(self.scanner_stop_bits_combo.currentText())
parity = self.scanner_parity_combo.currentData()
# 关闭可能已经打开的串口
if self.serial_manager.is_port_open(port):
self.serial_manager.close_port(port)
# 创建临时回调函数,用于测试期间接收扫码器数据
def scanner_callback(port_name, data):
try:
# 尝试将字节解码为字符串
try:
text = data.decode('utf-8').strip()
# 如果数据以"扫码数据: "开头,提取实际数据部分
if text.startswith("扫码数据: "):
text = text[6:].strip()
QMessageBox.information(self, "测试成功", f"收到扫码数据:\n{text}")
except:
# 如果解码失败,显示十六进制
hex_str = ' '.join(f'{b:02X}' for b in data)
QMessageBox.information(self, "测试成功", f"收到扫码数据 (十六进制):\n{hex_str}")
except Exception as e:
logging.error(f"处理扫码回调数据失败: {e}")
# 保存原始回调
original_callback = self.serial_manager.callbacks.get('scanner_data', None)
# 设置临时回调
self.serial_manager.callbacks['scanner_data'] = scanner_callback
# 尝试打开串口
success = self.serial_manager.open_port(
port, 'scanner', baud, data_bits, stop_bits, parity, 1.0
)
if success:
QMessageBox.information(self, "测试成功", f"串口 {port} 打开成功\n请触发扫码器进行扫描测试")
# 等待用户操作扫码器最多等待10秒
start_time = time.time()
timeout = 10.0 # 10秒超时
while time.time() - start_time < timeout:
# 使用QApplication处理事件保持UI响应
QApplication.processEvents()
time.sleep(0.1) # 短暂休眠减少CPU占用
# 关闭串口
self.serial_manager.close_port(port)
# 恢复原始回调
if original_callback:
self.serial_manager.callbacks['scanner_data'] = original_callback
else:
# 如果之前没有回调,则删除临时回调
if 'scanner_data' in self.serial_manager.callbacks:
del self.serial_manager.callbacks['scanner_data']
QMessageBox.information(self, "测试完成", f"串口 {port} 已关闭")
else:
QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}")
except Exception as e:
logging.error(f"测试扫码器串口失败: {e}")
QMessageBox.critical(self, "测试失败", f"测试扫码器串口失败: {e}")