import os import json import logging import serial.tools.list_ports import time from PySide6.QtWidgets import QMessageBox from ui.serial_settings_ui import SerialSettingsUI from utils.config_loader import ConfigLoader from utils.serial_manager import SerialManager class SerialSettingsWidget(SerialSettingsUI): """串口设置组件""" def __init__(self, parent=None): super().__init__(parent) self.config = ConfigLoader.get_instance() self.serial_manager = SerialManager() # 连接信号 self.init_connections() # 加载设置 self.load_settings() def init_connections(self): """初始化信号连接""" # 米电阻串口 self.mdz_refresh_btn.clicked.connect(self.refresh_ports) self.test_mdz_btn.clicked.connect(self.test_mdz_port) # 线径串口 self.xj_refresh_btn.clicked.connect(self.refresh_ports) self.test_xj_btn.clicked.connect(self.test_xj_port) # 扫码器串口 self.scanner_refresh_btn.clicked.connect(self.refresh_ports) self.test_scanner_btn.clicked.connect(self.test_scanner_port) # 保存按钮 self.save_btn.clicked.connect(self.save_settings) # 初始化时刷新串口列表 self.refresh_ports() # 加载设置 self.load_settings() def refresh_ports(self): """刷新串口列表""" try: # 保存当前选择 current_mdz_port = self.mdz_port_combo.currentData() current_xj_port = self.xj_port_combo.currentData() current_scanner_port = self.scanner_port_combo.currentData() # 清空列表 self.mdz_port_combo.clear() self.xj_port_combo.clear() self.scanner_port_combo.clear() # 添加"不使用"选项 self.mdz_port_combo.addItem("不使用", "") self.xj_port_combo.addItem("不使用", "") self.scanner_port_combo.addItem("不使用", "") # 获取可用串口 ports = list(serial.tools.list_ports.comports()) for port in ports: port_name = port.device port_desc = f"{port_name} ({port.description})" # 添加到米电阻下拉框 self.mdz_port_combo.addItem(port_desc, port_name) # 添加到线径下拉框 self.xj_port_combo.addItem(port_desc, port_name) # 添加到扫码器下拉框 self.scanner_port_combo.addItem(port_desc, port_name) # 恢复之前的选择 if current_mdz_port: index = self.mdz_port_combo.findData(current_mdz_port) if index >= 0: self.mdz_port_combo.setCurrentIndex(index) else: # 如果之前没有选择,则设为"不使用" self.mdz_port_combo.setCurrentIndex(0) if current_xj_port: index = self.xj_port_combo.findData(current_xj_port) if index >= 0: self.xj_port_combo.setCurrentIndex(index) else: # 如果之前没有选择,则设为"不使用" self.xj_port_combo.setCurrentIndex(0) if current_scanner_port: index = self.scanner_port_combo.findData(current_scanner_port) if index >= 0: self.scanner_port_combo.setCurrentIndex(index) else: # 如果之前没有选择,则设为"不使用" self.scanner_port_combo.setCurrentIndex(0) logging.info(f"已刷新串口列表,找到 {len(ports)} 个串口") except Exception as e: logging.error(f"刷新串口列表失败: {e}") QMessageBox.warning(self, "刷新失败", f"刷新串口列表失败: {e}") def load_settings(self): """加载设置""" try: # 加载全局设置 enable_serial = self.config.get_value('app.features.enable_serial_ports', False) enable_keyboard = self.config.get_value('app.features.enable_keyboard_listener', False) self.enable_serial_checkbox.setChecked(enable_serial) self.enable_keyboard_checkbox.setChecked(enable_keyboard) # 加载米电阻设置 mdz_config = self.config.get_config('mdz') if mdz_config: # 设置串口 mdz_port = mdz_config.get('ser', '') index = self.mdz_port_combo.findData(mdz_port) if index >= 0: self.mdz_port_combo.setCurrentIndex(index) # 设置波特率 mdz_baud = str(mdz_config.get('port', '9600')) index = self.mdz_baud_combo.findText(mdz_baud) if index >= 0: self.mdz_baud_combo.setCurrentIndex(index) # 设置数据位 mdz_data_bits = str(mdz_config.get('data_bits', '8')) index = self.mdz_data_bits_combo.findText(mdz_data_bits) if index >= 0: self.mdz_data_bits_combo.setCurrentIndex(index) # 设置停止位 mdz_stop_bits = str(mdz_config.get('stop_bits', '1')) index = self.mdz_stop_bits_combo.findText(mdz_stop_bits) if index >= 0: self.mdz_stop_bits_combo.setCurrentIndex(index) # 设置校验位 mdz_parity = mdz_config.get('parity', 'N') index = self.mdz_parity_combo.findData(mdz_parity) if index >= 0: self.mdz_parity_combo.setCurrentIndex(index) # 设置查询指令 mdz_query_cmd = mdz_config.get('query_cmd', '01 03 00 01 00 07 55 C8') self.mdz_query_cmd.setText(mdz_query_cmd) # 设置查询间隔 mdz_query_interval = mdz_config.get('query_interval', 5) self.mdz_query_interval.setValue(mdz_query_interval) # 加载线径设置 xj_config = self.config.get_config('xj') if xj_config: # 设置串口 xj_port = xj_config.get('ser', '') index = self.xj_port_combo.findData(xj_port) if index >= 0: self.xj_port_combo.setCurrentIndex(index) # 设置波特率 xj_baud = str(xj_config.get('port', '9600')) index = self.xj_baud_combo.findText(xj_baud) if index >= 0: self.xj_baud_combo.setCurrentIndex(index) # 设置数据位 xj_data_bits = str(xj_config.get('data_bits', '8')) index = self.xj_data_bits_combo.findText(xj_data_bits) if index >= 0: self.xj_data_bits_combo.setCurrentIndex(index) # 设置停止位 xj_stop_bits = str(xj_config.get('stop_bits', '1')) index = self.xj_stop_bits_combo.findText(xj_stop_bits) if index >= 0: self.xj_stop_bits_combo.setCurrentIndex(index) # 设置校验位 xj_parity = xj_config.get('parity', 'N') index = self.xj_parity_combo.findData(xj_parity) if index >= 0: self.xj_parity_combo.setCurrentIndex(index) # 加载扫码器设置 scanner_config = self.config.get_config('scanner') if scanner_config: # 设置串口 scanner_port = scanner_config.get('ser', '') index = self.scanner_port_combo.findData(scanner_port) if index >= 0: self.scanner_port_combo.setCurrentIndex(index) # 设置波特率 scanner_baud = str(scanner_config.get('port', '9600')) index = self.scanner_baud_combo.findText(scanner_baud) if index >= 0: self.scanner_baud_combo.setCurrentIndex(index) # 设置数据位 scanner_data_bits = str(scanner_config.get('data_bits', '8')) index = self.scanner_data_bits_combo.findText(scanner_data_bits) if index >= 0: self.scanner_data_bits_combo.setCurrentIndex(index) # 设置停止位 scanner_stop_bits = str(scanner_config.get('stop_bits', '1')) index = self.scanner_stop_bits_combo.findText(scanner_stop_bits) if index >= 0: self.scanner_stop_bits_combo.setCurrentIndex(index) # 设置校验位 scanner_parity = scanner_config.get('parity', 'N') index = self.scanner_parity_combo.findData(scanner_parity) if index >= 0: self.scanner_parity_combo.setCurrentIndex(index) logging.info("已加载串口设置") except Exception as e: logging.error(f"加载串口设置失败: {e}") QMessageBox.warning(self, "加载失败", f"加载串口设置失败: {e}") def save_settings(self): """保存设置""" try: # 保存全局设置 enable_serial = self.enable_serial_checkbox.isChecked() enable_keyboard = self.enable_keyboard_checkbox.isChecked() self.config.set_value('app.features.enable_serial_ports', enable_serial) self.config.set_value('app.features.enable_keyboard_listener', enable_keyboard) # 保存米电阻设置 mdz_port = self.mdz_port_combo.currentData() mdz_baud = int(self.mdz_baud_combo.currentText()) mdz_data_bits = int(self.mdz_data_bits_combo.currentText()) mdz_stop_bits = float(self.mdz_stop_bits_combo.currentText()) mdz_parity = self.mdz_parity_combo.currentData() mdz_query_cmd = self.mdz_query_cmd.text().strip() mdz_query_interval = self.mdz_query_interval.value() mdz_config = { 'port': mdz_baud, 'data_bits': mdz_data_bits, 'stop_bits': mdz_stop_bits, 'parity': mdz_parity, 'query_cmd': mdz_query_cmd, 'query_interval': mdz_query_interval } # 只有当用户选择了串口时才保存串口配置 if mdz_port: mdz_config['ser'] = mdz_port self.config.set_config('mdz', mdz_config) # 保存线径设置 xj_port = self.xj_port_combo.currentData() xj_baud = int(self.xj_baud_combo.currentText()) xj_data_bits = int(self.xj_data_bits_combo.currentText()) xj_stop_bits = float(self.xj_stop_bits_combo.currentText()) xj_parity = self.xj_parity_combo.currentData() xj_config = { 'port': xj_baud, 'data_bits': xj_data_bits, 'stop_bits': xj_stop_bits, 'parity': xj_parity } # 只有当用户选择了串口时才保存串口配置 if xj_port: xj_config['ser'] = xj_port self.config.set_config('xj', xj_config) # 保存扫码器设置 scanner_port = self.scanner_port_combo.currentData() scanner_baud = int(self.scanner_baud_combo.currentText()) scanner_data_bits = int(self.scanner_data_bits_combo.currentText()) scanner_stop_bits = float(self.scanner_stop_bits_combo.currentText()) scanner_parity = self.scanner_parity_combo.currentData() scanner_config = { 'port': scanner_baud, 'data_bits': scanner_data_bits, 'stop_bits': scanner_stop_bits, 'parity': scanner_parity } # 只有当用户选择了串口时才保存串口配置 if scanner_port: scanner_config['ser'] = scanner_port self.config.set_config('scanner', scanner_config) # 发送设置变更信号 self.settings_changed.emit() QMessageBox.information(self, "保存成功", "串口设置已保存") except Exception as e: logging.error(f"保存串口设置失败: {e}") QMessageBox.critical(self, "保存失败", f"保存串口设置失败: {e}") def test_mdz_port(self): """测试米电阻串口""" try: port = self.mdz_port_combo.currentData() if not port: QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"") return baud = int(self.mdz_baud_combo.currentText()) data_bits = int(self.mdz_data_bits_combo.currentText()) stop_bits = float(self.mdz_stop_bits_combo.currentText()) parity = self.mdz_parity_combo.currentData() # 关闭可能已经打开的串口 if self.serial_manager.is_port_open(port): self.serial_manager.close_port(port) # 尝试打开串口 success = self.serial_manager.open_port( port, 'mdz', baud, data_bits, stop_bits, parity, 1.0 ) if success: # 尝试发送查询指令 query_cmd = self.mdz_query_cmd.text() if query_cmd: try: # 转换查询指令为字节 cmd_bytes = bytes.fromhex(query_cmd.replace(' ', '')) self.serial_manager.write_data(port, cmd_bytes) time.sleep(0.1) # 等待响应 # 读取响应 response = self.serial_manager.read_data(port) if response: # 将字节转换为十六进制字符串 hex_str = ' '.join(f'{b:02X}' for b in response) QMessageBox.information(self, "测试成功", f"串口打开成功,收到响应:\n{hex_str}") else: QMessageBox.information(self, "测试成功", "串口打开成功,但未收到响应") except Exception as e: QMessageBox.warning(self, "测试结果", f"串口打开成功,但发送指令失败: {e}") else: QMessageBox.information(self, "测试成功", "串口打开成功") # 关闭串口 self.serial_manager.close_port(port) else: QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}") except Exception as e: logging.error(f"测试米电阻串口失败: {e}") QMessageBox.critical(self, "测试失败", f"测试米电阻串口失败: {e}") def test_xj_port(self): """测试线径串口""" try: port = self.xj_port_combo.currentData() if not port: QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"") return baud = int(self.xj_baud_combo.currentText()) data_bits = int(self.xj_data_bits_combo.currentText()) stop_bits = float(self.xj_stop_bits_combo.currentText()) parity = self.xj_parity_combo.currentData() # 关闭可能已经打开的串口 if self.serial_manager.is_port_open(port): self.serial_manager.close_port(port) # 尝试打开串口 success = self.serial_manager.open_port( port, 'xj', baud, data_bits, stop_bits, parity, 1.0 ) if success: QMessageBox.information(self, "测试成功", f"串口 {port} 打开成功") # 关闭串口 self.serial_manager.close_port(port) else: QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}") except Exception as e: logging.error(f"测试线径串口失败: {e}") QMessageBox.critical(self, "测试失败", f"测试线径串口失败: {e}") def test_scanner_port(self): """测试扫码器串口""" try: port = self.scanner_port_combo.currentData() if not port: QMessageBox.warning(self, "测试失败", "请先选择串口,当前设置为\"不使用\"") return baud = int(self.scanner_baud_combo.currentText()) data_bits = int(self.scanner_data_bits_combo.currentText()) stop_bits = float(self.scanner_stop_bits_combo.currentText()) parity = self.scanner_parity_combo.currentData() # 关闭可能已经打开的串口 if self.serial_manager.is_port_open(port): self.serial_manager.close_port(port) # 尝试打开串口 success = self.serial_manager.open_port( port, 'scanner', baud, data_bits, stop_bits, parity, 1.0 ) if success: QMessageBox.information(self, "测试成功", f"串口 {port} 打开成功\n请触发扫码器进行扫描测试") # 尝试读取数据(短暂等待扫码器输入) start_time = time.time() timeout = 5.0 # 5秒超时 while time.time() - start_time < timeout: response = self.serial_manager.read_data(port) if response: # 尝试将字节解码为字符串 try: text = response.decode('utf-8').strip() QMessageBox.information(self, "测试成功", f"收到扫码数据:\n{text}") break except: # 如果解码失败,显示十六进制 hex_str = ' '.join(f'{b:02X}' for b in response) QMessageBox.information(self, "测试成功", f"收到扫码数据 (十六进制):\n{hex_str}") break time.sleep(0.1) # 短暂休眠,减少CPU占用 # 关闭串口 self.serial_manager.close_port(port) else: QMessageBox.critical(self, "测试失败", f"无法打开串口 {port}") except Exception as e: logging.error(f"测试扫码器串口失败: {e}") QMessageBox.critical(self, "测试失败", f"测试扫码器串口失败: {e}")