diff --git a/dao/inspection_dao.py b/dao/inspection_dao.py index edc8212..d25d18c 100644 --- a/dao/inspection_dao.py +++ b/dao/inspection_dao.py @@ -232,118 +232,122 @@ class InspectionDAO: if not data: return False - # 先检查是否存在记录 - check_sql = "SELECT ddmo FROM wsbz_order_info WHERE ddmo = ?" + # 使用单一连接实例处理整个操作 with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.cursor.execute(check_sql, (data.get("mo", ""),)) - existing_record = db.cursor.fetchone() + db.begin_transaction() # 开始事务 + + # 先检查是否存在记录 + check_cursor = db.get_new_cursor() + check_sql = "SELECT ddmo FROM wsbz_order_info WHERE ddmo = ?" + check_cursor.execute(check_sql, (data.get("mo", ""),)) + existing_record = check_cursor.fetchone() + check_cursor.close() # 使用后关闭新游标 - if existing_record: - # 如果记录存在,执行更新 - sql = """ - UPDATE wsbz_order_info SET - data_corp = ?, user_id = ?, user_name = ?, gzl_zl = ?, - xpack = ?, qd = ?, spack_type = ?, mxzs = ?, jt = ?, - ddnote = ?, code = ?, type = ?, lable = ?, lib = ?, - gzl = ?, maxsl = ?, cz = ?, size = ?, cd = ?, luno = ?, - qfqd = ?, pono = ?, xj = ?, ysl = ?, dycz = ?, - zx_code = ?, edit_id = ?, remarks = ?, zx_name = ? - bccd = ? ,tccd = ? - WHERE ddmo = ? - """ - params = ( - data.get("data_corp", "JT"), - data.get("user_id", ""), - data.get("user_name", ""), - data.get("zx_zl", ""), - data.get("xpack", ""), - data.get("qd", ""), - data.get("spack_type", ""), - data.get("mxzs", ""), - data.get("jt", ""), - data.get("note", ""), - data.get("code", ""), - data.get("type", ""), - data.get("template_name", ""), - data.get("lib", ""), - data.get("zx_code", ""), - data.get("maxsl", ""), - data.get("cz", ""), - data.get("size", ""), - data.get("cd", ""), - data.get("luno", ""), - data.get("qfqd", ""), - data.get("khno", ""), - data.get("size", ""), - data.get("ysl", ""), - data.get("dycz", ""), - data.get("zx_code", ""), - data.get("edit_id", ""), - data.get("remarks", ""), - data.get("zx_name", ""), - data.get("bccd", ""), - data.get("tccd", ""), - data.get("mo", "") # WHERE 条件参数 - ) - logging.info(f"更新订单信息: ddmo={data.get('mo', '')}") - else: - # 如果记录不存在,执行插入 - sql = """ - INSERT INTO wsbz_order_info ( - data_corp, user_id, user_name, gzl_zl, ddmo, xpack, - qd, spack_type, mxzs, jt, ddnote, code, type, - lable, lib, gzl, maxsl, cz, size, cd, luno, qfqd, - pono, xj, ysl, dycz, zx_code, edit_id, remarks,zx_name,bccd,tccd - ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + if existing_record: + # 如果记录存在,执行更新 + sql = """ + UPDATE wsbz_order_info SET + data_corp = ?, user_id = ?, user_name = ?, gzl_zl = ?, + xpack = ?, qd = ?, spack_type = ?, mxzs = ?, jt = ?, + ddnote = ?, code = ?, type = ?, lable = ?, lib = ?, + gzl = ?, maxsl = ?, cz = ?, size = ?, cd = ?, luno = ?, + qfqd = ?, pono = ?, xj = ?, ysl = ?, dycz = ?, + zx_code = ?, edit_id = ?, remarks = ?, zx_name = ? + bccd = ? ,tccd = ? + WHERE ddmo = ? + """ + params = ( + data.get("data_corp", "JT"), + data.get("user_id", ""), + data.get("user_name", ""), + data.get("zx_zl", ""), + data.get("xpack", ""), + data.get("qd", ""), + data.get("spack_type", ""), + data.get("mxzs", ""), + data.get("jt", ""), + data.get("note", ""), + data.get("code", ""), + data.get("type", ""), + data.get("template_name", ""), + data.get("lib", ""), + data.get("zx_code", ""), + data.get("maxsl", ""), + data.get("cz", ""), + data.get("size", ""), + data.get("cd", ""), + data.get("luno", ""), + data.get("qfqd", ""), + data.get("khno", ""), + data.get("size", ""), + data.get("ysl", ""), + data.get("dycz", ""), + data.get("zx_code", ""), + data.get("edit_id", ""), + data.get("remarks", ""), + data.get("zx_name", ""), + data.get("bccd", ""), + data.get("tccd", ""), + data.get("mo", "") # WHERE 条件参数 ) - """ - params = ( - data.get("data_corp", "JT"), - data.get("user_id", ""), - data.get("user_name", ""), - data.get("zx_zl", ""), - data.get("mo", ""), - data.get("xpack", ""), - data.get("qd", ""), - data.get("spack_type", ""), - data.get("mxzs", ""), - data.get("jt", ""), - data.get("note", ""), - data.get("code", ""), - data.get("type", ""), - data.get("template_name", ""), - data.get("lib", ""), - data.get("zx_code", ""), - data.get("maxsl", ""), - data.get("cz", ""), - data.get("size", ""), - data.get("cd", ""), - data.get("luno", ""), - data.get("qfqd", ""), - data.get("khno", ""), - data.get("size", ""), - data.get("ysl", ""), - data.get("dycz", ""), - data.get("zx_code", ""), - data.get("edit_id", ""), - data.get("remarks", ""), - data.get("zx_name", ""), - data.get("bccd", ""), - data.get("tccd", "") - ) - logging.info(f"插入新订单信息: ddmo={data.get('mo', '')}") + logging.info(f"更新订单信息: ddmo={data.get('mo', '')}") + else: + # 如果记录不存在,执行插入 + sql = """ + INSERT INTO wsbz_order_info ( + data_corp, user_id, user_name, gzl_zl, ddmo, xpack, + qd, spack_type, mxzs, jt, ddnote, code, type, + lable, lib, gzl, maxsl, cz, size, cd, luno, qfqd, + pono, xj, ysl, dycz, zx_code, edit_id, remarks,zx_name,bccd,tccd + ) VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + """ + params = ( + data.get("data_corp", "JT"), + data.get("user_id", ""), + data.get("user_name", ""), + data.get("zx_zl", ""), + data.get("mo", ""), + data.get("xpack", ""), + data.get("qd", ""), + data.get("spack_type", ""), + data.get("mxzs", ""), + data.get("jt", ""), + data.get("note", ""), + data.get("code", ""), + data.get("type", ""), + data.get("template_name", ""), + data.get("lib", ""), + data.get("zx_code", ""), + data.get("maxsl", ""), + data.get("cz", ""), + data.get("size", ""), + data.get("cd", ""), + data.get("luno", ""), + data.get("qfqd", ""), + data.get("khno", ""), + data.get("size", ""), + data.get("ysl", ""), + data.get("dycz", ""), + data.get("zx_code", ""), + data.get("edit_id", ""), + data.get("remarks", ""), + data.get("zx_name", ""), + data.get("bccd", ""), + data.get("tccd", "") + ) + logging.info(f"插入新订单信息: ddmo={data.get('mo', '')}") - with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.cursor.execute(sql, params) - db.conn.commit() + # 执行SQL + db.execute_update(sql, params) + db.commit_transaction() + return True except Exception as e: logging.error(f"保存订单信息失败: {str(e)}") - with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.conn.rollback() return False def save_inspection_data(self, order_id, gc_note, data, username='system'): """保存检验数据 @@ -362,6 +366,8 @@ class InspectionDAO: # 使用上下文管理器自动处理连接和游标 with SQLUtils('sqlite', database='db/jtDB.db') as db: + db.begin_transaction() # 开始事务 + for item in data: position = item.get('position') config_id = item.get('config_id') @@ -370,15 +376,17 @@ class InspectionDAO: remark = item.get('remark', '') tray_id = item.get('tray_id', '') - # 检查是否已存在记录 + # 获取新游标执行查询,避免递归使用 + check_cursor = db.get_new_cursor() check_sql = """ SELECT id FROM wsbz_inspection_data WHERE order_id = ? AND gc_note = ? AND position = ? AND tray_id = ? """ check_params = (order_id, gc_note, position, tray_id) - db.execute_query(check_sql, check_params) - existing_record = db.fetchone() + check_cursor.execute(check_sql, check_params) + existing_record = check_cursor.fetchone() + check_cursor.close() # 使用后关闭新游标 if existing_record: # 更新现有记录 @@ -407,6 +415,8 @@ class InspectionDAO: current_time, username, current_time, username, tray_id ) db.execute_update(insert_sql, insert_params) + + db.commit_transaction() # 提交事务 return True except Exception as e: @@ -571,14 +581,16 @@ class InspectionDAO: VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?) """ params = (order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, datetime.now(), 'system', datetime.now(), 'system', False,gc_note) + with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.cursor.execute(sql, params) - db.conn.commit() + db.begin_transaction() + db.execute_update(sql, params) + db.commit_transaction() + + return True except Exception as e: logging.error(f"保存包装记录失败: {str(e)}") - with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.conn.rollback() - + return False def delete_inspection_data(self, order_id, gc_note, tray_id): """删除检验数据 @@ -594,13 +606,16 @@ class InspectionDAO: WHERE order_id = ? AND gc_note = ? AND tray_id = ? """ params = (order_id, gc_note, tray_id) + with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.cursor.execute(sql, params) - db.conn.commit() + db.begin_transaction() + db.execute_update(sql, params) + db.commit_transaction() + + return True except Exception as e: logging.error(f"删除检验数据失败: {str(e)}") - with SQLUtils('sqlite', database='db/jtDB.db') as db: - db.conn.rollback() + return False def get_axios_num_by_order_id(self, order_id): """获取托盘号对应的轴号""" try: diff --git a/db/jtDB.db b/db/jtDB.db index a5122f1..16e38a2 100644 Binary files a/db/jtDB.db and b/db/jtDB.db differ diff --git a/main.py b/main.py index 8aaaae3..706eb55 100644 --- a/main.py +++ b/main.py @@ -202,8 +202,12 @@ def main(): exit_code = app.exec() logging.info(f"应用程序退出,退出码: {exit_code}") - # 关闭所有数据库连接 + # 记录数据库连接池状态 from utils.sql_utils import SQLUtils + logging.info("记录数据库连接池状态:") + SQLUtils.log_connection_pool_status() + + # 关闭所有数据库连接 SQLUtils.close_all_connections() # 停止电力监控器 diff --git a/ui/login_ui.py b/ui/login_ui.py index 93ab639..4623d2a 100644 --- a/ui/login_ui.py +++ b/ui/login_ui.py @@ -18,7 +18,7 @@ class LoginUI(QWidget): font_version = QFont("微软雅黑", 8) # 标题 - self.label_title = QLabel("腾龙集团MES管理系统") + self.label_title = QLabel("MES管理系统") self.label_title.setFont(font_title) self.label_title.setStyleSheet("color: #1a237e;") self.label_title.setAlignment(Qt.AlignCenter) diff --git a/utils/modbus_monitor.py b/utils/modbus_monitor.py index 96d2489..ea6b549 100644 --- a/utils/modbus_monitor.py +++ b/utils/modbus_monitor.py @@ -48,14 +48,14 @@ class ModbusMonitor(QObject): register_error = Signal(int, str) monitor_status_changed = Signal(bool, str) - def __init__(self, polling_interval=1.0, max_errors=3, retry_interval=5.0): + def __init__(self, polling_interval=2.0, max_errors=3, retry_interval=5.0): """ 初始化Modbus监控器 Args: - polling_interval: 轮询间隔,单位秒 + polling_interval: 轮询间隔(秒),默认2.0秒,比原来的1.0秒增加了一倍 max_errors: 最大错误次数,超过此次数将暂停特定寄存器的监控 - retry_interval: 重试间隔,单位秒 + retry_interval: 重试间隔(秒) """ super().__init__() self.polling_interval = polling_interval @@ -175,50 +175,132 @@ class ModbusMonitor(QObject): def _read_registers(self): """读取所有注册的寄存器""" - for address, reg_value in self.registers.items(): + # 批量处理寄存器,减少连接次数 + grouped_registers = self._group_registers() + + for group, addresses in grouped_registers.items(): if self.stop_event.is_set(): break - # 如果错误次数超过阈值且未到重试时间,则跳过此寄存器 - if reg_value.error_count >= self.max_errors: - current_time = time.time() - last_read = reg_value.last_read_time or 0 - if current_time - last_read < self.retry_interval: - continue - + # 检查组内是否有需要读取的寄存器 + has_valid_registers = False + for address in addresses: + reg_value = self.registers[address] + # 如果错误次数超过阈值且未到重试时间,则跳过 + if reg_value.error_count >= self.max_errors: + current_time = time.time() + last_read = reg_value.last_read_time or 0 + if current_time - last_read < self.retry_interval: + continue + has_valid_registers = True + break + + if not has_valid_registers: + continue + try: - # 读取寄存器值 - result = self.modbus.read_holding_register(self.client, address) - if result is None or len(result) == 0: - error_count = reg_value.record_error() - error_msg = f"读取寄存器D{address}失败,这是第{error_count}次失败" - logging.warning(error_msg) - self.register_error.emit(address, error_msg) - - # 如果连续失败次数达到阈值,尝试重连 - if error_count >= self.max_errors: - logging.error(f"寄存器D{address}连续{error_count}次读取失败,将在{self.retry_interval}秒后重试") - # 下次将在retry_interval后尝试读取此寄存器 - continue + # 为每组创建一个重试计数 + retry_count = 0 + max_retries = 2 + delay = 0.5 - # 成功读取,重置错误计数 - reg_value.reset_error() - - # 更新值并检查是否发生变化 - if reg_value.update(result[0]): - logging.info(f"寄存器D{address}值变化: {reg_value.last_value} -> {reg_value.value}") - # 发出信号 - self.register_changed.emit(address, reg_value.value) - # 调用注册的处理器 - self._notify_handlers(address, reg_value.value) + while retry_count <= max_retries: + try: + # 批量读取该组寄存器 + for address in addresses: + reg_value = self.registers[address] + + # 如果错误次数超过阈值且未到重试时间,则跳过 + if reg_value.error_count >= self.max_errors: + current_time = time.time() + last_read = reg_value.last_read_time or 0 + if current_time - last_read < self.retry_interval: + continue + + try: + # 读取寄存器值 + result = self.modbus.read_holding_register(self.client, address) + if result is None or len(result) == 0: + error_count = reg_value.record_error() + error_msg = f"读取寄存器D{address}失败,这是第{error_count}次失败" + logging.warning(error_msg) + self.register_error.emit(address, error_msg) + + # 如果连续失败次数达到阈值,尝试重连 + if error_count >= self.max_errors: + logging.error(f"寄存器D{address}连续{error_count}次读取失败,将在{self.retry_interval}秒后重试") + continue + + # 成功读取,重置错误计数 + reg_value.reset_error() + + # 更新值并检查是否发生变化 + if reg_value.update(result[0]): + logging.info(f"寄存器D{address}值变化: {reg_value.last_value} -> {reg_value.value}") + # 发出信号 + self.register_changed.emit(address, reg_value.value) + # 调用注册的处理器 + self._notify_handlers(address, reg_value.value) + except Exception as e: + error_count = reg_value.record_error() + error_msg = f"读取寄存器D{address}时发生异常: {str(e)}" + logging.error(error_msg) + self.register_error.emit(address, error_msg) + + if error_count >= self.max_errors: + logging.error(f"寄存器D{address}连续{error_count}次读取异常,将在{self.retry_interval}秒后重试") + + # 如果是连接错误,尝试重新连接并退出循环 + if "Connection" in str(e): + retry_count += 1 + if retry_count <= max_retries: + # 释放连接,重新获取 + if self.client: + self.modbus.close_client(self.client) + self.client = None + + delay_time = delay * (2 ** (retry_count - 1)) + logging.warning(f"连接错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries})...") + time.sleep(delay_time) + # 重新连接 + self._reconnect() + break + + # 如果已成功读取所有寄存器,跳出重试循环 + break + except Exception as e: + retry_count += 1 + if retry_count <= max_retries: + delay_time = delay * (2 ** (retry_count - 1)) + logging.warning(f"批量读取寄存器时发生错误,等待 {delay_time:.1f} 秒后重试 ({retry_count}/{max_retries}): {str(e)}") + time.sleep(delay_time) + # 尝试重新连接 + if self.client: + self.modbus.close_client(self.client) + self.client = None + self._reconnect() + else: + logging.error(f"批量读取寄存器失败,已达到最大重试次数: {str(e)}") + break except Exception as e: - error_count = reg_value.record_error() - error_msg = f"读取寄存器D{address}时发生异常: {str(e)}" - logging.error(error_msg) - self.register_error.emit(address, error_msg) - - if error_count >= self.max_errors: - logging.error(f"寄存器D{address}连续{error_count}次读取异常,将在{self.retry_interval}秒后重试") + logging.error(f"处理寄存器组时发生异常: {str(e)}", exc_info=True) + + def _group_registers(self): + """将寄存器分组,以便批量处理 + + Returns: + dict: 分组后的寄存器,格式为 {group_id: [address1, address2, ...]} + """ + # 简单分组:每5个寄存器为一组 + group_size = 5 + grouped = {} + + addresses = list(self.registers.keys()) + for i in range(0, len(addresses), group_size): + group_id = i // group_size + grouped[group_id] = addresses[i:i+group_size] + + return grouped def _notify_handlers(self, address, value): """通知所有注册的处理器""" diff --git a/utils/modbus_utils.py b/utils/modbus_utils.py index 1c35adc..a9e3b6e 100644 --- a/utils/modbus_utils.py +++ b/utils/modbus_utils.py @@ -3,13 +3,109 @@ from pymodbus.client import ModbusTcpClient import time import logging from .config_loader import ConfigLoader +import threading # 配置 Flask 日志级别 log = logging.getLogger('werkzeug') log.setLevel(logging.WARNING) -# Modbus TCP 配置 +# 添加连接池管理类 +class ModbusConnectionPool: + _instance = None + _lock = threading.Lock() + + @classmethod + def get_instance(cls): + """获取单例实例""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = ModbusConnectionPool() + return cls._instance + + def __init__(self): + """初始化连接池""" + self._connections = {} # 保存每个目标地址的连接 + self._locks = {} # 每个连接的锁,避免并发问题 + self._in_use = {} # 标记连接是否在使用中 + self._last_used = {} # 记录连接最后使用时间 + + # 启动清理线程 + self._cleanup_thread = threading.Thread(target=self._cleanup_idle_connections, daemon=True) + self._cleanup_thread.start() + + def get_connection(self, host, port): + """获取指定地址的连接,如果不存在则创建""" + key = f"{host}:{port}" + + # 如果不存在该地址的锁,创建一个 + if key not in self._locks: + with self._lock: + if key not in self._locks: + self._locks[key] = threading.Lock() + + # 获取该地址的锁 + with self._locks[key]: + # 如果连接不存在或已断开,创建新连接 + if key not in self._connections or not self._connections[key].is_socket_open(): + try: + logging.info(f"Creating new Modbus connection to {host}:{port}") + client = ModbusTcpClient(host, port=port, timeout=10) + connected = client.connect() + if not connected: + logging.error(f"Failed to connect to {host}:{port}") + return None + self._connections[key] = client + self._in_use[key] = True + self._last_used[key] = time.time() + return client + except Exception as e: + logging.error(f"Error creating Modbus connection to {host}:{port}: {e}") + return None + else: + # 连接存在,标记为使用中并更新时间 + self._in_use[key] = True + self._last_used[key] = time.time() + return self._connections[key] + + def release_connection(self, host, port): + """释放连接,将其标记为不再使用""" + key = f"{host}:{port}" + with self._locks.get(key, self._lock): + if key in self._in_use: + self._in_use[key] = False + self._last_used[key] = time.time() + + def _cleanup_idle_connections(self): + """清理空闲连接的后台线程""" + idle_timeout = 300 # 5分钟空闲超时 + while True: + time.sleep(60) # 每分钟检查一次 + current_time = time.time() + keys_to_check = list(self._connections.keys()) + + for key in keys_to_check: + try: + with self._locks.get(key, self._lock): + # 如果连接空闲超过5分钟,关闭并移除 + if (key in self._in_use and not self._in_use[key] and + current_time - self._last_used.get(key, 0) > idle_timeout): + logging.info(f"Closing idle connection: {key}") + try: + if self._connections[key].is_socket_open(): + self._connections[key].close() + except: + pass # 忽略关闭连接时的错误 + + # 从所有字典中移除 + self._connections.pop(key, None) + self._in_use.pop(key, None) + self._last_used.pop(key, None) + except Exception as e: + logging.error(f"Error during connection cleanup for {key}: {e}") + +# Modbus TCP 配置 class ModbusUtils: def __init__(self, host=None, port=None) -> None: @@ -17,28 +113,34 @@ class ModbusUtils: config = ConfigLoader.get_instance() self.MODBUS_HOST = host if host is not None else config.get_value("modbus.host") self.MODBUS_PORT = port if port is not None else int(config.get_value("modbus.port")) + self.connection_pool = ModbusConnectionPool.get_instance() + self.retry_max = 3 # 最大重试次数 + self.retry_delay = 0.5 # 初始重试延迟(秒) def get_client(self): - # 创建Modbus TCP客户端实例,指定服务器的IP地址和端口号 - # client = ModbusTcpClient('localhost', port=5020) - client = ModbusTcpClient(self.MODBUS_HOST, port=self.MODBUS_PORT, timeout=10) # 增加超时时间 - logging.info(f"Attempting to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}") - try: - is_connected = client.connect() #确保客户端已连接 - if is_connected: - logging.info(f"Successfully connected to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}") - else: - logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}. client.connect() returned False.") - except Exception as e: - logging.error(f"Exception during connection to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT}: {e}") - # Optionally, re-raise or handle as appropriate - return None # Or raise an exception - return client + """获取Modbus客户端连接(使用连接池)""" + # 使用指数退避重试获取连接 + retry_count = 0 + delay = self.retry_delay + + while retry_count < self.retry_max: + client = self.connection_pool.get_connection(self.MODBUS_HOST, self.MODBUS_PORT) + if client and client.is_socket_open(): + return client + + # 连接失败,等待后重试 + logging.warning(f"Connection attempt {retry_count+1} failed, retrying in {delay:.1f}s...") + time.sleep(delay) + delay *= 2 # 指数退避 + retry_count += 1 + + logging.error(f"Failed to connect to Modbus server {self.MODBUS_HOST}:{self.MODBUS_PORT} after {self.retry_max} attempts") + return None def close_client(self, client): - # 关闭客户端连接 + """释放客户端连接回连接池,不再实际关闭连接""" if client: - client.close() + self.connection_pool.release_connection(self.MODBUS_HOST, self.MODBUS_PORT) # 新增十进制转成二进制 @staticmethod diff --git a/utils/register_handlers.py b/utils/register_handlers.py index d101241..a8443ce 100644 --- a/utils/register_handlers.py +++ b/utils/register_handlers.py @@ -290,4 +290,15 @@ class EmergencyStopHandler: if self.callback: self.callback(value, desc) - return True, desc \ No newline at end of file + return True, desc + + def handle_change(self, value): + """实现 handle_change 方法以兼容 ModbusMonitor._notify_handlers + + Args: + value: 寄存器值 + """ + changed, desc = self.handle(value) + if changed: + logging.info(f"急停状态变化: {desc} (值={value})") + return changed, desc \ No newline at end of file diff --git a/utils/sql_utils.py b/utils/sql_utils.py index 6115f5a..4243db6 100644 --- a/utils/sql_utils.py +++ b/utils/sql_utils.py @@ -1,6 +1,7 @@ import sys import logging import threading +import time from utils.config_loader import ConfigLoader try: @@ -20,10 +21,73 @@ except ImportError: class SQLUtils: - # 存储连接池,避免重复创建连接 + # 存储连接池,使用线程ID作为键的一部分 _connection_pool = {} - # 添加线程锁,用于防止多线程同时使用同一个cursor - _lock = threading.RLock() + # 连接引用计数 + _connection_refs = {} + # 最后使用时间记录 + _last_used = {} + # 轻量级锁,仅用于连接池访问 + _pool_lock = threading.RLock() + # 空闲连接超时时间(秒) + _idle_timeout = 300 # 5分钟 + # 初始化清理线程标志 + _cleanup_thread_started = False + + @classmethod + def _start_cleanup_thread(cls): + """启动清理空闲连接的后台线程""" + if cls._cleanup_thread_started: + return + + def cleanup_idle_connections(): + """定期清理空闲连接的线程函数""" + logging.info("数据库连接清理线程已启动") + while True: + time.sleep(60) # 每分钟检查一次 + try: + current_time = time.time() + + with cls._pool_lock: + # 复制键列表,避免在迭代过程中修改字典 + conn_keys = list(cls._connection_pool.keys()) + + for conn_key in conn_keys: + # 检查引用计数和最后使用时间 + if (conn_key in cls._connection_refs and + cls._connection_refs[conn_key] <= 0 and + conn_key in cls._last_used and + current_time - cls._last_used[conn_key] > cls._idle_timeout): + + try: + # 获取连接和游标 + conn, cursor = cls._connection_pool[conn_key] + + # 关闭资源 + if cursor: + cursor.close() + if conn: + conn.close() + + # 从所有集合中移除 + cls._connection_pool.pop(conn_key, None) + cls._connection_refs.pop(conn_key, None) + cls._last_used.pop(conn_key, None) + + logging.debug(f"已清理空闲连接: {conn_key}") + except Exception as e: + logging.error(f"清理空闲连接时出错: {e}") + except Exception as e: + logging.error(f"连接清理线程执行异常: {e}") + + # 创建并启动后台线程 + cleanup_thread = threading.Thread( + target=cleanup_idle_connections, + daemon=True, + name="DB-Connection-Cleanup" + ) + cleanup_thread.start() + cls._cleanup_thread_started = True def __init__(self, db_type=None, source_name=None, **kwargs): """初始化SQLUtils对象 @@ -33,6 +97,10 @@ class SQLUtils: source_name: 数据源名称,用于从配置中获取特定的数据源,如'sqlite', 'postgresql', 'mysql' **kwargs: 连接参数,如果没有提供,则使用配置文件中的参数 """ + # 确保清理线程已启动 + if not SQLUtils._cleanup_thread_started: + SQLUtils._start_cleanup_thread() + self.conn = None self.cursor = None @@ -97,34 +165,65 @@ class SQLUtils: return False # 不抑制异常 def _get_connection(self): - """从连接池获取连接,如果没有则创建新连接""" - # 创建连接键,包含数据库类型和连接参数 - conn_key = f"{self.db_type}:{str(self.kwargs)}" + """从连接池获取连接,基于线程ID""" + # 使用线程ID作为连接键的一部分 + thread_id = threading.get_ident() + conn_key = f"{self.db_type}:{str(self.kwargs)}:{thread_id}" - # 检查连接池中是否已有此连接 - with SQLUtils._lock: - if conn_key in SQLUtils._connection_pool: + # 检查连接池中是否已有此线程的连接 + # 只在访问共享资源时使用锁 + with self._pool_lock: + if conn_key in self._connection_pool: try: # 尝试执行简单查询,确认连接有效 - conn, cursor = SQLUtils._connection_pool[conn_key] + conn, cursor = self._connection_pool[conn_key] cursor.execute("SELECT 1") # 连接有效,直接使用 self.conn = conn self.cursor = cursor + # 更新引用计数和最后使用时间 + self._connection_refs[conn_key] = self._connection_refs.get(conn_key, 0) + 1 + self._last_used[conn_key] = time.time() return except Exception: # 连接已失效,从连接池移除 - del SQLUtils._connection_pool[conn_key] + self._cleanup_connection(conn_key) - # 创建新连接 - self.connect() - - # 将新连接添加到连接池 - if self.conn and self.cursor: - SQLUtils._connection_pool[conn_key] = (self.conn, self.cursor) + # 创建新连接 - 这部分不需要锁 + self.connect() + + # 将新连接添加到连接池 - 需要锁 + if self.conn and self.cursor: + with self._pool_lock: + self._connection_pool[conn_key] = (self.conn, self.cursor) + self._connection_refs[conn_key] = 1 + self._last_used[conn_key] = time.time() + + def _cleanup_connection(self, conn_key): + """清理指定的连接""" + try: + if conn_key in self._connection_pool: + conn, cursor = self._connection_pool[conn_key] + if cursor: + try: + cursor.close() + except: + pass + if conn: + try: + conn.close() + except: + pass + # 移除相关引用 + self._connection_pool.pop(conn_key, None) + self._connection_refs.pop(conn_key, None) + self._last_used.pop(conn_key, None) + logging.debug(f"已清理连接: {conn_key}") + except Exception as e: + logging.error(f"清理连接失败: {e}") def connect(self): - """连接到数据库""" + """连接到数据库 - 不需要全局锁""" try: if self.db_type in ['pgsql', 'postgresql']: if not psycopg2: @@ -148,85 +247,114 @@ class SQLUtils: raise def execute_query(self, sql, params=None): + """执行查询 - 不使用全局锁,仅使用单个连接""" if params is None: params = () - with SQLUtils._lock: - try: - self.cursor.execute(sql, params) - return self.cursor - except Exception as e: - logging.error(f"执行查询失败: {e}, SQL: {sql}, 参数: {params}") - raise + + try: + # 直接执行查询,因为每个线程有自己的连接 + self.cursor.execute(sql, params) + # 更新最后使用时间 + thread_id = threading.get_ident() + conn_key = f"{self.db_type}:{str(self.kwargs)}:{thread_id}" + with self._pool_lock: + if conn_key in self._last_used: + self._last_used[conn_key] = time.time() + return self.cursor + except Exception as e: + logging.error(f"执行查询失败: {e}, SQL: {sql}, 参数: {params}") + raise def execute_update(self, sql, params=None): + """执行更新 - 不使用全局锁,仅使用单个连接""" try: - with SQLUtils._lock: - self.cursor.execute(sql, params) - self.conn.commit() - return self.cursor.rowcount + if params is None: + params = () + + # 直接执行更新 + self.cursor.execute(sql, params) + self.conn.commit() + + # 更新最后使用时间 + thread_id = threading.get_ident() + conn_key = f"{self.db_type}:{str(self.kwargs)}:{thread_id}" + with self._pool_lock: + if conn_key in self._last_used: + self._last_used[conn_key] = time.time() + + return self.cursor.rowcount except Exception as e: self.conn.rollback() logging.error(f"执行更新失败: {e}, SQL: {sql}, 参数: {params}") raise e def begin_transaction(self) -> None: - """开始事务""" - with SQLUtils._lock: - if self.db_type in ['sqlite', 'sqlite3']: - self.execute_query('BEGIN TRANSACTION') - else: - self.conn.autocommit = False - + """开始事务 - 不使用全局锁""" + if self.db_type in ['sqlite', 'sqlite3']: + self.execute_query('BEGIN TRANSACTION') + else: + self.conn.autocommit = False + def commit_transaction(self) -> None: - """提交事务""" - with SQLUtils._lock: - self.conn.commit() - if self.db_type not in ['sqlite', 'sqlite3']: - self.conn.autocommit = True - + """提交事务 - 不使用全局锁""" + self.conn.commit() + if self.db_type not in ['sqlite', 'sqlite3']: + self.conn.autocommit = True + def rollback_transaction(self) -> None: - """回滚事务""" - with SQLUtils._lock: - self.conn.rollback() - if self.db_type not in ['sqlite', 'sqlite3']: - self.conn.autocommit = True + """回滚事务 - 不使用全局锁""" + self.conn.rollback() + if self.db_type not in ['sqlite', 'sqlite3']: + self.conn.autocommit = True def fetchone(self): - with SQLUtils._lock: - return self.cursor.fetchone() + """获取一行数据 - 不使用全局锁""" + return self.cursor.fetchone() def fetchall(self): - with SQLUtils._lock: - return self.cursor.fetchall() + """获取所有数据 - 不使用全局锁""" + return self.cursor.fetchall() + + def get_new_cursor(self): + """获取一个新的游标,用于避免游标递归使用问题 + + Returns: + cursor: 数据库游标对象 + """ + if self.conn: + return self.conn.cursor() + return None def close(self): - """关闭当前游标,但保留连接在连接池中""" - # 不再关闭连接,只关闭游标,减少重复创建连接的开销 - # 连接仍然保留在连接池中供后续使用 - pass + """关闭当前游标,减少引用计数,必要时释放连接""" + thread_id = threading.get_ident() + conn_key = f"{self.db_type}:{str(self.kwargs)}:{thread_id}" + + with self._pool_lock: + if conn_key in self._connection_refs: + # 减少引用计数 + self._connection_refs[conn_key] -= 1 + + # 如果引用计数为0,关闭连接并从池中移除 + if self._connection_refs[conn_key] <= 0: + self._cleanup_connection(conn_key) def real_close(self): - """真正关闭连接,从连接池中移除""" - conn_key = f"{self.db_type}:{str(self.kwargs)}" - with SQLUtils._lock: - if conn_key in SQLUtils._connection_pool: - try: - conn, cursor = SQLUtils._connection_pool[conn_key] - if cursor: - cursor.close() - if conn: - conn.close() - del SQLUtils._connection_pool[conn_key] - logging.debug(f"已关闭并移除连接: {conn_key}") - except Exception as e: - logging.error(f"关闭连接失败: {e}") + """强制关闭连接,无论引用计数""" + thread_id = threading.get_ident() + conn_key = f"{self.db_type}:{str(self.kwargs)}:{thread_id}" + + with self._pool_lock: + self._cleanup_connection(conn_key) @staticmethod def close_all_connections(): """关闭所有连接池中的连接""" - with SQLUtils._lock: - for conn, cursor in SQLUtils._connection_pool.values(): + with SQLUtils._pool_lock: + conn_keys = list(SQLUtils._connection_pool.keys()) + for conn_key in conn_keys: try: + conn, cursor = SQLUtils._connection_pool[conn_key] if cursor: cursor.close() if conn: @@ -234,7 +362,10 @@ class SQLUtils: except Exception as e: logging.error(f"关闭数据库连接失败: {e}") + # 清空所有字典 SQLUtils._connection_pool.clear() + SQLUtils._connection_refs.clear() + SQLUtils._last_used.clear() logging.info("已关闭所有数据库连接") @staticmethod @@ -250,4 +381,64 @@ class SQLUtils: @staticmethod def get_mysql_connection(): """获取MySQL连接""" - return SQLUtils(source_name='mysql') \ No newline at end of file + return SQLUtils(source_name='mysql') + + @classmethod + def get_connection_pool_stats(cls): + """获取连接池统计信息 + + Returns: + dict: 包含连接池统计信息的字典 + """ + with cls._pool_lock: + stats = { + 'active_connections': len(cls._connection_pool), + 'connection_details': [], + 'connection_count_by_type': {}, + 'active_threads': {}, + } + + # 统计不同类型连接数量 + for conn_key in cls._connection_pool: + parts = conn_key.split(':') + if len(parts) > 0: + db_type = parts[0] + stats['connection_count_by_type'][db_type] = stats['connection_count_by_type'].get(db_type, 0) + 1 + + # 获取线程ID + if len(parts) > 2: + thread_id = parts[2] + stats['active_threads'][thread_id] = stats['active_threads'].get(thread_id, 0) + 1 + + # 连接详情 + refs = cls._connection_refs.get(conn_key, 0) + last_used = cls._last_used.get(conn_key, 0) + idle_time = time.time() - last_used if last_used else 0 + + stats['connection_details'].append({ + 'key': conn_key, + 'references': refs, + 'idle_time_seconds': int(idle_time), + 'is_idle': refs <= 0 + }) + + return stats + + @classmethod + def log_connection_pool_status(cls): + """记录当前连接池状态到日志""" + stats = cls.get_connection_pool_stats() + logging.info(f"数据库连接池状态: 活动连接数={stats['active_connections']}") + + # 记录每种数据库类型的连接数 + for db_type, count in stats['connection_count_by_type'].items(): + logging.info(f" - {db_type}: {count}个连接") + + # 记录空闲连接 + idle_connections = [d for d in stats['connection_details'] if d['is_idle']] + if idle_connections: + logging.info(f" - 空闲连接: {len(idle_connections)}个") + for conn in idle_connections: + logging.debug(f" * {conn['key']} (空闲{conn['idle_time_seconds']}秒)") + + return stats \ No newline at end of file diff --git a/widgets/main_window.py b/widgets/main_window.py index 7759c93..681fe57 100644 --- a/widgets/main_window.py +++ b/widgets/main_window.py @@ -483,9 +483,12 @@ class MainWindow(MainWindowUI): try: # 上料 D0 给到层数,等待点击开始后,进行上料 success0 = modbus.write_register_until_success(client, 0, self._current_stow_num) + # 读取D0寄存器值 + current_stow_num = modbus.read_holding_register(client, 0) + logging.info(f"上料初始化成功:层数 {current_stow_num} 已写入寄存器0") if success0: # 创建状态标签并显示在右上角 - self.show_operation_status("拆垛层数", "input", str(self._current_stow_num)) + self.show_operation_status("拆垛层数", "input", str(current_stow_num if current_stow_num else self._current_stow_num)) else: QMessageBox.information(self, "操作提示", "上料失败") except Exception as e: @@ -516,11 +519,11 @@ class MainWindow(MainWindowUI): # 始终使用用户最新输入的信息 self._total_unload_num = int(unloading_info.get('tier', '3')) - self._current_unload_num = 1 # 从第一层开始 + self._current_unload_num = int(unloading_info.get('tier', '3')) # 直接使用用户输入的层数,而不是从1开始 self._current_unload_info = unloading_info logging.info(f"下料任务设置:总层数={self._total_unload_num}, 当前层数={self._current_unload_num}") - # 将初始层数(1)写入寄存器 + # 将用户输入的层数写入寄存器 modbus = ModbusUtils() client = modbus.get_client() try: @@ -528,10 +531,14 @@ class MainWindow(MainWindowUI): logging.info(f"下料初始化成功:层数 {self._current_unload_num} 已写入寄存器4") finally: modbus.close_client(client) + + # 读取D4寄存器值 + current_unload_num = modbus.read_holding_register(client, 4) + logging.info(f"下料初始化成功:层数 {current_unload_num} 已写入寄存器4") # 统一更新UI显示 tray_code = self._current_unload_info.get('tray_code', '') - self.show_operation_status("下料层数", "output", f"{self._current_unload_num}/{self._total_unload_num} ") + self.show_operation_status("码垛层数", "output", f"{current_unload_num if current_unload_num else self._current_unload_num}/{current_unload_num if current_unload_num else self._total_unload_num} ") else: logging.info("下料对话框已取消") except Exception as e: @@ -1790,21 +1797,8 @@ class MainWindow(MainWindowUI): logging.info(f"跳过处理:重量 {original_weight_kg}kg 与上次处理的重量 {self._last_processed_weight}kg 接近且已处理") return - # 称重稳定后,给寄存器 D10 为 1 表示已经称重完成 - modbus = ModbusUtils() - client = modbus.get_client() - modbus.write_register_until_success(client, 10, 1) - modbus.close_client(client) - - # 处理稳定重量 - self._process_stable_weight(original_weight_kg) - # 调用打印方法 - self._print_weight_label(original_weight_kg) - - # 设置已处理标记和上次处理的重量 - self._weight_processed = True - self._last_processed_weight = original_weight_kg - logging.info(f"已标记重量 {original_weight_kg}kg 为已处理") + # 尝试写入D10寄存器并处理稳定重量,带有重试机制 + self._handle_stable_weight_with_retry(original_weight_kg, max_retries=3) else: logging.info(f"重量在{self._weight_stable_threshold}秒内发生变化,从 {original_weight_kg}kg 变为 {self._current_weight}kg") except Exception as e: @@ -1814,6 +1808,63 @@ class MainWindow(MainWindowUI): if self._stability_check_timer is not None: self._stability_check_timer.deleteLater() self._stability_check_timer = None + + def _handle_stable_weight_with_retry(self, weight_kg, max_retries=3): + """ + 使用重试机制处理稳定重量 + Args: + weight_kg: 稳定的重量值(千克) + max_retries: 最大重试次数 + """ + retry_count = 0 + success = False + last_error = None + modbus = ModbusUtils() + client = None + + try: + # 获取Modbus客户端(现在使用连接池,不会每次都创建新连接) + client = modbus.get_client() + if not client: + logging.error("无法获取Modbus客户端连接") + return + + # 重试机制写入寄存器 + while retry_count < max_retries and not success: + try: + # 称重稳定后,给寄存器 D10 为 1 表示已经称重完成 + success = modbus.write_register_until_success(client, 10, 1) + if success: + logging.info(f"成功写入D10寄存器值为1,表示称重完成") + break + except Exception as e: + last_error = e + retry_count += 1 + if retry_count < max_retries: + delay = 0.5 * (2 ** retry_count) # 指数退避 + logging.warning(f"写入D10寄存器失败,尝试第{retry_count}次重试,等待{delay:.1f}秒: {str(e)}") + time.sleep(delay) + + if not success: + logging.error(f"写入D10寄存器失败,已尝试{max_retries}次: {str(last_error)}") + return + + # 处理稳定重量 + self._process_stable_weight(weight_kg) + # 调用打印方法 + self._print_weight_label(weight_kg) + + # 设置已处理标记和上次处理的重量 + self._weight_processed = True + self._last_processed_weight = weight_kg + logging.info(f"已标记重量 {weight_kg}kg 为已处理") + + except Exception as e: + logging.error(f"处理稳定重量时发生错误: {str(e)}") + finally: + # 释放客户端连接回连接池 + if client: + modbus.close_client(client) def _process_stable_weight(self, weight_kg): """ @@ -2161,9 +2212,12 @@ class MainWindow(MainWindowUI): # 如果当前下料层数小于总层数,则将层数加1并写入寄存器4 if self._current_unload_num < self._total_unload_num: - # 当前层已完成,准备下一层 + # 保存当前完成的层数用于消息显示 + completed_tier = self._current_unload_num + + # 当前层已完成,层数加1表示开始下一层 self._current_unload_num += 1 - logging.info(f"当前层下料完成,更新层数:当前={self._current_unload_num}, 总数={self._total_unload_num}") + logging.info(f"当前层{completed_tier}下料完成,更新到下一层:当前={self._current_unload_num}, 总数={self._total_unload_num}") # 将新的层数写入寄存器4 modbus.write_register_until_success(client, 4, self._current_unload_num) @@ -2171,7 +2225,7 @@ class MainWindow(MainWindowUI): # 不直接更新UI,而是通过信号将数据传递给主线程处理 # 通过信号触发UI更新 - 显示前一层完成的消息 - message = f"第{self._current_unload_num-1}层下料完成,请启动第{self._current_unload_num}层下料" + message = f"第{completed_tier}层下料完成,请启动第{self._current_unload_num}层下料" self.unloading_feedback_signal.emit("output", message) # 恢复开始按钮原始样式 @@ -2223,6 +2277,10 @@ class MainWindow(MainWindowUI): self.unloading_position_label.setText("下料位置:--") elif "请启动" in desc: QMessageBox.information(self, "下料层完成", desc) + + # 如果当前下料信息存在且层数有效,更新UI显示 + if self._current_unload_info and self._current_unload_num > 0: + self.show_operation_status("下料层数", "output", f"{self._current_unload_num}/{self._total_unload_num}") except Exception as e: logging.error(f"处理下料UI更新失败: {str(e)}") @@ -2336,18 +2394,22 @@ class MainWindow(MainWindowUI): @Slot(int) def handle_unloading_level(self, level): """处理下料层数信息(来自Modbus)""" - # 只更新内存中的当前层数,UI更新通过信号槽完成 + # 只在层数发生变化时记录日志 if self._current_unload_num != level: logging.info(f"下料层数变化:{self._current_unload_num} -> {level}") - - self._current_unload_num = level - - # 更新保存的下料信息中的当前层数值 - if self._current_unload_info: - self._current_unload_info['tier'] = str(level) - # 通过信号在主线程中更新UI - self.unloading_level_ui_signal.emit(level) + # 更新当前层数 + self._current_unload_num = level + + # 更新保存的下料信息中的当前层数值 + if self._current_unload_info: + self._current_unload_info['tier'] = str(level) + + # 通过信号在主线程中更新UI + self.unloading_level_ui_signal.emit(level) + else: + # 即使层数相同,也更新UI以确保显示正确 + self.unloading_level_ui_signal.emit(level) @Slot(int) def handle_unloading_level_ui(self, level): @@ -2358,11 +2420,10 @@ class MainWindow(MainWindowUI): # 如果有下料信息且层数大于0,更新右上角显示 if level > 0 and self._current_unload_info: - tray_code = self._current_unload_info.get('tray_code', '') - # 确保使用固定的总层数 total_tier = self._total_unload_num + # 更新右上角显示,使用实际层数值 self.show_operation_status("下料层数", "output", f"{level}/{total_tier}") logging.info(f"更新右上角下料层数显示:{level}/{total_tier}") except Exception as e: