jiateng_ws/widgets/main_window.py

2208 lines
95 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import logging
import json
from datetime import datetime
from pathlib import Path
from utils.modbus_utils import ModbusUtils
from utils.modbus_monitor import get_instance as get_modbus_monitor
from utils.register_handlers import (
NGHandler,
WeightDataHandler,
LabelSignalHandler,
MachineStatusHandlers,
LoadingFeedbackHandler,
UnloadingFeedbackHandler,
Error1Handler,
Error2Handler,
Error3Handler,
UnloadingLevelHandler,
UnloadingPositionHandler
)
# 导入PySide6
from PySide6.QtWidgets import (
QWidget, QMessageBox, QTableWidgetItem, QStackedWidget, QLabel, QMainWindow,
QTableWidget, QMenu, QComboBox, QFormLayout, QDialog, QVBoxLayout, QHBoxLayout, QPushButton,
QStatusBar, QSplitter, QFrame, QHeaderView
)
from PySide6.QtCore import Qt, QTimer, Slot
from PySide6.QtGui import QBrush, QColor
# 导入UI
from ui.main_window_ui import MainWindowUI
# 导入相机显示组件和设置组件
from widgets.camera_display_widget import CameraDisplayWidget
from widgets.camera_settings_widget import CameraSettingsWidget
# 导入检验配置管理器
from utils.inspection_config_manager import InspectionConfigManager
# 导入托盘类型管理器
from utils.pallet_type_manager import PalletTypeManager
# 导入串口管理
from utils.serial_manager import SerialManager
class MainWindow(MainWindowUI):
"""主窗口"""
def __init__(self, user_id=None, user_name=None, corp_name=None, corp_id=None, position_id=None):
super().__init__()
self.user_id = user_id
self.user_name = user_name
self.corp_name = corp_name
self.corp_id = corp_id
self.position_id = position_id
self.init_seq = {} # 初始化轴包装的序号
self._loading_data_in_progress = False # 数据加载状态标志,防止循环调用
# 设置窗口标题
if user_name and corp_name:
self.setWindowTitle(f"腾智微丝产线包装系统 - {user_name} ({corp_name})")
# 加载配置文件
self.config = self.load_config()
self.camera_enabled = self.config.get('camera', {}).get('enabled', False)
# 初始化检验配置管理器
self.inspection_manager = InspectionConfigManager.get_instance()
# 初始化托盘类型管理器
self.pallet_type_manager = PalletTypeManager.get_instance()
# 创建表单布局,用于添加托盘类型选择控件
self.material_form_layout = QFormLayout()
self.material_content_layout.addLayout(self.material_form_layout)
self.output_form_layout = QFormLayout()
self.output_content_layout.addLayout(self.output_form_layout)
# 只有在相机启用时创建相机显示组件
if self.camera_enabled:
# 创建相机显示组件并添加到上料区
self.camera_display = CameraDisplayWidget()
self.material_content_layout.addWidget(self.camera_display)
else:
# 在上料区添加占位标签
self.material_placeholder = QLabel("相机功能已禁用")
self.material_placeholder.setAlignment(Qt.AlignCenter)
self.material_placeholder.setStyleSheet("color: #888888; background-color: #f0f0f0;")
self.material_content_layout.addWidget(self.material_placeholder)
# 为下料区添加占位标签,确保它保持为空
self.output_placeholder = QWidget()
self.output_placeholder.setStyleSheet("background-color: #f0f0f0;")
placeholder_layout = QVBoxLayout(self.output_placeholder)
placeholder_layout.setAlignment(Qt.AlignCenter)
# 添加标题标签
title_label = QLabel("下料区")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("color: #888888;")
title_label.setFont(self.second_title_font)
placeholder_layout.addWidget(title_label)
self.output_content_layout.addWidget(self.output_placeholder)
# 添加下料信息标签
self.unloading_level_label = QLabel("下料层数:--")
self.unloading_position_label = QLabel("下料位置:--")
placeholder_layout.addWidget(self.unloading_level_label)
placeholder_layout.addWidget(self.unloading_position_label)
self.unloading_level_label.setStyleSheet("color: #888888; font-weight: bold;")
self.unloading_position_label.setStyleSheet("color: #888888; font-weight: bold;")
self.unloading_level_label.setFont(self.normal_font)
self.unloading_position_label.setFont(self.normal_font)
# 创建堆叠部件
self.stacked_widget = QStackedWidget()
self.stacked_widget.addWidget(self.central_widget) # 主页面
# 不在这里直接初始化相机设置组件
# 延迟创建保证创建的时候SettingsUI的所有控件都已经准备好
self.camera_settings = None
# 设置中央部件为堆叠部件
self.setCentralWidget(self.stacked_widget)
# 添加托盘类型选择下拉框
self.add_pallet_type_selectors()
# 连接信号和槽
self.connect_signals()
# 默认显示主页面
self.stacked_widget.setCurrentIndex(0)
# 配置检验列 - 使用检验配置管理器获取启用的列数和标题
self.update_inspection_columns()
# 设置表格上下文菜单
self.process_table.setContextMenuPolicy(Qt.CustomContextMenu)
# 加载未完成的检验数据
self._safe_load_data()
# 加载已完成检验数据
self.show_pack_item()
# 创建状态处理器实例
self.machine_handlers = MachineStatusHandlers()
# 添加状态显示到状态栏
self.modbus_status_label = QLabel("Modbus: 未连接")
self.weight_label = QLabel("重量: --")
self.label_status_label = QLabel("贴标: 无贴标")
self.error_status_label = QLabel("故障: 无")
# 设置样式
self.error_status_label.setStyleSheet("color: green; font-weight: bold;")
# 添加到状态栏
self.statusBar().addPermanentWidget(self.modbus_status_label)
self.statusBar().addPermanentWidget(self.weight_label)
self.statusBar().addPermanentWidget(self.label_status_label)
self.statusBar().addPermanentWidget(self.error_status_label)
self.statusBar().addPermanentWidget(QLabel(" "))
logging.info(f"主窗口已创建,用户: {user_name}")
# 初始化串口管理器
self.serial_manager = SerialManager()
# 注册串口数据回调函数
self.register_serial_callbacks()
def add_pallet_type_selectors(self):
"""添加托盘类型选择下拉框"""
# 创建上料托盘类型选择下拉框
self.input_pallet_type_label = QLabel("上料托盘类型:")
self.input_pallet_type_label.setFont(self.normal_font)
self.input_pallet_type_label.setVisible(False)
self.material_form_layout.addRow(self.input_pallet_type_label)
self.input_pallet_type_combo = QComboBox()
self.input_pallet_type_combo.setFont(self.normal_font)
self.input_pallet_type_combo.setVisible(False)
self.material_form_layout.addRow("", self.input_pallet_type_combo)
# 创建下料托盘类型选择下拉框
self.output_pallet_type_label = QLabel("下料托盘类型:")
self.output_pallet_type_label.setFont(self.normal_font)
self.output_pallet_type_label.setVisible(False)
self.output_form_layout.addRow(self.output_pallet_type_label)
self.output_pallet_type_combo = QComboBox()
self.output_pallet_type_combo.setFont(self.normal_font)
self.output_pallet_type_combo.setVisible(False)
self.output_form_layout.addRow("", self.output_pallet_type_combo)
# 加载托盘类型数据
self.update_pallet_types()
def update_pallet_types(self):
"""更新托盘类型下拉框"""
# 重新加载托盘类型数据
self.pallet_type_manager.reload_pallet_types()
# 更新上料托盘类型
input_types = self.pallet_type_manager.get_pallet_types_by_operation("input")
self.input_pallet_type_combo.clear()
for pallet_type in input_types:
self.input_pallet_type_combo.addItem(pallet_type['type_name'], pallet_type['id'])
# 更新下料托盘类型
output_types = self.pallet_type_manager.get_pallet_types_by_operation("output")
self.output_pallet_type_combo.clear()
for pallet_type in output_types:
self.output_pallet_type_combo.addItem(pallet_type['type_name'], pallet_type['id'])
def load_config(self):
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "app_config.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
logging.info(f"已加载配置文件: {config_path}")
return config
except Exception as e:
logging.error(f"加载配置文件失败: {e}")
return {}
def connect_signals(self):
"""连接信号槽"""
# 连接微丝产线表格单元格变更信号槽
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
# 连接菜单动作
self.main_action.triggered.connect(self.show_main_page)
self.settings_action.triggered.connect(self.show_settings_page)
# 工程号输入框回车事件
self.order_edit.returnPressed.connect(self.handle_order_enter)
# 托盘号输入框回车和切换事件,触发未加载数据查询
# QComboBox没有returnPressed信号只有currentTextChanged和activated信号
self.tray_edit.currentTextChanged.connect(self.handle_tray_changed)
self.tray_edit.activated.connect(self.handle_tray_changed) # 当用户选择一项时触发
# 连接按钮事件
self.input_button.clicked.connect(self.handle_input)
self.output_button.clicked.connect(self.handle_output)
self.start_button.clicked.connect(self.handle_start)
self.stop_button.clicked.connect(self.handle_stop)
# 设置表格上下文菜单
self.process_table.setContextMenuPolicy(Qt.CustomContextMenu)
self.process_table.customContextMenuRequested.connect(self.show_table_context_menu)
# 只有在相机启用时连接相机信号
if self.camera_enabled and hasattr(self, 'camera_display'):
self.camera_display.signal_camera_status.connect(self.handle_camera_status)
def update_inspection_columns(self):
"""更新检验列配置 - 使用检验配置管理器获取启用的列数和标题"""
try:
# 获取已启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 获取启用的列数
column_count = len(enabled_configs)
if column_count == 0:
# 如果没有启用的列,至少显示一列
column_count = 1
headers = ["检验项"]
else:
# 如果有启用的列,使用配置的标题
headers = [config['display_name'] for config in enabled_configs]
# 设置检验列
self.set_inspection_columns(column_count, headers)
logging.info(f"已更新检验列配置:{column_count}列, 标题: {headers}")
except Exception as e:
logging.error(f"更新检验列配置失败: {str(e)}")
# 如果更新失败,使用默认配置
self.set_inspection_columns(1, ["检验项"])
def show_main_page(self):
self.stacked_widget.setCurrentWidget(self.central_widget)
# 更新检验列配置
self.update_inspection_columns()
# 加载未完成的检验数据
self._safe_load_data()
# 只有在相机启用时处理相机显示
if self.camera_enabled and hasattr(self, 'camera_display'):
# 如果相机已连接,直接开始显示相机画面
if self.camera_display.camera_manager.isOpen:
if not self.camera_display.camera_manager.isGrabbing:
self.camera_display.start_display()
logging.info("显示主页面")
def show_settings_page(self):
"""显示设置页面"""
# 创建设置窗口
if not hasattr(self, 'settings_window'):
from widgets.settings_window import SettingsWindow
self.settings_window = SettingsWindow(self)
# 连接设置改变信号
self.settings_window.settings_changed.connect(self.on_settings_changed)
# 显示设置窗口
self.settings_window.show()
logging.info("显示设置窗口")
def on_settings_changed(self):
"""设置改变时触发"""
# 重新加载配置
from utils.config_loader import ConfigLoader
config_loader = ConfigLoader.get_instance()
config_loader.load_config()
self.config = self.load_config() # 重新加载配置到 self.config
# 更新串口管理器配置
self.serial_manager.reload_config()
logging.info("设置已更新,重新加载配置")
def handle_input(self):
"""处理上料按钮点击事件"""
# 获取托盘号
tray_id = self.tray_edit.currentText()
if not tray_id:
QMessageBox.warning(self, "提示", "请先选择或输入托盘号")
return
# 启动监听(不论后续是否确认上料)
# 启动Modbus监控
if not hasattr(self, 'modbus_monitor') or not self.modbus_monitor.is_running():
self.setup_modbus_monitor()
logging.info("已在上料操作前启动Modbus监控")
# 启动串口监听
self.serial_manager.auto_open_configured_ports()
# 启动键盘监听器
self.serial_manager.start_keyboard_listener()
logging.info("已在上料操作前启动键盘监听器")
# 创建上料对话框
from widgets.loading_dialog_widget import LoadingDialog
dialog = LoadingDialog(parent=self)
# 显示对话框
result = dialog.exec()
# 如果用户点击确认按钮
if result == QDialog.Accepted:
# TODO: 在这里添加上料操作的具体逻辑
logging.info(f"上料对话框已确认,托盘号: {self.tray_edit.currentText()}")
pass
def handle_output(self):
"""处理下料按钮点击事件"""
# 创建对话框
dialog = QDialog(self)
dialog.setWindowTitle("下料操作")
dialog.setFixedSize(300, 200)
# 对话框布局
layout = QVBoxLayout(dialog)
# 添加提示信息
info_label = QLabel("请选择下料托盘类型:")
info_label.setFont(self.normal_font)
layout.addWidget(info_label)
# 添加托盘类型选择
pallet_combo = QComboBox()
pallet_combo.setFont(self.normal_font)
# 复制当前托盘类型选择器的内容
for i in range(self.output_pallet_type_combo.count()):
pallet_combo.addItem(self.output_pallet_type_combo.itemText(i))
layout.addWidget(pallet_combo)
# 添加按钮
button_layout = QHBoxLayout()
confirm_button = QPushButton("确认")
confirm_button.setFont(self.normal_font)
confirm_button.setStyleSheet("background-color: #fff8e1; border: 1px solid #ffc107; padding: 8px 16px; font-weight: bold; border-radius: 4px;")
cancel_button = QPushButton("取消")
cancel_button.setFont(self.normal_font)
cancel_button.setStyleSheet("padding: 8px 16px; font-weight: bold; border-radius: 4px;")
button_layout.addStretch()
button_layout.addWidget(confirm_button)
button_layout.addWidget(cancel_button)
layout.addLayout(button_layout)
# 连接按钮信号
confirm_button.clicked.connect(dialog.accept)
cancel_button.clicked.connect(dialog.reject)
# 显示对话框
result = dialog.exec()
# 如果用户确认,则执行下料操作
if result == QDialog.Accepted:
selected_type = pallet_combo.currentText()
# 获取托盘的排序,该顺序影响着下料寄存器的写入值 切记,需要和 PLC 确认沟通完成后才能修改排序值
pallets_dict = self.pallet_type_manager.get_pallet_type_by_type(selected_type)
# 执行Modbus操作
modbus = ModbusUtils()
client = modbus.get_client()
try:
#TODO: 下料 D3 寄存器写入 1 D1 寄存器写入托盘类型
if modbus.write_register_until_success(client, 3, 1) and modbus.write_register_until_success(client, 1, pallets_dict.get(selected_type)):
# 创建状态标签并显示在右上角
self.show_operation_status("下料托盘", "output", selected_type)
else:
QMessageBox.information(self, "操作提示", "下料失败")
except Exception as e:
logging.error(f"下料操作失败: {str(e)}")
QMessageBox.critical(self, "错误", f"下料操作失败: {str(e)}")
finally:
modbus.close_client(client)
def handle_start(self):
"""
处理开始按钮点击事件,并启动 modbus 监控
"""
# 创建对话框
dialog = QDialog(self)
dialog.setWindowTitle("上料操作")
dialog.setFixedSize(300, 200)
# 对话框布局
layout = QVBoxLayout(dialog)
# 添加提示信息
info_label = QLabel("请选择拆垛层数:")
info_label.setFont(self.normal_font)
layout.addWidget(info_label)
# 添加托盘类型选择
pallet_combo = QComboBox()
pallet_combo.setFont(self.normal_font)
# 复制当前托盘类型选择器的内容
for i in range(1,4):
pallet_combo.addItem(str(i))
layout.addWidget(pallet_combo)
# 添加提示信息
info_label = QLabel("请选择码垛层数:")
info_label.setFont(self.normal_font)
layout.addWidget(info_label)
# 添加托盘类型选择
pallet_combo2 = QComboBox()
pallet_combo2.setFont(self.normal_font)
# 复制当前托盘类型选择器的内容
for i in range(1,4):
pallet_combo2.addItem(str(i))
layout.addWidget(pallet_combo2)
# 添加按钮
button_layout = QHBoxLayout()
confirm_button = QPushButton("确认")
confirm_button.setFont(self.normal_font)
confirm_button.setStyleSheet("background-color: #e3f2fd; border: 1px solid #2196f3; padding: 8px 16px; font-weight: bold; border-radius: 4px;")
cancel_button = QPushButton("取消")
cancel_button.setFont(self.normal_font)
cancel_button.setStyleSheet("padding: 8px 16px; font-weight: bold; border-radius: 4px;")
button_layout.addStretch()
button_layout.addWidget(confirm_button)
button_layout.addWidget(cancel_button)
layout.addLayout(button_layout)
# 连接按钮信号
confirm_button.clicked.connect(dialog.accept)
cancel_button.clicked.connect(dialog.reject)
# 显示对话框
result = dialog.exec()
# 如果用户确认,则执行上料操作
if result == QDialog.Accepted:
stow_num = pallet_combo.currentText()
stow_num2 = pallet_combo2.currentText()
# 获取托盘号对应的托盘类型
tray_id = self.tray_edit.currentText()
pallet_type = self.pallet_type_manager.get_pallet_type_by_pallet_id(tray_id)
# 初始化托盘号对应的序号
if tray_id not in self.init_seq:
self.init_seq[tray_id] = 1
if not pallet_type:
QMessageBox.warning(self, "错误", "未查到对应下料托盘类型")
return
# 执行Modbus操作
modbus = ModbusUtils()
client = modbus.get_client()
try:
# 启动Modbus监控
self.setup_modbus_monitor()
# 启动串口监听
self.serial_manager.auto_open_configured_ports()
# 显式启动键盘监听器
self.serial_manager.start_keyboard_listener()
logging.info("已在开始操作时启动键盘监听器")
success0 = modbus.write_register_until_success(client, 0, int(stow_num))
success1 = modbus.write_register_until_success(client, 1, int(pallet_type))
success2 = modbus.write_register_until_success(client, 2, 1)
success3 = modbus.write_register_until_success(client, 3, 1)
success4 = modbus.write_register_until_success(client, 4, int(stow_num2))
# 上料 D2 寄存器写入 1 ,D0 寄存器写入托盘类型
if success0 and success2 and success1 and success3 and success4:
# 创建状态标签并显示在右上角
self.show_operation_status("拆垛层数", "input", stow_num)
else:
QMessageBox.information(self, "操作提示", "上料失败")
except Exception as e:
logging.error(f"上料操作失败: {str(e)}")
QMessageBox.critical(self, "错误", f"上料操作失败: {str(e)}")
finally:
modbus.close_client(client)
def handle_stop(self):
"""处理停止按钮点击事件,并关闭 modbus 监控"""
modbus = ModbusUtils()
client = modbus.get_client()
try:
success2 = modbus.write_register_until_success(client, 2, 0)
success3 = modbus.write_register_until_success(client, 3, 0)
# 停止 D1 寄存器2、3写入 0
if not success2 and not success3:
QMessageBox.information(self, "操作提示", "停止失败")
except Exception as e:
logging.error(f"停止操作失败: {str(e)}")
QMessageBox.critical(self, "错误", f"停止操作失败: {str(e)}")
finally:
modbus.close_client(client)
# 停止Modbus监控
if hasattr(self, 'modbus_monitor'):
logging.info("停止Modbus监控")
self.modbus_monitor.stop()
# 停止串口监听
self.serial_manager.stop_keyboard_listener()
self.serial_manager.close_all_ports()
def handle_camera_status(self, is_connected, message):
"""处理相机状态变化"""
if is_connected:
logging.info("相机已连接并显示")
else:
logging.warning(f"相机显示问题: {message}")
def handle_camera_connection(self, is_connected, message):
"""处理相机连接状态变化"""
if is_connected:
logging.info("相机已连接")
# 如果当前在主页面,直接开始显示相机画面
if self.stacked_widget.currentWidget() == self.central_widget:
self.camera_display.start_display()
else:
if message:
logging.warning(f"相机连接失败: {message}")
else:
logging.info("相机已断开")
# 如果相机断开,确保停止显示
self.camera_display.stop_display()
def handle_camera_params_changed(self, exposure_time, gain, frame_rate):
"""处理相机参数变化"""
logging.info(f"相机参数已更新: 曝光={exposure_time:.1f}μs, 增益={gain:.1f}dB, 帧率={frame_rate:.1f}fps")
# 这里可以添加对相机参数变化的处理逻辑
def handle_camera_error(self, error_msg):
"""处理相机错误"""
logging.error(f"相机错误: {error_msg}")
QMessageBox.warning(self, "相机错误", error_msg)
def closeEvent(self, event):
"""窗口关闭事件"""
# 停止Modbus监控
if hasattr(self, 'modbus_monitor'):
logging.info("停止Modbus监控")
self.modbus_monitor.stop()
# 只有在相机启用时处理相机关闭
if self.camera_enabled and hasattr(self, 'camera_display'):
# 停止相机显示
self.camera_display.stop_display()
# 停止串口监听
self.serial_manager.stop_keyboard_listener()
self.serial_manager.close_all_ports()
# 接受关闭事件
event.accept()
def handle_order_enter(self):
"""处理工程号输入框按下回车事件"""
logging.info("工程号输入框按下回车事件")
# 获取当前输入的工程号
order_text = self.order_edit.text().strip()
if order_text:
logging.info(f"输入的工程号: {order_text}")
# 在微丝产线表格中添加一条新记录
self.add_new_inspection_row(order_text)
else:
logging.warning("工程号为空")
QMessageBox.warning(self, "输入提示", "请输入有效的工程号")
# 处理完后可以清除焦点,让输入框失去焦点
self.central_widget.setFocus()
def add_new_inspection_row(self, order_id):
"""在微丝产线表格中添加一条新记录,添加到表格末尾
Args:
order_id: 工程号
"""
try:
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 断开单元格变更信号,避免加载过程中触发保存
try:
self.process_table.cellChanged.disconnect(self.handle_inspection_cell_changed)
except:
pass
# 计算新行的行索引(添加到末尾)
data_start_row = self.process_table.rowCount()
# 在末尾添加新行
self.process_table.insertRow(data_start_row)
# 计算新行的序号(最后一个序号+1
new_seq = 1 # 默认为1
if data_start_row > 2: # 如果有其他数据行
prev_seq_item = self.process_table.item(data_start_row - 1, 0)
if prev_seq_item:
try:
prev_seq = int(prev_seq_item.text())
new_seq = prev_seq + 1
except ValueError:
new_seq = data_start_row - 1 # 备选方案:使用行索引作为序号
# 添加工程号到表格的第二列
item = QTableWidgetItem(order_id)
item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_start_row, 1, item)
# 添加序号到表格的第一列
item = QTableWidgetItem(str(new_seq))
item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_start_row, 0, item)
# 检验列设置为可编辑状态
for i, config in enumerate(enabled_configs):
col_index = 2 + i # 检验列从第3列开始
# 创建空的可编辑单元格
item = QTableWidgetItem("")
item.setTextAlignment(Qt.AlignCenter)
# 设置单元格属性以标识其关联的检验项
item.setData(Qt.UserRole, config.get('id'))
self.process_table.setItem(data_start_row, col_index, item)
# 包装列设置为可编辑状态
packaging_start_col = 2 + len(enabled_configs)
for i in range(2): # 贴标和称重
col_index = packaging_start_col + i
item = QTableWidgetItem("")
item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_start_row, col_index, item)
# 设置表格为可编辑状态
self.process_table.setEditTriggers(QTableWidget.DoubleClicked | QTableWidget.EditKeyPressed)
# 重新连接单元格内容变更信号
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
# 选中新添加的行
self.process_table.selectRow(data_start_row)
# 限制最大行数
self.limit_table_rows(10) # 最多保留10行数据
# 将工程号和托盘号保存到数据库,确保能够正确关联
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
tray_id = self.tray_edit.currentText()
# 为每个检验位置创建一个空记录,确保工程号在数据库中存在
for config in enabled_configs:
data = [{
'position': config.get('position'),
'config_id': config.get('id'),
'value': '',
'status': '', # 默认设置为通过状态
'remark': '',
'tray_id': tray_id
}]
inspection_dao.save_inspection_data(order_id, data)
# 为贴标和称重也创建空记录
for position in [11, 12, 13]: # 11是贴标12是毛重13是净重
data = [{
'position': position,
'config_id': position,
'value': '',
'status': 'pass', # 默认设置为通过状态
'remark': '',
'tray_id': tray_id
}]
inspection_dao.save_inspection_data(order_id, data)
logging.info(f"已添加工程号 {order_id} 的新记录,显示在第{new_seq}")
except Exception as e:
logging.error(f"添加新记录失败: {str(e)}")
QMessageBox.warning(self, "添加失败", f"添加新记录失败: {str(e)}")
finally:
# 重新加载数据确保UI显示正确
self._safe_load_data()
def limit_table_rows(self, max_rows):
"""限制表格最大行数
Args:
max_rows: 最大行数(不包括表头行)
"""
try:
# 计算数据总行数
data_rows = self.process_table.rowCount() - 2 # 减去表头行
# 如果超过最大行数,删除多余的行
if data_rows > max_rows:
# 要删除的行数
rows_to_remove = data_rows - max_rows
# 从最后一行开始删除
for i in range(rows_to_remove):
self.process_table.removeRow(self.process_table.rowCount() - 1)
logging.info(f"已限制表格最大行数为 {max_rows} 行数据,删除了 {rows_to_remove}")
except Exception as e:
logging.error(f"限制表格行数失败: {str(e)}")
def handle_inspection_cell_changed(self, row, column):
"""处理微丝包装单元格内容变更
Args:
row: 行索引
column: 列索引
"""
try:
# 只处理数据行的检验列变更
if row < 2: # 忽略表头行
return
# 忽略首尾两列(序号和工程号)
if column < 2:
return
# 获取工程号
order_id_item = self.process_table.item(row, 1)
if not order_id_item:
return
order_id = order_id_item.text().strip()
if not order_id:
return
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 判断是否是检验列(非包装列)
packaging_start_col = 2 + len(enabled_configs)
# 获取单元格内容
cell_item = self.process_table.item(row, column)
if not cell_item:
return
value = cell_item.text().strip()
# 默认设置为通过状态
status = 'pass'
# 记录当前正在处理的数据类型,用于日志输出
data_type = "检验"
if column >= 2 and column < packaging_start_col:
# 是检验列
config_index = column - 2
if config_index < len(enabled_configs):
config = enabled_configs[config_index]
data_type = config['display_name']
# 显示临时状态消息
self.statusBar().showMessage(f"正在保存检验数据: {data_type}={value}", 1000)
# 验证数据有效性
if self.validate_inspection_value(config, value):
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
status = 'pass'
else:
# 设置单元格颜色为警告
cell_item.setBackground(QBrush(QColor("#fff9c4"))) # 浅黄色
status = 'warning'
# 保存到数据库
self.save_inspection_data(order_id, tray_id, config['position'], config['id'], value, status)
# 判断是否是包装列
elif column == packaging_start_col:
# 贴标列
data_type = "贴标"
self.statusBar().showMessage(f"正在保存贴标数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存贴标数据position和config_id都是11
self.save_inspection_data(order_id, tray_id, 11, 11, value, status)
elif column == packaging_start_col + 1:
# 毛重列
data_type = "毛重"
self.statusBar().showMessage(f"正在保存称重数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存毛重数据position和config_id都是12
self.save_inspection_data(order_id, tray_id, 12, 12, value, status)
elif column == packaging_start_col + 2:
# 净重列
data_type = "净重"
self.statusBar().showMessage(f"正在保存净重数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存净重数据position和config_id都是13
self.save_inspection_data(order_id, tray_id, 13, 13, value, status)
# 记录详细日志
logging.info(f"处理单元格变更: 行={row}, 列={column}, 类型={data_type}, 工程号={order_id}, 值={value}, 状态={status}")
except Exception as e:
logging.error(f"处理检验单元格变更失败: {str(e)}")
self.statusBar().showMessage(f"处理检验数据失败: {str(e)[:50]}...", 3000)
finally:
# 延迟一段时间后再触发查询避免频繁刷新UI
# 但要避免在加载过程中触发新的加载
if not self._loading_data_in_progress:
QTimer.singleShot(1000, self._safe_load_data)
def validate_inspection_value(self, config, value):
"""验证检验值是否有效
Args:
config: 检验配置
value: 检验值
Returns:
bool: 是否有效
"""
try:
# 特殊处理贴标和称重数据 - 这些数据默认都是有效的
if config.get('position') in [11, 12]: # 11是贴标12是称重
return True
# 检查值是否为空
if not value and config.get('required', False):
return False
# 根据数据类型验证
data_type = config.get('data_type')
if data_type == 'number':
# 数值类型验证
try:
# 如果值为空且不是必填,则视为有效
if not value and not config.get('required', False):
return True
num_value = float(value)
min_value = config.get('min_value')
max_value = config.get('max_value')
if min_value is not None and num_value < min_value:
return False
if max_value is not None and num_value > max_value:
return False
return True
except ValueError:
return False
elif data_type == 'enum':
# 枚举类型验证
enum_values = config.get('enum_values')
if enum_values and isinstance(enum_values, list):
# 如果值为空且不是必填,则视为有效
if not value and not config.get('required', False):
return True
return value in enum_values
return False
# 文本类型不做特殊验证
return True
except Exception as e:
logging.error(f"验证检验值失败: {str(e)}")
return False
def save_inspection_data(self, order_id, tray_id, position, config_id, value, status):
"""保存检验数据到数据库
Args:
order_id: 工程号
position: 位置序号
config_id: 配置ID
value: 检验值
status: 状态
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
modbus = ModbusUtils()
client = modbus.get_client()
# 记录保存前的详细日志
logging.info(f"正在保存检验数据: 工程号={order_id}, 托盘号={tray_id}, 位置={position}, 配置ID={config_id}, 值={value}, 状态={status}")
# 构建数据
data = [{
'position': position,
'config_id': config_id,
'value': value,
'status': status,
'remark': '',
'tray_id': tray_id
}]
# 保存到数据库
inspection_dao.save_inspection_data(order_id, data)
# 注意不要在这里调用数据加载方法而是依靠信号和槽机制或QTimer安全地触发加载
except Exception as e:
logging.error(f"保存检验数据失败: {str(e)}")
# 显示错误消息
self.statusBar().showMessage(f"保存检验数据错误: {str(e)[:50]}...", 3000)
def _safe_load_data(self):
"""安全地加载数据,避免循环调用"""
if self._loading_data_in_progress:
# 如果已经在加载数据,不要再次触发
logging.debug("已有数据加载正在进行,忽略此次请求")
return
try:
self._loading_data_in_progress = True
self.load_finished_inspection_data()
finally:
self._loading_data_in_progress = False
def load_finished_inspection_data(self):
"""加载未完成的检验数据并显示在表格中"""
# 注意此方法通常应通过_safe_load_data调用以防止循环
try:
# 使用InspectionDAO获取未完成的检验数据
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 使用get_inspection_data_unfinished获取未完成的数据
unfinished_data = inspection_dao.get_inspection_data_unfinished(tray_id)
# 断开单元格变更信号,避免加载过程中触发保存
try:
self.process_table.cellChanged.disconnect(self.handle_inspection_cell_changed)
except:
pass
# 清空表格现有数据行,只保留表头
while self.process_table.rowCount() > 2:
self.process_table.removeRow(2)
if not unfinished_data:
logging.info(f"托盘号 {tray_id} 没有未完成的检验数据")
# 确保表格完全清空,只保留表头行
self.process_table.setRowCount(2) # 只保留表头的两行
# 重新连接单元格变更信号
try:
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
except:
pass
# 加载包装记录
self.show_pack_item()
return
logging.info(f"已加载未完成的检验数据,共 {len(unfinished_data)} 条记录")
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 按工程号分组
orders_data = {}
for data in unfinished_data:
order_id = data['order_id']
if order_id not in orders_data:
orders_data[order_id] = []
orders_data[order_id].append(data)
# 添加数据到表格 - 从第3行开始添加数据
row_idx = 2
# 确保按工程号倒序排列,最新的工程号在最前面
sorted_order_ids = sorted(orders_data.keys(), reverse=False)
for order_id in sorted_order_ids:
items = orders_data[order_id]
# 添加新行
self.process_table.insertRow(row_idx)
# 添加序号到第一列
seq_item = QTableWidgetItem(str(row_idx - 1))
seq_item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(row_idx, 0, seq_item)
# 添加工程号到第二列
order_item = QTableWidgetItem(order_id)
order_item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(row_idx, 1, order_item)
# 添加检验数据
for item in items:
position = item['position']
value = item['value'] if item['value'] else ""
status = item['status']
config_id = item['config_id']
# 找到对应的列索引
col_index = None
for i, config in enumerate(enabled_configs):
if config.get('position') == position:
col_index = 2 + i # 检验列从第3列开始
break
if col_index is not None:
# 创建单元格并设置值
cell_item = QTableWidgetItem(str(value))
cell_item.setTextAlignment(Qt.AlignCenter)
# 存储配置ID用于保存时确定是哪个检验项
cell_item.setData(Qt.UserRole, config_id)
# 设置单元格
self.process_table.setItem(row_idx, col_index, cell_item)
# 添加贴标11和称重数据12
if position == 11: # 贴标
# 贴标列索引 = 2(序号和工程号) + 检验列数
label_col = 2 + len(enabled_configs)
self.process_table.setItem(row_idx, label_col, QTableWidgetItem(str(value)))
elif position == 12: # 称重
# 称重列索引 = 2(序号和工程号) + 检验列数 + 1(贴标)
weight_col = 2 + len(enabled_configs) + 1
self.process_table.setItem(row_idx, weight_col, QTableWidgetItem(str(value)))
elif position == 13: # 净重
# 净重列索引 = 2(序号和工程号) + 检验列数 + 2(贴标和称重)
net_weight_col = 2 + len(enabled_configs) + 2
self.process_table.setItem(row_idx, net_weight_col, QTableWidgetItem(str(value)))
row_idx += 1
# 设置表格为可编辑状态
self.process_table.setEditTriggers(QTableWidget.DoubleClicked | QTableWidget.EditKeyPressed)
# 重新连接单元格变更信号
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
except Exception as e:
logging.error(f"加载未完成的检验数据失败: {str(e)}")
QMessageBox.warning(self, "加载失败", f"加载未完成的检验数据失败: {str(e)}")
finally:
# 加载包装记录,但要避免循环调用
# 设置一个标志,防止 show_pack_item 触发更多的数据加载
if not hasattr(self, '_loading_data_in_progress'):
self._loading_data_in_progress = True
try:
self.show_pack_item()
finally:
self._loading_data_in_progress = False
def load_finished_record_to_package_record(self, order_id, tray_id):
"""加载已完成检验数据到包装记录
Args:
order_id: 工程号
tray_id: 托盘号
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 获取该工程号的所有检验数据
inspection_data = inspection_dao.get_inspection_data_by_order(order_id, tray_id)
if not inspection_data:
logging.warning(f"未找到工程号 {order_id} 托盘号 {tray_id} 的检验数据")
return
# 从检验数据中获取贴标和称重数据
label_value = ""
weight_value = ""
net_weight_value = ""
for item in inspection_data:
if item['position'] == 11: # 贴标
label_value = item['value']
elif item['position'] == 12: # 称重
weight_value = item['value']
elif item['position'] == 13: # 净重
net_weight_value = item['value']
# 只要贴标字段有值,就可以写入包装记录
if not label_value:
logging.warning(f"工程号 {order_id} 托盘号 {tray_id} 的贴标字段为空,不添加到包装记录")
return
# 获取当前包装记录,检查是否已经存在相同的记录
existing_records = inspection_dao.get_package_record(tray_id)
for record in existing_records:
if record[0] == order_id and record[4] == label_value:
logging.info(f"工程号 {order_id} 托盘号 {tray_id} 贴标值 {label_value} 的包装记录已存在,不重复添加")
return
# 获取当前时间作为完成时间
finish_time = datetime.now()
# 将数据写入到数据库表 inspection_pack_data
inspection_dao.save_package_record(order_id, tray_id, label_value, weight_value,net_weight_value, finish_time)
# 回显数据,但避免循环调用
if not hasattr(self, '_loading_data_in_progress'):
self._loading_data_in_progress = True
try:
self.show_pack_item()
finally:
self._loading_data_in_progress = False
logging.info(f"已将工程号 {order_id} 托盘号 {tray_id} 的检验数据添加到包装记录并回显")
except Exception as e:
logging.error(f"加载已完成检验数据到包装记录失败: {str(e)}")
QMessageBox.warning(self, "加载失败", f"加载已完成检验数据到包装记录失败: {str(e)}")
def show_pack_item(self):
"""显示包装记录"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 读取已包装的记录信息,然后回显到 UI
package_record = inspection_dao.get_package_record(tray_id)
# 完全清空包装记录表格(包括所有行)
self.record_table.setRowCount(0)
# 断开包装记录表的信号连接(如果有)
try:
self.record_table.cellChanged.disconnect()
except:
pass
# 设置表头固定不动
self.record_table.horizontalHeader().setSectionResizeMode(QHeaderView.Fixed)
self.record_table.horizontalHeader().setStretchLastSection(False)
self.record_table.horizontalHeader().setSectionsMovable(False)
self.record_table.horizontalHeader().setSectionsClickable(False)
# 设置表头标签
self.record_table.setHorizontalHeaderLabels(["序号", "订单", "品名", "规格", "托号", "轴包装号", "毛重", "净重", "完成时间"])
self.record_table.horizontalHeader().setVisible(True)
# 设置表头样式
self.record_table.horizontalHeader().setStyleSheet("""
QHeaderView::section {
background-color: #f8f8f8;
padding: 4px;
border: 1px solid #dddddd;
font-weight: bold;
}
""")
# 设置列宽
column_widths = [70, 200, 130, 130, 160, 120, 120, 120, 160]
for col, width in enumerate(column_widths):
self.record_table.setColumnWidth(col, width)
self.record_table.horizontalHeader().resizeSection(col, width)
# 检查是否有包装记录数据
if not package_record:
logging.info(f"托盘号 {tray_id} 没有包装记录数据")
# 表格已清空,不需要再设置行数
# 更新包装记录统计数据
self.update_package_statistics()
return
logging.info(f"托盘号 {tray_id} 已加载包装记录,共 {len(package_record)} 条记录")
# 添加所有包装记录到表格
for index, item in enumerate(package_record):
# 在包装记录表中添加新行
row_index = self.record_table.rowCount() # 获取当前行数从0开始
self.record_table.insertRow(row_index)
# 设置包装记录数据
# 序号 - 第1列
seq_item = QTableWidgetItem(str(index + 1))
seq_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 0, seq_item)
# 工程号 - 第2列
order_item = QTableWidgetItem(item[0])
order_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 1, order_item)
# 材质 - 第3列
material_item = QTableWidgetItem(item[1])
material_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 2, material_item)
# 规格 - 第4列
spec_item = QTableWidgetItem(item[2])
spec_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 3, spec_item)
# 托盘号 - 第5列
tray_item = QTableWidgetItem(item[3])
tray_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 4, tray_item)
# 轴包装号(贴标)- 第6列
label_item = QTableWidgetItem(item[4])
label_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 5, label_item)
# 重量 - 第7列
weight_item = QTableWidgetItem(str(item[5]))
weight_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 6, weight_item)
# 净重 - 第8列
net_weight_item = QTableWidgetItem(str(item[6]))
net_weight_item.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 7, net_weight_item)
# 包装时间
pack_time = QTableWidgetItem(str(item[7]))
pack_time.setTextAlignment(Qt.AlignCenter)
self.record_table.setItem(row_index, 8, pack_time)
# 设置表格不可编辑
self.record_table.setEditTriggers(QTableWidget.NoEditTriggers)
# 更新包装记录统计数据
self.update_package_statistics()
except Exception as e:
logging.error(f"显示包装记录失败: {str(e)}")
QMessageBox.warning(self, "显示失败", f"显示包装记录失败: {str(e)}")
def update_package_statistics(self):
"""更新包装记录统计数据"""
try:
# 获取包装记录表的行数
package_count = self.record_table.rowCount()
# 更新任务表格中的已完成数量
completed_item = QTableWidgetItem(str(package_count))
completed_item.setTextAlignment(Qt.AlignCenter)
self.task_table.setItem(2, 2, completed_item)
# 计算已完成公斤数(如果称重列有数值)
completed_kg = 0
for row in range(self.record_table.rowCount()):
weight_item = self.record_table.item(row, 6) # 称重列
if weight_item and weight_item.text():
try:
completed_kg += float(weight_item.text())
except ValueError:
pass
# 更新任务表格中的已完成公斤
completed_kg_item = QTableWidgetItem(str(completed_kg))
completed_kg_item.setTextAlignment(Qt.AlignCenter)
self.task_table.setItem(2, 3, completed_kg_item)
logging.info(f"已更新包装记录统计数据: 完成数量={package_count}, 完成公斤={completed_kg}")
except Exception as e:
logging.error(f"更新包装记录统计数据失败: {str(e)}")
def show_table_context_menu(self, pos):
"""显示表格上下文菜单
Args:
pos: 鼠标位置
"""
try:
# 获取当前单元格
cell_index = self.process_table.indexAt(pos)
if not cell_index.isValid():
return
row = cell_index.row()
column = cell_index.column()
# 只对数据行和检验列显示上下文菜单
if row < 2: # 忽略表头行
return
# 获取工程号
order_id_item = self.process_table.item(row, 1)
if not order_id_item:
return
order_id = order_id_item.text().strip()
if not order_id:
return
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 创建上下文菜单
menu = QMenu(self)
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 判断是否是检验列(非包装列)
packaging_start_col = 2 + len(enabled_configs)
if column >= 2 and column < packaging_start_col:
# 是检验列
config_index = column - 2
if config_index < len(enabled_configs):
config = enabled_configs[config_index]
position = config.get('position')
# 添加查询数据库菜单项
check_action = menu.addAction("检查数据库记录")
check_action.triggered.connect(lambda: self.check_database_record(order_id, position, tray_id))
# 显示菜单
menu.exec_(self.process_table.viewport().mapToGlobal(pos))
except Exception as e:
logging.error(f"显示表格上下文菜单失败: {str(e)}")
def check_database_record(self, order_id, position, tray_id):
"""检查数据库记录
Args:
order_id: 工程号
position: 位置序号
tray_id: 托盘号
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 获取检验数据
inspection_data = inspection_dao.get_inspection_data_by_order(order_id, tray_id)
# 查找对应位置的数据
matching_data = None
for data in inspection_data:
if data.get('position') == position:
matching_data = data
break
# 显示结果
if matching_data:
value = matching_data.get('value')
status = matching_data.get('status')
message = f"数据库记录:\n\n"
message += f"工程号: {order_id}\n"
message += f"位置: {position}\n"
message += f"值: {value}\n"
message += f"状态: {status}\n"
QMessageBox.information(self, "数据库记录", message)
else:
QMessageBox.warning(self, "数据库记录", f"未找到工程号 {order_id} 位置 {position} 的数据")
except Exception as e:
logging.error(f"检查数据库记录失败: {str(e)}")
QMessageBox.warning(self, "查询失败", f"检查数据库记录失败: {str(e)}")
def show_operation_status(self, status, operation_type, pallet_type):
"""在右上角显示操作状态
Args:
status: 状态文本
operation_type: 操作类型 (input/output)
pallet_type: 托盘类型
"""
# 确定要添加标签的容器
if operation_type == "input":
container = self.material_content
else:
container = self.output_content
# 如果已存在状态标签,则移除它
status_label_name = f"{operation_type}_status_label"
if hasattr(self, status_label_name):
old_label = getattr(self, status_label_name)
old_label.deleteLater()
# 创建新的状态标签
status_label = QLabel(f"{status}: {pallet_type}", container)
status_label.setFont(self.second_title_font)
status_label.setStyleSheet("color: red; background-color: transparent;")
status_label.setAlignment(Qt.AlignRight | Qt.AlignTop)
# 使用绝对定位,放置在右上角
status_label.setGeometry(container.width() - 250, 5, 240, 30)
# 确保标签始终保持在顶层显示
status_label.raise_()
status_label.show()
# 保存标签引用
setattr(self, status_label_name, status_label)
# 保存原始的resize事件处理函数
if not hasattr(container, "_original_resize_event"):
container._original_resize_event = container.resizeEvent
# 添加窗口大小变化事件处理,确保标签位置随窗口调整
container.resizeEvent = lambda event: self.adjust_status_label_position(event, container, status_label)
def adjust_status_label_position(self, event, container, label):
"""调整状态标签位置,确保始终在右上角
Args:
event: 窗口大小变化事件
container: 标签所在的容器
label: 状态标签
"""
# 更新标签位置,保持在右上角
label.setGeometry(container.width() - 250, 5, 240, 30)
# 调用原始的resizeEvent如果有的话
original_resize = getattr(container, "_original_resize_event", None)
if original_resize:
original_resize(event)
# ==================== Modbus监控系统相关方法 ====================
def setup_modbus_monitor(self):
"""设置Modbus监控系统"""
# 获取Modbus监控器实例
self.modbus_monitor = get_modbus_monitor()
# 注册寄存器处理器
self._register_modbus_handlers()
# 连接信号槽
self._connect_modbus_signals()
# 启动监控
self.modbus_monitor.start()
logging.info("Modbus监控系统已设置")
def _register_modbus_handlers(self):
"""注册寄存器处理器"""
# 注册D6处理器处理NG信号
self.modbus_monitor.register_handler(6, NGHandler(self.machine_handlers.handle_ng))
# 注册D11处理器处理称重数据
self.modbus_monitor.register_handler(11, WeightDataHandler(self.machine_handlers.handle_weight_data))
# 注册D13处理器处理贴标信号
self.modbus_monitor.register_handler(13, LabelSignalHandler(self.machine_handlers.handle_label_signal))
# 注册D20-D24处理器处理各种状态信息
self.modbus_monitor.register_handler(20, LoadingFeedbackHandler(self.handle_loading_feedback))
self.modbus_monitor.register_handler(21, UnloadingFeedbackHandler(self.handle_unloading_feedback))
self.modbus_monitor.register_handler(22, Error1Handler(self.machine_handlers.handle_error_1))
self.modbus_monitor.register_handler(23, Error2Handler(self.machine_handlers.handle_error_2))
self.modbus_monitor.register_handler(24, Error3Handler(self.machine_handlers.handle_error_3))
# 注册下料层数和位置处理器
self.modbus_monitor.register_handler(4, UnloadingLevelHandler(self.handle_unloading_level))
self.modbus_monitor.register_handler(5, UnloadingPositionHandler(self.handle_unloading_position))
def _connect_modbus_signals(self):
"""连接Modbus信号槽"""
# 连接监控器状态信号
self.modbus_monitor.monitor_status_changed.connect(self.handle_modbus_status_change)
self.modbus_monitor.register_error.connect(self.handle_register_error)
self.machine_handlers.ng_changed.connect(self.handle_ng)
# 直接连接寄存器变化信号
self.modbus_monitor.register_changed.connect(self.handle_register_change)
# 连接机器状态信号
self.machine_handlers.loading_feedback_changed.connect(self.handle_loading_feedback)
self.machine_handlers.unloading_feedback_changed.connect(self.handle_unloading_feedback)
self.machine_handlers.error_1_changed.connect(self.handle_error_1)
self.machine_handlers.error_2_changed.connect(self.handle_error_2)
self.machine_handlers.error_3_changed.connect(self.handle_error_3)
# 连接称重数据和贴标信号
self.machine_handlers.weight_changed.connect(self.handle_weight_data)
self.machine_handlers.label_signal_changed.connect(self.handle_label_signal)
@Slot(int)
def handle_weight_data(self, weight):
"""处理称重数据变化"""
logging.info(f"[处理] 称重数据: {weight}g")
# 更新UI显示
self.weight_label.setText(f"重量: {weight}g")
try:
# 获取数据行数
if self.process_table.rowCount() <= 2: # 没有数据行
logging.warning("没有可用的数据行来写入称重数据")
return
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 计算称重列索引 - 称重位置在检验列之后的第二列(贴标后面)
weight_col = 2 + len(enabled_configs) + 1
# 计算净重列索引 - 净重位置在检验列之后的第三列(称重后面)
net_weight_col = 2 + len(enabled_configs) + 2
# 获取当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
# 确保行存在
if data_row >= self.process_table.rowCount():
logging.warning(f"选中的行 {data_row} 超出了表格范围")
return
# 获取工程号
order_id_item = self.process_table.item(data_row, 1)
if not order_id_item:
logging.warning("无法获取工程号")
return
order_id = order_id_item.text().strip()
if not order_id:
logging.warning("工程号为空")
return
# 暂时断开信号连接避免触发cellChanged信号
try:
self.process_table.cellChanged.disconnect(self.handle_inspection_cell_changed)
except:
pass
# 设置称重值单元格
weight_item = QTableWidgetItem(str(weight))
weight_item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_row, weight_col, weight_item)
# 保存到数据库
tray_id = self.tray_edit.currentText()
self.save_inspection_data(order_id, tray_id, 12, 12, str(weight), "pass")
# 保存净重到数据库(毛重-工字轮重量TODO 先默认工字轮重量为10g后续从接口获取
net_weight = weight - 10
self.save_inspection_data(order_id, tray_id, 13, 13, str(net_weight), "pass")
# 设置净重单元格
net_weight_item = QTableWidgetItem(str(net_weight))
net_weight_item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_row, net_weight_col, net_weight_item)
# 重新连接信号
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
logging.info(f"已将称重数据 {weight}g 写入行 {data_row}, 列 {weight_col}")
# TODO调用称重打印进行下一步打印
except Exception as e:
logging.error(f"处理称重数据时发生错误: {str(e)}")
# 确保重新连接信号
try:
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
except:
pass
@Slot(int, str)
def handle_label_signal(self, signal, status):
"""处理贴标信号"""
logging.info(f"[处理] 贴标信号: {status} (值={signal})")
# 更新UI显示
self.label_status_label.setText(f"贴标: {status}")
# 只有当信号为贴标完成(1)时才进行处理
if signal == 1:
try:
# 获取数据行数
if self.process_table.rowCount() <= 2: # 没有数据行
logging.warning("没有可用的数据行来写入贴标数据")
return
# 获取当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
# 确保行存在
if data_row >= self.process_table.rowCount():
logging.warning(f"选中的行 {data_row} 超出了表格范围")
return
# 获取工程号
order_id_item = self.process_table.item(data_row, 1)
if not order_id_item:
logging.warning("无法获取工程号")
return
order_id = order_id_item.text().strip()
if not order_id:
logging.warning("工程号为空")
return
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 计算贴标列索引 - 贴标位置在检验列之后的第一列
label_col = 2 + len(enabled_configs)
# 生成贴标号(托盘号+序号)
label_value = f"{self.init_seq[tray_id]}"
# 断开单元格变更信号,避免程序自动写入时触发
try:
self.process_table.cellChanged.disconnect(self.handle_inspection_cell_changed)
except:
pass
# 创建并设置贴标单元格
label_item = QTableWidgetItem(label_value)
label_item.setTextAlignment(Qt.AlignCenter)
# 写入单元格
self.process_table.setItem(data_row, label_col, label_item)
logging.info(f"已将贴标数据 {label_value} 写入表格单元格 [{data_row}, {label_col}]")
# 保存贴标数据到数据库
self.save_inspection_data(order_id, tray_id, 11, 11, label_value, "pass")
# 调用加载到包装记录的方法
self.load_finished_record_to_package_record(order_id, tray_id)
logging.info(f"贴标完成,已将工程号 {order_id} 的记录加载到包装记录")
# 删除当前处理的行
self.process_table.removeRow(data_row)
logging.info(f"已删除处理完成的行 {data_row}")
# 重新连接单元格变更信号
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
# 更新托盘号对应的序号
self.init_seq[tray_id] += 1
except Exception as e:
logging.error(f"处理贴标完成信号失败: {str(e)}")
# 确保信号重新连接
try:
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
except:
pass
@Slot(bool, str)
def handle_modbus_status_change(self, is_connected, message):
"""处理Modbus连接状态变化"""
if is_connected:
self.modbus_status_label.setText("Modbus: 已连接")
self.modbus_status_label.setStyleSheet("color: green;")
logging.info(f"Modbus已连接: {message}")
else:
self.modbus_status_label.setText("Modbus: 未连接")
self.modbus_status_label.setToolTip(message)
self.modbus_status_label.setStyleSheet("color: red;")
logging.warning(f"Modbus连接断开: {message}")
@Slot(int, str)
def handle_register_error(self, address, error_msg):
"""处理寄存器读取错误"""
logging.warning(f"[处理] 寄存器D{address}错误: {error_msg}")
# 在这里可以添加错误处理逻辑
pass
@Slot(int, int)
def handle_register_change(self, address, value):
"""处理寄存器变化"""
logging.info(f"[处理] 寄存器D{address}变化: {value}")
# 在这里可以添加通用寄存器变化处理逻辑
pass
@Slot(int, str)
def handle_loading_feedback(self, status, desc):
"""处理上料信息反馈"""
logging.info(f"[处理] 上料信息: {desc}")
# 如果上料完成(status=1),显示状态信息,把上料寄存器置为 0
if status == 1:
modbus = ModbusUtils()
client = modbus.get_client()
modbus.write_register_until_success(client, 2, 0)
modbus.close_client(client)
self.show_operation_status("上料完成", "input", desc)
QMessageBox.information(self, "上料操作", f"上料操作已完成: {desc}")
@Slot(int, str)
def handle_unloading_feedback(self, status, desc):
"""处理下料信息反馈"""
logging.info(f"[处理] 下料信息: {desc}")
# 如果下料完成(status=1),显示状态信息,把下料寄存器置为 0
if status == 1:
modbus = ModbusUtils()
client = modbus.get_client()
modbus.write_register_until_success(client, 3, 0)
modbus.close_client(client)
self.show_operation_status("下料", "output", desc)
QMessageBox.information(self, "下料操作", f"下料操作已完成: {desc}")
def _update_error_status(self):
"""更新故障状态显示"""
# 收集所有故障信息
error_codes = [
getattr(self, 'error_1', 0),
getattr(self, 'error_2', 0),
getattr(self, 'error_3', 0)
]
# 检查是否有故障
has_error = any(code > 0 for code in error_codes)
if has_error:
# 收集所有错误信息
errors = []
error_map = self.machine_handlers.error_map
if getattr(self, 'error_1', 0) > 0:
errors.append(f"故障1: {error_map.get(self.error_1, '未知')}")
if getattr(self, 'error_2', 0) > 0:
errors.append(f"故障2: {error_map.get(self.error_2, '未知')}")
if getattr(self, 'error_3', 0) > 0:
errors.append(f"故障3: {error_map.get(self.error_3, '未知')}")
self.error_status_label.setText("故障: 有")
self.error_status_label.setToolTip("\n".join(errors))
self.error_status_label.setStyleSheet("color: red; font-weight: bold;")
else:
self.error_status_label.setText("故障: 无")
self.error_status_label.setToolTip("")
self.error_status_label.setStyleSheet("color: green; font-weight: bold;")
@Slot(int, str)
def handle_error_1(self, error_code, error_desc):
"""机器人视觉报警"""
logging.info(f"[处理] 机器人视觉报警: {error_desc}")
from utils.register_handlers import Error1Handler
error_handler = Error1Handler()
detailed_desc = error_handler.error_map.get(error_code, f"机器人视觉报警-{error_code}")
# 保存故障码
self.error_1 = error_code
self._update_error_status()
# 如果有故障,显示提示
if error_code in (2, 3):
QMessageBox.warning(self, "机器人视觉报警", f"机器人视觉报警: {detailed_desc}")
# error_1 属于上料故障,需要把上料寄存器置为 0
modbus = ModbusUtils()
client = modbus.get_client()
modbus.write_register_until_success(client, 2, 0)
modbus.write_register_until_success(client, 3, 0)
modbus.close_client(client)
self.show_operation_status("异常", "", detailed_desc)
@Slot(int, str)
def handle_error_2(self, error_code, error_desc):
"""滚筒线报警"""
logging.info(f"[处理] 滚筒线报警: {error_desc}")
from utils.register_handlers import Error2Handler
error_handler = Error2Handler()
detailed_desc = error_handler.error_map.get(error_code, f"滚筒线报警-{error_code}")
# 保存故障码
self.error_2 = error_code
self._update_error_status()
# 如果有故障,显示提示
if error_code > 0:
# error_2 属于下料故障,需要把下料寄存器置为 0
modbus = ModbusUtils()
client = modbus.get_client()
modbus.write_register_until_success(client, 3, 0)
modbus.close_client(client)
self.show_operation_status("异常", "", detailed_desc)
@Slot(int, str)
def handle_error_3(self, error_code, error_desc):
"""拆码垛报警"""
logging.info(f"[处理] 拆码垛报警: {error_desc}")
from utils.register_handlers import Error3Handler
error_handler = Error3Handler()
detailed_desc = error_handler.error_map.get(error_code, f"拆码垛报警-{error_code}")
# 保存故障码
self.error_3 = error_code
self._update_error_status()
modbus = ModbusUtils()
client = modbus.get_client()
# 如果有故障,显示提示
if error_code == 1:
QMessageBox.warning(self, "异常", f"异常: {detailed_desc}")
modbus.write_register_until_success(client, 2, 0)
modbus.close_client(client)
self.show_operation_status("异常", "", detailed_desc)
elif error_code == 2:
QMessageBox.warning(self, "异常", f"异常: {detailed_desc}")
modbus.write_register_until_success(client, 3, 0)
modbus.close_client(client)
@Slot(int)
def handle_unloading_level(self, level):
"""处理下料层数信息"""
self.unloading_level_label.setText(f"下料层数:{level}")
@Slot(int)
def handle_unloading_position(self, position):
"""处理下料位置信息"""
self.unloading_position_label.setText(f"下料位置:{position}")
@Slot(int)
def handle_ng(self, ng):
"""处理NG信号, 将当前处理的数据添加到包装记录中毛重和净重设为0"""
if ng == 1:
"""
# 获取当前选中的行或第一个数据行,并删除
try:
order_id = self.process_table.item(2, 1).text().strip()
tray_id = self.tray_edit.currentText()
except Exception as e:
logging.error(f"处理NG信号时发生错误: {str(e)}")
order_id = ""
tray_id = ""
self.inspection_manager.delete_inspection_data(order_id, tray_id)
# 触发重新查询,更新数据
self._safe_load_data()
logging.info(f"已删除当前在处理的数据: order_id: {order_id}, tray_id: {tray_id}")
"""
try:
# 获取最后一条数据行
total_rows = self.process_table.rowCount()
if total_rows <= 2: # 只有表头行,没有数据行
logging.warning("没有可用的数据行来处理NG信号")
return
# 使用最后一条数据行
data_row = total_rows - 1
# 获取工程号
order_id_item = self.process_table.item(data_row, 1)
if not order_id_item:
logging.warning("无法获取工程号")
return
order_id = order_id_item.text().strip()
if not order_id:
logging.warning("工程号为空")
return
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 计算贴标列索引
label_col = 2 + len(enabled_configs)
# 获取贴标值
label_item = self.process_table.item(data_row, label_col)
label_value = label_item.text() if label_item else ""
# 如果贴标值为空,生成一个新的贴标值
if not label_value:
# 初始化托盘号对应的序号
if tray_id not in self.init_seq:
self.init_seq[tray_id] = 1
# 生成贴标号(托盘号+序号)
label_value = f"{self.init_seq[tray_id]}-NG"
self.init_seq[tray_id] += 1
# 保存贴标数据到数据库
self.save_inspection_data(order_id, tray_id, 11, 11, label_value, "pass")
else:
# 如果贴标值已存在但不包含NG标记添加NG标记
if "NG" not in label_value:
label_value = f"{label_value}-NG"
# 更新贴标数据
self.save_inspection_data(order_id, tray_id, 11, 11, label_value, "pass")
# 设置毛重和净重为0
self.save_inspection_data(order_id, tray_id, 12, 12, "0", "pass")
self.save_inspection_data(order_id, tray_id, 13, 13, "0", "pass")
# 获取当前时间作为完成时间
finish_time = datetime.now()
# 将数据写入到数据库表 inspection_pack_data
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
inspection_dao.save_package_record(order_id, tray_id, label_value, "0", "0", finish_time)
# 删除当前处理的行
self.process_table.removeRow(data_row)
# 回显数据
self.show_pack_item()
logging.info(f"NG信号处理完成: 工程号={order_id}, 托盘号={tray_id}, 贴标值={label_value}")
except Exception as e:
logging.error(f"处理NG信号时发生错误: {str(e)}")
finally:
# 复原NG信号
modbus = ModbusUtils()
client = modbus.get_client()
modbus.write_register_until_success(client, 6, 0)
modbus.close_client(client)
def register_serial_callbacks(self):
"""注册串口数据回调函数"""
try:
# 注册米电阻数据回调
self.serial_manager.callbacks['mdz_data'] = self.on_mdz_data_received
# 注册线径数据回调
self.serial_manager.callbacks['xj_data'] = self.on_diameter_data_received
# 自动打开已配置的串口
self.serial_manager.auto_open_configured_ports()
logging.info("已注册串口数据回调函数")
except Exception as e:
logging.error(f"注册串口数据回调函数失败: {str(e)}")
def on_mdz_data_received(self, port_name, data):
"""米电阻数据接收回调函数
Args:
port_name: 串口名称
data: 接收到的数据
"""
try:
# 解析数据
data_str = data.decode('utf-8') if isinstance(data, bytes) else str(data)
logging.info(f"收到米电阻数据: {data_str} 来自 {port_name}")
# 提取米电阻值,格式为"米电阻数据: xxx"
if "米电阻数据:" in data_str:
value_str = data_str.split("米电阻数据:")[1].strip()
try:
# 转换为浮点数
mdz_value = float(value_str)
# 查找米电阻对应的检验项配置
mdz_config = None
enabled_configs = self.inspection_manager.get_enabled_configs()
for config in enabled_configs:
if config.get('name') == 'mdz' or config.get('display_name') == '米电阻':
mdz_config = config
break
if mdz_config:
# 找到对应的检验项,将数据写入对应的单元格
self.set_inspection_value('mdz', mdz_config, mdz_value)
else:
logging.warning("未找到米电阻对应的检验项配置")
except ValueError:
logging.warning(f"米电阻数据格式错误: {value_str}")
else:
logging.warning(f"收到的数据不包含米电阻数据标记: {data_str}")
except Exception as e:
logging.error(f"处理米电阻数据失败: {str(e)}")
def on_diameter_data_received(self, port_name, data):
"""线径数据接收回调函数
Args:
port_name: 串口名称
data: 接收到的数据
"""
try:
# 解析数据
data_str = data.decode('utf-8') if isinstance(data, bytes) else str(data)
logging.info(f"收到线径数据: {data_str} 来自 {port_name}")
# 提取线径值,格式为"线径数据: xxx"
if "线径数据:" in data_str:
value_str = data_str.split("线径数据:")[1].strip()
try:
# 转换为浮点数
xj_value = float(value_str)
# 查找线径对应的检验项配置
xj_config = None
enabled_configs = self.inspection_manager.get_enabled_configs()
for config in enabled_configs:
if config.get('name') == 'xj' or config.get('display_name') == '线径':
xj_config = config
break
if xj_config:
# 找到对应的检验项,将数据写入对应的单元格
self.set_inspection_value('xj', xj_config, xj_value)
else:
logging.warning("未找到线径对应的检验项配置")
except ValueError:
logging.warning(f"线径数据格式错误: {value_str}")
else:
logging.warning(f"收到的数据不包含线径数据标记: {data_str}")
except Exception as e:
logging.error(f"处理线径数据失败: {str(e)}")
def set_inspection_value(self, data_type, config, value):
"""设置检验项目值到表格中
Args:
data_type: 数据类型,'mdz'表示米电阻,'xj'表示线径
config: 检验项配置
value: 检验值
"""
try:
# 获取当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
# 确保行存在
if data_row >= self.process_table.rowCount():
logging.warning(f"选中的行 {data_row} 超出了表格范围")
# 可能需要添加新行
if self.process_table.rowCount() <= 2: # 只有表头行
order_id = self.order_edit.text().strip()
if order_id:
self.add_new_inspection_row(order_id)
data_row = 2 # 新添加的行
else:
logging.warning("无法添加新行,订单号为空")
return
else:
return
# 获取工程号
order_id_item = self.process_table.item(data_row, 1)
if not order_id_item:
logging.warning("无法获取工程号")
return
order_id = order_id_item.text().strip()
if not order_id:
logging.warning("工程号为空")
return
# 获取检验项的列索引
config_id = config.get('id')
config_position = config.get('position')
col_index = None
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 根据检验项配置查找对应的列索引
for i, cfg in enumerate(enabled_configs):
if cfg.get('id') == config_id:
col_index = 2 + i # 检验列从第3列开始
break
if col_index is None:
logging.warning(f"未找到{data_type}对应的列索引")
return
# 暂时断开信号连接避免触发cellChanged信号
try:
self.process_table.cellChanged.disconnect(self.handle_inspection_cell_changed)
except:
pass
# 格式化值并设置单元格
formatted_value = str(value)
if config.get('data_type') == 'number':
# 格式化数字保留2位小数
formatted_value = f"{value:.2f}"
# 设置单元格值
item = QTableWidgetItem(formatted_value)
item.setTextAlignment(Qt.AlignCenter)
item.setData(Qt.UserRole, config_id) # 保存配置ID用于识别检验项
self.process_table.setItem(data_row, col_index, item)
# 验证数据是否在有效范围内
status = "pass"
if config.get('data_type') == 'number':
min_value = config.get('min_value')
max_value = config.get('max_value')
if (min_value is not None and value < min_value) or (max_value is not None and value > max_value):
status = "fail"
item.setBackground(QBrush(QColor("#ffcdd2"))) # 浅红色
else:
item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存到数据库,但只在非加载状态下
if not self._loading_data_in_progress:
tray_id = self.tray_edit.currentText()
self.save_inspection_data(order_id, tray_id, config_position, config_id, formatted_value, status)
# 不需要在这里主动触发数据重新加载因为handle_inspection_cell_changed会处理
# 重新连接信号
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
logging.info(f"已将{data_type}数据 {formatted_value} 写入行 {data_row}, 列 {col_index}")
except Exception as e:
logging.error(f"设置检验项值失败: {str(e)}")
# 确保重新连接信号
try:
self.process_table.cellChanged.connect(self.handle_inspection_cell_changed)
except:
pass
def handle_tray_changed(self):
"""处理托盘号变更事件,启动监听并加载数据"""
try:
tray_id = self.tray_edit.currentText()
if tray_id:
logging.info(f"托盘号变更为 {tray_id},启动监听")
# 确保启动Modbus监控
if not hasattr(self, 'modbus_monitor') or not self.modbus_monitor.is_running():
try:
self.setup_modbus_monitor()
logging.info("已在托盘号变更时启动Modbus监控")
except Exception as e:
logging.error(f"托盘号变更时启动Modbus监控失败: {str(e)}")
# 确保启动串口监听
try:
self.serial_manager.auto_open_configured_ports()
# 启动键盘监听器
self.serial_manager.start_keyboard_listener()
logging.info("已在托盘号变更时启动串口和键盘监听器")
except Exception as e:
logging.error(f"托盘号变更时启动串口监听失败: {str(e)}")
# 初始化托盘号对应的序号(如果不存在)
if tray_id not in self.init_seq:
self.init_seq[tray_id] = 1
logging.info(f"初始化托盘号 {tray_id} 的序号为 1")
# 加载数据
self._safe_load_data()
except Exception as e:
logging.error(f"处理托盘号变更失败: {str(e)}")