from flask import Flask, jsonify, request, make_response from pymodbus.client import ModbusTcpClient import time import logging from .config_loader import ConfigLoader import threading # 配置 Flask 日志级别 log = logging.getLogger('werkzeug') log.setLevel(logging.WARNING) # 添加连接池管理类 class ModbusConnectionPool: _instance = None _lock = threading.Lock() @classmethod def get_instance(cls): """获取单例实例""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = ModbusConnectionPool() return cls._instance def __init__(self): """初始化连接池""" self._connections = {} # 保存每个目标地址的连接 self._locks = {} # 每个连接的锁,避免并发问题 self._in_use = {} # 标记连接是否在使用中 self._last_used = {} # 记录连接最后使用时间 # 启动清理线程 self._cleanup_thread = threading.Thread(target=self._cleanup_idle_connections, daemon=True) self._cleanup_thread.start() def get_connection(self, host, port): """获取指定地址的连接,如果不存在则创建""" key = f"{host}:{port}" # 如果不存在该地址的锁,创建一个 if key not in self._locks: with self._lock: if key not in self._locks: self._locks[key] = threading.Lock() # 获取该地址的锁 with self._locks[key]: # 如果连接不存在或已断开,创建新连接 if key not in self._connections or not self._connections[key].is_socket_open(): try: logging.info(f"Creating new Modbus connection to {host}:{port}") client = ModbusTcpClient(host, port=port, timeout=10) connected = client.connect() if not connected: logging.error(f"Failed to connect to {host}:{port}") return None self._connections[key] = client self._in_use[key] = True self._last_used[key] = time.time() return client except Exception as e: logging.error(f"Error creating Modbus connection to {host}:{port}: {e}") return None else: # 连接存在,标记为使用中并更新时间 self._in_use[key] = True self._last_used[key] = time.time() return self._connections[key] def release_connection(self, host, port): """释放连接,将其标记为不再使用""" key = f"{host}:{port}" with self._locks.get(key, self._lock): if key in self._in_use: self._in_use[key] = False self._last_used[key] = time.time() def _cleanup_idle_connections(self): """清理空闲连接的后台线程""" idle_timeout = 300 # 5分钟空闲超时 while True: time.sleep(60) # 每分钟检查一次 current_time = time.time() keys_to_check = list(self._connections.keys()) for key in keys_to_check: try: with self._locks.get(key, self._lock): # 如果连接空闲超过5分钟,关闭并移除 if (key in self._in_use and not self._in_use[key] and current_time - self._last_used.get(key, 0) > idle_timeout): logging.info(f"Closing idle connection: {key}") try: if self._connections[key].is_socket_open(): self._connections[key].close() except: pass # 忽略关闭连接时的错误 # 从所有字典中移除 self._connections.pop(key, None) self._in_use.pop(key, None) self._last_used.pop(key, None) except Exception as e: logging.error(f"Error during connection cleanup for {key}: {e}") # Modbus TCP 配置 class ModbusUtils: def __init__(self, host=None, port=None) -> None: # 初始化 modbus 配置 config = ConfigLoader.get_instance() self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host") self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port")) self.connection_pool = ModbusConnectionPool.get_instance() self.retry_max = 3 # 最大重试次数 self.retry_delay = 0.5 # 初始重试延迟(秒) def get_client(self): """获取Modbus客户端连接(使用连接池)""" # 使用指数退避重试获取连接 retry_count = 0 delay = self.retry_delay while retry_count < self.retry_max: client = self.connection_pool.get_connection(self.MODBUS_HOST, self.MODBUS_PORT) if client and client.is_socket_open(): return client # 连接失败,等待后重试 logging.warning(f"Connection attempt {retry_count+1} failed, retrying in {delay:.1f}s...") time.sleep(delay) delay *= 2 # 指数退避 retry_count += 1 logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT} after {self.retry_max} attempts") return None def close_client(self, client): """释放客户端连接回连接池,不再实际关闭连接""" if client: self.connection_pool.release_connection(self.MODBUS_HOST, self.MODBUS_PORT) # 新增十进制转成二进制 @staticmethod def decimal_to_binary(decimal): """十进制转16位二进制,右对齐""" return format(decimal, '016b') # 新增二进制转成十进制 @staticmethod def binary_to_decimal(binary): """二进制字符串转十进制""" return int(binary, 2) # 新增十进制转16进制 @staticmethod def decimal_to_hex(decimal): return hex(decimal)[2:] @staticmethod def split_data(data): ''' 解析位置编码获取寄存器地址 :param data: 位置编码,例如 "01-003-02" :return: 寄存器地址 规则说明: - column: 库区编号,01表示A库(D100-D108),02表示B库(D110-D118) - level: 层号(1-9),对应寄存器最后一位,如第2层对应D101 - row: 排号(1-16),对应寄存器的位索引 ''' data_list = data.split('-') column = data_list[0] row = data_list[1] level = data_list[2] return column, row, level def get_hex_str(self, data): """ 解析位置编码获取寄存器地址 :param data: 位置编码,例如 "01-003-02" :return: 寄存器地址 规则说明: - column: 库区编号,01表示A库(D100-D108),02表示B库(D110-D118) - level: 层号(1-9),对应寄存器最后一位,如第2层对应D101 - row: 排号(1-16),对应寄存器的位索引 示例: 01-003-02 => D101.2 (A库第2层,第3排) 02-005-06 => D115.4 (B库第6层,第5排) """ column, row, level = self.split_data(data) # 计算基础地址:A库从100开始,B库从110开始 base_address = 100 + (int(column) - 1) * 10 # 加上层号得到最终地址 register_address = base_address + (int(level) - 1) return register_address @staticmethod def check_register_value(client, address, expected_value, timeout_seconds=5): """ 校验指定寄存器地址中的数据是否达到期望值 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param expected_value: 期望值 :param timeout_seconds: 超时时间(秒) :return: True表示达到期望值,False表示超时 """ try: start_time = time.time() while True: result = client.read_holding_registers(address=address, count=1) if not result.isError() and result.registers[0] == expected_value: logging.info(f"寄存器D{address}值: {result.registers[0]} 达到期望值{expected_value}") return True if time.time() - start_time > timeout_seconds: logging.warning(f"寄存器D{address} {timeout_seconds}秒内未达到期望值{expected_value}") return False time.sleep(0.2) # 添加短暂延时,避免过于频繁的读取 except Exception as e: logging.error(f"校验寄存器D{address}值时发生错误: {str(e)}") return False def write_register_until_success(self, client, address, value, expected_value=None, timeout_seconds=5): """ 循环写入寄存器直到成功或超时 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param value: 要写入的值 :param expected_value: 期望值(如果为None,则使用value作为期望值) :param timeout_seconds: 超时时间(秒) :return: True表示写入成功,False表示超时 """ if expected_value is None: expected_value = value start_time = time.time() while True: # 写入值 logging.info(f"写入寄存器D{address}值: {value}") client.write_registers(address=address, values=[value]) logging.info(f"写入寄存器D{address}值: {value}成功") # 检查是否达到期望值 if self.check_register_value(client, address, expected_value): return True if time.time() - start_time > timeout_seconds: logging.warning(f"寄存器D{address} {timeout_seconds}秒内写入失败") return False time.sleep(0.2) # 添加短暂延时,避免过于频繁的写入 @staticmethod def handle_error(error_msg, task_number='', is_emergency_stop=False): """通用错误处理函数 :param error_msg: 错误信息 :param task_number: 任务编号,默认为空 :param is_emergency_stop: 是否是急停引起的错误 """ logging.error(error_msg) pass # return response, 500 # 添加便捷方法,用于读取保持寄存器 def read_holding_register(self, client, address, count=1): """ 读取保持寄存器 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param count: 读取的寄存器数量 :return: 读取结果,失败返回None """ try: result = client.read_holding_registers(address=address, count=count) if result.isError(): logging.error(f"读取寄存器D{address}失败: {result}") return None return result.registers except Exception as e: logging.error(f"读取寄存器D{address}时发生错误: {str(e)}") return None # 添加便捷方法,用于写入保持寄存器 def write_register(self, client, address, value): """ 写入保持寄存器 :param client: ModbusTcp客户端 :param address: 寄存器地址 :param value: 要写入的值 :return: 是否写入成功 """ try: result = client.write_registers(address=address, values=[value]) if result.isError(): logging.error(f"写入寄存器D{address}值{value}失败: {result}") return False logging.info(f"写入寄存器D{address}值{value}成功") return True except Exception as e: logging.error(f"写入寄存器D{address}值{value}时发生错误: {str(e)}") return False