jiateng_ws/utils/serial_manager.py

1140 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import threading
import time
import logging
import os
import json
from typing import Dict, Optional, Callable
from utils.config_loader import ConfigLoader
from utils.keyboard_listener import KeyboardListener
from pynput.keyboard import Key
import re
import platform
class SerialManager:
"""串口管理器,单例模式"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(SerialManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.serial_ports: Dict[str, serial.Serial] = {} # 存储打开的串口对象
self.read_threads: Dict[str, threading.Thread] = {} # 存储读取线程
self.running_flags: Dict[str, bool] = {} # 存储线程运行标志
self.callbacks: Dict[str, Callable] = {} # 存储数据回调函数
# 添加文件操作暂停控制
self._file_operations_suspended = False
self._file_operations_lock = threading.Lock()
# 添加称重稳定性检测变量
self.last_weight = 0
self.stable_count = 0
self.weight_written = False # 初始化为False确保首次称重能够正确处理
# 添加抗抖动变量
self.last_weights = [0] * 3 # 存储最近3个重量值
self.weight_changed_time = time.time() # 上次重量变化的时间
self.last_write_time = 0 # 最后写入时间
# 稳定性时间跟踪
self.stability_start_time = 0 # 开始检测稳定性的时间
# 数据存储
self.data = {
'mdz': 0,
'cz': 0
}
# 是否自动查询米电阻数据默认为False只通过PageUp键触发
self.auto_query_mdz = False
logging.info("初始化 SerialManager")
# 加载配置
self._load_config()
# 检查是否启用键盘监听功能
enable_keyboard_listener = self.config.get_value('app.features.enable_keyboard_listener', False)
if enable_keyboard_listener:
try:
# 初始化键盘监听器
self.keyboard_listener = KeyboardListener()
# 从配置中获取触发键
trigger_key = self.config.get_value('serial.keyboard.trigger_key', 'Key.page_up')
# 注册触发键回调
self.keyboard_listener.register_callback(trigger_key, self.trigger_resistance_query)
logging.info(f"已注册{trigger_key}按键回调用于触发米电阻查询")
except Exception as e:
logging.error(f"初始化键盘监听器失败: {e}")
# 创建一个空的键盘监听器对象以避免后续代码出现NoneType错误
self.keyboard_listener = None
else:
logging.info("键盘监听功能已在配置中禁用,跳过初始化键盘监听器")
self.keyboard_listener = None
def _load_config(self):
"""加载配置"""
try:
config_loader = ConfigLoader.get_instance() # Renamed for clarity
self.config = config_loader # Assign the whole config object
# 获取数据文件路径
self.data_file = self.config.get_value('serial.data_file', 'data.txt')
# 确保是绝对路径
if not os.path.isabs(self.data_file):
self.data_file = os.path.abspath(self.data_file)
logging.info(f"最终确定的 data_file 绝对路径: {self.data_file}")
# 获取串口配置
self.mdz_config = self.config.get_config('mdz')
self.cz_config = self.config.get_config('cz')
# 检查操作系统类型在macOS上处理COM端口名称问题
os_type = platform.system()
if os_type == "Darwin":
# 检查是否需要自动检测macOS上的串口
macos_autodetect = self.config.get_value('serial.os_config.macos_autodetect', False)
if macos_autodetect:
logging.info("在macOS上启用了串口自动检测")
self._detect_macos_ports()
# 获取默认的稳定阈值
self.stable_threshold = self.cz_config.get('stable_threshold', 10) if self.cz_config else 10
# 检查是否自动查询米电阻数据
self.auto_query_mdz = self.config.get_value('serial.keyboard.auto_query', False)
logging.info(f"已加载串口配置mdz={self.mdz_config}, cz={self.cz_config}, data_file={self.data_file}")
logging.info(f"米电阻自动查询: {'开启' if self.auto_query_mdz else '关闭'}")
except Exception as e:
logging.error(f"加载配置出错: {e}")
# 设置默认值
self.data_file = os.path.abspath('data.txt')
self.mdz_config = {'port': 9600, 'ser': 'COM5'}
self.cz_config = {'port': 9600, 'ser': 'COM2', 'stable_threshold': 10}
self.stable_threshold = 10
self.auto_query_mdz = False
logging.info(f"使用默认配置,数据文件: {self.data_file}")
def _detect_macos_ports(self):
"""在macOS上检测可用的串口"""
try:
if platform.system() != "Darwin":
logging.info("不是macOS系统跳过串口检测")
return
logging.info("正在检测macOS可用串口...")
# 检查/dev目录下的tty.*和cu.*设备
import glob
tty_ports = glob.glob('/dev/tty.*')
cu_ports = glob.glob('/dev/cu.*')
all_ports = tty_ports + cu_ports
if not all_ports:
logging.warning("未检测到macOS上的串口设备")
return
logging.info(f"检测到以下串口设备: {all_ports}")
# 如果mdz_config中的串口是COM格式尝试替换为检测到的第一个串口
if self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser'].startswith('COM'):
# 优先选择包含"usb"的设备
usb_ports = [port for port in all_ports if 'usb' in port.lower()]
if usb_ports:
self.mdz_config['ser'] = usb_ports[0]
logging.info(f"自动将米电阻串口从COM格式替换为: {usb_ports[0]}")
elif all_ports:
self.mdz_config['ser'] = all_ports[0]
logging.info(f"自动将米电阻串口从COM格式替换为: {all_ports[0]}")
# 如果cz_config中的串口是COM格式尝试替换为检测到的第二个串口
if self.cz_config and 'ser' in self.cz_config and self.cz_config['ser'].startswith('COM'):
# 优先选择包含"usb"的设备并且不是已分配给mdz的设备
usb_ports = [port for port in all_ports if 'usb' in port.lower()
and (not self.mdz_config or port != self.mdz_config.get('ser'))]
if usb_ports:
self.cz_config['ser'] = usb_ports[0]
logging.info(f"自动将线径串口从COM格式替换为: {usb_ports[0]}")
elif len(all_ports) > 1:
# 选择不是mdz_config已使用的第一个端口
for port in all_ports:
if not self.mdz_config or port != self.mdz_config.get('ser'):
self.cz_config['ser'] = port
logging.info(f"自动将线径串口从COM格式替换为: {port}")
break
elif all_ports and (not self.mdz_config or all_ports[0] != self.mdz_config.get('ser')):
self.cz_config['ser'] = all_ports[0]
logging.info(f"自动将线径串口从COM格式替换为: {all_ports[0]}")
except Exception as e:
logging.error(f"检测macOS串口时发生错误: {e}")
logging.info("将继续使用配置文件中的串口设置")
def open_port(self, port_name: str, port_type: str, baud_rate: Optional[int] = None,
data_bits: int = 8, stop_bits: int = 1,
parity: str = 'N', timeout: float = 1.0,
callback: Optional[Callable] = None) -> bool:
"""
打开串口
Args:
port_name: 串口名称如COM1
port_type: 串口类型,'cz'表示称重,'mdz'表示米电阻
baud_rate: 波特率如果为None则从配置文件读取
data_bits: 数据位
stop_bits: 停止位
parity: 校验位N-无校验E-偶校验O-奇校验
timeout: 超时时间,单位秒
callback: 数据回调函数,接收参数为(port_name, data)
Returns:
是否成功打开
"""
try:
# 如果波特率为None从配置文件读取
if baud_rate is None:
if port_type == 'cz' and self.cz_config:
baud_rate = self.cz_config.get('port', 9600)
elif port_type == 'mdz' and self.mdz_config:
baud_rate = self.mdz_config.get('port', 9600)
else:
baud_rate = 9600 # 默认波特率
# 如果串口已经打开,先关闭
if port_name in self.serial_ports:
self.close_port(port_name)
# 打开串口
ser = serial.Serial(
port=port_name,
baudrate=baud_rate,
bytesize=data_bits,
stopbits=stop_bits,
parity=parity,
timeout=timeout
)
if not ser.is_open:
ser.open()
# 存储串口对象
self.serial_ports[port_name] = ser
logging.info(f"串行对象 for {port_name} 存储在 self.serial_ports 中. 当前活跃端口: {list(self.serial_ports.keys())}")
# 设置回调
if callback:
self.callbacks[port_name] = callback
# 启动读取线程
self.running_flags[port_name] = True
# 根据串口类型选择不同的读取线程
if port_type == 'cz':
thread = threading.Thread(target=self._read_weight_thread, args=(port_name, self.stable_threshold))
thread.daemon = True
thread.start()
self.read_threads[port_name] = thread
elif port_type == 'mdz':
thread = threading.Thread(target=self._read_resistance_thread, args=(port_name,))
thread.daemon = True
thread.start()
self.read_threads[port_name] = thread
else:
# 默认读取线程
thread = threading.Thread(target=self._read_thread, args=(port_name,))
thread.daemon = True
thread.start()
self.read_threads[port_name] = thread
logging.info(f"串口 {port_name} ({port_type}) 已打开,波特率={baud_rate}")
return True
except Exception as e:
logging.error(f"打开串口 {port_name} 失败: {str(e)}")
if port_name in self.serial_ports: # 清理,以防部分成功
del self.serial_ports[port_name]
logging.info(f"打开 {port_name} 失败后, 当前活跃端口: {list(self.serial_ports.keys())}")
return False
def close_port(self, port_name: str) -> bool:
"""
关闭串口
Args:
port_name: 串口名称
Returns:
是否成功关闭
"""
try:
# 停止读取线程
if port_name in self.running_flags:
self.running_flags[port_name] = False
# 等待线程结束
if port_name in self.read_threads:
if self.read_threads[port_name].is_alive():
self.read_threads[port_name].join(1.0) # 最多等待1秒
del self.read_threads[port_name]
# 关闭串口
if port_name in self.serial_ports:
if self.serial_ports[port_name].is_open:
self.serial_ports[port_name].close()
del self.serial_ports[port_name]
logging.info(f"串行对象 for {port_name} 从 self.serial_ports 中删除. 当前活跃端口: {list(self.serial_ports.keys())}")
# 删除回调
if port_name in self.callbacks:
del self.callbacks[port_name]
logging.info(f"串口 {port_name} 已关闭")
return True
except Exception as e:
logging.error(f"关闭串口 {port_name} 失败: {str(e)}")
return False
def is_port_open(self, port_name: str) -> bool:
"""
检查串口是否打开
Args:
port_name: 串口名称
Returns:
是否打开
"""
return port_name in self.serial_ports and self.serial_ports[port_name].is_open
def write_data(self, port_name: str, data: bytes) -> bool:
"""
向串口写入数据
Args:
port_name: 串口名称
data: 要写入的数据
Returns:
是否成功写入
"""
try:
if not self.is_port_open(port_name):
return False
self.serial_ports[port_name].write(data)
return True
except Exception as e:
logging.error(f"向串口 {port_name} 写入数据失败: {str(e)}")
return False
def _read_thread(self, port_name: str):
"""
串口读取线程
Args:
port_name: 串口名称
"""
try:
while self.running_flags.get(port_name, False):
if not self.is_port_open(port_name):
time.sleep(0.1)
continue
# 读取数据
if self.serial_ports[port_name].in_waiting:
data = self.serial_ports[port_name].readline()
# 调用回调函数
if port_name in self.callbacks and data:
try:
self.callbacks[port_name](port_name, data)
except Exception as e:
logging.error(f"调用串口 {port_name} 回调函数失败: {str(e)}")
time.sleep(0.01) # 短暂休眠避免CPU占用过高
except Exception as e:
logging.error(f"串口 {port_name} 读取线程异常: {str(e)}")
def _read_weight_thread(self, port_name: str, stable_threshold: int = 10):
logging.info(f"[{port_name}] 称重线程启动")
# 重置状态变量,确保线程重启时能正确处理称重数据
self.weight_written = False
self.stable_count = 0
self.last_weight = 0
self.last_weights = [0] * 3
self.weight_changed_time = time.time()
self.last_write_time = 0 # 添加一个变量来跟踪最后一次写入时间
self.stability_start_time = 0 # 重置稳定性检测开始时间
# 定义处理重量更新的函数,避免代码重复
def process_weight_update(current_weight, source_text):
# 更新最近重量记录用于抗抖动
self.last_weights.pop(0) # 移除最旧的记录
self.last_weights.append(current_weight) # 添加新记录
# --- 稳定性及后续处理 ---
# 检查重量是否稳定容忍0.01以内的微小波动
is_stable = abs(current_weight - self.last_weight) < 0.01
# 添加重量检查确保重量大于1
if is_stable and current_weight < 1:
is_stable = False # 重量小于1时不认为是稳定的
# 检查重量是否有重大变化超过0.05)或者长时间未更新(>30秒
now = time.time()
significant_change = abs(current_weight - self.last_weight) >= 0.05
time_since_last_change = now - self.weight_changed_time
force_update = time_since_last_change > 30 and not self.weight_written
if is_stable:
# 如果这是第一次检测到稳定,记录开始时间
if self.stable_count == 0:
self.stability_start_time = now
self.stable_count += 1
# 检查时间要求 - 确保10次稳定读数在2秒内发生
time_in_stability = now - self.stability_start_time
meets_time_requirement = time_in_stability <= 2.0 or self.stable_count < stable_threshold
# 如果稳定时间超过2秒但还没达到稳定阈值重置计数
if not meets_time_requirement:
logging.warning(f"[{port_name}] 稳定检测超时 - {time_in_stability:.2f}秒内只有{self.stable_count}次稳定读数,重置计数")
self.stable_count = 1 # 重置为1而不是0因为当前读数是稳定的
self.stability_start_time = now # 重置开始时间
# 仅在达到特定阈值时记录日志
if self.stable_count % 5 == 0 or self.stable_count >= stable_threshold:
logging.info(f"[{port_name}] 称重稳定计数: {self.stable_count}/{stable_threshold}, 经过时间: {time_in_stability:.2f}")
# 添加"ST"标记到数据中模拟PB代码中的稳定标志
st_marker = ""
if self.stable_count >= stable_threshold:
st_marker = " ST" # 添加ST标记表示完全稳定
if port_name in self.callbacks:
try:
# 将稳定标记添加到回调数据中
callback_data = f"称重数据: {current_weight}, 稳定计数: {self.stable_count}/{stable_threshold}{st_marker}".encode()
self.callbacks[port_name](port_name, callback_data)
except Exception as e:
logging.error(f"[{port_name}] 调用串口回调失败: {str(e)}")
if self.stable_count >= stable_threshold and not self.weight_written:
logging.info(f"[{port_name}] 称重数据稳定,写入: {current_weight}")
self.data['cz'] = current_weight
# 添加写入文件的限流确保最少间隔0.5秒才写入一次
current_time = time.time()
if current_time - self.last_write_time > 0.5:
try:
self._write_data_to_file()
self.last_write_time = current_time
except Exception as e:
logging.error(f"写入数据文件失败: {str(e)}")
self.weight_written = True
elif force_update:
logging.info(f"[{port_name}] 长时间未更新,强制更新重量: {current_weight}")
self.data['cz'] = current_weight
# 同样对强制更新添加限流
current_time = time.time()
if current_time - self.last_write_time > 0.5:
try:
self._write_data_to_file()
self.last_write_time = current_time
except Exception as e:
logging.error(f"写入数据文件失败: {str(e)}")
self.weight_written = True
self.weight_changed_time = now
elif significant_change:
# 记录旧值,防止日志中的循环引用
old_weight = self.last_weight
self.stable_count = 0
self.weight_written = False # 确保在重量变化时重置写入标志
self.last_weight = current_weight
self.weight_changed_time = now
logging.info(f"[{port_name}] 重量变化: {old_weight} -> {current_weight}")
# 重量变化时也调用回调但不添加ST标记
if port_name in self.callbacks:
try:
callback_data = f"称重数据: {current_weight}, 稳定计数: 0/{stable_threshold}".encode()
self.callbacks[port_name](port_name, callback_data)
except Exception as e:
logging.error(f"[{port_name}] 调用串口回调失败: {str(e)}")
else:
# 微小波动,保持当前状态,不重置计数器
# 即使有微小波动也要更新UI显示但不影响稳定计数
if port_name in self.callbacks:
try:
# 如果接近稳定阈值但还未完全稳定,添加不同的标记
st_partial = ""
if self.stable_count >= stable_threshold * 0.7: # 达到阈值的70%
st_partial = " ST_PARTIAL"
callback_data = f"称重数据: {current_weight}, 稳定计数: {self.stable_count}/{stable_threshold}{st_partial}".encode()
self.callbacks[port_name](port_name, callback_data)
except Exception as e:
logging.error(f"[{port_name}] 调用串口回调失败: {str(e)}")
# --- 结束稳定性处理 ---
return
try:
read_count = 0 # 添加读取计数器,用于调整睡眠时间
last_read_time = time.time() # 记录上次读取时间
while self.running_flags.get(port_name, False):
if not self.is_port_open(port_name):
time.sleep(0.1)
continue
read_count += 1
current_time = time.time()
try:
if self.serial_ports[port_name].in_waiting > 0:
# 等待一小段时间让数据完整接收,避免数据帧被拆分
time.sleep(0.05) # 减少等待时间,提高响应速度
# 直接读取所有可用数据
available_bytes = self.serial_ports[port_name].in_waiting
raw_data = self.serial_ports[port_name].read(available_bytes)
# 有数据读取时重置计数器
read_count = 0
last_read_time = current_time
# 尝试解码为字符串
try:
data_str = raw_data.decode('ascii', errors='replace')
# 基于截图中的数据格式,尝试直接用正则表达式搜索"ST,NT,+ X.XXkg"格式
import re
# 完整格式的正则表达式,匹配称重显示格式
full_pattern = r"ST,NT,\+\s+(\d+\.\d+)kg"
matches = re.findall(full_pattern, data_str)
if matches:
for weight_str in matches:
# 计算重量 - 直接使用原始值,无需除以分度值
raw_value = float(weight_str)
current_weight = raw_value # 直接使用原始值,不再除以 bit_value
# 处理重量更新
process_weight_update(current_weight, f"完整格式: '{weight_str}kg'")
else:
# 如果没有找到完整格式,尝试按行分割处理
# 按行分割,处理每一行数据
lines = data_str.split('\r\n')
for line in lines:
if not line.strip():
continue
# 提取重量值 - 查找任何包含数字的部分
weight_pattern = r"([+-]?)\s*(\d+\.?\d*|\.\d+)"
match = re.search(weight_pattern, line)
if match:
sign = match.group(1) or '+' # 如果没有符号,默认为正
number = match.group(2)
cleaned_weight_str = sign + number
# 计算重量 - 直接使用原始值,无需除以分度值
raw_value = float(cleaned_weight_str)
current_weight = raw_value # 直接使用原始值,不再除以 bit_value
# 处理重量更新
process_weight_update(current_weight, f"行处理: '{line}'")
else:
logging.warning(f"[{port_name}] 无法从行数据中提取有效的数字")
except UnicodeDecodeError:
logging.warning(f"[{port_name}] 无法解码数据为ASCII")
except Exception as e:
logging.error(f"[{port_name}] 处理数据时出错: {str(e)}")
except Exception as e:
logging.error(f"[{port_name}] 读取处理异常: {str(e)}")
# 动态调整睡眠时间减少CPU占用
# 如果长时间没有读取到数据,增加睡眠时间
sleep_time = 0.01 # 基础睡眠时间
if read_count > 100: # 连续多次无数据读取
time_since_last_read = current_time - last_read_time
if time_since_last_read > 5: # 如果超过5秒没有数据
sleep_time = 0.2 # 增加到较长的睡眠时间
elif time_since_last_read > 2: # 如果超过2秒没有数据
sleep_time = 0.1 # 增加到中等睡眠时间
elif read_count > 1000: # 非常长时间无数据
sleep_time = 0.3 # 更长的睡眠时间
time.sleep(sleep_time)
except Exception as e:
logging.error(f"[{port_name}] 主循环异常: {str(e)}")
logging.info(f"[{port_name}] 称重线程结束")
def _process_weight_data(self, data_bytes):
"""
处理原始称重数据
Args:
data_bytes: 原始字节数据
Returns:
解析后的重量值或None
"""
try:
# 尝试多种编码方式解析
weight = None
# 记录原始数据以便调试
logging.debug(f"称重原始数据: {data_bytes.hex()}")
# 方法1: 尝试ASCII解码
try:
ascii_str = data_bytes.decode('ascii', errors='replace')
# 仅提取数字部分
import re
numbers = re.findall(r'\d+', ascii_str)
if numbers:
weight = float(numbers[0]) / 10.0
logging.debug(f"ASCII解码成功: {weight}")
except Exception as e:
logging.debug(f"ASCII解码失败: {e}")
# 方法2: 尝试直接从二进制解析 (根据具体协议)
if weight is None and len(data_bytes) >= 8:
try:
# 假设重量在特定位置,具体需根据实际协议调整
weight_bytes = data_bytes[2:6]
weight = int.from_bytes(weight_bytes, byteorder='big') / 10.0
logging.debug(f"二进制解码成功: {weight}")
except Exception as e:
logging.debug(f"二进制解码失败: {e}")
return weight
except Exception as e:
logging.error(f"处理称重数据失败: {e}")
return None
def _read_resistance_thread(self, port_name: str):
"""
米电阻串口读取线程
Args:
port_name: 串口名称
"""
try:
while self.running_flags.get(port_name, False):
if not self.is_port_open(port_name):
time.sleep(0.1)
continue
# 如果不是自动查询模式,则只监听串口数据而不主动发送查询
if not self.auto_query_mdz:
# 检查是否有数据可读
if self.serial_ports[port_name].in_waiting > 0:
response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting)
self._process_mdz_response(port_name, response)
time.sleep(0.1)
continue
# 以下代码只在自动查询模式下执行
try:
# 发送查询指令
hex_data = '01 03 00 01 00 07 55 C8'
byte_data = bytes.fromhex(hex_data.replace(' ', ''))
self.serial_ports[port_name].write(byte_data)
# 等待响应
time.sleep(1)
if self.serial_ports[port_name].in_waiting > 0:
response = self.serial_ports[port_name].read(self.serial_ports[port_name].in_waiting)
self._process_mdz_response(port_name, response)
except Exception as e:
logging.error(f"米电阻数据处理异常: {e}")
# 每5秒查询一次
for i in range(50):
if not self.running_flags.get(port_name, False):
break
time.sleep(0.1)
except Exception as e:
logging.error(f"米电阻串口 {port_name} 读取线程异常: {e}")
def _process_mdz_response(self, port_name, response_bytes: bytes):
"""处理米电阻响应数据"""
try:
if response_bytes: # 确保有响应数据
try:
# 转换为字符串用于日志记录
response_str = str(response_bytes)
logging.warning(f"[{port_name}] 米电阻数据: {response_str}")
# 使用正则表达式直接提取数字
# 查找格式为11.58201这样的浮点数
match = re.search(r'(\d+\.\d+)', response_str)
if match:
number_str = match.group(1)
try:
# 转换为浮点数
mdz_value = float(number_str)
logging.info(f"米电阻数据: {mdz_value}")
# 更新数据
self.data['mdz'] = mdz_value
self._write_data_to_file()
self._notify_callbacks('mdz_data', {"type": "mdz", "value": self.data['mdz'], "source": f"serial ({port_name})"})
return True
except ValueError:
logging.warning(f"米电阻数据字符串 '{number_str}' 无法转换为浮点数")
else:
logging.warning(f"米电阻数据中未找到有效的浮点数")
except Exception as e:
logging.error(f"处理米电阻数据异常: {e}")
else:
logging.warning("米电阻响应数据为空")
# 如果无法解析,则不再使用模拟数据,直接返回失败
return False
except Exception as e:
logging.error(f"米电阻数据处理关键异常: {e}")
return False
def _write_data_to_file(self):
"""将数据写入文件"""
try:
# 检查文件操作是否已暂停
with self._file_operations_lock:
if self._file_operations_suspended:
logging.info("文件操作已暂停,跳过写入")
return
# 构建数据字符串
data_str = f"mdz:{self.data['mdz']}|cz:{self.data['cz']}|"
# 确保目录存在
data_dir = os.path.dirname(self.data_file)
if data_dir and not os.path.exists(data_dir):
logging.info(f"创建目录: {data_dir}")
os.makedirs(data_dir, exist_ok=True)
# 写入文件 - 使用临时文件写入然后重命名,避免文件锁定问题
# 创建临时文件
temp_file = f"{self.data_file}.tmp"
try:
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(data_str)
f.flush()
os.fsync(f.fileno()) # 确保数据写入磁盘
# 再次检查文件操作是否已暂停
with self._file_operations_lock:
if self._file_operations_suspended:
logging.info("文件操作已暂停,已写入临时文件但取消重命名操作")
return
# 原子性地重命名文件,替换原有文件(在大多数操作系统上是原子操作)
if os.path.exists(self.data_file):
try:
os.remove(self.data_file)
except Exception as e:
logging.warning(f"无法删除旧数据文件: {e}, 尝试直接覆盖")
os.rename(temp_file, self.data_file)
logging.info(f"数据已写入文件: {self.data_file}")
except Exception as e:
logging.error(f"写入临时文件失败: {e}")
# 清理临时文件
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except:
pass
raise # 重新抛出异常
except Exception as e:
logging.error(f"写入数据文件失败: {e}, 文件路径: {self.data_file}")
def get_current_data(self):
"""获取当前数据"""
return self.data.copy()
def close_all_ports(self):
"""关闭所有串口"""
port_names = list(self.serial_ports.keys())
for port_name in port_names:
self.close_port(port_name)
def reload_config(self):
"""重新加载配置"""
self._load_config()
logging.info("已重新加载串口配置")
def start_keyboard_listener(self):
"""启动键盘监听"""
try:
# 检查是否启用键盘监听功能
enable_keyboard_listener = self.config.get_value('app.features.enable_keyboard_listener', False)
if not enable_keyboard_listener:
logging.info("键盘监听功能已在配置中禁用,跳过启动键盘监听")
return False
# 检查键盘监听器是否已初始化
if self.keyboard_listener is None:
logging.warning("键盘监听器未初始化,无法启动")
return False
# 从配置中获取触发键
config = ConfigLoader.get_instance()
trigger_key = config.get_value('serial.keyboard.trigger_key', 'Key.page_up')
# 确保已注册触发键回调
if trigger_key not in self.keyboard_listener.callbacks:
self.keyboard_listener.register_callback(trigger_key, self.trigger_resistance_query)
logging.info(f"已注册{trigger_key}按键回调用于触发米电阻数据查询")
# 启动键盘监听
result = self.keyboard_listener.start()
if result:
logging.info(f"已启动键盘监听,按 {trigger_key} 键可触发米电阻数据查询")
# 检查监听器状态
if self.keyboard_listener.is_active():
logging.info("键盘监听器处于活动状态")
else:
logging.warning("键盘监听器启动完成,但状态检查显示不活动")
else:
logging.error("启动键盘监听失败")
return result
except Exception as e:
logging.error(f"启动键盘监听失败: {e}")
return False
def stop_keyboard_listener(self, join_thread=False):
"""停止键盘监听
Args:
join_thread: 是否等待键盘监听线程结束
"""
try:
# 检查键盘监听器是否已初始化
if self.keyboard_listener is None:
logging.info("键盘监听器未初始化,无需停止")
return
self.keyboard_listener.stop()
logging.info("已停止键盘监听")
# 如果需要等待线程结束
if join_thread:
try:
self.keyboard_listener.join(timeout=1.0) # 最多等待1秒
except Exception as e:
logging.error(f"等待键盘监听线程结束时出错: {e}")
except Exception as e:
logging.error(f"停止键盘监听失败: {e}")
def register_callback(self, key, callback):
"""
注册数据回调函数
Args:
key: 回调标识,如 'mdz_data'
callback: 回调函数,参数为 (port_name, data)
"""
try:
if key in self.callbacks:
logging.warning(f"覆盖已存在的回调函数: {key}")
self.callbacks[key] = callback
logging.info(f"已注册回调函数: {key}")
except Exception as e:
logging.error(f"注册回调失败: {e}")
def trigger_resistance_query(self):
"""触发米电阻数据查询,如果串口未打开,则尝试临时打开并查询"""
# 直接打印到控制台,确保可见
print("\n[米电阻查询] PageUp键被按下正在触发米电阻数据查询...\n")
# 检查是否启用串口功能
enable_serial_ports = self.config.get_value('app.features.enable_serial_ports', False)
if not enable_serial_ports:
logging.info("串口功能已在配置中禁用,跳过米电阻数据查询")
print("\n[米电阻查询] 串口功能已禁用,无法查询\n")
return
# 检查是否启用键盘监听功能 - 如果这个按键是通过键盘触发的,应该尊重键盘监听器配置
enable_keyboard_listener = self.config.get_value('app.features.enable_keyboard_listener', False)
if not enable_keyboard_listener:
logging.info("键盘监听功能已在配置中禁用但收到了Page Up触发检查是否为其他来源的调用")
print("\n[米电阻查询] 键盘监听功能已禁用,但收到了触发\n")
# 这里我们仍然继续执行,因为该方法可能由其他非键盘源调用
# 从配置中获取触发键
config = ConfigLoader.get_instance()
trigger_key = config.get_value('serial.keyboard.trigger_key', 'Key.page_up')
logging.info(f"[SerialManager] {trigger_key}键按下,正在触发米电阻数据查询...")
print(f"\n[米电阻查询] {trigger_key}键按下,正在处理...\n")
if not self.mdz_config:
logging.error("[SerialManager] 米电阻配置 (mdz_config) 未加载,无法查询。")
print("\n[米电阻查询] 配置未加载,无法查询\n")
return
mdz_port_name = self.mdz_config.get('ser')
query_cmd_hex = self.mdz_config.get('query_cmd', '01030001000755C8')
baud_rate = self.mdz_config.get('port', 9600)
timeout = self.mdz_config.get('timeout', 1.0)
data_bits = self.mdz_config.get('data_bits', 8)
stop_bits = self.mdz_config.get('stop_bits', 1)
parity_char = self.mdz_config.get('parity', 'N')
parity = parity_char # Use the character directly ('N', 'E', or 'O')
logging.info(f"[SerialManager] 使用校验位设置: {parity}")
print(f"\n[米电阻查询] 串口配置: {mdz_port_name}, {baud_rate}, {data_bits}, {stop_bits}, {parity}\n")
temp_ser = None
try:
byte_data = bytes.fromhex(query_cmd_hex.replace(' ', ''))
logging.info(f"[SerialManager] 准备发送米电阻查询指令: {byte_data.hex(' ').upper()} 到端口 {mdz_port_name}")
print(f"\n[米电阻查询] 准备发送查询指令: {byte_data.hex(' ').upper()}\n")
# 检查 SerialManager 是否已管理此端口且已打开
if self.is_port_open(mdz_port_name):
logging.info(f"[SerialManager] 米电阻串口 {mdz_port_name} 已由 SerialManager 管理并打开,直接发送指令。")
print(f"\n[米电阻查询] 串口 {mdz_port_name} 已打开,直接发送指令\n")
if self.write_data(mdz_port_name, byte_data):
logging.info(f"[SerialManager] 指令已发送到 {mdz_port_name} (通过已打开的串口)。响应将由读取线程处理。")
print("\n[米电阻查询] 指令发送成功,等待响应\n")
# 当串口已打开时,指令发送后,响应会由 _read_resistance_thread 捕获并处理。
# _read_resistance_thread 内部的 _process_mdz_response 会负责更新 self.data,
# 调用 _write_data_to_file 和 _notify_callbacks。
# 因此,这里不需要再显式地 sleep 后调用 _write_data_to_file 和 _notify_callbacks
# 以避免数据竞争或重复通知。
return # 指令已发送,等待线程处理
else:
logging.warning(f"[SerialManager] 向已打开的串口 {mdz_port_name} 发送指令失败。将尝试临时打开。")
print("\n[米电阻查询] 指令发送失败,尝试临时打开串口\n")
# 如果串口未被 SerialManager 管理或发送失败,则尝试临时打开
logging.info(f"[SerialManager] 米电阻串口 {mdz_port_name} 未打开或发送失败。尝试临时打开并查询...")
print(f"\n[米电阻查询] 串口 {mdz_port_name} 未打开,尝试临时打开\n")
temp_ser = serial.Serial(
port=mdz_port_name,
baudrate=baud_rate,
bytesize=data_bits,
stopbits=stop_bits,
parity=parity,
timeout=timeout
)
if not temp_ser.is_open:
temp_ser.open()
temp_ser.write(byte_data)
logging.info(f"[SerialManager] 指令已通过临时串口发送到 {mdz_port_name}。等待响应...")
print("\n[米电阻查询] 指令已通过临时串口发送,等待响应\n")
time.sleep(0.1) # 等待设备响应
response_bytes = b''
if temp_ser.in_waiting > 0:
response_bytes = temp_ser.read(temp_ser.in_waiting)
if response_bytes:
logging.info(f"[SerialManager] 收到来自 {mdz_port_name} (临时串口) 的响应: {response_bytes.hex(' ').upper()}")
print(f"\n[米电阻查询] 收到响应: {response_bytes.hex(' ').upper()}\n")
# 将响应交给标准的处理函数
parse_success = self._process_mdz_response(mdz_port_name, response_bytes)
if not parse_success:
logging.warning(f"[SerialManager] _process_mdz_response未能成功处理来自临时串口{mdz_port_name}的响应。将依赖其内部的mock/old data逻辑。")
print("\n[米电阻查询] 响应解析失败\n")
# _process_mdz_response 内部在失败时会处理 mock/old data 及文件写入和通知,这里无需额外操作。
else:
logging.warning(f"[SerialManager] 未收到来自 {mdz_port_name} (临时串口) 的响应。")
print("\n[米电阻查询] 未收到响应\n")
except serial.SerialException as se:
logging.error(f"[SerialManager] 临时打开或操作串口 {mdz_port_name} 失败: {se}")
print(f"\n[米电阻查询] 串口操作失败: {se}\n")
except ValueError as ve:
logging.error(f"[SerialManager] 指令转换错误或响应解析错误 (临时查询): {ve}")
print(f"\n[米电阻查询] 指令转换或响应解析错误: {ve}\n")
except Exception as e:
logging.error(f"[SerialManager] 触发米电阻查询时发生未知错误 (临时查询): {e}", exc_info=True)
print(f"\n[米电阻查询] 未知错误: {e}\n")
finally:
if temp_ser and temp_ser.is_open:
temp_ser.close()
logging.info("[SerialManager] 米电阻数据查询流程结束。")
print("\n[米电阻查询] 查询流程结束\n")
def _notify_callbacks(self, port_name, value):
"""通知所有相关回调函数"""
try:
# 端口特定回调 (通常用于原始串口数据)
if port_name in self.callbacks and port_name != 'mdz_data': # 避免重复处理 mdz_data
try:
# 假设这种回调期望原始的 value (可能是字节串,也可能是其他类型)
self.callbacks[port_name](port_name, value)
logging.debug(f"Notified port-specific callback for {port_name}")
except Exception as e:
logging.error(f"调用端口回调 {port_name} 失败: {e}")
# 全局回调, 特别处理 'mdz_data'
if 'mdz_data' in self.callbacks:
actual_mdz_numeric_value = None
source_info = "unknown"
if isinstance(value, dict):
actual_mdz_numeric_value = value.get('value')
source_info = value.get('source', source_info)
elif isinstance(value, (str, float, int)):
# 如果直接传递了数值 (例如来自旧的 _use_mock_data)
actual_mdz_numeric_value = str(value)
else:
# 尝试从可能是字节串的value中解码 (不太可能走到这里了,因为上游会处理好)
try:
decoded_value = value.decode('utf-8')
if "米电阻数据:" in decoded_value:
actual_mdz_numeric_value = decoded_value.split("米电阻数据:")[1].strip()
else:
actual_mdz_numeric_value = decoded_value # best guess
except: # noqa
pass # 无法解码或解析,保持 None
if actual_mdz_numeric_value is not None:
# 构建 PackageInboundDialog.on_mdz_data_received 期望的格式
callback_data_str = f"米电阻数据: {actual_mdz_numeric_value}"
try:
# port_name 对于 mdz_data 回调,可以传递触发源的串口名,或者一个通用标识
# trigger_resistance_query 知道 mdz_port_name它应该作为 port_name 传给 _notify_callbacks
# 如果是从模拟数据来port_name 可能是 'mdz' 或 'mock'
triggering_port = port_name if port_name not in ['mdz_data', 'mdz'] else self.mdz_config.get('ser', 'N/A') if self.mdz_config else 'N/A'
if source_info.startswith("mock"): # 如果源是模拟数据
triggering_port = f"mock_{port_name}" # e.g. mock_mdz
self.callbacks['mdz_data'](triggering_port, callback_data_str.encode('utf-8'))
logging.info(f"通知 'mdz_data' 回调. 值: {actual_mdz_numeric_value}, 源: {source_info}, 触发源端口: {triggering_port}")
except Exception as e:
logging.error(f"调用全局回调 'mdz_data' 失败: {e}", exc_info=True)
else:
logging.warning(f"回调失败: mdz_data 中 实际值为None. 初始 value: {value}")
except Exception as e:
logging.error(f"通知回调失败: {e}", exc_info=True)
def auto_open_configured_ports(self):
"""自动打开已配置的串口"""
logging.info("尝试自动打开已配置的串口...")
# 首先检查是否启用串口功能
enable_serial_ports = self.config.get_value('app.features.enable_serial_ports', False)
if not enable_serial_ports:
logging.info("串口功能已在配置中禁用,跳过自动打开串口")
return False
success = True
# 检查操作系统类型并提供合适的警告
os_type = platform.system()
if os_type == "Darwin" and (
(self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser'] and self.mdz_config['ser'].startswith('COM')) or
(self.cz_config and 'ser' in self.cz_config and self.cz_config['ser'] and self.cz_config['ser'].startswith('COM'))
):
logging.warning("检测到在macOS系统上配置了Windows格式的COM端口这些端口将无法正常打开")
logging.warning("macOS上的串口通常是/dev/tty.*或/dev/cu.*格式")
# 继续尝试打开,但不影响程序流程
# 尝试打开线径串口
if self.cz_config and 'ser' in self.cz_config and self.cz_config['ser']:
port_name = self.cz_config['ser']
baud_rate = self.cz_config.get('port', 2400)
if not self.is_port_open(port_name):
try:
if self.open_port(port_name, 'cz', baud_rate):
logging.info(f"自动打开线径串口 {port_name} 成功")
else:
logging.error(f"自动打开线径串口 {port_name} 失败")
success = False
except Exception as e:
logging.error(f"自动打开线径串口 {port_name} 时发生异常: {e}")
success = False
else:
logging.info(f"线径串口 {port_name} 已经打开,无需重新打开")
else:
logging.warning("线径串口未配置,跳过自动打开")
# 尝试打开米电阻串口
if self.mdz_config and 'ser' in self.mdz_config and self.mdz_config['ser']:
port_name = self.mdz_config['ser']
baud_rate = self.mdz_config.get('port', 9600)
if not self.is_port_open(port_name):
try:
if self.open_port(port_name, 'mdz', baud_rate):
logging.info(f"自动打开米电阻串口 {port_name} 成功")
else:
logging.error(f"自动打开米电阻串口 {port_name} 失败")
success = False
except Exception as e:
logging.error(f"自动打开米电阻串口 {port_name} 时发生异常: {e}")
success = False
else:
logging.info(f"米电阻串口 {port_name} 已经打开,无需重新打开")
else:
logging.warning("米电阻串口未配置,跳过自动打开")
# 检查是否启用键盘监听功能
enable_keyboard_listener = self.config.get_value('app.features.enable_keyboard_listener', False)
if enable_keyboard_listener:
# 启动键盘监听
try:
self.start_keyboard_listener()
except Exception as e:
logging.error(f"启动键盘监听失败: {e}")
# 键盘监听启动失败不影响串口打开的整体状态
else:
logging.info("键盘监听功能已在配置中禁用,跳过启动")
if not success:
logging.warning("部分串口自动打开失败,请检查设备连接或在参数配置中手动打开")
return True # 总是返回True防止应用程序因串口问题而终止
def suspend_file_operations(self, suspend: bool):
"""暂停或恢复文件操作
Args:
suspend: True表示暂停False表示恢复
"""
with self._file_operations_lock:
old_state = self._file_operations_suspended
self._file_operations_suspended = suspend
if old_state != suspend:
if suspend:
logging.info("已暂停SerialManager文件操作")
else:
logging.info("已恢复SerialManager文件操作")