feat: 新增状态吗判断

This commit is contained in:
zhu-mengmeng 2025-07-19 02:00:22 +08:00
parent df842cd83c
commit 8b8df295f1
15 changed files with 6133 additions and 117 deletions

View File

@ -8,7 +8,7 @@
"enable_camera": false "enable_camera": false
}, },
"base_url": "http://localhost:8084", "base_url": "http://localhost:8084",
"mode": "api" "mode": "standalone"
}, },
"apis": { "apis": {
"get_tray_info": "/apjt/xcsc/tpda/getByTp_note/", "get_tray_info": "/apjt/xcsc/tpda/getByTp_note/",

View File

@ -453,7 +453,7 @@ class InspectionDAO:
SELECT DISTINCT d.gc_note SELECT DISTINCT d.gc_note
FROM wsbz_inspection_data d FROM wsbz_inspection_data d
WHERE d.is_deleted = FALSE AND d.tray_id = ? WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.position = 11 AND COALESCE(d.value,'') = '' AND status != 'labeled'
""" """
params = (tray_id,) params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db: with SQLUtils('sqlite', database='db/jtDB.db') as db:
@ -583,15 +583,20 @@ class InspectionDAO:
except Exception as e: except Exception as e:
logging.error(f"获取包装记录失败: {str(e)}") logging.error(f"获取包装记录失败: {str(e)}")
return [] return []
def save_package_record(self, order_id, tray_id, label_value, weight_value,net_weight_value, finish_time,gc_note): def save_package_record(self, order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, gc_note=None):
"""保存包装记录 """保存包装记录
Args: Args:
order_id: 工程 order_id: 订单
tray_id: 托盘号 tray_id: 托盘号
label_value: 标签值 label_value: 贴标值
weight_value: 重量值 weight_value: 称重值
net_weight_value: 净重值
finish_time: 完成时间 finish_time: 完成时间
gc_note: 工程号
Returns:
bool: 保存是否成功
""" """
# TODO调用接口获取到工程号对应的其他信息比如材质规格后续完成 # TODO调用接口获取到工程号对应的其他信息比如材质规格后续完成
try: try:
@ -610,6 +615,60 @@ class InspectionDAO:
except Exception as e: except Exception as e:
logging.error(f"保存包装记录失败: {str(e)}") logging.error(f"保存包装记录失败: {str(e)}")
return False return False
def get_product_status(self, order_id, gc_note, tray_id):
"""获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态如果没有找到则返回'init'
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = """
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
"""
params = (order_id, gc_note, tray_id)
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def update_product_status(self, order_id, gc_note, tray_id, new_status):
"""更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = """
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute_update(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False
def delete_inspection_data(self, order_id, gc_note, tray_id): def delete_inspection_data(self, order_id, gc_note, tray_id):
"""删除检验数据 """删除检验数据
@ -689,7 +748,7 @@ class InspectionDAO:
with SQLUtils('sqlite', database='db/jtDB.db') as db: with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params) db.cursor.execute(sql, params)
result = db.cursor.fetchone() result = db.cursor.fetchone()
return result[0],result[1] if result else None,None return (result[0],result[1]) if result else (None,None)
except Exception as e: except Exception as e:
logging.error(f"获取线径范围失败: {str(e)}") logging.error(f"获取线径范围失败: {str(e)}")
return None,None return None,None

972
dao/inspection_dao.py.bak Normal file
View File

@ -0,0 +1,972 @@
import json
import logging
from datetime import datetime
from utils.sql_utils import SQLUtils
class InspectionDAO:
"""检验项目配置和数据访问对象"""
def __init__(self):
"""初始化数据访问对象"""
# 不再在初始化时创建数据库连接,而是在需要时创建
pass
def __del__(self):
"""析构函数,确保数据库连接关闭"""
# 不再需要在这里关闭连接,由上下文管理器处理
pass
def get_all_inspection_configs(self, include_disabled=False):
"""获取所有检验项目配置
Args:
include_disabled: 是否包含禁用的项目
Returns:
list: 检验项目配置列表
"""
try:
if include_disabled:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE is_deleted = FALSE
ORDER BY sort_order, position
"""
params = ()
else:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE is_deleted = FALSE AND enabled = TRUE
ORDER BY sort_order, position
"""
params = ()
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
configs = []
for row in results:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
configs.append(config)
return configs
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return []
def get_enabled_inspection_configs(self):
"""获取已启用的检验项目配置
Returns:
list: 已启用的检验项目配置列表
"""
return self.get_all_inspection_configs(include_disabled=False)
def get_inspection_config_by_position(self, position):
"""根据位置获取检验项目配置
Args:
position: 位置序号 (1-6)
Returns:
dict: 检验项目配置, 未找到则返回None
"""
try:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE position = ? AND is_deleted = FALSE
"""
params = (position,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
row = db.cursor.fetchone()
if row:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
return config
else:
return None
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return None
def update_inspection_config(self, config_id, data, username='system'):
"""更新检验项目配置
Args:
config_id: 配置ID
data: 更新数据
username: 操作用户
Returns:
bool: 更新是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建更新SQL
update_fields = []
params = []
# 可更新的字段
allowed_fields = [
'name', 'display_name', 'enabled', 'required',
'data_type', 'min_value', 'max_value', 'unit',
'sort_order', 'enum_values'
]
for field in allowed_fields:
if field in data:
# 特殊处理enum_values字段确保存储为JSON字符串
if field == 'enum_values' and data[field] is not None:
if isinstance(data[field], list):
update_fields.append(f"{field} = ?")
params.append(json.dumps(data[field]))
elif isinstance(data[field], str):
# 如果已经是字符串检查是否有效的JSON
try:
json.loads(data[field])
update_fields.append(f"{field} = ?")
params.append(data[field])
except:
logging.warning(f"无效的JSON: {data[field]}")
continue
else:
update_fields.append(f"{field} = ?")
params.append(data[field])
# 添加更新时间和更新人
update_fields.append("update_time = ?")
params.append(current_time)
update_fields.append("update_by = ?")
params.append(username)
# 添加配置ID到参数列表
params.append(config_id)
# 构建SQL
sql = f"""
UPDATE inspection_config
SET {', '.join(update_fields)}
WHERE id = ?
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目配置失败: {str(e)}")
return False
def toggle_inspection_config(self, position, enabled, username='system'):
"""启用或禁用检验项目配置
Args:
position: 位置序号 (1-6)
enabled: 是否启用
username: 操作用户
Returns:
bool: 操作是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """
UPDATE wsbz_inspection_config
SET enabled = ?, update_time = ?, update_by = ?
WHERE position = ? AND is_deleted = FALSE
"""
params = (enabled, current_time, username, position)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目启用状态失败: {str(e)}")
return False
def save_order_info(self, order_id,data):
"""保存订单信息到 wsbz_order_info 表
Args:
data: 订单信息字典
Returns:
bool: 操作是否成功
"""
try:
if not data:
return False
# 使用单一连接实例处理整个操作
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction() # 开始事务
# 先检查是否存在记录
check_cursor = db.get_new_cursor()
check_sql = "SELECT ddmo FROM wsbz_order_info WHERE ddmo = ?"
check_cursor.execute(check_sql, (data.get("mo", ""),))
existing_record = check_cursor.fetchone()
check_cursor.close() # 使用后关闭新游标
if existing_record:
# 如果记录存在,执行更新
sql = """
UPDATE wsbz_order_info SET
data_corp = ?, user_id = ?, user_name = ?, gzl_zl = ?,
xpack = ?, qd = ?, spack_type = ?, mxzs = ?, jt = ?,
ddnote = ?, code = ?, type = ?, lable = ?, lib = ?,
gzl = ?, maxsl = ?, cz = ?, size = ?, cd = ?, luno = ?,
qfqd = ?, pono = ?, xj = ?, ysl = ?, dycz = ?,
zx_code = ?, edit_id = ?, remarks = ?, zx_name = ?,
bccd = ? ,tccd = ?, zzyq = ?, customer = ?,customerexp = ?,
bz_bqd = ?,bz_tqd = ?,type_name = ?,remarks_hb=?,khno=?
WHERE ddmo = ?
"""
params = (
data.get("data_corp", "JT"),
data.get("user_id", ""),
data.get("user_name", ""),
data.get("zx_zl", ""),
data.get("xpack", ""),
data.get("qd", ""),
data.get("spack_type", ""),
data.get("mxzs", ""),
data.get("jt", ""),
data.get("note", ""),
data.get("code", ""),
data.get("type", ""),
data.get("template_name", ""),
data.get("lib", ""),
data.get("zx_code", ""),
data.get("maxsl", ""),
data.get("cz", ""),
data.get("size", ""),
data.get("cd", ""),
data.get("luno", ""),
data.get("qfqd", ""),
data.get("khno", ""),
data.get("size", ""),
data.get("ysl", ""),
data.get("dycz", ""),
data.get("zx_code", ""),
data.get("edit_id", ""),
data.get("remarks", ""),
data.get("zx_name", ""),
data.get("bccd", ""),
data.get("tccd", ""),
data.get("zzyq", ""),
data.get("customer", ""),
data.get("customerexp", ""),
data.get("bz_bqd", ""),
data.get("bz_tqd", ""),
data.get("type_name", ""),
data.get("remarks_hb", ""),
data.get("khno", ""),
data.get("mo", "") # WHERE 条件参数
)
logging.info(f"更新订单信息: ddmo={data.get('mo', '')}")
else:
# 如果记录不存在,执行插入
sql = """
INSERT INTO wsbz_order_info (
data_corp, user_id, user_name, gzl_zl, ddmo, xpack,
qd, spack_type, mxzs, jt, ddnote, code, type,
lable, lib, gzl, maxsl, cz, size, cd, luno, qfqd,
pono, xj, ysl, dycz, zx_code, edit_id, remarks,zx_name,
bccd,tccd,zzyq,customer,customerexp,bz_bqd,bz_tqd,type_name,
remarks_hb,khno
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
"""
params = (
data.get("data_corp", "JT"),
data.get("user_id", ""),
data.get("user_name", ""),
data.get("zx_zl", ""),
data.get("mo", ""),
data.get("xpack", ""),
data.get("qd", ""),
data.get("spack_type", ""),
data.get("mxzs", ""),
data.get("jt", ""),
data.get("note", ""),
data.get("code", ""),
data.get("type", ""),
data.get("template_name", ""),
data.get("lib", ""),
data.get("zx_code", ""),
data.get("maxsl", ""),
data.get("cz", ""),
data.get("size", ""),
data.get("cd", ""),
data.get("luno", ""),
data.get("qfqd", ""),
data.get("khno", ""),
data.get("size", ""),
data.get("ysl", ""),
data.get("dycz", ""),
data.get("zx_code", ""),
data.get("edit_id", ""),
data.get("remarks", ""),
data.get("zx_name", ""),
data.get("bccd", ""),
data.get("tccd", ""),
data.get("zzyq", ""),
data.get("customer", ""),
data.get("customerexp", ""),
data.get("bz_bqd", ""),
data.get("bz_tqd", ""),
data.get("type_name", ""),
data.get("remarks_hb", ""),
data.get("khno", "")
)
logging.info(f"插入新订单信息: ddmo={data.get('mo', '')}")
# 执行SQL
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"保存订单信息失败: {str(e)}")
return False
def save_inspection_data(self, order_id, gc_note, data, username='system'):
"""保存检验数据
Args:
order_id: 订单号
gc_note: 工程号
data: 检验数据列表每项包含position, config_id, value, status, remark
username: 操作用户
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 使用上下文管理器自动处理连接和游标
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction() # 开始事务
for item in data:
position = item.get('position')
config_id = item.get('config_id')
value = item.get('value', '')
status = item.get('status', '')
remark = item.get('remark', '')
tray_id = item.get('tray_id', '')
# 获取新游标执行查询,避免递归使用
check_cursor = db.get_new_cursor()
check_sql = """
SELECT id FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND position = ? AND tray_id = ?
"""
check_params = (order_id, gc_note, position, tray_id)
check_cursor.execute(check_sql, check_params)
existing_record = check_cursor.fetchone()
check_cursor.close() # 使用后关闭新游标
if existing_record:
# 更新现有记录
update_sql = """
UPDATE wsbz_inspection_data
SET config_id = ?, value = ?, status = ?, remark = ?,
update_time = ?, update_by = ?
WHERE order_id = ? AND gc_note = ? AND position = ? AND tray_id = ?
"""
update_params = (
config_id, value, status, remark,
current_time, username,
order_id, gc_note, position, tray_id
)
db.execute_update(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO wsbz_inspection_data (
order_id, gc_note, position, config_id, value, status, remark,
create_time, create_by, update_time, update_by, tray_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_params = (
order_id, gc_note, position, config_id, value, status, remark,
current_time, username, current_time, username, tray_id
)
db.execute_update(insert_sql, insert_params)
db.commit_transaction() # 提交事务
return True
except Exception as e:
logging.error(f"保存检验数据失败: {str(e)}")
return False
def get_inspection_data_unfinished(self, tray_id):
"""获取未完成的检验数据,通过是否贴标来判断
Returns:
list: 未完成的检验数据列表
"""
try:
# 先获取所有没有贴标的工程号
sql_orders = """
SELECT DISTINCT d.gc_note
FROM wsbz_inspection_data d
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.position = 11 AND COALESCE(d.value,'') = ''
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql_orders, params)
gc_notes = db.cursor.fetchall()
if not gc_notes:
return []
# 构建IN子句的参数
gc_notes = [gc_note[0] for gc_note in gc_notes]
placeholders = ','.join(['?' for _ in gc_notes])
# 获取这些工程号的所有检验数据
sql = f"""
SELECT d.id, d.gc_note, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM wsbz_inspection_data d
LEFT JOIN wsbz_inspection_config c ON d.config_id = c.id
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.gc_note IN ({placeholders})
ORDER BY d.create_time
"""
params = [tray_id] + gc_notes
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'gc_note': row[1],
'position': row[2],
'config_id': row[3],
'value': row[4],
'status': row[5],
'remark': row[6],
'name': row[7],
'display_name': row[8],
'data_type': row[9],
'unit': row[10]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取未完成的检验数据失败: {str(e)}")
return []
def get_inspection_data_by_order(self, order_id,gc_note, tray_id):
"""根据工程号获取检验数据
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
list: 检验数据列表
"""
try:
sql = """
SELECT d.id, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM wsbz_inspection_data d
LEFT JOIN wsbz_inspection_config c ON d.config_id = c.id
WHERE d.order_id = ? AND d.gc_note = ? AND d.is_deleted = FALSE AND d.tray_id = ?
ORDER BY d.create_time, d.order_id, d.position
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'position': row[1],
'config_id': row[2],
'value': row[3],
'status': row[4],
'remark': row[5],
'name': row[6],
'display_name': row[7],
'data_type': row[8],
'unit': row[9]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取检验数据失败: {str(e)}")
return []
def get_package_record(self, tray_id):
"""根据托盘号获取包装记录
Args:
tray_id: 托盘号
Returns:
list: 包装记录列表
"""
try:
sql = """
SELECT DISTINCT order_id,
gc_note,
COALESCE(orders.size, '') as material,
COALESCE(orders.cz, '') as spec,
tray_id,
COALESCE(axis_package_id, '') as axis_package_id,
COALESCE(weight, 0) as weight,
COALESCE(net_weight, 0) as net_weight,
STRFTIME('%Y-%m-%d %H:%M:%S', pack_time) as pack_time
FROM wsbz_inspection_pack_data t1
LEFT JOIN wsbz_order_info orders on t1.order_id = orders.ddmo
WHERE tray_id = ?
AND is_deleted = FALSE
ORDER BY pack_time DESC
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
return results
except Exception as e:
logging.error(f"获取包装记录失败: {str(e)}")
return []
def save_package_record(self, order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, gc_note=None):
"""保存包装记录
Args:
order_id: 订单号
tray_id: 托盘号
label_value: 贴标值
weight_value: 称重值
net_weight_value: 净重值
finish_time: 完成时间
gc_note: 工程号
Returns:
bool: 保存是否成功
"""
# TODO调用接口获取到工程号对应的其他信息比如材质规格后续完成
try:
sql = """
INSERT INTO wsbz_inspection_pack_data (order_id, tray_id, axis_package_id, weight, net_weight, pack_time, create_time, create_by, update_time, update_by, is_deleted,gc_note)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)
"""
params = (order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, datetime.now(), 'system', datetime.now(), 'system', False,gc_note)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction()
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"保存包装记录失败: {str(e)}")
return False
def get_product_status(self, order_id, gc_note, tray_id):
"""获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态如果没有找到则返回'init'
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = """
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
"""
params = (order_id, gc_note, tray_id)
result = db.query_one(sql, params)
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def update_product_status(self, order_id, gc_note, tray_id, new_status):
"""更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = """
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False
def delete_inspection_data(self, order_id, gc_note, tray_id):
"""删除检验数据
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
"""
try:
sql = """
UPDATE wsbz_inspection_data
SET is_deleted = TRUE
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction()
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"删除检验数据失败: {str(e)}")
return False
def get_axios_num_by_order_id(self, order_id):
"""获取托盘号对应的轴号"""
try:
sql = """
SELECT max(axis_package_id) as axios_num FROM wsbz_inspection_pack_data WHERE order_id = ?
AND is_deleted = FALSE
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return int(result[0]) if result[0] else 0
except Exception as e:
logging.error(f"获取轴号失败: {str(e)}")
return 0
def get_axios_num(self, tray_id):
"""获取托盘号对应的轴号"""
try:
sql = """
SELECT max(cast(axis_package_id as int)) as axios_num FROM wsbz_inspection_pack_data WHERE tray_id = ?
AND is_deleted = FALSE
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return int(result[0]) if result else 0
except Exception as e:
logging.error(f"获取轴号失败: {str(e)}")
return 0
def get_gzl_zl(self,order_id):
"""获取工字轮重量"""
try:
sql = """
SELECT gzl_zl FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logging.error(f"获取工字轮重量失败: {str(e)}")
return 0
def get_xj_range(self,order_id):
"""获取线径范围"""
try:
sql = """
SELECT bccd, tccd FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return (result[0],result[1]) if result else (None,None)
except Exception as e:
logging.error(f"获取线径范围失败: {str(e)}")
return None,None
def get_order_create_time(self, order_id):
"""获取工程号的最早创建时间
Args:
order_id: 工程号
Returns:
str: 创建时间格式为'YYYY-MM-DD HH:MM:SS'如果未找到则返回None
"""
try:
sql = """
SELECT MIN(create_time) FROM wsbz_inspection_data
WHERE order_id = ? AND is_deleted = FALSE
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, (order_id,))
result = db.cursor.fetchone()
return result[0] if result and result[0] else None
except Exception as e:
logging.error(f"获取工程号创建时间失败: {str(e)}")
return None
def get_orders_by_create_time(self, order_ids):
"""按创建时间排序工程号
Args:
order_ids: 工程号列表
Returns:
list: 按创建时间排序的工程号列表
"""
try:
if not order_ids:
return []
# 构建IN子句
placeholders = ','.join(['?' for _ in order_ids])
# 查询每个工程号的最早创建时间并排序
sql = f"""
SELECT gc_note, MIN(create_time) as first_create_time
FROM wsbz_inspection_data
WHERE gc_note IN ({placeholders}) AND is_deleted = FALSE
GROUP BY gc_note
ORDER BY first_create_time
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, order_ids)
results = db.cursor.fetchall()
# 提取排序后的工程号
sorted_order_ids = [row[0] for row in results]
# 确保所有传入的工程号都在结果中
for order_id in order_ids:
if order_id not in sorted_order_ids:
sorted_order_ids.append(order_id)
return sorted_order_ids
except Exception as e:
logging.error(f"按创建时间排序工程号失败: {str(e)}")
return order_ids # 出错时返回原始顺序
def get_order_info(self, order_id):
"""获取订单信息
Args:
order_id: 工程号
Returns:
dict: 订单信息字典
"""
try:
sql = """
SELECT DISTINCT data_corp, user_id, user_name, gzl_zl, mzl, ddmo, xpack, qd, spack_type, mxzs, jt, ddnote, code, type, lable, lib, gzl, maxsl, cz, size, cd, luno,
qfqd, pono, xj, ysl, dycz, zx_code, edit_id, remarks, zx_name, bccd, tccd, zzyq, customer, customerexp, bz_bqd as bqd, bz_tqd as tqd, type_name, remarks_hb
FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
if not result:
return {}
# 获取列名
column_names = [desc[0] for desc in db.cursor.description]
# 转换为字典
result_dict = {}
for i, value in enumerate(result):
if i < len(column_names):
result_dict[column_names[i]] = value
return result_dict
except Exception as e:
logging.error(f"获取订单信息失败: {str(e)}")
return {}
def get_order_others_info(self, gc_note, order_id, tray_id):
"""获取订单其他信息
Args:
order_id: 工程号
tray_id: 托盘号
Returns:
dict: 订单其他信息字典以name为keyvalue为值
"""
try:
sql = """
SELECT t1.order_id, CASE WHEN t1.position = 12 THEN 'mzl' ELSE name END AS name, value
FROM wsbz_inspection_data t1
LEFT JOIN main.wsbz_inspection_config wic ON t1.config_id = wic.id
WHERE gc_note = ? AND t1.order_id = ?
AND tray_id = ?
AND CASE WHEN t1.position = 12 THEN 'mzl' ELSE name END IS NOT NULL
AND COALESCE(value, '') != ''
"""
params = (gc_note, order_id, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
if not results:
return {}
# 将结果转换为字典以name为keyvalue为值
result_dict = {}
for row in results:
if len(row) >= 3: # 确保行至少有3个元素
name = row[1] # name在第二列
value = row[2] # value在第三列
result_dict[name] = value
return result_dict
except Exception as e:
logging.error(f"获取订单其他信息失败: {str(e)}")
return {}
def get_order_statistics(self):
"""获取订单数量和产量统计数据(日/月/年/累计)
Returns:
dict: 包含日累计订单数量和产量的字典
"""
try:
# 使用提供的SQL查询
sql = """
SELECT CASE
WHEN create_time >= DATE('now') AND create_time < DATE('now', '+1 day')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_day,
CASE
WHEN create_time >= DATE('now', 'start of month') AND create_time < DATE('now', 'start of month', '+1 month')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_month,
CASE
WHEN create_time >= DATE('now', 'start of year') AND create_time < DATE('now', 'start of year', '+1 year')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_year,
COUNT(DISTINCT order_id) AS order_cnt_all,
CASE
WHEN create_time >= DATE('now') AND create_time < DATE('now', '+1 day')
THEN SUM(value)
ELSE 0 END AS order_num_day,
CASE
WHEN create_time >= DATE('now', 'start of month') AND
create_time < DATE('now', 'start of month', '+1 month')
THEN SUM(value)
ELSE 0 END AS order_num_month,
CASE
WHEN create_time >= DATE('now', 'start of year') AND
create_time < DATE('now', 'start of year', '+1 year')
THEN SUM(value)
ELSE 0 END AS order_num_year,
CASE WHEN position = 12 THEN SUM(value) ELSE 0 END AS order_num_all
FROM wsbz_inspection_data WHERE position = 12
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql)
row = db.cursor.fetchone()
if row:
data = {
'order_cnt_day': row[0] if row[0] is not None else 0,
'order_cnt_month': row[1] if row[1] is not None else 0,
'order_cnt_year': row[2] if row[2] is not None else 0,
'order_cnt_all': row[3] if row[3] is not None else 0,
'order_num_day': float(row[4]) if row[4] is not None else 0,
'order_num_month': float(row[5]) if row[5] is not None else 0,
'order_num_year': float(row[6]) if row[6] is not None else 0,
'order_num_all': float(row[7]) if row[7] is not None else 0
}
return data
else:
return {
'order_cnt_day': 0,
'order_cnt_month': 0,
'order_cnt_year': 0,
'order_cnt_all': 0,
'order_num_day': 0,
'order_num_month': 0,
'order_num_year': 0,
'order_num_all': 0
}
except Exception as e:
logging.error(f"获取订单数量和产量统计数据失败: {str(e)}")
return {
'order_cnt_day': 0,
'order_cnt_month': 0,
'order_cnt_year': 0,
'order_cnt_all': 0,
'order_num_day': 0,
'order_num_month': 0,
'order_num_year': 0,
'order_num_all': 0
}

View File

@ -367,8 +367,8 @@ class PalletTypeDAO:
update_time, update_by, enabled, is_deleted update_time, update_by, enabled, is_deleted
) VALUES (?, ?, ?, ?, ?, ?, TRUE, FALSE) ) VALUES (?, ?, ?, ?, ?, ?, TRUE, FALSE)
""" """
params = (pallet_code, tier, current_time, user_id, params = (pallet_code, tier, current_time, user_id[0],
current_time, user_id) current_time, user_id[0])
self.db.execute_update(insert_sql, params) self.db.execute_update(insert_sql, params)
logging.info(f"创建新托盘档案记录:包装号={pallet_code}, 层数={tier}") logging.info(f"创建新托盘档案记录:包装号={pallet_code}, 层数={tier}")

Binary file not shown.

367
fix_status_management.py Normal file
View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""
修复状态管理功能的脚本
"""
import os
import re
# 1. 在InspectionDAO类中添加状态管理方法
DAO_METHODS = """
def get_product_status(self, order_id, gc_note, tray_id):
\"\"\"获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态如果没有找到则返回'init'
\"\"\"
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = \"\"\"
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
\"\"\"
params = (order_id, gc_note, tray_id)
result = db.query_one(sql, params)
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def update_product_status(self, order_id, gc_note, tray_id, new_status):
\"\"\"更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
\"\"\"
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = \"\"\"
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
\"\"\"
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False
"""
# 2. 检查检验完成的方法
CHECK_INSPECTION_METHOD = """
def check_inspection_completed(self, row):
\"\"\"检查行是否有至少一个检验项已完成如果是则更新状态为inspected
Args:
row: 行索引
Returns:
bool: 是否有至少一个检验项已完成
\"\"\"
try:
# 获取工程号
gc_note_item = self.process_table.item(row, 1)
if not gc_note_item:
return False
gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 检查是否有至少一个检验项有值
has_any_value = False
for i, config in enumerate(enabled_configs):
col_index = 2 + i
item = self.process_table.item(row, col_index)
if item and item.text().strip():
has_any_value = True
break
# 如果有至少一个检验项有值更新状态为inspected
if has_any_value:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'inspected')
logging.info(f"工程号 {gc_note} 的检验已完成状态更新为inspected")
return has_any_value
except Exception as e:
logging.error(f"检查检验完成状态失败: {str(e)}")
return False
"""
# 3. 修改的save_inspection_data方法
SAVE_INSPECTION_DATA_METHOD = """
def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status):
\"\"\"保存检验数据到数据库
Args:
order_id: 订单号
gc_note: 工程号
position: 位置序号
config_id: 配置ID
value: 检验值
status: 状态
\"\"\"
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
modbus = ModbusUtils()
client = modbus.get_client()
# 获取当前产品状态,优先使用产品状态管理中的状态
current_status = inspection_dao.get_product_status(order_id, gc_note, tray_id)
# 如果当前状态不是初始状态则使用当前状态而不是传入的status
if current_status not in ['', 'init']:
status = current_status
# 记录保存前的详细日志
logging.info(f"正在保存检验数据: 工程号={gc_note}, 托盘号={tray_id}, 位置={position}, 配置ID={config_id}, 值={value}, 状态={status}")
# 构建数据
data = [{
'position': position,
'config_id': config_id,
'value': value,
'status': status,
'remark': '',
'tray_id': tray_id
}]
# 保存到数据库
inspection_dao.save_inspection_data(order_id, gc_note, data)
"""
# 4. 修改_process_stable_weight方法中的查找行逻辑
PROCESS_STABLE_WEIGHT_FIND_ROW = """
# 基于状态查找行优先查找状态为inspected的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为inspected的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'inspected':
data_row = row
logging.info(f"找到状态为inspected的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到inspected状态的行回退到原有逻辑
if data_row is None:
# 查找第一个没有称重数据的行
for row in range(2, self.process_table.rowCount()):
weight_item = self.process_table.item(row, weight_col)
if not weight_item or not weight_item.text().strip():
data_row = row
break
# 如果仍然没有找到,使用当前选中行或第一个数据行
if data_row is None:
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为inspected的行或没有称重数据的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"找到没有称重数据的行: {data_row}")
else:
logging.info(f"将使用状态为inspected的行: {data_row}")
"""
# 5. 添加称重完成后的状态更新代码
PROCESS_STABLE_WEIGHT_UPDATE_STATUS = """
# 更新产品状态为weighed
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'weighed')
logging.info(f"工程号 {gc_note} 的称重已完成状态更新为weighed")
"""
# 6. 修改handle_label_signal方法中的查找行逻辑
HANDLE_LABEL_SIGNAL_FIND_ROW = """
# 基于状态查找行优先查找状态为weighed的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为weighed的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'weighed':
data_row = row
logging.info(f"找到状态为weighed的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到weighed状态的行回退到原有逻辑
if data_row is None:
# 使用当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为weighed的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"将使用状态为weighed的行: {data_row}")
"""
# 7. 添加贴标完成后的状态更新代码
HANDLE_LABEL_SIGNAL_UPDATE_STATUS = """
# 更新产品状态为labeled
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'labeled')
logging.info(f"工程号 {gc_note} 的贴标已完成状态更新为labeled")
"""
# 8. 在handle_inspection_cell_changed方法末尾添加调用check_inspection_completed
HANDLE_INSPECTION_CELL_CHANGED_CALL = """
# 检查是否完成检验并更新状态
self.check_inspection_completed(row)
"""
# 9. 在add_new_inspection_row方法末尾添加初始化状态代码
ADD_NEW_INSPECTION_ROW_INIT_STATUS = """
# 初始化产品状态为init
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'init')
logging.info(f"已添加工程号 {gc_note} 的新记录,显示在第{new_seq}初始状态为init")
"""
# 10. 移除数据校验逻辑,替换为简单的单元格颜色设置
REMOVE_VALIDATION_LOGIC = """
# 设置单元格颜色为浅绿色,表示已填写
cell_item.setBackground(QBrush(QColor("#c8e6c9")))
"""
# 主函数:应用所有修改
def apply_fixes():
print("开始应用状态管理功能修复...")
# 备份文件
os.system("cp dao/inspection_dao.py dao/inspection_dao.py.bak")
os.system("cp widgets/main_window.py widgets/main_window.py.bak")
# 1. 添加DAO方法
with open("dao/inspection_dao.py", "r") as f:
dao_content = f.read()
# 检查方法是否已存在
if "def get_product_status" not in dao_content:
# 找到合适的位置插入新方法
pattern = r'def save_package_record.*?return False'
match = re.search(pattern, dao_content, re.DOTALL)
if match:
insert_pos = match.end()
new_content = dao_content[:insert_pos] + DAO_METHODS + dao_content[insert_pos:]
with open("dao/inspection_dao.py", "w") as f:
f.write(new_content)
print("1. 已成功添加状态管理方法到 dao/inspection_dao.py")
else:
print("无法找到合适的位置插入DAO方法")
else:
print("1. 状态管理方法已存在,跳过添加")
# 读取main_window.py
with open("widgets/main_window.py", "r") as f:
main_window_content = f.read()
# 2. 添加check_inspection_completed方法
if "def check_inspection_completed" not in main_window_content:
# 找到合适的位置插入新方法
pattern = r'def validate_inspection_value.*?return False'
match = re.search(pattern, main_window_content, re.DOTALL)
if match:
insert_pos = match.end()
new_content = main_window_content[:insert_pos] + "\n" + CHECK_INSPECTION_METHOD + main_window_content[insert_pos:]
main_window_content = new_content
print("2. 已成功添加check_inspection_completed方法")
else:
print("无法找到合适的位置插入check_inspection_completed方法")
else:
print("2. check_inspection_completed方法已存在跳过添加")
# 3. 修改save_inspection_data方法
pattern = r'def save_inspection_data.*?inspection_dao\.save_inspection_data\(order_id, gc_note, data\)'
replacement = SAVE_INSPECTION_DATA_METHOD
main_window_content = re.sub(pattern, replacement, main_window_content, flags=re.DOTALL)
print("3. 已成功修改save_inspection_data方法")
# 4. 修改_process_stable_weight方法中的查找行逻辑
pattern = r'# 查找第一个没有称重数据的行\s*data_row = None\s*for row in range.*?if data_row is None:.*?else:\s*logging\.info\(f"找到没有称重数据的行: \{data_row\}"\)'
replacement = PROCESS_STABLE_WEIGHT_FIND_ROW
main_window_content = re.sub(pattern, replacement, main_window_content, flags=re.DOTALL)
print("4. 已成功修改_process_stable_weight方法中的查找行逻辑")
# 5. 添加称重完成后的状态更新代码
pattern = r'(logging\.info\(f"已将稳定的称重数据 \{weight_kg\}kg 写入行 \{data_row\}, 列 \{weight_col\}"\))\s*\n\s*except'
replacement = r'\1\n\n' + PROCESS_STABLE_WEIGHT_UPDATE_STATUS + r'\n except'
main_window_content = re.sub(pattern, replacement, main_window_content)
print("5. 已成功添加称重完成后的状态更新代码")
# 6. 修改handle_label_signal方法中的查找行逻辑
pattern = r'# 获取当前选中的行或第一个数据行\s*current_row = self\.process_table\.currentRow\(\)\s*data_row = current_row if current_row >= 2 else 2'
replacement = HANDLE_LABEL_SIGNAL_FIND_ROW
main_window_content = re.sub(pattern, replacement, main_window_content)
print("6. 已成功修改handle_label_signal方法中的查找行逻辑")
# 7. 添加贴标完成后的状态更新代码
pattern = r'(logging\.info\(f"已将贴标数据 \{axios_num\} 保存到数据库"\))\s*\n\s*# 调用加载到包装记录的方法'
replacement = r'\1\n\n' + HANDLE_LABEL_SIGNAL_UPDATE_STATUS + r'\n \n # 调用加载到包装记录的方法'
main_window_content = re.sub(pattern, replacement, main_window_content)
print("7. 已成功添加贴标完成后的状态更新代码")
# 8. 在handle_inspection_cell_changed方法末尾添加调用check_inspection_completed
pattern = r'(logging\.info\(f"处理单元格变更: 行=\{row\}, 列=\{column\}, 类型=\{data_type\}, 工程号=\{gc_note\}, 值=\{value\}, 状态=\{status\}"\))\s*\n\s*except'
replacement = r'\1\n\n' + HANDLE_INSPECTION_CELL_CHANGED_CALL + r'\n except'
main_window_content = re.sub(pattern, replacement, main_window_content)
print("8. 已成功在handle_inspection_cell_changed方法末尾添加调用check_inspection_completed")
# 9. 修改add_new_inspection_row方法设置初始状态
# 9.1 修改检验项的status为init
pattern = r"'status': '',\s*# 默认设置为通过状态"
replacement = "'status': 'init', # 设置初始状态"
main_window_content = re.sub(pattern, replacement, main_window_content)
# 9.2 修改贴标和称重项的status为init
pattern = r"'status': 'pass',\s*# 默认设置为通过状态"
replacement = "'status': 'init', # 设置初始状态"
main_window_content = re.sub(pattern, replacement, main_window_content)
# 9.3 添加状态初始化代码
pattern = r'(logging\.info\(f"已添加工程号 \{gc_note\} 的新记录,显示在第\{new_seq\}条"\))\s*\n\s*except'
replacement = ADD_NEW_INSPECTION_ROW_INIT_STATUS + r'\n except'
main_window_content = re.sub(pattern, replacement, main_window_content)
print("9. 已成功修改add_new_inspection_row方法设置初始状态")
# 10. 移除数据校验逻辑
pattern = r'# 验证数据有效性\s*if self\.validate_inspection_value\(config, value\):.*?status = \'warning\''
replacement = REMOVE_VALIDATION_LOGIC
main_window_content = re.sub(pattern, replacement, main_window_content, flags=re.DOTALL)
print("10. 已成功移除数据校验逻辑")
# 保存修改后的main_window.py
with open("widgets/main_window.py", "w") as f:
f.write(main_window_content)
print("状态管理功能修复完成!")
if __name__ == "__main__":
apply_fixes()

View File

@ -12,9 +12,9 @@ client.write_registers(address=11, values=[2122])
# client.write_registers(address=5, values=[16]) # client.write_registers(address=5, values=[16])
# 贴标完成 # 贴标完成
# client.write_registers(address=24, values=[1]) # client.write_registers(address=24, values=[1])
client.write_registers(address=3, values=[1]) client.write_registers(address=13, values=[1])
result = client.read_holding_registers(address=3, count=1) result = client.read_holding_registers(address=13, count=1)
print(result.registers[0],"123===") print(result.registers[0],"123===")
client.close() client.close()

378
status_management_patch.py Executable file
View File

@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
状态管理功能补丁文件
用于修复产品状态管理相关的代码
"""
import os
import re
# 定义要修改的文件
DAO_FILE = "dao/inspection_dao.py"
MAIN_WINDOW_FILE = "widgets/main_window.py"
# 1. 在InspectionDAO类中添加状态管理方法
def add_dao_methods():
with open(DAO_FILE, "r") as f:
content = f.read()
# 检查方法是否已存在
if "def get_product_status" in content:
print("状态管理方法已存在,跳过添加")
return
# 找到合适的位置插入新方法
pattern = r'(def save_package_record.*?\n\s*return False\n)'
new_methods = r"""\1
def get_product_status(self, order_id, gc_note, tray_id):
"""获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态如果没有找到则返回'init'
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = """
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
"""
params = (order_id, gc_note, tray_id)
result = db.query_one(sql, params)
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def update_product_status(self, order_id, gc_note, tray_id, new_status):
"""更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = """
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False"""
modified_content = re.sub(pattern, new_methods, content, flags=re.DOTALL)
with open(DAO_FILE, "w") as f:
f.write(modified_content)
print(f"已成功添加状态管理方法到 {DAO_FILE}")
# 2. 修改add_new_inspection_row方法设置初始状态
def update_add_new_inspection_row():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改检验项的status为init
pattern1 = r"'status': '',\s*# 默认设置为通过状态"
replacement1 = "'status': 'init', # 设置初始状态"
content = re.sub(pattern1, replacement1, content)
# 修改贴标和称重项的status为init
pattern2 = r"'status': 'pass',\s*# 默认设置为通过状态"
replacement2 = "'status': 'init', # 设置初始状态"
content = re.sub(pattern2, replacement2, content)
# 添加状态初始化代码
pattern3 = r"(logging\.info\(f\"已添加工程号 \{gc_note\} 的新记录,显示在第\{new_seq\}条\"\))"
replacement3 = r"""# 初始化产品状态为init
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'init')
logging.info(f"已添加工程号 {gc_note} 的新记录,显示在第{new_seq}初始状态为init")"""
content = re.sub(pattern3, replacement3, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新add_new_inspection_row方法")
# 3. 添加check_inspection_completed方法
def add_check_inspection_completed():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 检查方法是否已存在
if "def check_inspection_completed" in content:
print("check_inspection_completed方法已存在跳过添加")
else:
# 找到合适的位置插入新方法
pattern = r'(def validate_inspection_value.*?)(\n\s*def )'
new_method = r"""\1
def check_inspection_completed(self, row):
"""检查行是否有至少一个检验项已完成如果是则更新状态为inspected
Args:
row: 行索引
Returns:
bool: 是否有至少一个检验项已完成
"""
try:
# 获取工程号
gc_note_item = self.process_table.item(row, 1)
if not gc_note_item:
return False
gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 检查是否有至少一个检验项有值
has_any_value = False
for i, config in enumerate(enabled_configs):
col_index = 2 + i
item = self.process_table.item(row, col_index)
if item and item.text().strip():
has_any_value = True
break
# 如果有至少一个检验项有值更新状态为inspected
if has_any_value:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'inspected')
logging.info(f"工程号 {gc_note} 的检验已完成状态更新为inspected")
return has_any_value
except Exception as e:
logging.error(f"检查检验完成状态失败: {str(e)}")
return False\2"""
content = re.sub(pattern, new_method, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功添加check_inspection_completed方法")
# 在handle_inspection_cell_changed方法末尾添加调用
pattern = r'(logging\.info\(f"处理单元格变更: 行=\{row\}, 列=\{column\}, 类型=\{data_type\}, 工程号=\{gc_note\}, 值=\{value\}, 状态=\{status\}"\))\n(\s*except)'
replacement = r"""\1
# 检查是否完成检验并更新状态
self.check_inspection_completed(row)
\2"""
content = re.sub(pattern, replacement, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功在handle_inspection_cell_changed方法末尾添加调用check_inspection_completed")
# 4. 修改_process_stable_weight方法
def update_process_stable_weight():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改查找行的逻辑
pattern1 = r'# 查找第一个没有称重数据的行\s*data_row = None\s*for row in range.*?if data_row is None:.*?else:\s*logging\.info\(f"找到没有称重数据的行: \{data_row\}"\)'
replacement1 = r"""# 基于状态查找行优先查找状态为inspected的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为inspected的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'inspected':
data_row = row
logging.info(f"找到状态为inspected的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到inspected状态的行回退到原有逻辑
if data_row is None:
# 查找第一个没有称重数据的行
for row in range(2, self.process_table.rowCount()):
weight_item = self.process_table.item(row, weight_col)
if not weight_item or not weight_item.text().strip():
data_row = row
break
# 如果仍然没有找到,使用当前选中行或第一个数据行
if data_row is None:
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为inspected的行或没有称重数据的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"找到没有称重数据的行: {data_row}")
else:
logging.info(f"将使用状态为inspected的行: {data_row}")"""
content = re.sub(pattern1, replacement1, content, flags=re.DOTALL)
# 添加状态更新代码
pattern2 = r'(logging\.info\(f"已将稳定的称重数据 \{weight_kg\}kg 写入行 \{data_row\}, 列 \{weight_col\}"\))\n\s*except'
replacement2 = r"""\1
# 更新产品状态为weighed
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'weighed')
logging.info(f"工程号 {gc_note} 的称重已完成状态更新为weighed")
except"""
content = re.sub(pattern2, replacement2, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新_process_stable_weight方法")
# 5. 修改handle_label_signal方法
def update_handle_label_signal():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 修改查找行的逻辑
pattern1 = r'# 获取当前选中的行或第一个数据行\s*current_row = self\.process_table\.currentRow\(\)\s*data_row = current_row if current_row >= 2 else 2'
replacement1 = r"""# 基于状态查找行优先查找状态为weighed的行
data_row = None
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为weighed的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'weighed':
data_row = row
logging.info(f"找到状态为weighed的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到weighed状态的行回退到原有逻辑
if data_row is None:
# 使用当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为weighed的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"将使用状态为weighed的行: {data_row}")"""
content = re.sub(pattern1, replacement1, content)
# 添加状态更新代码
pattern2 = r'(logging\.info\(f"已将贴标数据 \{axios_num\} 保存到数据库"\))\n\s*# 调用加载到包装记录的方法'
replacement2 = r"""\1
# 更新产品状态为labeled
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'labeled')
logging.info(f"工程号 {gc_note} 的贴标已完成状态更新为labeled")
# 调用加载到包装记录的方法"""
content = re.sub(pattern2, replacement2, content)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新handle_label_signal方法")
# 6. 修改save_inspection_data方法
def update_save_inspection_data():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 查找save_inspection_data方法
pattern = r'def save_inspection_data.*?try:.*?inspection_dao = InspectionDAO\(\).*?# 记录保存前的详细日志'
# 修改方法,添加状态获取逻辑
replacement = r"""def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status):
"""保存检验数据到数据库
Args:
order_id: 订单号
gc_note: 工程号
position: 位置序号
config_id: 配置ID
value: 检验值
status: 状态
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
modbus = ModbusUtils()
client = modbus.get_client()
# 获取当前产品状态,优先使用产品状态管理中的状态
current_status = inspection_dao.get_product_status(order_id, gc_note, tray_id)
# 如果当前状态不是初始状态则使用当前状态而不是传入的status
if current_status not in ['', 'init']:
status = current_status
# 记录保存前的详细日志"""
content = re.sub(pattern, replacement, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功更新save_inspection_data方法")
# 7. 移除数据校验逻辑
def remove_validation_logic():
with open(MAIN_WINDOW_FILE, "r") as f:
content = f.read()
# 第一处移除handle_inspection_cell_changed中的数据校验逻辑
pattern1 = r'# 验证数据有效性\s*if self\.validate_inspection_value\(config, value\):.*?status = \'warning\''
replacement1 = r"""# 设置单元格颜色为浅绿色,表示已填写
cell_item.setBackground(QBrush(QColor("#c8e6c9")))"""
content = re.sub(pattern1, replacement1, content, flags=re.DOTALL)
with open(MAIN_WINDOW_FILE, "w") as f:
f.write(content)
print(f"已成功移除数据校验逻辑")
# 执行所有修改
def apply_all_changes():
print("开始应用状态管理功能补丁...")
add_dao_methods()
update_add_new_inspection_row()
add_check_inspection_completed()
update_process_stable_weight()
update_handle_label_signal()
update_save_inspection_data()
remove_validation_logic()
print("状态管理功能补丁应用完成!")
if __name__ == "__main__":
apply_all_changes()

7
temp_fix.py Normal file
View File

@ -0,0 +1,7 @@
# 设置单元格颜色为浅绿色,表示已填写
cell_item.setBackground(QBrush(QColor("#c8e6c9")))
# 保持当前状态不变,由状态管理逻辑处理
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
status = inspection_dao.get_product_status(self._current_order_code, gc_note, tray_id)

View File

@ -0,0 +1,39 @@
def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status):
"""保存检验数据到数据库
Args:
order_id: 订单号
gc_note: 工程号
position: 位置序号
config_id: 配置ID
value: 检验值
status: 状态
"""
try:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
modbus = ModbusUtils()
client = modbus.get_client()
# 获取当前产品状态,优先使用产品状态管理中的状态
current_status = inspection_dao.get_product_status(order_id, gc_note, tray_id)
# 如果当前状态不是初始状态则使用当前状态而不是传入的status
if current_status not in ['', 'init']:
status = current_status
# 记录保存前的详细日志
logging.info(f"正在保存检验数据: 工程号={gc_note}, 托盘号={tray_id}, 位置={position}, 配置ID={config_id}, 值={value}, 状态={status}")
# 构建数据
data = [{
'position': position,
'config_id': config_id,
'value': value,
'status': status,
'remark': '',
'tray_id': tray_id
}]
# 保存到数据库
inspection_dao.save_inspection_data(order_id, gc_note, data)

View File

@ -1,7 +1,7 @@
from PySide6.QtWidgets import ( from PySide6.QtWidgets import (
QMainWindow, QWidget, QLabel, QGridLayout, QVBoxLayout, QHBoxLayout, QMainWindow, QWidget, QLabel, QGridLayout, QVBoxLayout, QHBoxLayout,
QTableWidget, QTableWidgetItem, QHeaderView, QFrame, QSplitter, QTableWidget, QTableWidgetItem, QHeaderView, QFrame, QSplitter,
QPushButton, QLineEdit, QAbstractItemView, QComboBox, QSizePolicy QPushButton, QLineEdit, QAbstractItemView, QComboBox, QSizePolicy, QTextEdit
) )
from PySide6.QtGui import QFont, QAction, QBrush, QColor from PySide6.QtGui import QFont, QAction, QBrush, QColor
from PySide6.QtCore import Qt, QDateTime, QTimer from PySide6.QtCore import Qt, QDateTime, QTimer
@ -320,11 +320,18 @@ class MainWindowUI(QMainWindow):
label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
label.setStyleSheet("background-color: #FAFAFA; padding: 5px;") label.setStyleSheet("background-color: #FAFAFA; padding: 5px;")
# 创建值 # 创建值改为QTextEdit
value = QLabel("") value = QTextEdit("")
value.setFont(self.normal_font) value.setFont(self.normal_font)
value.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
value.setStyleSheet("background-color: white; padding: 5px;") value.setStyleSheet("background-color: white; padding: 5px;")
value.setFixedHeight(35)
value.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
value.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
value.setFrameStyle(QFrame.NoFrame)
value.setLineWrapMode(QTextEdit.NoWrap)
value.setContentsMargins(0, 0, 0, 0)
value.setAcceptRichText(False)
value.setTabChangesFocus(True)
# 保存引用 # 保存引用
self.info_labels["备注"] = label self.info_labels["备注"] = label
@ -341,11 +348,14 @@ class MainWindowUI(QMainWindow):
label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
label.setStyleSheet("background-color: #FAFAFA; padding: 5px;") label.setStyleSheet("background-color: #FAFAFA; padding: 5px;")
# 创建值 # 创建值改为QLineEdit
value = QLabel("") value = QLineEdit("")
value.setFont(self.normal_font) value.setFont(self.normal_font)
value.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) value.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
value.setStyleSheet("background-color: white; padding: 5px;") value.setStyleSheet("background-color: white; padding: 5px; border: none;")
value.setFrame(False)
value.setContentsMargins(0, 0, 0, 0)
value.setFixedHeight(35)
# 保存引用 # 保存引用
self.info_labels[field_name] = label self.info_labels[field_name] = label

View File

@ -18,12 +18,12 @@ class LoadingDialog(LoadingDialogUI):
def __init__(self, parent=None, user_id=None, user_name=None, corp_id=None): def __init__(self, parent=None, user_id=None, user_name=None, corp_id=None):
super().__init__() super().__init__()
self.parent = parent self.parent = parent
self.user_id = user_id self.user_id = user_id,
self.user_name = user_name self.user_name = user_name
self.corp_id = corp_id self.corp_id = corp_id
# 存储订单数据 # 存储订单数据
self.order_data = None # self.order_data = None
# 彻底禁用对话框的回车键关闭功能 # 彻底禁用对话框的回车键关闭功能
self.setModal(True) self.setModal(True)
@ -188,7 +188,7 @@ class LoadingDialog(LoadingDialogUI):
inspection_dao = InspectionDAO() inspection_dao = InspectionDAO()
order_info = order_response.get("data", {})[0] order_info = order_response.get("data", {})[0]
# 设置轴数 # 设置轴数
order_info['user_id'] = self.user_id order_info['user_id'] = self.user_id[0]
order_info['user_name'] = self.user_name order_info['user_name'] = self.user_name
order_info['data_corp'] = self.corp_id order_info['data_corp'] = self.corp_id
inspection_dao.save_order_info(order_code, order_info) inspection_dao.save_order_info(order_code, order_info)
@ -289,9 +289,9 @@ class LoadingDialog(LoadingDialogUI):
self.order_input.setFocus() self.order_input.setFocus()
return return
if not self.order_data: # if not self.order_data:
QMessageBox.warning(self, "提示", "请先查询订单信息") # QMessageBox.warning(self, "提示", "请先查询订单信息")
return # return
try: try:
# 保存托盘档案信息 # 保存托盘档案信息
@ -299,7 +299,7 @@ class LoadingDialog(LoadingDialogUI):
success = pallet_manager.save_pallet_archives( success = pallet_manager.save_pallet_archives(
pallet_code=pallet_code, pallet_code=pallet_code,
tier=int(tier_value), tier=int(tier_value),
user_id=self.user_id, user_id=self.user_id[0],
user_name=self.user_name user_name=self.user_name
) )

View File

@ -826,7 +826,7 @@ class MainWindow(MainWindowUI):
# 'position': config.get('position'), # 'position': config.get('position'),
# 'config_id': config.get('id'), # 'config_id': config.get('id'),
# 'value': value, # 'value': value,
# 'status': 'pass', # 默认设置为通过状态 # 'status': 'init', # 设置初始状态
# 'remark': '', # 'remark': '',
# 'tray_id': tray_id # 'tray_id': tray_id
# }] # }]
@ -872,7 +872,7 @@ class MainWindow(MainWindowUI):
'position': config.get('position'), 'position': config.get('position'),
'config_id': config.get('id'), 'config_id': config.get('id'),
'value': '', 'value': '',
'status': '', # 默认设置为通过状态 'status': 'init', # 设置初始状态
'remark': '', 'remark': '',
'tray_id': tray_id 'tray_id': tray_id
}] }]
@ -884,13 +884,15 @@ class MainWindow(MainWindowUI):
'position': position, 'position': position,
'config_id': position, 'config_id': position,
'value': '', 'value': '',
'status': 'pass', # 默认设置为通过状态 'status': 'init', # 设置初始状态
'remark': '', 'remark': '',
'tray_id': tray_id 'tray_id': tray_id
}] }]
inspection_dao.save_inspection_data(self._current_order_code,gc_note, data) inspection_dao.save_inspection_data(self._current_order_code,gc_note, data)
logging.info(f"已添加工程号 {gc_note} 的新记录,显示在第{new_seq}") # 初始化产品状态为init
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'init')
logging.info(f"已添加工程号 {gc_note} 的新记录,显示在第{new_seq}初始状态为init")
except Exception as e: except Exception as e:
logging.error(f"添加新记录失败: {str(e)}") logging.error(f"添加新记录失败: {str(e)}")
@ -980,15 +982,13 @@ class MainWindow(MainWindowUI):
# 显示临时状态消息 # 显示临时状态消息
self.statusBar().showMessage(f"正在保存检验数据: {data_type}={value}", 1000) self.statusBar().showMessage(f"正在保存检验数据: {data_type}={value}", 1000)
# 验证数据有效性 # 设置单元格颜色为浅绿色,表示已填写
if self.validate_inspection_value(config, value): cell_item.setBackground(QBrush(QColor("#c8e6c9")))
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色 # 保持当前状态不变,由状态管理逻辑处理
status = 'pass' from dao.inspection_dao import InspectionDAO
else: inspection_dao = InspectionDAO()
# 设置单元格颜色为警告 status = inspection_dao.get_product_status(self._current_order_code, gc_note, tray_id)
cell_item.setBackground(QBrush(QColor("#fff9c4"))) # 浅黄色
status = 'warning'
# 保存到数据库 # 保存到数据库
self.save_inspection_data(self._current_order_code, gc_note, tray_id, config['position'], config['id'], value, status) self.save_inspection_data(self._current_order_code, gc_note, tray_id, config['position'], config['id'], value, status)
@ -1022,6 +1022,9 @@ class MainWindow(MainWindowUI):
# 记录详细日志 # 记录详细日志
logging.info(f"处理单元格变更: 行={row}, 列={column}, 类型={data_type}, 工程号={gc_note}, 值={value}, 状态={status}") logging.info(f"处理单元格变更: 行={row}, 列={column}, 类型={data_type}, 工程号={gc_note}, 值={value}, 状态={status}")
# 检查是否完成检验并更新状态
self.check_inspection_completed(row)
except Exception as e: except Exception as e:
logging.error(f"处理检验单元格变更失败: {str(e)}") logging.error(f"处理检验单元格变更失败: {str(e)}")
@ -1032,65 +1035,7 @@ class MainWindow(MainWindowUI):
if not self._loading_data_in_progress: if not self._loading_data_in_progress:
QTimer.singleShot(1000, self._safe_load_data) QTimer.singleShot(1000, self._safe_load_data)
def validate_inspection_value(self, config, value):
"""验证检验值是否有效
Args:
config: 检验配置
value: 检验值
Returns:
bool: 是否有效
"""
try:
# 特殊处理贴标和称重数据 - 这些数据默认都是有效的
if config.get('position') in [11, 12]: # 11是贴标12是称重
return True
# 检查值是否为空
if not value and config.get('required', False):
return False
# 根据数据类型验证
data_type = config.get('data_type')
if data_type == 'number':
# 数值类型验证
try:
# 如果值为空且不是必填,则视为有效
if not value and not config.get('required', False):
return True
num_value = float(value)
min_value = config.get('min_value')
max_value = config.get('max_value')
if min_value is not None and num_value < min_value:
return False
if max_value is not None and num_value > max_value:
return False
return True
except ValueError:
return False
elif data_type == 'enum':
# 枚举类型验证
enum_values = config.get('enum_values')
if enum_values and isinstance(enum_values, list):
# 如果值为空且不是必填,则视为有效
if not value and not config.get('required', False):
return True
return value in enum_values
return False
# 文本类型不做特殊验证
return True
except Exception as e:
logging.error(f"验证检验值失败: {str(e)}")
return False
def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status): def save_inspection_data(self, order_id, gc_note, tray_id, position, config_id, value, status):
"""保存检验数据到数据库 """保存检验数据到数据库
@ -1108,6 +1053,14 @@ class MainWindow(MainWindowUI):
inspection_dao = InspectionDAO() inspection_dao = InspectionDAO()
modbus = ModbusUtils() modbus = ModbusUtils()
client = modbus.get_client() client = modbus.get_client()
# 获取当前产品状态,优先使用产品状态管理中的状态
current_status = inspection_dao.get_product_status(order_id, gc_note, tray_id)
# 如果当前状态不是初始状态则使用当前状态而不是传入的status
if current_status not in ['', 'init']:
status = current_status
# 记录保存前的详细日志 # 记录保存前的详细日志
logging.info(f"正在保存检验数据: 工程号={gc_note}, 托盘号={tray_id}, 位置={position}, 配置ID={config_id}, 值={value}, 状态={status}") logging.info(f"正在保存检验数据: 工程号={gc_note}, 托盘号={tray_id}, 位置={position}, 配置ID={config_id}, 值={value}, 状态={status}")
@ -1123,6 +1076,7 @@ class MainWindow(MainWindowUI):
# 保存到数据库 # 保存到数据库
inspection_dao.save_inspection_data(order_id, gc_note, data) inspection_dao.save_inspection_data(order_id, gc_note, data)
except Exception as e: except Exception as e:
logging.error(f"保存检验数据失败: {str(e)}") logging.error(f"保存检验数据失败: {str(e)}")
# 显示错误消息 # 显示错误消息
@ -1943,21 +1897,41 @@ class MainWindow(MainWindowUI):
# 计算净重列索引 - 净重位置在检验列之后的第三列(称重后面) # 计算净重列索引 - 净重位置在检验列之后的第三列(称重后面)
net_weight_col = 2 + len(enabled_configs) + 2 net_weight_col = 2 + len(enabled_configs) + 2
# 查找第一个没有称重数据的行 # 基于状态查找行优先查找状态为inspected的行
data_row = None data_row = None
for row in range(2, self.process_table.rowCount()): from dao.inspection_dao import InspectionDAO
weight_item = self.process_table.item(row, weight_col) inspection_dao = InspectionDAO()
if not weight_item or not weight_item.text().strip():
data_row = row
break
# 如果没有找到没有称重数据的行,使用当前选中行或第一个数据行 # 首先查找状态为inspected的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'inspected':
data_row = row
logging.info(f"找到状态为inspected的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到inspected状态的行回退到原有逻辑
if data_row is None: if data_row is None:
current_row = self.process_table.currentRow() # 查找第一个没有称重数据的行
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2 for row in range(2, self.process_table.rowCount()):
logging.info(f"未找到没有称重数据的行,使用当前选中行或第一个数据行: {data_row}") weight_item = self.process_table.item(row, weight_col)
if not weight_item or not weight_item.text().strip():
data_row = row
break
# 如果仍然没有找到,使用当前选中行或第一个数据行
if data_row is None:
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为inspected的行或没有称重数据的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"找到没有称重数据的行: {data_row}")
else: else:
logging.info(f"找到没有称重数据的行: {data_row}") logging.info(f"将使用状态为inspected的行: {data_row}")
# 获取工程号 # 获取工程号
gc_note = self.process_table.item(data_row, 1) gc_note = self.process_table.item(data_row, 1)
@ -2003,7 +1977,7 @@ class MainWindow(MainWindowUI):
net_weight_item = QTableWidgetItem(str(net_weight_kg)) net_weight_item = QTableWidgetItem(str(net_weight_kg))
net_weight_item.setTextAlignment(Qt.AlignCenter) net_weight_item.setTextAlignment(Qt.AlignCenter)
self.process_table.setItem(data_row, net_weight_col, net_weight_item) self.process_table.setItem(data_row, net_weight_col, net_weight_item)
axios_num = self.get_axios_num_by_order_id(self._current_order_code) + 1
# 如果开启 api 模式,则调用接口添加到包装记录 # 如果开启 api 模式,则调用接口添加到包装记录
if AppMode.is_api(): if AppMode.is_api():
from dao.inspection_dao import InspectionDAO from dao.inspection_dao import InspectionDAO
@ -2011,7 +1985,6 @@ class MainWindow(MainWindowUI):
inspection_dao = InspectionDAO() inspection_dao = InspectionDAO()
# 调用接口 # 调用接口
gc_api = GcApi() gc_api = GcApi()
axios_num = self.get_axios_num_by_order_id(self._current_order_code) + 1
# 获取订单信息和其他信息,两者都已经是字典格式 # 获取订单信息和其他信息,两者都已经是字典格式
info = {} info = {}
order_info = inspection_dao.get_order_info(self._current_order_code) order_info = inspection_dao.get_order_info(self._current_order_code)
@ -2058,6 +2031,10 @@ class MainWindow(MainWindowUI):
logging.info(f"已将稳定的称重数据 {weight_kg}kg 写入行 {data_row}, 列 {weight_col}") logging.info(f"已将稳定的称重数据 {weight_kg}kg 写入行 {data_row}, 列 {weight_col}")
# 更新产品状态为weighed
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'weighed')
logging.info(f"工程号 {gc_note} 的称重已完成状态更新为weighed")
except Exception as e: except Exception as e:
logging.error(f"处理称重数据时发生错误: {str(e)}") logging.error(f"处理称重数据时发生错误: {str(e)}")
# 确保重新连接信号 # 确保重新连接信号
@ -2090,14 +2067,50 @@ class MainWindow(MainWindowUI):
# 只有当信号为贴标完成(1)时才进行处理 # 只有当信号为贴标完成(1)时才进行处理
if signal == 1: if signal == 1:
try: try:
modbus = ModbusUtils()
client = None
try:
# 获取Modbus客户端现在使用连接池不会每次都创建新连接
client = modbus.get_client()
if not client:
logging.error("无法获取Modbus客户端连接")
return
# 先将寄存器回复为0否则复原周期内、会把新来的数据也pass
modbus.write_register_until_success(client, 13, 0)
except Exception as e:
logging.error(f"复原寄存器失败{e}")
finally:
client.close()
# 获取数据行数 # 获取数据行数
if self.process_table.rowCount() <= 2: # 没有数据行 if self.process_table.rowCount() <= 2: # 没有数据行
logging.warning("没有可用的数据行来写入贴标数据") logging.warning("没有可用的数据行来写入贴标数据")
return return
# 获取当前选中的行或第一个数据行 # 基于状态查找行优先查找状态为weighed的行
current_row = self.process_table.currentRow() data_row = None
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2 from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 首先查找状态为weighed的行
for row in range(2, self.process_table.rowCount()):
gc_note_item = self.process_table.item(row, 1)
if gc_note_item:
row_gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
status = inspection_dao.get_product_status(self._current_order_code, row_gc_note, tray_id)
if status == 'weighed':
data_row = row
logging.info(f"找到状态为weighed的行: {data_row}, 工程号: {row_gc_note}")
break
# 如果没有找到weighed状态的行回退到原有逻辑
if data_row is None:
# 使用当前选中的行或第一个数据行
current_row = self.process_table.currentRow()
data_row = current_row if current_row >= 2 else 2 # 使用第一个数据行索引为2
logging.info(f"未找到状态为weighed的行使用当前选中行或第一个数据行: {data_row}")
else:
logging.info(f"将使用状态为weighed的行: {data_row}")
# 确保行存在 # 确保行存在
if data_row >= self.process_table.rowCount(): if data_row >= self.process_table.rowCount():
@ -2144,6 +2157,10 @@ class MainWindow(MainWindowUI):
# 在这里添加保存贴标数据到数据库的代码 # 在这里添加保存贴标数据到数据库的代码
self.save_inspection_data(self._current_order_code, gc_note, tray_id, 11, 11, str(axios_num), "pass") self.save_inspection_data(self._current_order_code, gc_note, tray_id, 11, 11, str(axios_num), "pass")
logging.info(f"已将贴标数据 {axios_num} 保存到数据库") logging.info(f"已将贴标数据 {axios_num} 保存到数据库")
# 更新产品状态为labeled
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'labeled')
logging.info(f"工程号 {gc_note} 的贴标已完成状态更新为labeled")
# 调用加载到包装记录的方法 # 调用加载到包装记录的方法
self.load_finished_record_to_package_record(self._current_order_code,gc_note, tray_id) self.load_finished_record_to_package_record(self._current_order_code,gc_note, tray_id)
@ -2868,7 +2885,7 @@ class MainWindow(MainWindowUI):
bccd, tccd = inspection_dao.get_xj_range(self._current_order_code) bccd, tccd = inspection_dao.get_xj_range(self._current_order_code)
if bccd is not None and tccd is not None: if bccd is not None and tccd is not None:
if bccd - 0.5 <= final_value <= tccd + 0.5: if float(bccd) - 0.5 <= final_value <= float(tccd) + 0.5:
# 使用set_inspection_value保存数据 # 使用set_inspection_value保存数据
self.set_inspection_value('xj', xj_config, final_value) self.set_inspection_value('xj', xj_config, final_value)
logging.info(f"已将稳定的线径值 {final_value:.3f} 保存到工程号 {gc_note} (行 {data_row})") logging.info(f"已将稳定的线径值 {final_value:.3f} 保存到工程号 {gc_note} (行 {data_row})")
@ -3727,4 +3744,152 @@ class MainWindow(MainWindowUI):
logging.info(f"当前焦点控件: 类型={widget_type}, 名称={widget_name}, 文本={widget_text}") logging.info(f"当前焦点控件: 类型={widget_type}, 名称={widget_name}, 文本={widget_text}")
except Exception as e: except Exception as e:
logging.error(f"记录焦点控件信息失败: {e}") logging.error(f"记录焦点控件信息失败: {e}")
def handle_inspection_cell_changed(self, row, column):
"""处理检验表格单元格内容变更事件"""
try:
# 只处理数据行的检验列变更
if row < 2: # 忽略表头行
return
# 忽略首尾两列(序号和工程号)
if column < 2:
return
# 获取工程号
order_item = self.process_table.item(row, 1)
if not order_item:
return
gc_note = order_item.text().strip()
if not gc_note:
return
# 获取托盘号
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 判断是否是检验列(非包装列)
packaging_start_col = 2 + len(enabled_configs)
# 获取单元格内容
cell_item = self.process_table.item(row, column)
if not cell_item:
return
value = cell_item.text().strip()
# 默认设置为通过状态
status = 'pass'
# 记录当前正在处理的数据类型,用于日志输出
data_type = "检验"
if column >= 2 and column < packaging_start_col:
# 是检验列
config_index = column - 2
if config_index < len(enabled_configs):
config = enabled_configs[config_index]
data_type = config['display_name']
# 显示临时状态消息
self.statusBar().showMessage(f"正在保存检验数据: {data_type}={value}", 1000)
# 验证数据有效性
# 设置单元格颜色为浅绿色,表示已填写
cell_item.setBackground(QBrush(QColor("#c8e6c9")))
# 保持当前状态不变,由状态管理逻辑处理
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
status = inspection_dao.get_product_status(self._current_order_code, gc_note, tray_id)
self.save_inspection_data(self._current_order_code, gc_note, tray_id, config['position'], config['id'], value, status)
# 判断是否是包装列
elif column == packaging_start_col:
# 贴标列
data_type = "贴标"
self.statusBar().showMessage(f"正在保存贴标数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存贴标数据position和config_id都是11
self.save_inspection_data(self._current_order_code, gc_note, tray_id, 11, 11, value, status)
elif column == packaging_start_col + 1:
# 毛重列
data_type = "毛重"
self.statusBar().showMessage(f"正在保存称重数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存毛重数据position和config_id都是12
self.save_inspection_data(self._current_order_code, gc_note, tray_id, 12, 12, value, status)
elif column == packaging_start_col + 2:
# 净重列
data_type = "净重"
self.statusBar().showMessage(f"正在保存净重数据: {value}", 1000)
# 设置单元格颜色为通过
cell_item.setBackground(QBrush(QColor("#c8e6c9"))) # 浅绿色
# 保存净重数据position和config_id都是13
self.save_inspection_data(self._current_order_code, gc_note, tray_id, 13, 13, value, status)
# 记录详细日志
logging.info(f"处理单元格变更: 行={row}, 列={column}, 类型={data_type}, 工程号={gc_note}, 值={value}, 状态={status}")
# 检查是否完成检验并更新状态
self.check_inspection_completed(row)
except Exception as e:
logging.error(f"处理检验单元格变更失败: {str(e)}")
self.statusBar().showMessage(f"处理检验数据失败: {str(e)[:50]}...", 3000)
finally:
# 延迟一段时间后再触发查询避免频繁刷新UI
# 但要避免在加载过程中触发新的加载
if not self._loading_data_in_progress:
QTimer.singleShot(1000, self._safe_load_data)
def check_inspection_completed(self, row):
"""检查行是否有至少一个检验项已完成如果是则更新状态为inspected
Args:
row: 行索引
Returns:
bool: 是否有至少一个检验项已完成
"""
try:
# 获取工程号
gc_note_item = self.process_table.item(row, 1)
if not gc_note_item:
return False
gc_note = gc_note_item.text().strip()
tray_id = self.tray_edit.currentText()
# 获取启用的检验配置
enabled_configs = self.inspection_manager.get_enabled_configs()
# 检查是否有至少一个检验项有值
has_any_value = False
for i, config in enumerate(enabled_configs):
col_index = 2 + i
item = self.process_table.item(row, col_index)
if item and item.text().strip():
has_any_value = True
break
# 如果有至少一个检验项有值更新状态为inspected
if has_any_value:
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
inspection_dao.update_product_status(self._current_order_code, gc_note, tray_id, 'inspected')
logging.info(f"工程号 {gc_note} 的检验已完成状态更新为inspected")
return has_any_value
except Exception as e:
logging.error(f"检查检验完成状态失败: {str(e)}")
return False

4019
widgets/main_window.py.bak Normal file

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ class OrderQueryDialog(OrderQueryDialogUI):
if hasattr(parent, 'corp_id'): if hasattr(parent, 'corp_id'):
self.corp_id = parent.corp_id self.corp_id = parent.corp_id
if hasattr(parent, 'user_id'): if hasattr(parent, 'user_id'):
self.user_id = parent.user_id self.user_id = parent.user_id[0]
if hasattr(parent, 'user_name'): if hasattr(parent, 'user_name'):
self.user_name = parent.user_name self.user_name = parent.user_name