jiateng_ws/utils/modbus_utils.py

210 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, make_response
from pymodbus.client import ModbusTcpClient
import time
import logging
from .config_loader import ConfigLoader
# 配置 Flask 日志级别
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
# Modbus TCP 配置
class ModbusUtils:
def __init__(self) -> None:
# 初始化 modbus 配置
config = ConfigLoader.get_instance()
self.MODBUS_HOST = config.get_value("modbus.host")
self.MODBUS_PORT = config.get_value("modbus.port")
def get_client(self):
# 创建Modbus TCP客户端实例指定服务器的IP地址和端口号
# client = ModbusTcpClient('localhost', port=5020)
client = ModbusTcpClient(self.MODBUS_HOST, port=self.MODBUS_PORT, timeout=10) # 增加超时时间
logging.info(f"Attempting to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}")
try:
is_connected = client.connect() #确保客户端已连接
if is_connected:
logging.info(f"Successfully connected to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}")
else:
logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}. client.connect() returned False.")
except Exception as e:
logging.error(f"Exception during connection to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}: {e}")
# Optionally, re-raise or handle as appropriate
return None # Or raise an exception
return client
def close_client(self, client):
# 关闭客户端连接
if client:
client.close()
# 新增十进制转成二进制
@staticmethod
def decimal_to_binary(decimal):
"""十进制转16位二进制右对齐"""
return format(decimal, '016b')
# 新增二进制转成十进制
@staticmethod
def binary_to_decimal(binary):
"""二进制字符串转十进制"""
return int(binary, 2)
# 新增十进制转16进制
@staticmethod
def decimal_to_hex(decimal):
return hex(decimal)[2:]
@staticmethod
def split_data(data):
'''
解析位置编码获取寄存器地址
:param data: 位置编码,例如 "01-003-02"
:return: 寄存器地址
规则说明:
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16),对应寄存器的位索引
'''
data_list = data.split('-')
column = data_list[0]
row = data_list[1]
level = data_list[2]
return column, row, level
def get_hex_str(self, data):
"""
解析位置编码获取寄存器地址
:param data: 位置编码,例如 "01-003-02"
:return: 寄存器地址
规则说明:
- column: 库区编号01表示A库D100-D10802表示B库D110-D118
- level: 层号(1-9)对应寄存器最后一位如第2层对应D101
- row: 排号(1-16),对应寄存器的位索引
示例:
01-003-02 => D101.2 (A库第2层第3排)
02-005-06 => D115.4 (B库第6层第5排)
"""
column, row, level = self.split_data(data)
# 计算基础地址A库从100开始B库从110开始
base_address = 100 + (int(column) - 1) * 10
# 加上层号得到最终地址
register_address = base_address + (int(level) - 1)
return register_address
@staticmethod
def check_register_value(client, address, expected_value, timeout_seconds=5):
"""
校验指定寄存器地址中的数据是否达到期望值
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param expected_value: 期望值
:param timeout_seconds: 超时时间(秒)
:return: True表示达到期望值False表示超时
"""
try:
start_time = time.time()
while True:
result = client.read_holding_registers(address=address, count=1)
if not result.isError() and result.registers[0] == expected_value:
logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}")
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取
except Exception as e:
logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}")
return False
def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5):
"""
循环写入寄存器直到成功或超时
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:param expected_value: 期望值如果为None则使用value作为期望值
:param timeout_seconds: 超时时间(秒)
:return: True表示写入成功False表示超时
"""
if expected_value is None:
expected_value = value
start_time = time.time()
while True:
# 写入值
logging.info(f"写入寄存器D{address}值: {value}")
client.write_registers(address=address, values=[value])
logging.info(f"写入寄存器D{address}值: {value}成功")
# 检查是否达到期望值
if self.check_register_value(client, address, expected_value):
return True
if time.time() - start_time > timeout_seconds:
logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败")
return False
time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入
@staticmethod
def handle_error(error_msg, task_number='', is_emergency_stop=False):
"""通用错误处理函数
:param error_msg: 错误信息
:param task_number: 任务编号,默认为空
:param is_emergency_stop: 是否是急停引起的错误
"""
logging.error(error_msg)
pass
# return response, 500
# 添加便捷方法,用于读取保持寄存器
def read_holding_register(self, client, address, count=1):
"""
读取保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param count: 读取的寄存器数量
:return: 读取结果失败返回None
"""
try:
result = client.read_holding_registers(address=address, count=count)
if result.isError():
logging.error(f"读取寄存器D{address}失败: {result}")
return None
return result.registers
except Exception as e:
logging.error(f"读取寄存器D{address}时发生错误: {str(e)}")
return None
# 添加便捷方法,用于写入保持寄存器
def write_register(self, client, address, value):
"""
写入保持寄存器
:param client: ModbusTcp客户端
:param address: 寄存器地址
:param value: 要写入的值
:return: 是否写入成功
"""
try:
result = client.write_registers(address=address, values=[value])
if result.isError():
logging.error(f"写入寄存器D{address}{value}失败: {result}")
return False
logging.info(f"写入寄存器D{address}{value}成功")
return True
except Exception as e:
logging.error(f"写入寄存器D{address}{value}时发生错误: {str(e)}")
return False