jiateng_ws/utils/modbus_utils.py

209 lines
8.1 KiB
Python
Raw Normal View History

from flask import Flask, jsonify, request, make_response
from pymodbus.client import ModbusTcpClient
import time
import logging
from .config_loader import ConfigLoader
# 配置 Flask 日志级别
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
# Modbus TCP 配置
class ModbusUtils:
def __init__(self, host=None, port=None) -> None:
# 初始化 modbus 配置
config = ConfigLoader.get_instance()
self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host")
self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port"))
def get_client(self):
# 创建Modbus TCP客户端实例指定服务器的IP地址和端口号
# client = ModbusTcpClient('localhost', port=5020)
client = ModbusTcpClient(self.MODBUS_HOST, port=self.MODBUS_PORT, timeout=10) # 增加超时时间
logging.info(f"Attempting to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}")
try:
is_connected = client.connect() #确保客户端已连接
if is_connected:
logging.info(f"Successfully connected to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}")
else:
logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}. client.connect() returned False.")
except Exception as e:
logging.error(f"Exception during connection to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}: {e}")
# Optionally, re-raise or handle as appropriate
return None # Or raise an exception
return client
def close_client(self, client):
# 关闭客户端连接
if client:
client.close()
# 新增十进制转成二进制
@staticmethod
def decimal_to_binary(decimal):
"""十进制转16位二进制右对齐"""
return format(decimal, '016b')
# 新增二进制转成十进制
@staticmethod
def binary_to_decimal(binary):
"""二进制字符串转十进制"""
return int(binary, 2)
# 新增十进制转16进制
@staticmethod
def decimal_to_hex(decimal):
return hex(decimal)[2:]
@staticmethod
def split_data(data):
'''
解析位置编码获取寄存器地址
:param data: 位置编码例如 "01-003-02"
:return: 寄存器地址
规则说明
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16)对应寄存器的位索引
'''
data_list = data.split('-')
column = data_list[0]
row = data_list[1]
level = data_list[2]
return column, row, level
def get_hex_str(self, data):
"""
解析位置编码获取寄存器地址
:param data: 位置编码例如 "01-003-02"
:return: 寄存器地址
规则说明
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16)对应寄存器的位索引
示例
01-003-02 => D101.2 (A库第2层第3排)
02-005-06 => D115.4 (B库第6层第5排)
"""
column, row, level = self.split_data(data)
# 计算基础地址A库从100开始B库从110开始
base_address = 100 + (int(column) - 1) * 10
# 加上层号得到最终地址
register_address = base_address + (int(level) - 1)
return register_address
@staticmethod
def check_register_value(client, address, expected_value, timeout_seconds=5):
"""
校验指定寄存器地址中的数据是否达到期望值
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param expected_value: 期望值
:param timeout_seconds: 超时时间
:return: True表示达到期望值False表示超时
"""
try:
start_time = time.time()
while True:
result = client.read_holding_registers(address=address, count=1)
if not result.isError() and result.registers[0] == expected_value:
logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}")
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取
except Exception as e:
logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}")
return False
def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5):
"""
循环写入寄存器直到成功或超时
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:param expected_value: 期望值如果为None则使用value作为期望值
:param timeout_seconds: 超时时间
:return: True表示写入成功False表示超时
"""
if expected_value is None:
expected_value = value
start_time = time.time()
while True:
# 写入值
logging.info(f"写入寄存器D{address}值: {value}")
client.write_registers(address=address, values=[value])
logging.info(f"写入寄存器D{address}值: {value}成功")
# 检查是否达到期望值
if self.check_register_value(client, address, expected_value):
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入
@staticmethod
def handle_error(error_msg, task_number='', is_emergency_stop=False):
"""通用错误处理函数
:param error_msg: 错误信息
:param task_number: 任务编号默认为空
:param is_emergency_stop: 是否是急停引起的错误
"""
logging.error(error_msg)
pass
# return response, 500
# 添加便捷方法,用于读取保持寄存器
def read_holding_register(self, client, address, count=1):
"""
读取保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param count: 读取的寄存器数量
:return: 读取结果失败返回None
"""
try:
result = client.read_holding_registers(address=address, count=count)
if result.isError():
logging.error(f"读取寄存器D{address}失败: {result}")
return None
return result.registers
except Exception as e:
logging.error(f"读取寄存器D{address}时发生错误: {str(e)}")
return None
# 添加便捷方法,用于写入保持寄存器
def write_register(self, client, address, value):
"""
写入保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:return: 是否写入成功
"""
try:
result = client.write_registers(address=address, values=[value])
if result.isError():
logging.error(f"写入寄存器D{address}{value}失败: {result}")
return False
logging.info(f"写入寄存器D{address}{value}成功")
return True
except Exception as e:
logging.error(f"写入寄存器D{address}{value}时发生错误: {str(e)}")
return False