jiateng_ws/utils/modbus_utils.py

311 lines
12 KiB
Python
Raw Permalink Normal View History

from flask import Flask, jsonify, request, make_response
from pymodbus.client import ModbusTcpClient
import time
import logging
from .config_loader import ConfigLoader
import threading
# 配置 Flask 日志级别
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
# 添加连接池管理类
class ModbusConnectionPool:
_instance = None
_lock = threading.Lock()
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = ModbusConnectionPool()
return cls._instance
def __init__(self):
"""初始化连接池"""
self._connections = {} # 保存每个目标地址的连接
self._locks = {} # 每个连接的锁,避免并发问题
self._in_use = {} # 标记连接是否在使用中
self._last_used = {} # 记录连接最后使用时间
# 启动清理线程
self._cleanup_thread = threading.Thread(target=self._cleanup_idle_connections, daemon=True)
self._cleanup_thread.start()
def get_connection(self, host, port):
"""获取指定地址的连接,如果不存在则创建"""
key = f"{host}:{port}"
# 如果不存在该地址的锁,创建一个
if key not in self._locks:
with self._lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
# 获取该地址的锁
with self._locks[key]:
# 如果连接不存在或已断开,创建新连接
if key not in self._connections or not self._connections[key].is_socket_open():
try:
logging.info(f"Creating new Modbus connection to {host}:{port}")
client = ModbusTcpClient(host, port=port, timeout=10)
connected = client.connect()
if not connected:
logging.error(f"Failed to connect to {host}:{port}")
return None
self._connections[key] = client
self._in_use[key] = True
self._last_used[key] = time.time()
return client
except Exception as e:
logging.error(f"Error creating Modbus connection to {host}:{port}: {e}")
return None
else:
# 连接存在,标记为使用中并更新时间
self._in_use[key] = True
self._last_used[key] = time.time()
return self._connections[key]
def release_connection(self, host, port):
"""释放连接,将其标记为不再使用"""
key = f"{host}:{port}"
with self._locks.get(key, self._lock):
if key in self._in_use:
self._in_use[key] = False
self._last_used[key] = time.time()
def _cleanup_idle_connections(self):
"""清理空闲连接的后台线程"""
idle_timeout = 300 # 5分钟空闲超时
while True:
time.sleep(60) # 每分钟检查一次
current_time = time.time()
keys_to_check = list(self._connections.keys())
for key in keys_to_check:
try:
with self._locks.get(key, self._lock):
# 如果连接空闲超过5分钟关闭并移除
if (key in self._in_use and not self._in_use[key] and
current_time - self._last_used.get(key, 0) > idle_timeout):
logging.info(f"Closing idle connection: {key}")
try:
if self._connections[key].is_socket_open():
self._connections[key].close()
except:
pass # 忽略关闭连接时的错误
# 从所有字典中移除
self._connections.pop(key, None)
self._in_use.pop(key, None)
self._last_used.pop(key, None)
except Exception as e:
logging.error(f"Error during connection cleanup for {key}: {e}")
# Modbus TCP 配置
class ModbusUtils:
def __init__(self, host=None, port=None) -> None:
# 初始化 modbus 配置
config = ConfigLoader.get_instance()
self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host")
self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port"))
self.connection_pool = ModbusConnectionPool.get_instance()
self.retry_max = 3 # 最大重试次数
self.retry_delay = 0.5 # 初始重试延迟(秒)
def get_client(self):
"""获取Modbus客户端连接使用连接池"""
# 使用指数退避重试获取连接
retry_count = 0
delay = self.retry_delay
while retry_count < self.retry_max:
client = self.connection_pool.get_connection(self.MODBUS_HOST, self.MODBUS_PORT)
if client and client.is_socket_open():
return client
# 连接失败,等待后重试
logging.warning(f"Connection attempt {retry_count+1} failed, retrying in {delay:.1f}s...")
time.sleep(delay)
delay *= 2 # 指数退避
retry_count += 1
logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT} after {self.retry_max} attempts")
return None
def close_client(self, client):
"""释放客户端连接回连接池,不再实际关闭连接"""
if client:
self.connection_pool.release_connection(self.MODBUS_HOST, self.MODBUS_PORT)
# 新增十进制转成二进制
@staticmethod
def decimal_to_binary(decimal):
"""十进制转16位二进制右对齐"""
return format(decimal, '016b')
# 新增二进制转成十进制
@staticmethod
def binary_to_decimal(binary):
"""二进制字符串转十进制"""
return int(binary, 2)
# 新增十进制转16进制
@staticmethod
def decimal_to_hex(decimal):
return hex(decimal)[2:]
@staticmethod
def split_data(data):
'''
解析位置编码获取寄存器地址
:param data: 位置编码例如 "01-003-02"
:return: 寄存器地址
规则说明
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16)对应寄存器的位索引
'''
data_list = data.split('-')
column = data_list[0]
row = data_list[1]
level = data_list[2]
return column, row, level
def get_hex_str(self, data):
"""
解析位置编码获取寄存器地址
:param data: 位置编码例如 "01-003-02"
:return: 寄存器地址
规则说明
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16)对应寄存器的位索引
示例
01-003-02 => D101.2 (A库第2层第3排)
02-005-06 => D115.4 (B库第6层第5排)
"""
column, row, level = self.split_data(data)
# 计算基础地址A库从100开始B库从110开始
base_address = 100 + (int(column) - 1) * 10
# 加上层号得到最终地址
register_address = base_address + (int(level) - 1)
return register_address
@staticmethod
def check_register_value(client, address, expected_value, timeout_seconds=5):
"""
校验指定寄存器地址中的数据是否达到期望值
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param expected_value: 期望值
:param timeout_seconds: 超时时间
:return: True表示达到期望值False表示超时
"""
try:
start_time = time.time()
while True:
result = client.read_holding_registers(address=address, count=1)
if not result.isError() and result.registers[0] == expected_value:
logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}")
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取
except Exception as e:
logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}")
return False
def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5):
"""
循环写入寄存器直到成功或超时
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:param expected_value: 期望值如果为None则使用value作为期望值
:param timeout_seconds: 超时时间
:return: True表示写入成功False表示超时
"""
if expected_value is None:
expected_value = value
start_time = time.time()
while True:
# 写入值
logging.info(f"写入寄存器D{address}值: {value}")
client.write_registers(address=address, values=[value])
logging.info(f"写入寄存器D{address}值: {value}成功")
# 检查是否达到期望值
if self.check_register_value(client, address, expected_value):
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入
@staticmethod
def handle_error(error_msg, task_number='', is_emergency_stop=False):
"""通用错误处理函数
:param error_msg: 错误信息
:param task_number: 任务编号默认为空
:param is_emergency_stop: 是否是急停引起的错误
"""
logging.error(error_msg)
pass
# return response, 500
# 添加便捷方法,用于读取保持寄存器
def read_holding_register(self, client, address, count=1):
"""
读取保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param count: 读取的寄存器数量
:return: 读取结果失败返回None
"""
try:
result = client.read_holding_registers(address=address, count=count)
if result.isError():
logging.error(f"读取寄存器D{address}失败: {result}")
return None
return result.registers
except Exception as e:
logging.error(f"读取寄存器D{address}时发生错误: {str(e)}")
return None
# 添加便捷方法,用于写入保持寄存器
def write_register(self, client, address, value):
"""
写入保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:return: 是否写入成功
"""
try:
result = client.write_registers(address=address, values=[value])
if result.isError():
logging.error(f"写入寄存器D{address}{value}失败: {result}")
return False
logging.info(f"写入寄存器D{address}{value}成功")
return True
except Exception as e:
logging.error(f"写入寄存器D{address}{value}时发生错误: {str(e)}")
return False