jiateng_ws/status_management_patch.py

379 lines
16 KiB
Python
Raw Normal View History

2025-07-19 02:00:22 +08:00
#!/usr/bin/env python3
"""
状态管理功能补丁文件
用于修复产品状态管理相关的代码
"""
import os
import re
# 定义要修改的文件
DAO_FILE = "dao/inspection_dao.py"
MAIN_WINDOW_FILE = "widgets/main_window.py"
# 1. 在InspectionDAO类中添加状态管理方法
def add_dao_methods():
with open(DAO_FILE, "r") as f:
content = f.read()
# 检查方法是否已存在
if "def get_product_status" in content:
print("状态管理方法已存在,跳过添加")
return
# 找到合适的位置插入新方法
pattern = r'(def save_package_record.*?\n\s*return False\n)'
new_methods = r"""\1
def get_product_status(self, order_id, gc_note, tray_id):
"""获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态如果没有找到则返回'init'
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = """
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
"""
params = (order_id, gc_note, tray_id)
result = db.query_one(sql, params)
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def update_product_status(self, order_id, gc_note, tray_id, new_status):
"""更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = """
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False"""
modified_content = re.sub(pattern, new_methods, content, flags=re.DOTALL)
with open(DAO_FILE, "w") as f:
f.write(modified_content)
print(f"已成功添加状态管理方法到 {DAO_FILE}")
# 2. 修改add_new_inspection_row方法设置初始状态
def update_add_new_inspection_row():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改检验项的status为init
pattern1 = r"'status': '',\s*# 默认设置为通过状态"
replacement1 = "'status': 'init', # 设置初始状态"
content = re.sub(pattern1, replacement1, content)
# 修改贴标和称重项的status为init
pattern2 = r"'status': 'pass',\s*# 默认设置为通过状态"
replacement2 = "'status': 'init', # 设置初始状态"
content = re.sub(pattern2, replacement2, content)
# 添加状态初始化代码
pattern3 = r"(logging\.info\(f\"已添加工程号 \{gc_note\} 的新记录,显示在第\{new_seq\}条\"\))"
replacement3 = r"""# 初始化产品状态为init
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'init')
logging.info(f"已添加工程号 {gc_note} 的新记录,显示在第{new_seq}初始状态为init")"""
content = re.sub(pattern3, replacement3, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新add_new_inspection_row方法")
# 3. 添加check_inspection_completed方法
def add_check_inspection_completed():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 检查方法是否已存在
if "def check_inspection_completed" in content:
print("check_inspection_completed方法已存在跳过添加")
else:
# 找到合适的位置插入新方法
pattern = r'(def validate_inspection_value.*?)(\n\s*def )'
new_method = r"""\1
def check_inspection_completed(self, row):
"""检查行是否有至少一个检验项已完成如果是则更新状态为inspected
Args:
row: 行索引
Returns:
bool: 是否有至少一个检验项已完成
"""
try:
# 获取工程号
gc_note_item = self.process_table.item(row, 1)
if not gc_note_item:
return False
gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.text()
2025-07-19 02:00:22 +08:00
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 检查是否有至少一个检验项有值
has_any_value = False
for i, config in enumerate(enabled_configs):
col_index = 2 + i
item = self.process_table.item(row, col_index)
if item and item.text().strip():
has_any_value = True
break
# 如果有至少一个检验项有值更新状态为inspected
if has_any_value:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'inspected')
logging.info(f"工程号 {gc_note} 的检验已完成状态更新为inspected")
return has_any_value
except Exception as e:
logging.error(f"检查检验完成状态失败: {str(e)}")
return False\2"""
content = re.sub(pattern, new_method, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功添加check_inspection_completed方法")
# 在handle_inspection_cell_changed方法末尾添加调用
pattern = r'(logging\.info\(f"处理单元格变更: 行=\{row\}, 列=\{column\}, 类型=\{data_type\}, 工程号=\{gc_note\}, 值=\{value\}, 状态=\{status\}"\))\n(\s*except)'
replacement = r"""\1
# 检查是否完成检验并更新状态
self.check_inspection_completed(row)
\2"""
content = re.sub(pattern, replacement, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功在handle_inspection_cell_changed方法末尾添加调用check_inspection_completed")
# 4. 修改_process_stable_weight方法
def update_process_stable_weight():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改查找行的逻辑
pattern1 = r'# 查找第一个没有称重数据的行\s*data_row = None\s*for row in range.*?if data_row is None:.*?else:\s*logging\.info\(f"找到没有称重数据的行: \{data_row\}"\)'
replacement1 = r"""# 基于状态查找行优先查找状态为inspected的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为inspected的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.text()
2025-07-19 02:00:22 +08:00
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'inspected':
data_row = row
logging.info(f"找到状态为inspected的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到inspected状态的行回退到原有逻辑
if data_row is None:
# 查找第一个没有称重数据的行
for row in range(2, self.process_table.rowCount()):
weight_item = self.process_table.item(row, weight_col)
if not weight_item or not weight_item.text().strip():
data_row = row
break
# 如果仍然没有找到,使用当前选中行或第一个数据行
if data_row is None:
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为inspected的行或没有称重数据的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"找到没有称重数据的行: {data_row}")
else:
logging.info(f"将使用状态为inspected的行: {data_row}")"""
content = re.sub(pattern1, replacement1, content, flags=re.DOTALL)
# 添加状态更新代码
pattern2 = r'(logging\.info\(f"已将稳定的称重数据 \{weight_kg\}kg 写入行 \{data_row\}, 列 \{weight_col\}"\))\n\s*except'
replacement2 = r"""\1
# 更新产品状态为weighed
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'weighed')
logging.info(f"工程号 {gc_note} 的称重已完成状态更新为weighed")
except"""
content = re.sub(pattern2, replacement2, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新_process_stable_weight方法")
# 5. 修改handle_label_signal方法
def update_handle_label_signal():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改查找行的逻辑
pattern1 = r'# 获取当前选中的行或第一个数据行\s*current_row = self\.process_table\.currentRow\(\)\s*data_row = current_row if current_row >= 2 else 2'
replacement1 = r"""# 基于状态查找行优先查找状态为weighed的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为weighed的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.text()
2025-07-19 02:00:22 +08:00
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'weighed':
data_row = row
logging.info(f"找到状态为weighed的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到weighed状态的行回退到原有逻辑
if data_row is None:
# 使用当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为weighed的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"将使用状态为weighed的行: {data_row}")"""
content = re.sub(pattern1, replacement1, content)
# 添加状态更新代码
pattern2 = r'(logging\.info\(f"已将贴标数据 \{axios_num\} 保存到数据库"\))\n\s*# 调用加载到包装记录的方法'
replacement2 = r"""\1
# 更新产品状态为labeled
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'labeled')
logging.info(f"工程号 {gc_note} 的贴标已完成状态更新为labeled")
# 调用加载到包装记录的方法"""
content = re.sub(pattern2, replacement2, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新handle_label_signal方法")
# 6. 修改save_inspection_data方法
def update_save_inspection_data():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 查找save_inspection_data方法
pattern = r'def save_inspection_data.*?try:.*?inspection_dao = InspectionDAO\(\).*?# 记录保存前的详细日志'
# 修改方法,添加状态获取逻辑
replacement = r"""def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status):
"""保存检验数据到数据库
Args:
order_id: 订单号
gc_note: 工程号
position: 位置序号
config_id: 配置ID
value: 检验值
status: 状态
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
modbus = ModbusUtils()
client = modbus.get_client()
# 获取当前产品状态,优先使用产品状态管理中的状态
current_status = inspection_dao.get_product_status(order_id, gc_note, tray_id)
# 如果当前状态不是初始状态则使用当前状态而不是传入的status
if current_status not in ['', 'init']:
status = current_status
# 记录保存前的详细日志"""
content = re.sub(pattern, replacement, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新save_inspection_data方法")
# 7. 移除数据校验逻辑
def remove_validation_logic():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 第一处移除handle_inspection_cell_changed中的数据校验逻辑
pattern1 = r'# 验证数据有效性\s*if self\.validate_inspection_value\(config, value\):.*?status = \'warning\''
replacement1 = r"""# 设置单元格颜色为浅绿色,表示已填写
cell_item.setBackground(QBrush(QColor("#c8e6c9")))"""
content = re.sub(pattern1, replacement1, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功移除数据校验逻辑")
# 执行所有修改
def apply_all_changes():
print("开始应用状态管理功能补丁...")
add_dao_methods()
update_add_new_inspection_row()
add_check_inspection_completed()
update_process_stable_weight()
update_handle_label_signal()
update_save_inspection_data()
remove_validation_logic()
print("状态管理功能补丁应用完成!")
if __name__ == "__main__":
apply_all_changes()