jiateng_ws/utils/register_handlers.py
2025-07-01 15:32:40 +08:00

304 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from PySide6.QtCore import QObject, Signal
from .modbus_monitor import RegisterHandler
class WeightDataHandler(RegisterHandler):
"""寄存器D11处理器处理称重数据值"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
self.last_weight = None
def handle_change(self, value):
weight = value # 可能需要转换单位或格式,这里简单处理
logging.info(f"称重数据变化: {self.last_weight} -> {weight}")
self.last_weight = weight
# 如果有回调函数,则调用
if self.callback:
self.callback(weight)
class LabelSignalHandler(RegisterHandler):
"""寄存器D13处理器处理贴标信号"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
self.status_map = {
0: "无贴标",
1: "贴标完成"
}
def handle_change(self, value):
status = self.status_map.get(value, f"未知状态({value})")
logging.info(f"贴标信号: {status}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value, status)
class MachineStatusHandlers(QObject):
"""机器状态相关寄存器(D20-D24)处理器集合"""
# 定义信号
loading_feedback_changed = Signal(int, str) # 上料信息反馈
unloading_feedback_changed = Signal(int, str) # 下料信息反馈
error_1_changed = Signal(int, str) # 故障信息1
error_2_changed = Signal(int, str) # 故障信息2
error_3_changed = Signal(int, str) # 故障信息3
weight_changed = Signal(int) # 称重数据变化
label_signal_changed = Signal(int, str) # 贴标信号变化
ng_changed = Signal(int) # NG信号变化
def __init__(self):
super().__init__()
# 上料信息反馈状态映射
self.loading_feedback_map = {
0: "无操作",
1: "上料",
}
# 下料信息反馈状态映射
self.unloading_feedback_map = {
0: "无操作",
1: "下料",
}
# 故障信息映射 - 可以根据实际设备故障码进行扩展
self.error_map = {
0: "正常",
1: "急停触发",
2: "通信错误",
3: "伺服报警",
4: "气压不足",
5: "材料不足"
}
def handle_weight_data(self, weight):
"""处理称重数据"""
logging.info(f"[处理器] 称重数据: {weight}g")
# 发射信号由MainWindow更新UI
self.weight_changed.emit(weight)
def handle_label_signal(self, signal, status):
"""处理贴标信号"""
logging.info(f"[处理器] 贴标信号: {status} (值={signal})")
# 发射信号由MainWindow更新UI
self.label_signal_changed.emit(signal, status)
def handle_loading_feedback(self, status, description):
"""处理上料信息反馈"""
logging.info(f"[处理器] 上料信息反馈: {description} (值={status})")
self.loading_feedback_changed.emit(status, description)
def handle_unloading_feedback(self, status, description):
"""处理下料信息反馈"""
logging.info(f"[处理器] 下料信息反馈: {description} (值={status})")
self.unloading_feedback_changed.emit(status, description)
def handle_error_1(self, status, description):
"""处理故障信息1"""
logging.info(f"[处理器] 故障信息1: {description} (值={status})")
self.error_1_changed.emit(status, description)
def handle_error_2(self, status, description):
"""处理故障信息2"""
logging.info(f"[处理器] 故障信息2: {description} (值={status})")
self.error_2_changed.emit(status, description)
def handle_error_3(self, status, description):
"""处理故障信息3"""
logging.info(f"[处理器] 故障信息3: {description} (值={status})")
self.error_3_changed.emit(status, description)
def handle_ng(self, ng):
"""处理NG信号"""
logging.info(f"[处理器] NG信号: {ng}")
self.ng_changed.emit(ng)
class LoadingFeedbackHandler(RegisterHandler):
"""寄存器D20处理器处理上料信息反馈"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def handle_change(self, value):
if value == 1: # Only emit signal when loading is complete
feedback = "上料完成"
self.callback(value, feedback)
logging.info(f"Loading feedback: {feedback} (value: {value})")
class UnloadingFeedbackHandler(RegisterHandler):
"""寄存器D21处理器处理下料信息反馈"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def handle_change(self, value):
if value == 1: # Only emit signal when unloading is complete
feedback = "下料完成"
self.callback(value, feedback)
logging.info(f"Unloading feedback: {feedback} (value: {value})")
class Error1Handler(RegisterHandler):
"""寄存器D22处理器处理故障信息1"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
self.error_map = {
0: "无报警",
1: "码垛真空压异常/报警 机器人暂停",
2: "拆垛真空压异常/报警 机器人暂停",
3: "机器人未收到CCD数据/机器人视觉通讯异常/报警 机器人暂停"
}
def handle_change(self, value):
error_desc = self.error_map.get(value, f"未知故障类型1-{value}")
logging.info(f"故障信息1: {error_desc}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value, error_desc)
class Error2Handler(RegisterHandler):
"""寄存器D23处理器处理滚动线报警"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
self.error_map = {
0: "无报警",
2: "贴标故障"
}
def handle_change(self, value):
error_desc = "正常" if value == 0 else f"故障类型2-{value}"
logging.info(f"故障信息2: {error_desc}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value, error_desc)
class Error3Handler(RegisterHandler):
"""寄存器D24处理器处理拆码垛报警"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
self.error_map = {
0: "无报警",
1: "拆垛层数判断有误重新选择层数",
2: "码垛层数判断有误重新选择层数"
}
def handle_change(self, value):
error_desc = self.error_map.get(value, f"拆码垛报警-{value}")
logging.info(f"拆码垛报警: {error_desc}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value, error_desc)
class UnloadingLevelHandler(RegisterHandler):
"""寄存器D4处理器处理下料层数"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def handle_change(self, value):
logging.info(f"下料层数变化: {value}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value)
class UnloadingPositionHandler(RegisterHandler):
"""寄存器D5处理器处理下料位置"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def handle_change(self, value):
logging.info(f"下料位置变化: {value}")
# 如果有回调函数,则调用
if self.callback:
self.callback(value)
class NGHandler(RegisterHandler):
"""寄存器D6处理器处理NG信号"""
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def handle_change(self, value):
logging.info(f"NG信号变化: {value}")
#如果有回调函数,则调用
if self.callback:
self.callback(value)
class EmergencyStopHandler:
"""处理急停信号"""
def __init__(self, callback=None):
"""初始化处理器
Args:
callback: 回调函数接收两个参数value(int), desc(str)
"""
self.callback = callback
self.prev_value = 0
# 急停状态描述映射
self.status_map = {
0: "正常",
1: "急停触发"
}
def handle(self, value):
"""处理急停信号
Args:
value: 寄存器值
Returns:
tuple: (是否变化, 状态描述)
"""
# 检查值是否有效
if value not in [0, 1]:
return False, f"无效的急停状态值: {value}"
# 检查值是否变化
if value == self.prev_value:
return False, self.status_map.get(value, "未知状态")
# 更新上一次的值
self.prev_value = value
# 获取状态描述
desc = self.status_map.get(value, "未知状态")
# 调用回调函数
if self.callback:
self.callback(value, desc)
return True, desc
def handle_change(self, value):
"""实现 handle_change 方法以兼容 ModbusMonitor._notify_handlers
Args:
value: 寄存器值
"""
changed, desc = self.handle(value)
if changed:
logging.info(f"急停状态变化: {desc} (值={value})")
return changed, desc