jiateng_ws/utils/modbus_utils.py
2025-07-01 15:32:40 +08:00

311 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, make_response
from pymodbus.client import ModbusTcpClient
import time
import logging
from .config_loader import ConfigLoader
import threading
# 配置 Flask 日志级别
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
# 添加连接池管理类
class ModbusConnectionPool:
_instance = None
_lock = threading.Lock()
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = ModbusConnectionPool()
return cls._instance
def __init__(self):
"""初始化连接池"""
self._connections = {} # 保存每个目标地址的连接
self._locks = {} # 每个连接的锁,避免并发问题
self._in_use = {} # 标记连接是否在使用中
self._last_used = {} # 记录连接最后使用时间
# 启动清理线程
self._cleanup_thread = threading.Thread(target=self._cleanup_idle_connections, daemon=True)
self._cleanup_thread.start()
def get_connection(self, host, port):
"""获取指定地址的连接,如果不存在则创建"""
key = f"{host}:{port}"
# 如果不存在该地址的锁,创建一个
if key not in self._locks:
with self._lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
# 获取该地址的锁
with self._locks[key]:
# 如果连接不存在或已断开,创建新连接
if key not in self._connections or not self._connections[key].is_socket_open():
try:
logging.info(f"Creating new Modbus connection to {host}:{port}")
client = ModbusTcpClient(host, port=port, timeout=10)
connected = client.connect()
if not connected:
logging.error(f"Failed to connect to {host}:{port}")
return None
self._connections[key] = client
self._in_use[key] = True
self._last_used[key] = time.time()
return client
except Exception as e:
logging.error(f"Error creating Modbus connection to {host}:{port}: {e}")
return None
else:
# 连接存在,标记为使用中并更新时间
self._in_use[key] = True
self._last_used[key] = time.time()
return self._connections[key]
def release_connection(self, host, port):
"""释放连接,将其标记为不再使用"""
key = f"{host}:{port}"
with self._locks.get(key, self._lock):
if key in self._in_use:
self._in_use[key] = False
self._last_used[key] = time.time()
def _cleanup_idle_connections(self):
"""清理空闲连接的后台线程"""
idle_timeout = 300 # 5分钟空闲超时
while True:
time.sleep(60) # 每分钟检查一次
current_time = time.time()
keys_to_check = list(self._connections.keys())
for key in keys_to_check:
try:
with self._locks.get(key, self._lock):
# 如果连接空闲超过5分钟关闭并移除
if (key in self._in_use and not self._in_use[key] and
current_time - self._last_used.get(key, 0) > idle_timeout):
logging.info(f"Closing idle connection: {key}")
try:
if self._connections[key].is_socket_open():
self._connections[key].close()
except:
pass # 忽略关闭连接时的错误
# 从所有字典中移除
self._connections.pop(key, None)
self._in_use.pop(key, None)
self._last_used.pop(key, None)
except Exception as e:
logging.error(f"Error during connection cleanup for {key}: {e}")
# Modbus TCP 配置
class ModbusUtils:
def __init__(self, host=None, port=None) -> None:
# 初始化 modbus 配置
config = ConfigLoader.get_instance()
self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host")
self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port"))
self.connection_pool = ModbusConnectionPool.get_instance()
self.retry_max = 3 # 最大重试次数
self.retry_delay = 0.5 # 初始重试延迟(秒)
def get_client(self):
"""获取Modbus客户端连接使用连接池"""
# 使用指数退避重试获取连接
retry_count = 0
delay = self.retry_delay
while retry_count < self.retry_max:
client = self.connection_pool.get_connection(self.MODBUS_HOST, self.MODBUS_PORT)
if client and client.is_socket_open():
return client
# 连接失败,等待后重试
logging.warning(f"Connection attempt {retry_count+1} failed, retrying in {delay:.1f}s...")
time.sleep(delay)
delay *= 2 # 指数退避
retry_count += 1
logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT} after {self.retry_max} attempts")
return None
def close_client(self, client):
"""释放客户端连接回连接池,不再实际关闭连接"""
if client:
self.connection_pool.release_connection(self.MODBUS_HOST, self.MODBUS_PORT)
# 新增十进制转成二进制
@staticmethod
def decimal_to_binary(decimal):
"""十进制转16位二进制右对齐"""
return format(decimal, '016b')
# 新增二进制转成十进制
@staticmethod
def binary_to_decimal(binary):
"""二进制字符串转十进制"""
return int(binary, 2)
# 新增十进制转16进制
@staticmethod
def decimal_to_hex(decimal):
return hex(decimal)[2:]
@staticmethod
def split_data(data):
'''
解析位置编码获取寄存器地址
:param data: 位置编码,例如 "01-003-02"
:return: 寄存器地址
规则说明:
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16),对应寄存器的位索引
'''
data_list = data.split('-')
column = data_list[0]
row = data_list[1]
level = data_list[2]
return column, row, level
def get_hex_str(self, data):
"""
解析位置编码获取寄存器地址
:param data: 位置编码,例如 "01-003-02"
:return: 寄存器地址
规则说明:
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16),对应寄存器的位索引
示例:
01-003-02 => D101.2 (A库第2层第3排)
02-005-06 => D115.4 (B库第6层第5排)
"""
column, row, level = self.split_data(data)
# 计算基础地址A库从100开始B库从110开始
base_address = 100 + (int(column) - 1) * 10
# 加上层号得到最终地址
register_address = base_address + (int(level) - 1)
return register_address
@staticmethod
def check_register_value(client, address, expected_value, timeout_seconds=5):
"""
校验指定寄存器地址中的数据是否达到期望值
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param expected_value: 期望值
:param timeout_seconds: 超时时间(秒)
:return: True表示达到期望值False表示超时
"""
try:
start_time = time.time()
while True:
result = client.read_holding_registers(address=address, count=1)
if not result.isError() and result.registers[0] == expected_value:
logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}")
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取
except Exception as e:
logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}")
return False
def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5):
"""
循环写入寄存器直到成功或超时
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:param expected_value: 期望值如果为None则使用value作为期望值
:param timeout_seconds: 超时时间(秒)
:return: True表示写入成功False表示超时
"""
if expected_value is None:
expected_value = value
start_time = time.time()
while True:
# 写入值
logging.info(f"写入寄存器D{address}值: {value}")
client.write_registers(address=address, values=[value])
logging.info(f"写入寄存器D{address}值: {value}成功")
# 检查是否达到期望值
if self.check_register_value(client, address, expected_value):
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入
@staticmethod
def handle_error(error_msg, task_number='', is_emergency_stop=False):
"""通用错误处理函数
:param error_msg: 错误信息
:param task_number: 任务编号,默认为空
:param is_emergency_stop: 是否是急停引起的错误
"""
logging.error(error_msg)
pass
# return response, 500
# 添加便捷方法,用于读取保持寄存器
def read_holding_register(self, client, address, count=1):
"""
读取保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param count: 读取的寄存器数量
:return: 读取结果失败返回None
"""
try:
result = client.read_holding_registers(address=address, count=count)
if result.isError():
logging.error(f"读取寄存器D{address}失败: {result}")
return None
return result.registers
except Exception as e:
logging.error(f"读取寄存器D{address}时发生错误: {str(e)}")
return None
# 添加便捷方法,用于写入保持寄存器
def write_register(self, client, address, value):
"""
写入保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:return: 是否写入成功
"""
try:
result = client.write_registers(address=address, values=[value])
if result.isError():
logging.error(f"写入寄存器D{address}{value}失败: {result}")
return False
logging.info(f"写入寄存器D{address}{value}成功")
return True
except Exception as e:
logging.error(f"写入寄存器D{address}{value}时发生错误: {str(e)}")
return False