jiateng_ws/utils/modbus_monitor.py

340 lines
14 KiB
Python
Raw Normal View History

import time
import logging
from threading import Thread, Event
from PySide6.QtCore import QObject, Signal
from .modbus_utils import ModbusUtils
class RegisterValue:
"""寄存器值对象,用于存储寄存器的值和状态"""
def __init__(self, address, value=None):
self.address = address
self.value = value
self.last_value = None
self.error_count = 0
self.last_read_time = None
self.last_change_time = None
def update(self, new_value):
"""更新寄存器值,并返回值是否发生变化"""
self.last_read_time = time.time()
if self.value != new_value:
self.last_value = self.value
self.value = new_value
self.last_change_time = self.last_read_time
return True
return False
def record_error(self):
"""记录错误次数"""
self.error_count += 1
return self.error_count
def reset_error(self):
"""重置错误计数"""
self.error_count = 0
class RegisterHandler:
"""寄存器处理器基类"""
def handle_change(self, value):
"""处理寄存器值变化的方法,由子类实现具体逻辑"""
pass
class ModbusMonitor(QObject):
"""Modbus寄存器监控器"""
# 定义信号:寄存器地址、新值
register_changed = Signal(int, int)
register_error = Signal(int, str)
monitor_status_changed = Signal(bool, str)
2025-07-01 16:04:18 +08:00
def __init__(self, polling_interval=1.0, max_errors=3, retry_interval=5.0):
"""
初始化Modbus监控器
Args:
2025-07-01 16:04:18 +08:00
polling_interval: 轮询间隔默认1.0比原来的1.0秒增加了一倍
max_errors: 最大错误次数超过此次数将暂停特定寄存器的监控
retry_interval: 重试间隔
"""
super().__init__()
self.polling_interval = polling_interval
self.max_errors = max_errors
self.retry_interval = retry_interval
# 初始化存储
self.registers = {} # 存储寄存器值的字典
self.handlers = {} # 存储寄存器处理器的字典
self.stop_event = Event() # 用于停止监控线程的事件
self.monitor_thread = None
self.modbus = ModbusUtils()
self.client = None
self.running = False
# 初始化要监控的寄存器列表
self._initialize_registers()
def _initialize_registers(self):
"""初始化要监控的寄存器列表"""
# 默认监控的寄存器地址
2025-07-01 16:04:18 +08:00
register_addresses = [0, 4, 5, 6, 11, 13, 20, 21, 22, 23, 24, 25, 30]
for address in register_addresses:
self.registers[address] = RegisterValue(address)
def register_handler(self, address, handler):
"""注册寄存器处理器
Args:
address: 寄存器地址
handler: RegisterHandler的实例
"""
if address not in self.registers:
self.registers[address] = RegisterValue(address)
if address not in self.handlers:
self.handlers[address] = []
self.handlers[address].append(handler)
logging.info(f"已注册寄存器D{address}的处理器: {handler.__class__.__name__}")
def start(self):
"""启动监控线程"""
if self.monitor_thread and self.monitor_thread.is_alive():
logging.warning("监控器已经在运行中")
return
self.stop_event.clear()
self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
self.running = True
self.monitor_status_changed.emit(True, "监控器已启动")
logging.info("Modbus监控器已启动")
def stop(self):
"""停止监控线程"""
if not self.monitor_thread or not self.monitor_thread.is_alive():
return
self.stop_event.set()
if self.monitor_thread:
self.monitor_thread.join(timeout=5.0) # 给线程5秒时间完成
if self.client:
self.modbus.close_client(self.client)
self.client = None
self.running = False
self.monitor_status_changed.emit(False, "监控器已停止")
logging.info("Modbus监控器已停止")
def _monitor_loop(self):
"""监控循环,在独立线程中运行"""
try:
while not self.stop_event.is_set():
# 检查连接并根据需要重新连接
if not self.client:
self._reconnect()
if self.client:
# 读取并处理所有注册的寄存器
self._read_registers()
# 等待下一次轮询,或者直到收到停止信号
self.stop_event.wait(self.polling_interval)
except Exception as e:
logging.error(f"监控线程发生异常: {str(e)}", exc_info=True)
self.monitor_status_changed.emit(False, f"监控器异常: {str(e)}")
finally:
if self.client:
self.modbus.close_client(self.client)
self.client = None
def _reconnect(self):
"""重新连接到Modbus服务器"""
try:
if self.client:
self.modbus.close_client(self.client)
self.client = self.modbus.get_client()
if self.client:
logging.info("Modbus监控器成功连接到服务器")
self.monitor_status_changed.emit(True, "已连接到Modbus服务器")
# 重置所有寄存器的错误计数
for reg in self.registers.values():
reg.reset_error()
else:
logging.warning("Modbus监控器无法连接到服务器")
self.monitor_status_changed.emit(False, "无法连接到Modbus服务器")
# 等待重试间隔
self.stop_event.wait(self.retry_interval)
except Exception as e:
logging.error(f"重新连接Modbus服务器时发生错误: {str(e)}")
self.monitor_status_changed.emit(False, f"连接错误: {str(e)}")
# 等待重试间隔
self.stop_event.wait(self.retry_interval)
def _read_registers(self):
"""读取所有注册的寄存器"""
# 批量处理寄存器,减少连接次数
grouped_registers = self._group_registers()
for group, addresses in grouped_registers.items():
if self.stop_event.is_set():
break
# 检查组内是否有需要读取的寄存器
has_valid_registers = False
for address in addresses:
reg_value = self.registers[address]
# 如果错误次数超过阈值且未到重试时间,则跳过
if reg_value.error_count >= self.max_errors:
current_time = time.time()
last_read = reg_value.last_read_time or 0
if current_time - last_read < self.retry_interval:
continue
has_valid_registers = True
break
if not has_valid_registers:
continue
try:
# 为每组创建一个重试计数
retry_count = 0
max_retries = 2
delay = 0.5
while retry_count <= max_retries:
try:
# 批量读取该组寄存器
for address in addresses:
reg_value = self.registers[address]
# 如果错误次数超过阈值且未到重试时间,则跳过
if reg_value.error_count >= self.max_errors:
current_time = time.time()
last_read = reg_value.last_read_time or 0
if current_time - last_read < self.retry_interval:
continue
try:
# 读取寄存器值
result = self.modbus.read_holding_register(self.client, address)
if result is None or len(result) == 0:
error_count = reg_value.record_error()
error_msg = f"读取寄存器D{address}失败,这是第{error_count}次失败"
logging.warning(error_msg)
self.register_error.emit(address, error_msg)
# 如果连续失败次数达到阈值,尝试重连
if error_count >= self.max_errors:
logging.error(f"寄存器D{address}连续{error_count}次读取失败,将在{self.retry_interval}秒后重试")
continue
# 成功读取,重置错误计数
reg_value.reset_error()
# 更新值并检查是否发生变化
if reg_value.update(result[0]):
logging.info(f"寄存器D{address}值变化: {reg_value.last_value} -> {reg_value.value}")
# 发出信号
self.register_changed.emit(address, reg_value.value)
# 调用注册的处理器
self._notify_handlers(address, reg_value.value)
except Exception as e:
error_count = reg_value.record_error()
error_msg = f"读取寄存器D{address}时发生异常: {str(e)}"
logging.error(error_msg)
self.register_error.emit(address, error_msg)
if error_count >= self.max_errors:
logging.error(f"寄存器D{address}连续{error_count}次读取异常,将在{self.retry_interval}秒后重试")
# 如果是连接错误,尝试重新连接并退出循环
if "Connection" in str(e):
retry_count += 1
if retry_count <= max_retries:
# 释放连接,重新获取
if self.client:
self.modbus.close_client(self.client)
self.client = None
delay_time = delay * (2 ** (retry_count - 1))
logging.warning(f"连接错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries})...")
time.sleep(delay_time)
# 重新连接
self._reconnect()
break
# 如果已成功读取所有寄存器,跳出重试循环
break
except Exception as e:
retry_count += 1
if retry_count <= max_retries:
delay_time = delay * (2 ** (retry_count - 1))
logging.warning(f"批量读取寄存器时发生错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries}): {str(e)}")
time.sleep(delay_time)
# 尝试重新连接
if self.client:
self.modbus.close_client(self.client)
self.client = None
self._reconnect()
else:
logging.error(f"批量读取寄存器失败,已达到最大重试次数: {str(e)}")
break
except Exception as e:
logging.error(f"处理寄存器组时发生异常: {str(e)}", exc_info=True)
def _group_registers(self):
"""将寄存器分组,以便批量处理
Returns:
dict: 分组后的寄存器格式为 {group_id: [address1, address2, ...]}
"""
# 简单分组每5个寄存器为一组
group_size = 5
grouped = {}
addresses = list(self.registers.keys())
for i in range(0, len(addresses), group_size):
group_id = i // group_size
grouped[group_id] = addresses[i:i+group_size]
return grouped
def _notify_handlers(self, address, value):
"""通知所有注册的处理器"""
if address in self.handlers:
for handler in self.handlers[address]:
try:
handler.handle_change(value)
except Exception as e:
logging.error(f"调用寄存器D{address}的处理器时发生异常: {str(e)}", exc_info=True)
def get_register_value(self, address):
"""获取寄存器的当前值
Args:
address: 寄存器地址
Returns:
当前值如果未读取过则返回None
"""
if address in self.registers:
return self.registers[address].value
return None
def is_running(self):
"""返回监控器是否正在运行"""
return self.running and self.monitor_thread and self.monitor_thread.is_alive()
# 单例模式
_instance = None
def get_instance():
"""获取ModbusMonitor单例"""
global _instance
if _instance is None:
_instance = ModbusMonitor()
return _instance