import serial import threading import time import logging import os import json from typing import Dict, Optional, Callable from utils.config_loader import ConfigLoader from utils.keyboard_listener import KeyboardListener from pynput.keyboard import Key import re import platform class SerialManager: """串口管理器,单例模式""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super(SerialManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self.serial_ports: Dict[str, serial.Serial] = {} # 存储打开的串口对象 self.read_threads: Dict[str, threading.Thread] = {} # 存储读取线程 self.running_flags: Dict[str, bool] = {} # 存储线程运行标志 self.callbacks: Dict[str, Callable] = {} # 存储数据回调函数 self.port_types: Dict[str, str] = {} # 存储端口类型,用于线程重启 # 添加文件操作暂停控制 self._file_operations_suspended = False self._file_operations_lock = threading.Lock() # 添加称重稳定性检测变量 self.last_weight = 0 self.stable_count = 0 self.weight_written = False # 初始化为False,确保首次称重能够正确处理 # 添加抗抖动变量 self.last_weights = [0] * 3 # 存储最近3个重量值 self.weight_changed_time = time.time() # 上次重量变化的时间 self.last_write_time = 0 # 最后写入时间 # 稳定性时间跟踪 self.stability_start_time = 0 # 开始检测稳定性的时间 # 数据存储 self.data = { 'mdz': 0, 'xj': 0, # 添加线径数据 'cz': 0, 'scanner': '' # 添加扫码器数据 } # 是否自动查询米电阻数据,默认为False,只通过PageUp键触发 self.auto_query_mdz = False # 是否自动查询线径数据,默认为True,开启自动查询 self.auto_query_xj = True logging.info("初始化 SerialManager") # 加载配置 self._load_config() # 检查是否启用键盘监听功能 enable_keyboard_listener = self.config.get_value('app.features.enable_keyboard_listener', False) if enable_keyboard_listener: try: # 初始化键盘监听器 self.keyboard_listener = KeyboardListener() # 从配置中获取米电阻触发键 trigger_key = self.config.get_value('serial.keyboard.trigger_key', 'Key.page_up') # 注册米电阻触发键回调 self.keyboard_listener.register_callback(trigger_key, self.trigger_resistance_query) logging.info(f"已注册{trigger_key}按键回调用于触发米电阻查询") # 注意:不在这里启动键盘监听器,而是在点击"开始"按钮时启动 except Exception as e: logging.error(f"初始化键盘监听器失败: {e}") # 创建一个空的键盘监听器对象,以避免后续代码出现NoneType错误 self.keyboard_listener = None else: logging.info("键盘监听功能已在配置中禁用,跳过初始化键盘监听器") self.keyboard_listener = None # 启动线程监控 self._start_thread_monitor() def _load_config(self): """加载配置""" try: config_loader = ConfigLoader.get_instance() # Renamed for clarity self.config = config_loader # Assign the whole config object # 获取数据文件路径 self.data_file = self.config.get_value('serial.data_file', 'data.txt') # 确保是绝对路径 if not os.path.isabs(self.data_file): self.data_file = os.path.abspath(self.data_file) logging.info(f"最终确定的 data_file 绝对路径: {self.data_file}") # 获取串口配置 self.mdz_config = self.config.get_config('mdz') self.cz_config = self.config.get_config('cz') self.xj_config = self.config.get_config('xj') # 添加线径配置 self.scanner_config = self.config.get_config('scanner') # 添加扫码器配置 # 检查操作系统类型,在macOS上处理COM端口名称问题 os_type = platform.system() if os_type == "Darwin": # 检查是否需要自动检测macOS上的串口 macos_autodetect = self.config.get_value('serial.os_config.macos_autodetect', False) if macos_autodetect: logging.info("在macOS上启用了串口自动检测") self._detect_macos_ports() # 获取默认的稳定阈值 self.stable_threshold = self.cz_config.get('stable_threshold', 10) if self.cz_config else 10 # 检查是否自动查询米电阻数据 self.auto_query_mdz = self.config.get_value('serial.mdz.auto_query', False) # 检查是否自动查询线径数据 self.auto_query_xj = self.config.get_value('serial.xj.auto_query', True) # 默认为True,确保线径自动查询开启 logging.info(f"已加载串口配置:mdz={self.mdz_config}, xj={self.xj_config}, cz={self.cz_config}, scanner={self.scanner_config}, data_file={self.data_file}") logging.info(f"米电阻自动查询: {'开启' if self.auto_query_mdz else '关闭'}") logging.info(f"线径自动查询: {'开启' if self.auto_query_xj else '关闭'}") except Exception as e: logging.error(f"加载配置出错: {e}") # 设置默认值 self.data_file = os.path.abspath('data.txt') self.mdz_config = {'port': 9600, 'ser': 'COM5'} self.cz_config = {'port': 9600, 'ser': 'COM2', 'stable_threshold': 10} self.xj_config = {'port': 9600, 'ser': 'COM3'} self.scanner_config = {'port': 9600, 'ser': 'COM4'} self.stable_threshold = 10 self.auto_query_mdz = False self.auto_query_xj = True # 默认为True,确保线径自动查询开启 logging.info(f"使用默认配置,数据文件: {self.data_file}") def _detect_macos_ports(self): """在macOS上检测可用的串口""" try: if platform.system() != "Darwin": logging.info("不是macOS系统,跳过串口检测") return logging.info("正在检测macOS可用串口...") # 检查/dev目录下的tty.*和cu.*设备 import glob tty_ports = glob.glob('/dev/tty.*') cu_ports = glob.glob('/dev/cu.*') all_ports = tty_ports + cu_ports if not all_ports: logging.warning("未检测到macOS上的串口设备") return logging.info(f"检测到以下串口设备: {all_ports}") # 如果mdz_config中的串口是COM格式,尝试替换为检测到的第一个串口 if self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser'].startswith('COM'): # 优先选择包含"usb"的设备 usb_ports = [port for port in all_ports if 'usb' in port.lower()] if usb_ports: self.mdz_config['ser'] = usb_ports[0] logging.info(f"自动将米电阻串口从COM格式替换为: {usb_ports[0]}") elif all_ports: self.mdz_config['ser'] = all_ports[0] logging.info(f"自动将米电阻串口从COM格式替换为: {all_ports[0]}") # 如果cz_config中的串口是COM格式,尝试替换为检测到的第二个串口 if self.cz_config and 'ser' in self.cz_config and self.cz_config['ser'].startswith('COM'): # 优先选择包含"usb"的设备,并且不是已分配给mdz的设备 usb_ports = [port for port in all_ports if 'usb' in port.lower() and (not self.mdz_config or port != self.mdz_config.get('ser'))] if usb_ports: self.cz_config['ser'] = usb_ports[0] logging.info(f"自动将线径串口从COM格式替换为: {usb_ports[0]}") elif len(all_ports) > 1: # 选择不是mdz_config已使用的第一个端口 for port in all_ports: if not self.mdz_config or port != self.mdz_config.get('ser'): self.cz_config['ser'] = port logging.info(f"自动将线径串口从COM格式替换为: {port}") break elif all_ports and (not self.mdz_config or all_ports[0] != self.mdz_config.get('ser')): self.cz_config['ser'] = all_ports[0] logging.info(f"自动将线径串口从COM格式替换为: {all_ports[0]}") except Exception as e: logging.error(f"检测macOS串口时发生错误: {e}") logging.info("将继续使用配置文件中的串口设置") def open_port(self, port_name: str, port_type: str, baud_rate: Optional[int] = None, data_bits: int = 8, stop_bits: int = 1, parity: str = 'N', timeout: float = 1.0, callback: Optional[Callable] = None) -> bool: """ 打开串口 Args: port_name: 串口名称,如COM1 port_type: 串口类型,'cz'表示称重,'mdz'表示米电阻, 'xj'表示线径, 'scanner'表示扫码器 baud_rate: 波特率,如果为None则从配置文件读取 data_bits: 数据位 stop_bits: 停止位 parity: 校验位,'N'表示无校验,'O'表示奇校验,'E'表示偶校验 timeout: 超时时间,单位秒 callback: 回调函数,接收(port_name, data)作为参数 Returns: bool: 成功返回True,失败返回False """ # 如果串口已经打开,先关闭 if port_name in self.serial_ports and self.serial_ports[port_name]: try: self.close_port(port_name) except Exception as e: logging.error(f"关闭已打开的串口失败: {e}") # 配置串口参数 try: # 从配置读取波特率(如果未提供) if baud_rate is None: if port_type == 'cz' and self.cz_config: baud_rate = self.cz_config.get('port', 9600) elif port_type == 'mdz' and self.mdz_config: baud_rate = self.mdz_config.get('port', 9600) elif port_type == 'xj' and self.xj_config: baud_rate = self.xj_config.get('port', 9600) elif port_type == 'scanner' and self.scanner_config: baud_rate = self.scanner_config.get('port', 9600) else: baud_rate = 9600 # 默认波特率 # 转换校验位为PySerial常量 if parity.upper() == 'N': parity_constant = serial.PARITY_NONE elif parity.upper() == 'O': parity_constant = serial.PARITY_ODD elif parity.upper() == 'E': parity_constant = serial.PARITY_EVEN else: parity_constant = serial.PARITY_NONE # 打开串口 self.serial_ports[port_name] = serial.Serial( port=port_name, baudrate=baud_rate, bytesize=data_bits, stopbits=stop_bits, parity=parity_constant, timeout=timeout ) logging.info(f"打开串口成功: {port_name}, 类型: {port_type}, 波特率: {baud_rate}") # 设置回调 if callback: self.callbacks[port_name] = callback # 记录端口类型,用于线程重启 self.port_types[port_name] = port_type # 创建并启动读取线程 self.running_flags[port_name] = True # 统一线程创建和管理方式 if port_type == 'cz': # 称重数据需要特殊处理 thread = threading.Thread( target=self._read_weight_thread, args=(port_name, self.stable_threshold), daemon=True, name=f"Thread-{port_type}-{port_name}" ) elif port_type == 'mdz': # 米电阻数据需要特殊处理 thread = threading.Thread( target=self._read_resistance_thread, args=(port_name,), daemon=True, name=f"Thread-{port_type}-{port_name}" ) elif port_type == 'xj': # 线径数据需要特殊处理 thread = threading.Thread( target=self._read_diameter_thread, args=(port_name,), daemon=True, name=f"Thread-{port_type}-{port_name}" ) elif port_type == 'scanner': # 扫码器数据需要特殊处理 thread = threading.Thread( target=self._read_scanner_thread, args=(port_name,), daemon=True, name=f"Thread-{port_type}-{port_name}" ) else: # 其他类型使用通用处理 thread = threading.Thread( target=self._read_thread, args=(port_name,), daemon=True, name=f"Thread-{port_type}-{port_name}" ) # 统一启动线程 thread.start() self.read_threads[port_name] = thread logging.info(f"已启动串口读取线程: {thread.name}") return True except Exception as e: logging.error(f"打开串口失败: {port_name}, 错误: {e}") # 确保清理好资源 if port_name in self.serial_ports: try: self.serial_ports[port_name].close() except: pass self.serial_ports.pop(port_name, None) # 停止相关线程 self.running_flags[port_name] = False if port_name in self.read_threads: self.read_threads.pop(port_name, None) return False def close_port(self, port_name: str) -> bool: """ 关闭串口 Args: port_name: 串口名称 Returns: 是否成功关闭 """ try: # 停止读取线程 if port_name in self.running_flags: self.running_flags[port_name] = False # 等待线程结束 if port_name in self.read_threads: if self.read_threads[port_name].is_alive(): self.read_threads[port_name].join(1.0) # 最多等待1秒 del self.read_threads[port_name] # 关闭串口 if port_name in self.serial_ports: if self.serial_ports[port_name].is_open: self.serial_ports[port_name].close() del self.serial_ports[port_name] logging.info(f"串行对象 for {port_name} 从 self.serial_ports 中删除. 当前活跃端口: {list(self.serial_ports.keys())}") # 删除回调和端口类型记录 if port_name in self.callbacks: del self.callbacks[port_name] if port_name in self.port_types: del self.port_types[port_name] logging.info(f"串口 {port_name} 已关闭") return True except Exception as e: logging.error(f"关闭串口 {port_name} 失败: {str(e)}") return False def is_port_open(self, port_name: str) -> bool: """ 检查串口是否打开 Args: port_name: 串口名称 Returns: 是否打开 """ return port_name in self.serial_ports and self.serial_ports[port_name].is_open def write_data(self, port_name: str, data: bytes) -> bool: """ 向串口写入数据 Args: port_name: 串口名称 data: 要写入的数据 Returns: 是否成功写入 """ try: if not self.is_port_open(port_name): return False self.serial_ports[port_name].write(data) return True except Exception as e: logging.error(f"向串口 {port_name} 写入数据失败: {str(e)}") return False def read_data(self, port_name: str, size: int = None) -> bytes: """ 从串口读取数据 Args: port_name: 串口名称 size: 要读取的字节数,如果为None则读取所有可用数据 Returns: 读取的数据,如果失败则返回空字节 """ try: if not self.is_port_open(port_name): logging.error(f"尝试从未打开的串口 {port_name} 读取数据") return b'' if size is None: # 读取所有可用数据 if self.serial_ports[port_name].in_waiting > 0: return self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) return b'' else: # 读取指定数量的字节 return self.serial_ports[port_name].read(size) except Exception as e: logging.error(f"从串口 {port_name} 读取数据失败: {str(e)}") return b'' def _read_thread(self, port_name: str): """ 串口读取线程 Args: port_name: 串口名称 """ try: while self.running_flags.get(port_name, False): if not self.is_port_open(port_name): time.sleep(0.1) continue # 读取数据 if self.serial_ports[port_name].in_waiting: data = self.serial_ports[port_name].readline() # 调用回调函数 if port_name in self.callbacks and data: try: self.callbacks[port_name](port_name, data) except Exception as e: logging.error(f"调用串口 {port_name} 回调函数失败: {str(e)}") time.sleep(0.01) # 短暂休眠,避免CPU占用过高 except Exception as e: logging.error(f"串口 {port_name} 读取线程异常: {str(e)}") def _read_weight_thread(self, port_name: str, stable_threshold: int = 10): """ 称重串口读取线程 Args: port_name: 串口名称 stable_threshold: 稳定阈值 """ try: logging.info(f"[{port_name}] 称重线程启动") # 重置状态变量,确保线程重启时能正确处理称重数据 self.weight_written = False self.stable_count = 0 self.last_weight = 0 self.last_weights = [0] * 3 self.weight_changed_time = time.time() self.last_write_time = 0 # 添加一个变量来跟踪最后一次写入时间 self.stability_start_time = 0 # 重置稳定性检测开始时间 # 添加实际的读取逻辑 while self.running_flags.get(port_name, False): if not self.is_port_open(port_name): time.sleep(0.1) continue # 检查是否有数据可读 if self.serial_ports[port_name].in_waiting > 0: data = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) weight = self._process_weight_data(data) if weight is not None: # 更新数据 self.data['cz'] = weight self._write_data_to_file() time.sleep(0.1) except Exception as e: logging.error(f"称重串口 {port_name} 读取线程异常: {e}") # 线程异常时,尝试重置状态 self.running_flags[port_name] = False def _process_weight_data(self, data_bytes): """ TODO: 需要将线径数据写入文件,这个方法需要修改成线径的串口数据获取 Args: data_bytes: 原始字节数据 Returns: 解析后的重量值或None """ try: # 尝试多种编码方式解析 weight = None # 记录原始数据以便调试 logging.debug(f"称重原始数据: {data_bytes.hex()}") # 方法1: 尝试ASCII解码 try: ascii_str = data_bytes.decode('ascii', errors='replace') # 仅提取数字部分 import re numbers = re.findall(r'\d+', ascii_str) if numbers: weight = float(numbers[0]) / 10.0 logging.debug(f"ASCII解码成功: {weight}") except Exception as e: logging.debug(f"ASCII解码失败: {e}") # 方法2: 尝试直接从二进制解析 (根据具体协议) if weight is None and len(data_bytes) >= 8: try: # 假设重量在特定位置,具体需根据实际协议调整 weight_bytes = data_bytes[2:6] weight = int.from_bytes(weight_bytes, byteorder='big') / 10.0 logging.debug(f"二进制解码成功: {weight}") except Exception as e: logging.debug(f"二进制解码失败: {e}") return weight except Exception as e: logging.error(f"处理称重数据失败: {e}") return None def _read_resistance_thread(self, port_name: str): """ 米电阻串口读取线程 Args: port_name: 串口名称 """ try: logging.info(f"[{port_name}] 米电阻线程启动") while self.running_flags.get(port_name, False): if not self.is_port_open(port_name): time.sleep(0.1) continue # 如果不是自动查询模式,则只监听串口数据而不主动发送查询 if not self.auto_query_mdz: # 检查是否有数据可读 if self.serial_ports[port_name].in_waiting > 0: response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) self._process_mdz_response(port_name, response) time.sleep(0.1) continue # 以下代码只在自动查询模式下执行 try: # 发送查询指令 hex_data = '01 03 00 01 00 07 55 C8' byte_data = bytes.fromhex(hex_data.replace(' ', '')) self.serial_ports[port_name].write(byte_data) # 等待响应 time.sleep(1) if self.serial_ports[port_name].in_waiting > 0: response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) self._process_mdz_response(port_name, response) except Exception as e: logging.error(f"米电阻数据处理异常: {e}") # 每5秒查询一次 for i in range(50): if not self.running_flags.get(port_name, False): break time.sleep(0.1) except Exception as e: logging.error(f"米电阻串口 {port_name} 读取线程异常: {e}") # 线程异常时,尝试重置状态 self.running_flags[port_name] = False def _process_mdz_response(self, port_name, response_bytes: bytes): """处理米电阻响应数据""" try: if response_bytes: # 确保有响应数据 try: # 转换为字符串用于日志记录 response_str = str(response_bytes) logging.warning(f"[{port_name}] 米电阻数据: {response_str}") # 使用正则表达式直接提取数字 # 查找格式为11.58201这样的浮点数 match = re.search(r'(\d+\.\d+)', response_str) if match: number_str = match.group(1) try: # 转换为浮点数 mdz_value = float(number_str) logging.info(f"米电阻数据: {mdz_value}") # 更新数据 self.data['mdz'] = mdz_value self._write_data_to_file() self._notify_callbacks('mdz_data', {"type": "mdz", "value": self.data['mdz'], "source": f"serial ({port_name})"}) return True except ValueError: logging.warning(f"米电阻数据字符串 '{number_str}' 无法转换为浮点数") else: logging.warning(f"米电阻数据中未找到有效的浮点数") except Exception as e: logging.error(f"处理米电阻数据异常: {e}") else: logging.warning("米电阻响应数据为空") # 如果无法解析,则不再使用模拟数据,直接返回失败 return False except Exception as e: logging.error(f"米电阻数据处理关键异常: {e}") return False def _write_data_to_file(self): """将数据写入文件""" try: # 检查文件操作是否已暂停 with self._file_operations_lock: if self._file_operations_suspended: logging.info("文件操作已暂停,跳过写入") return # 构建数据字符串 data_str = f"mdz:{self.data['mdz']}|cz:{self.data['cz']}|scanner:{self.data['scanner']}|" # 确保目录存在 data_dir = os.path.dirname(self.data_file) if data_dir and not os.path.exists(data_dir): logging.info(f"创建目录: {data_dir}") os.makedirs(data_dir, exist_ok=True) # 写入文件 - 使用临时文件写入然后重命名,避免文件锁定问题 # 创建临时文件 temp_file = f"{self.data_file}.tmp" try: with open(temp_file, 'w', encoding='utf-8') as f: f.write(data_str) f.flush() os.fsync(f.fileno()) # 确保数据写入磁盘 # 再次检查文件操作是否已暂停 with self._file_operations_lock: if self._file_operations_suspended: logging.info("文件操作已暂停,已写入临时文件但取消重命名操作") return # 原子性地重命名文件,替换原有文件(在大多数操作系统上是原子操作) if os.path.exists(self.data_file): try: os.remove(self.data_file) except Exception as e: logging.warning(f"无法删除旧数据文件: {e}, 尝试直接覆盖") os.rename(temp_file, self.data_file) logging.info(f"数据已写入文件: {self.data_file}") except Exception as e: logging.error(f"写入临时文件失败: {e}") # 清理临时文件 if os.path.exists(temp_file): try: os.remove(temp_file) except: pass raise # 重新抛出异常 except Exception as e: logging.error(f"写入数据文件失败: {e}, 文件路径: {self.data_file}") def get_current_data(self): """获取当前数据""" return self.data.copy() def close_all_ports(self): """关闭所有串口""" port_names = list(self.serial_ports.keys()) for port_name in port_names: self.close_port(port_name) def reload_config(self): """重新加载配置""" self._load_config() logging.info("已重新加载串口配置") def start_keyboard_listener(self): """启动键盘监听""" try: # 检查键盘监听器是否已初始化 if self.keyboard_listener is None: logging.warning("键盘监听器未初始化,无法启动") return False # 从配置中获取米电阻触发键 trigger_key = self.config.get_value('serial.keyboard.trigger_key', 'Key.page_up') # 不再获取线径触发键 # 确保已注册米电阻触发键回调 if trigger_key not in self.keyboard_listener.callbacks: self.keyboard_listener.register_callback(trigger_key, self.trigger_resistance_query) logging.info(f"已注册{trigger_key}按键回调用于触发米电阻数据查询") # 移除线径触发键回调注册代码 # 启动键盘监听 result = self.keyboard_listener.start() if result: logging.info(f"已启动键盘监听,按 {trigger_key} 键可触发米电阻数据查询") # 检查监听器状态 if self.keyboard_listener.is_active(): logging.info("键盘监听器处于活动状态") else: logging.warning("键盘监听器启动完成,但状态检查显示不活动") else: logging.error("启动键盘监听失败") return result except Exception as e: logging.error(f"启动键盘监听失败: {e}") return False def stop_keyboard_listener(self, join_thread=False): """停止键盘监听 Args: join_thread: 是否等待键盘监听线程结束 """ try: # 检查键盘监听器是否已初始化 if self.keyboard_listener is None: logging.info("键盘监听器未初始化,无需停止") return self.keyboard_listener.stop() logging.info("已停止键盘监听") # 如果需要等待线程结束 if join_thread: try: self.keyboard_listener.join(timeout=1.0) # 最多等待1秒 except Exception as e: logging.error(f"等待键盘监听线程结束时出错: {e}") except Exception as e: logging.error(f"停止键盘监听失败: {e}") def register_callback(self, key, callback): """ 注册数据回调函数 Args: key: 回调标识,如 'mdz_data' callback: 回调函数,参数为 (port_name, data) """ try: if key in self.callbacks: logging.warning(f"覆盖已存在的回调函数: {key}") self.callbacks[key] = callback logging.info(f"已注册回调函数: {key}, 回调对象类型: {callback.__self__.__class__.__name__}") except Exception as e: logging.error(f"注册回调失败: {e}") def trigger_resistance_query(self): """触发米电阻查询""" try: # 检查米电阻配置是否存在 if not self.mdz_config: logging.warning("未找到米电阻配置,无法触发查询") return False # 检查米电阻串口是否已打开 ser_name = self.mdz_config.get('ser') if not ser_name or not self.is_port_open(ser_name): logging.warning(f"米电阻串口 {ser_name} 未打开,无法触发查询") return False # 从配置获取查询命令,如果没有则使用默认命令 query_cmd = self.mdz_config.get('query_cmd', '01 03 00 01 00 07 55 C8') # 转换为字节数据 try: byte_data = bytes.fromhex(query_cmd.replace(' ', '')) except ValueError as e: logging.error(f"米电阻查询命令格式错误: {query_cmd}, 错误: {e}") return False # 发送查询命令 result = self.write_data(ser_name, byte_data) if result: logging.info(f"已向米电阻串口 {ser_name} 发送查询命令: {query_cmd}") # 特殊情况:如果触发了查询,但串口没有响应,尝试模拟一个合理的响应 # 这里我们先等待一段时间,让设备有机会响应 time.sleep(0.5) # 如果没有数据可读,或者读取的数据不包含有效的米电阻值 if self.serial_ports[ser_name].in_waiting == 0: logging.warning("米电阻串口未返回数据,检查设备连接") # 在这里我们不再自动提供模拟数据,而是由用户决定是否重试 return True else: logging.error(f"向米电阻串口 {ser_name} 发送查询命令失败") return False except Exception as e: logging.error(f"触发米电阻查询时出错: {e}") return False def _notify_callbacks(self, port_name, value): """通知所有相关回调函数""" try: # 端口特定回调 (通常用于原始串口数据) if port_name in self.callbacks and port_name not in ['mdz_data', 'xj_data', 'scanner_data']: # 避免重复处理 try: # 假设这种回调期望原始的 value (可能是字节串,也可能是其他类型) self.callbacks[port_name](port_name, value) logging.debug(f"Notified port-specific callback for {port_name}") except Exception as e: logging.error(f"调用端口回调 {port_name} 失败: {e}") # 全局回调, 特别处理 'mdz_data' if 'mdz_data' in self.callbacks and port_name == 'mdz_data': actual_mdz_numeric_value = None source_info = "unknown" if isinstance(value, dict): actual_mdz_numeric_value = value.get('value') source_info = value.get('source', source_info) elif isinstance(value, (str, float, int)): # 如果直接传递了数值 (例如来自旧的 _use_mock_data) actual_mdz_numeric_value = str(value) else: # 尝试从可能是字节串的value中解码 (不太可能走到这里了,因为上游会处理好) try: decoded_value = value.decode('utf-8') if "米电阻数据:" in decoded_value: actual_mdz_numeric_value = decoded_value.split("米电阻数据:")[1].strip() else: actual_mdz_numeric_value = decoded_value # best guess except: # noqa pass # 无法解码或解析,保持 None if actual_mdz_numeric_value is not None: # 构建 PackageInboundDialog.on_mdz_data_received 期望的格式 callback_data_str = f"米电阻数据: {actual_mdz_numeric_value}" try: # port_name 对于 mdz_data 回调,可以传递触发源的串口名,或者一个通用标识 # trigger_resistance_query 知道 mdz_port_name,它应该作为 port_name 传给 _notify_callbacks # 如果是从模拟数据来,port_name 可能是 'mdz' 或 'mock' triggering_port = port_name if port_name not in ['mdz_data', 'mdz'] else self.mdz_config.get('ser', 'N/A') if self.mdz_config else 'N/A' if source_info.startswith("mock"): # 如果源是模拟数据 triggering_port = f"mock_{port_name}" # e.g. mock_mdz self.callbacks['mdz_data'](triggering_port, callback_data_str.encode('utf-8')) logging.info(f"通知 'mdz_data' 回调. 值: {actual_mdz_numeric_value}, 源: {source_info}, 触发源端口: {triggering_port}") except Exception as e: logging.error(f"调用全局回调 'mdz_data' 失败: {e}", exc_info=True) else: logging.warning(f"回调失败: mdz_data 中 实际值为None. 初始 value: {value}") # 全局回调, 特别处理 'xj_data' if 'xj_data' in self.callbacks and port_name == 'xj_data': actual_xj_numeric_value = None source_info = "unknown" if isinstance(value, dict): actual_xj_numeric_value = value.get('value') source_info = value.get('source', source_info) elif isinstance(value, (str, float, int)): actual_xj_numeric_value = str(value) else: try: decoded_value = value.decode('utf-8') if "线径数据:" in decoded_value: actual_xj_numeric_value = decoded_value.split("线径数据:")[1].strip() else: actual_xj_numeric_value = decoded_value # best guess except: # noqa pass # 无法解码或解析,保持 None if actual_xj_numeric_value is not None: callback_data_str = f"线径数据: {actual_xj_numeric_value}" try: triggering_port = port_name if port_name not in ['xj_data', 'xj'] else self.xj_config.get('ser', 'N/A') if self.xj_config else 'N/A' if source_info.startswith("mock"): triggering_port = f"mock_{port_name}" self.callbacks['xj_data'](triggering_port, callback_data_str.encode('utf-8')) logging.info(f"通知 'xj_data' 回调. 值: {actual_xj_numeric_value}, 源: {source_info}, 触发源端口: {triggering_port}") except Exception as e: logging.error(f"调用全局回调 'xj_data' 失败: {e}", exc_info=True) else: logging.warning(f"回调失败: xj_data 中实际值为None. 初始 value: {value}") # 全局回调, 特别处理 'scanner_data' if 'scanner_data' in self.callbacks and port_name == 'scanner_data': actual_scanner_value = None source_info = "unknown" if isinstance(value, dict): actual_scanner_value = value.get('value') source_info = value.get('source', source_info) elif isinstance(value, (str, bytes)): if isinstance(value, bytes): try: actual_scanner_value = value.decode('utf-8').strip() except: actual_scanner_value = str(value) else: actual_scanner_value = value if actual_scanner_value is not None: callback_data_str = f"扫码数据: {actual_scanner_value}" try: triggering_port = port_name if port_name not in ['scanner_data', 'scanner'] else self.scanner_config.get('ser', 'N/A') if self.scanner_config else 'N/A' if source_info.startswith("mock"): triggering_port = f"mock_{port_name}" self.callbacks['scanner_data'](triggering_port, callback_data_str.encode('utf-8')) logging.info(f"通知 'scanner_data' 回调. 值: {actual_scanner_value}, 源: {source_info}, 触发源端口: {triggering_port}") except Exception as e: logging.error(f"调用全局回调 'scanner_data' 失败: {e}", exc_info=True) else: logging.warning(f"回调失败: scanner_data 中实际值为None. 初始 value: {value}") except Exception as e: logging.error(f"通知回调失败: {e}", exc_info=True) def auto_open_configured_ports(self): """自动打开已配置的串口""" logging.info("尝试自动打开已配置的串口...") # 首先检查是否启用串口功能 enable_serial_ports = self.config.get_value('app.features.enable_serial_ports', False) if not enable_serial_ports: logging.info("串口功能已在配置中禁用,跳过自动打开串口") return False success = True # 检查操作系统类型并提供合适的警告 os_type = platform.system() if os_type == "Darwin" and ( (self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser'] and self.mdz_config['ser'].startswith('COM')) or (self.cz_config and 'ser' in self.cz_config and self.cz_config['ser'] and self.cz_config['ser'].startswith('COM')) or (self.xj_config and 'ser' in self.xj_config and self.xj_config['ser'] and self.xj_config['ser'].startswith('COM')) or (self.scanner_config and 'ser' in self.scanner_config and self.scanner_config['ser'] and self.scanner_config['ser'].startswith('COM')) ): logging.warning("检测到在macOS系统上配置了Windows格式的COM端口,这些端口将无法正常打开") logging.warning("macOS上的串口通常是/dev/tty.*或/dev/cu.*格式") # 继续尝试打开,但不影响程序流程 # 尝试打开称重串口 if self.cz_config and 'ser' in self.cz_config and self.cz_config['ser'] and self.cz_config['ser'].strip(): port_name = self.cz_config['ser'] baud_rate = self.cz_config.get('port', 9600) if not self.is_port_open(port_name): try: if self.open_port(port_name, 'cz', baud_rate): logging.info(f"自动打开称重串口 {port_name} 成功") else: logging.error(f"自动打开称重串口 {port_name} 失败") success = False except Exception as e: logging.error(f"自动打开称重串口 {port_name} 时发生异常: {e}") success = False else: logging.info(f"称重串口 {port_name} 已经打开,无需重新打开") else: logging.warning("称重串口未配置或设置为不使用,跳过自动打开") # 尝试打开线径串口 if self.xj_config and 'ser' in self.xj_config and self.xj_config['ser'] and self.xj_config['ser'].strip(): port_name = self.xj_config['ser'] baud_rate = self.xj_config.get('port', 19200) if not self.is_port_open(port_name): try: if self.open_port(port_name, 'xj', baud_rate): logging.info(f"自动打开线径串口 {port_name} 成功") else: logging.error(f"自动打开线径串口 {port_name} 失败") success = False except Exception as e: logging.error(f"自动打开线径串口 {port_name} 时发生异常: {e}") success = False else: logging.info(f"线径串口 {port_name} 已经打开,无需重新打开") else: logging.warning("线径串口未配置或设置为不使用,跳过自动打开") # 尝试打开米电阻串口 if self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser'] and self.mdz_config['ser'].strip(): port_name = self.mdz_config['ser'] baud_rate = self.mdz_config.get('port', 9600) if not self.is_port_open(port_name): try: if self.open_port(port_name, 'mdz', baud_rate): logging.info(f"自动打开米电阻串口 {port_name} 成功") else: logging.error(f"自动打开米电阻串口 {port_name} 失败") success = False except Exception as e: logging.error(f"自动打开米电阻串口 {port_name} 时发生异常: {e}") success = False else: logging.info(f"米电阻串口 {port_name} 已经打开,无需重新打开") else: logging.warning("米电阻串口未配置或设置为不使用,跳过自动打开") # 尝试打开扫码器串口 if self.scanner_config and 'ser' in self.scanner_config and self.scanner_config['ser'] and self.scanner_config['ser'].strip(): port_name = self.scanner_config['ser'] baud_rate = self.scanner_config.get('port', 9600) if not self.is_port_open(port_name): try: if self.open_port(port_name, 'scanner', baud_rate): logging.info(f"自动打开扫码器串口 {port_name} 成功") else: logging.error(f"自动打开扫码器串口 {port_name} 失败") success = False except Exception as e: logging.error(f"自动打开扫码器串口 {port_name} 时发生异常: {e}") success = False else: logging.info(f"扫码器串口 {port_name} 已经打开,无需重新打开") else: logging.warning("扫码器串口未配置或设置为不使用,跳过自动打开") # 注意:不在这里启动键盘监听器,而是在MainWindow的handle_start方法中显式调用start_keyboard_listener if not success: logging.warning("部分串口自动打开失败,请检查设备连接或在参数配置中手动打开") return True # 总是返回True,防止应用程序因串口问题而终止 def suspend_file_operations(self, suspend: bool): """暂停或恢复文件操作 Args: suspend: True表示暂停,False表示恢复 """ with self._file_operations_lock: old_state = self._file_operations_suspended self._file_operations_suspended = suspend if old_state != suspend: if suspend: logging.info("已暂停SerialManager文件操作") else: logging.info("已恢复SerialManager文件操作") def _read_diameter_thread(self, port_name: str): """ 线径串口读取线程 Args: port_name: 串口名称 """ try: logging.info(f"[{port_name}] 线径线程启动") logging.info(f"线径自动查询已开启,将持续发送查询指令获取数据") while self.running_flags.get(port_name, False): if not self.is_port_open(port_name): time.sleep(0.1) continue try: # 从配置获取查询命令,如果没有则使用默认命令 query_cmd = self.xj_config.get('query_cmd', '01 41 0d') # 发送查询指令 byte_data = bytes.fromhex(query_cmd.replace(' ', '')) self.serial_ports[port_name].write(byte_data) # 等待响应 - 减少等待时间以加快数据获取 time.sleep(0.2) if self.serial_ports[port_name].in_waiting > 0: response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) self._process_diameter_response(port_name, response) except Exception as e: logging.error(f"线径数据处理异常: {e}") # 查询间隔,从配置中获取或使用默认值 query_interval = self.xj_config.get('query_interval', 1) if self.xj_config else 1 # 确保查询间隔不小于0.2秒 if query_interval < 0.2: query_interval = 0.2 # 每隔query_interval秒查询一次,使用更简单的等待方式 time.sleep(query_interval) except Exception as e: logging.error(f"线径串口 {port_name} 读取线程异常: {e}") # 线程异常时,尝试重置状态 self.running_flags[port_name] = False def _process_diameter_response(self, port_name, response_bytes: bytes): """处理线径响应数据""" try: if response_bytes: # 确保有响应数据 try: # 转换为字符串用于日志记录 response_str = str(response_bytes) logging.warning(f"[{port_name}] 线径数据: {response_str}") # 使用正则表达式直接提取数字 # 查找格式为20.15这样的浮点数 match = re.search(r'(\d+\.?\d*)', response_str) if match: number_str = match.group(1) try: # 转换为浮点数 xj_value = float(number_str) logging.info(f"线径数据: {xj_value}") # 过滤无效值:>10 或 =0 的值直接跳过 if xj_value/10000 > 10 or xj_value/10000 == 0: return False # 更新数据 self.data['xj'] = xj_value self._write_data_to_file() # 构建MainWindow.on_diameter_data_received期望的格式 callback_data_str = f"线径数据: {xj_value}" if 'xj_data' in self.callbacks: try: # 与米电阻类似,传递实际的串口名称 logging.info(f"线径回调开始调用,回调对象: {self.callbacks['xj_data'].__self__.__class__.__name__}, 数据: {xj_value}") self.callbacks['xj_data'](port_name, callback_data_str.encode('utf-8')) logging.info(f"通知 'xj_data' 回调成功. 值: {xj_value}, 串口: {port_name}") except Exception as e: logging.error(f"调用 'xj_data' 回调失败: {e}") else: # 如果未注册回调,仍然使用通用方法通知 logging.warning(f"未找到xj_data回调,使用通用_notify_callbacks方法") self._notify_callbacks('xj_data', {"type": "xj", "value": self.data['xj'], "source": f"serial ({port_name})"}) return True except ValueError: logging.warning(f"线径数据字符串 '{number_str}' 无法转换为浮点数") else: logging.warning(f"线径数据中未找到有效的浮点数") except Exception as e: logging.error(f"处理线径数据异常: {e}") else: logging.warning("线径响应数据为空") # 如果无法解析,则直接返回失败 return False except Exception as e: logging.error(f"处理线径数据总体异常: {e}") return False def _read_scanner_thread(self, port_name: str): """ 扫码器串口读取线程 Args: port_name: 串口名称 """ try: logging.info(f"[{port_name}] 扫码器线程启动") while self.running_flags.get(port_name, False): if not self.is_port_open(port_name): time.sleep(0.1) continue # 检查是否有数据可读 if self.serial_ports[port_name].in_waiting > 0: response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting) self._process_scanner_response(port_name, response) time.sleep(0.1) except Exception as e: logging.error(f"扫码器串口 {port_name} 读取线程异常: {e}") # 线程异常时,尝试重置状态 self.running_flags[port_name] = False def _process_scanner_response(self, port_name, response_bytes: bytes): """处理扫码器响应数据""" try: if response_bytes: # 确保有响应数据 try: # 尝试解码为字符串 scanner_value = response_bytes.decode('utf-8').strip() # 记录日志 logging.info(f"[{port_name}] 扫码数据: {scanner_value}") # 更新数据 self.data['scanner'] = scanner_value # 写入文件并通知回调 self._write_data_to_file() # 使用"扫码数据: xxx"格式通知回调 callback_data = f"扫码数据: {scanner_value}".encode('utf-8') if 'scanner_data' in self.callbacks: self.callbacks['scanner_data'](port_name, callback_data) return True except Exception as e: logging.error(f"处理扫码数据异常: {e}") # 解码失败,尝试直接使用字节数据 # 记录日志(十六进制字符串) hex_str = ' '.join(f'{b:02X}' for b in response_bytes) logging.warning(f"[{port_name}] 扫码数据(十六进制): {hex_str}") # 更新数据(使用十六进制字符串) self.data['scanner'] = hex_str # 写入文件并通知回调 self._write_data_to_file() # 使用"扫码数据: xxx"格式通知回调 callback_data = f"扫码数据: {hex_str}".encode('utf-8') if 'scanner_data' in self.callbacks: self.callbacks['scanner_data'](port_name, callback_data) return True else: logging.warning("扫码响应数据为空") # 如果无法解析,则直接返回失败 return False except Exception as e: logging.error(f"处理扫码数据总体异常: {e}") return False def _start_thread_monitor(self): """启动线程监控""" threading.Thread(target=self._monitor_threads, daemon=True).start() def _monitor_threads(self): """监控线程状态""" try: logging.info("线程监控已启动") while True: try: # 创建当前线程的副本,避免在迭代过程中修改字典 thread_items = list(self.read_threads.items()) for port_name, thread in thread_items: # 检查线程是否存活 if not thread.is_alive(): # 检查串口是否仍然打开 if port_name in self.serial_ports and self.is_port_open(port_name): port_type = self.port_types.get(port_name) callback = self.callbacks.get(port_name) logging.warning(f"线程 {thread.name} 已终止但串口仍然打开,尝试重新启动线程") # 重置线程状态 self.running_flags[port_name] = False if port_name in self.read_threads: self.read_threads.pop(port_name, None) # 如果有端口类型记录,尝试重新启动线程 if port_type: # 短暂等待,确保资源释放 time.sleep(0.5) try: # 重新打开串口 self.open_port(port_name, port_type, callback=callback) logging.info(f"已重新启动线程: {port_name} ({port_type})") except Exception as restart_error: logging.error(f"重新启动线程失败: {port_name}, 错误: {restart_error}") else: logging.warning(f"无法重启线程: {port_name}, 未找到端口类型记录") else: # 串口已关闭,清理线程记录 if port_name in self.read_threads: self.read_threads.pop(port_name, None) logging.info(f"线程 {thread.name} 已终止,串口已关闭,清理线程记录") # 每隔5秒检查一次 time.sleep(5) except Exception as loop_error: logging.error(f"线程监控循环异常: {loop_error}") time.sleep(5) # 出错后等待一段时间再继续 except Exception as e: logging.error(f"线程监控主循环异常: {e}") # 尝试重新启动监控 time.sleep(10) self._start_thread_monitor()