jiateng_ws/utils/modbus_monitor.py
2025-07-01 16:04:18 +08:00

340 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import logging
from threading import Thread, Event
from PySide6.QtCore import QObject, Signal
from .modbus_utils import ModbusUtils
class RegisterValue:
"""寄存器值对象,用于存储寄存器的值和状态"""
def __init__(self, address, value=None):
self.address = address
self.value = value
self.last_value = None
self.error_count = 0
self.last_read_time = None
self.last_change_time = None
def update(self, new_value):
"""更新寄存器值,并返回值是否发生变化"""
self.last_read_time = time.time()
if self.value != new_value:
self.last_value = self.value
self.value = new_value
self.last_change_time = self.last_read_time
return True
return False
def record_error(self):
"""记录错误次数"""
self.error_count += 1
return self.error_count
def reset_error(self):
"""重置错误计数"""
self.error_count = 0
class RegisterHandler:
"""寄存器处理器基类"""
def handle_change(self, value):
"""处理寄存器值变化的方法,由子类实现具体逻辑"""
pass
class ModbusMonitor(QObject):
"""Modbus寄存器监控器"""
# 定义信号:寄存器地址、新值
register_changed = Signal(int, int)
register_error = Signal(int, str)
monitor_status_changed = Signal(bool, str)
def __init__(self, polling_interval=1.0, max_errors=3, retry_interval=5.0):
"""
初始化Modbus监控器
Args:
polling_interval: 轮询间隔默认1.0秒比原来的1.0秒增加了一倍
max_errors: 最大错误次数,超过此次数将暂停特定寄存器的监控
retry_interval: 重试间隔(秒)
"""
super().__init__()
self.polling_interval = polling_interval
self.max_errors = max_errors
self.retry_interval = retry_interval
# 初始化存储
self.registers = {} # 存储寄存器值的字典
self.handlers = {} # 存储寄存器处理器的字典
self.stop_event = Event() # 用于停止监控线程的事件
self.monitor_thread = None
self.modbus = ModbusUtils()
self.client = None
self.running = False
# 初始化要监控的寄存器列表
self._initialize_registers()
def _initialize_registers(self):
"""初始化要监控的寄存器列表"""
# 默认监控的寄存器地址
register_addresses = [0, 4, 5, 6, 11, 13, 20, 21, 22, 23, 24, 25, 30]
for address in register_addresses:
self.registers[address] = RegisterValue(address)
def register_handler(self, address, handler):
"""注册寄存器处理器
Args:
address: 寄存器地址
handler: RegisterHandler的实例
"""
if address not in self.registers:
self.registers[address] = RegisterValue(address)
if address not in self.handlers:
self.handlers[address] = []
self.handlers[address].append(handler)
logging.info(f"已注册寄存器D{address}的处理器: {handler.__class__.__name__}")
def start(self):
"""启动监控线程"""
if self.monitor_thread and self.monitor_thread.is_alive():
logging.warning("监控器已经在运行中")
return
self.stop_event.clear()
self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
self.running = True
self.monitor_status_changed.emit(True, "监控器已启动")
logging.info("Modbus监控器已启动")
def stop(self):
"""停止监控线程"""
if not self.monitor_thread or not self.monitor_thread.is_alive():
return
self.stop_event.set()
if self.monitor_thread:
self.monitor_thread.join(timeout=5.0) # 给线程5秒时间完成
if self.client:
self.modbus.close_client(self.client)
self.client = None
self.running = False
self.monitor_status_changed.emit(False, "监控器已停止")
logging.info("Modbus监控器已停止")
def _monitor_loop(self):
"""监控循环,在独立线程中运行"""
try:
while not self.stop_event.is_set():
# 检查连接并根据需要重新连接
if not self.client:
self._reconnect()
if self.client:
# 读取并处理所有注册的寄存器
self._read_registers()
# 等待下一次轮询,或者直到收到停止信号
self.stop_event.wait(self.polling_interval)
except Exception as e:
logging.error(f"监控线程发生异常: {str(e)}", exc_info=True)
self.monitor_status_changed.emit(False, f"监控器异常: {str(e)}")
finally:
if self.client:
self.modbus.close_client(self.client)
self.client = None
def _reconnect(self):
"""重新连接到Modbus服务器"""
try:
if self.client:
self.modbus.close_client(self.client)
self.client = self.modbus.get_client()
if self.client:
logging.info("Modbus监控器成功连接到服务器")
self.monitor_status_changed.emit(True, "已连接到Modbus服务器")
# 重置所有寄存器的错误计数
for reg in self.registers.values():
reg.reset_error()
else:
logging.warning("Modbus监控器无法连接到服务器")
self.monitor_status_changed.emit(False, "无法连接到Modbus服务器")
# 等待重试间隔
self.stop_event.wait(self.retry_interval)
except Exception as e:
logging.error(f"重新连接Modbus服务器时发生错误: {str(e)}")
self.monitor_status_changed.emit(False, f"连接错误: {str(e)}")
# 等待重试间隔
self.stop_event.wait(self.retry_interval)
def _read_registers(self):
"""读取所有注册的寄存器"""
# 批量处理寄存器,减少连接次数
grouped_registers = self._group_registers()
for group, addresses in grouped_registers.items():
if self.stop_event.is_set():
break
# 检查组内是否有需要读取的寄存器
has_valid_registers = False
for address in addresses:
reg_value = self.registers[address]
# 如果错误次数超过阈值且未到重试时间,则跳过
if reg_value.error_count >= self.max_errors:
current_time = time.time()
last_read = reg_value.last_read_time or 0
if current_time - last_read < self.retry_interval:
continue
has_valid_registers = True
break
if not has_valid_registers:
continue
try:
# 为每组创建一个重试计数
retry_count = 0
max_retries = 2
delay = 0.5
while retry_count <= max_retries:
try:
# 批量读取该组寄存器
for address in addresses:
reg_value = self.registers[address]
# 如果错误次数超过阈值且未到重试时间,则跳过
if reg_value.error_count >= self.max_errors:
current_time = time.time()
last_read = reg_value.last_read_time or 0
if current_time - last_read < self.retry_interval:
continue
try:
# 读取寄存器值
result = self.modbus.read_holding_register(self.client, address)
if result is None or len(result) == 0:
error_count = reg_value.record_error()
error_msg = f"读取寄存器D{address}失败,这是第{error_count}次失败"
logging.warning(error_msg)
self.register_error.emit(address, error_msg)
# 如果连续失败次数达到阈值,尝试重连
if error_count >= self.max_errors:
logging.error(f"寄存器D{address}连续{error_count}次读取失败,将在{self.retry_interval}秒后重试")
continue
# 成功读取,重置错误计数
reg_value.reset_error()
# 更新值并检查是否发生变化
if reg_value.update(result[0]):
logging.info(f"寄存器D{address}值变化: {reg_value.last_value} -> {reg_value.value}")
# 发出信号
self.register_changed.emit(address, reg_value.value)
# 调用注册的处理器
self._notify_handlers(address, reg_value.value)
except Exception as e:
error_count = reg_value.record_error()
error_msg = f"读取寄存器D{address}时发生异常: {str(e)}"
logging.error(error_msg)
self.register_error.emit(address, error_msg)
if error_count >= self.max_errors:
logging.error(f"寄存器D{address}连续{error_count}次读取异常,将在{self.retry_interval}秒后重试")
# 如果是连接错误,尝试重新连接并退出循环
if "Connection" in str(e):
retry_count += 1
if retry_count <= max_retries:
# 释放连接,重新获取
if self.client:
self.modbus.close_client(self.client)
self.client = None
delay_time = delay * (2 ** (retry_count - 1))
logging.warning(f"连接错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries})...")
time.sleep(delay_time)
# 重新连接
self._reconnect()
break
# 如果已成功读取所有寄存器,跳出重试循环
break
except Exception as e:
retry_count += 1
if retry_count <= max_retries:
delay_time = delay * (2 ** (retry_count - 1))
logging.warning(f"批量读取寄存器时发生错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries}): {str(e)}")
time.sleep(delay_time)
# 尝试重新连接
if self.client:
self.modbus.close_client(self.client)
self.client = None
self._reconnect()
else:
logging.error(f"批量读取寄存器失败,已达到最大重试次数: {str(e)}")
break
except Exception as e:
logging.error(f"处理寄存器组时发生异常: {str(e)}", exc_info=True)
def _group_registers(self):
"""将寄存器分组,以便批量处理
Returns:
dict: 分组后的寄存器,格式为 {group_id: [address1, address2, ...]}
"""
# 简单分组每5个寄存器为一组
group_size = 5
grouped = {}
addresses = list(self.registers.keys())
for i in range(0, len(addresses), group_size):
group_id = i // group_size
grouped[group_id] = addresses[i:i+group_size]
return grouped
def _notify_handlers(self, address, value):
"""通知所有注册的处理器"""
if address in self.handlers:
for handler in self.handlers[address]:
try:
handler.handle_change(value)
except Exception as e:
logging.error(f"调用寄存器D{address}的处理器时发生异常: {str(e)}", exc_info=True)
def get_register_value(self, address):
"""获取寄存器的当前值
Args:
address: 寄存器地址
Returns:
当前值如果未读取过则返回None
"""
if address in self.registers:
return self.registers[address].value
return None
def is_running(self):
"""返回监控器是否正在运行"""
return self.running and self.monitor_thread and self.monitor_thread.is_alive()
# 单例模式
_instance = None
def get_instance():
"""获取ModbusMonitor单例"""
global _instance
if _instance is None:
_instance = ModbusMonitor()
return _instance