from flask import Flask, jsonify, request, make_response from pymodbus.client import ModbusTcpClient import time import logging from .config_loader import ConfigLoader # 配置 Flask 日志级别 log = logging.getLogger('werkzeug') log.setLevel(logging.WARNING) # Modbus TCP 配置 class ModbusUtils: def __init__(self, host=None, port=None) -> None: # 初始化 modbus 配置 config = ConfigLoader.get_instance() self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host") self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port")) def get_client(self): # 创建Modbus TCP客户端实例,指定服务器的IP地址和端口号 # client = ModbusTcpClient('localhost', port=5020) client = ModbusTcpClient(self.MODBUS_HOST, port=self.MODBUS_PORT, timeout=10) # 增加超时时间 logging.info(f"Attempting to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}") try: is_connected = client.connect() #确保客户端已连接 if is_connected: logging.info(f"Successfully connected to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}") else: logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}. client.connect() returned False.") except Exception as e: logging.error(f"Exception during connection to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}: {e}") # Optionally, re-raise or handle as appropriate return None # Or raise an exception return client def close_client(self, client): # 关闭客户端连接 if client: client.close() # 新增十进制转成二进制 @staticmethod def decimal_to_binary(decimal): """十进制转16位二进制,右对齐""" return format(decimal, '016b') # 新增二进制转成十进制 @staticmethod def binary_to_decimal(binary): """二进制字符串转十进制""" return int(binary, 2) # 新增十进制转16进制 @staticmethod def decimal_to_hex(decimal): return hex(decimal)[2:] @staticmethod def split_data(data): ''' 解析位置编码获取寄存器地址 :param data: 位置编码,例如 "01-003-02" :return: 寄存器地址 规则说明: - column: 库区编号,01表示A库(D100-D108),02表示B库(D110-D118) - level: 层号(1-9),对应寄存器最后一位,如第2层对应D101 - row: 排号(1-16),对应寄存器的位索引 ''' data_list = data.split('-') column = data_list[0] row = data_list[1] level = data_list[2] return column, row, level def get_hex_str(self, data): """ 解析位置编码获取寄存器地址 :param data: 位置编码,例如 "01-003-02" :return: 寄存器地址 规则说明: - column: 库区编号,01表示A库(D100-D108),02表示B库(D110-D118) - level: 层号(1-9),对应寄存器最后一位,如第2层对应D101 - row: 排号(1-16),对应寄存器的位索引 示例: 01-003-02 => D101.2 (A库第2层,第3排) 02-005-06 => D115.4 (B库第6层,第5排) """ column, row, level = self.split_data(data) # 计算基础地址:A库从100开始,B库从110开始 base_address = 100 + (int(column) - 1) * 10 # 加上层号得到最终地址 register_address = base_address + (int(level) - 1) return register_address @staticmethod def check_register_value(client, address, expected_value, timeout_seconds=5): """ 校验指定寄存器地址中的数据是否达到期望值 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param expected_value: 期望值 :param timeout_seconds: 超时时间(秒) :return: True表示达到期望值,False表示超时 """ try: start_time = time.time() while True: result = client.read_holding_registers(address=address, count=1) if not result.isError() and result.registers[0] == expected_value: logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}") return True if time.time() - start_time > timeout_seconds: logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}") return False time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取 except Exception as e: logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}") return False def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5): """ 循环写入寄存器直到成功或超时 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param value: 要写入的值 :param expected_value: 期望值(如果为None,则使用value作为期望值) :param timeout_seconds: 超时时间(秒) :return: True表示写入成功,False表示超时 """ if expected_value is None: expected_value = value start_time = time.time() while True: # 写入值 logging.info(f"写入寄存器D{address}值: {value}") client.write_registers(address=address, values=[value]) logging.info(f"写入寄存器D{address}值: {value}成功") # 检查是否达到期望值 if self.check_register_value(client, address, expected_value): return True if time.time() - start_time > timeout_seconds: logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败") return False time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入 @staticmethod def handle_error(error_msg, task_number='', is_emergency_stop=False): """通用错误处理函数 :param error_msg: 错误信息 :param task_number: 任务编号,默认为空 :param is_emergency_stop: 是否是急停引起的错误 """ logging.error(error_msg) pass # return response, 500 # 添加便捷方法,用于读取保持寄存器 def read_holding_register(self, client, address, count=1): """ 读取保持寄存器 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param count: 读取的寄存器数量 :return: 读取结果,失败返回None """ try: result = client.read_holding_registers(address=address, count=count) if result.isError(): logging.error(f"读取寄存器D{address}失败: {result}") return None return result.registers except Exception as e: logging.error(f"读取寄存器D{address}时发生错误: {str(e)}") return None # 添加便捷方法,用于写入保持寄存器 def write_register(self, client, address, value): """ 写入保持寄存器 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param value: 要写入的值 :return: 是否写入成功 """ try: result = client.write_registers(address=address, values=[value]) if result.isError(): logging.error(f"写入寄存器D{address}值{value}失败: {result}") return False logging.info(f"写入寄存器D{address}值{value}成功") return True except Exception as e: logging.error(f"写入寄存器D{address}值{value}时发生错误: {str(e)}") return False