jiateng_ws/dao/inspection_dao.py

442 lines
16 KiB
Python
Raw Normal View History

import json
import logging
from datetime import datetime
from utils.sql_utils import SQLUtils
class InspectionDAO:
"""检验项目配置和数据访问对象"""
def __init__(self):
"""初始化数据访问对象"""
self.db = SQLUtils('sqlite', database='db/jtDB.db')
def __del__(self):
"""析构函数,确保数据库连接关闭"""
if hasattr(self, 'db'):
self.db.close()
def get_all_inspection_configs(self, include_disabled=False):
"""获取所有检验项目配置
Args:
include_disabled: 是否包含禁用的项目
Returns:
list: 检验项目配置列表
"""
try:
if include_disabled:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM inspection_config
WHERE is_deleted = FALSE
ORDER BY sort_order, position
"""
params = ()
else:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM inspection_config
WHERE is_deleted = FALSE AND enabled = TRUE
ORDER BY sort_order, position
"""
params = ()
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
configs = []
for row in results:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
configs.append(config)
return configs
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return []
def get_enabled_inspection_configs(self):
"""获取已启用的检验项目配置
Returns:
list: 已启用的检验项目配置列表
"""
return self.get_all_inspection_configs(include_disabled=False)
def get_inspection_config_by_position(self, position):
"""根据位置获取检验项目配置
Args:
position: 位置序号 (1-6)
Returns:
dict: 检验项目配置, 未找到则返回None
"""
try:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM inspection_config
WHERE position = ? AND is_deleted = FALSE
"""
params = (position,)
self.db.cursor.execute(sql, params)
row = self.db.cursor.fetchone()
if row:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
return config
else:
return None
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return None
def update_inspection_config(self, config_id, data, username='system'):
"""更新检验项目配置
Args:
config_id: 配置ID
data: 更新数据
username: 操作用户
Returns:
bool: 更新是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建更新SQL
update_fields = []
params = []
# 可更新的字段
allowed_fields = [
'name', 'display_name', 'enabled', 'required',
'data_type', 'min_value', 'max_value', 'unit',
'sort_order', 'enum_values'
]
for field in allowed_fields:
if field in data:
# 特殊处理enum_values字段确保存储为JSON字符串
if field == 'enum_values' and data[field] is not None:
if isinstance(data[field], list):
update_fields.append(f"{field} = ?")
params.append(json.dumps(data[field]))
elif isinstance(data[field], str):
# 如果已经是字符串检查是否有效的JSON
try:
json.loads(data[field])
update_fields.append(f"{field} = ?")
params.append(data[field])
except:
logging.warning(f"无效的JSON: {data[field]}")
continue
else:
update_fields.append(f"{field} = ?")
params.append(data[field])
# 添加更新时间和更新人
update_fields.append("update_time = ?")
params.append(current_time)
update_fields.append("update_by = ?")
params.append(username)
# 添加配置ID到参数列表
params.append(config_id)
# 构建SQL
sql = f"""
UPDATE inspection_config
SET {', '.join(update_fields)}
WHERE id = ?
"""
self.db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目配置失败: {str(e)}")
return False
def toggle_inspection_config(self, position, enabled, username='system'):
"""启用或禁用检验项目配置
Args:
position: 位置序号 (1-6)
enabled: 是否启用
username: 操作用户
Returns:
bool: 操作是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """
UPDATE inspection_config
SET enabled = ?, update_time = ?, update_by = ?
WHERE position = ? AND is_deleted = FALSE
"""
params = (enabled, current_time, username, position)
self.db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目启用状态失败: {str(e)}")
return False
def save_inspection_data(self, order_id, data, username='system'):
"""保存检验数据
Args:
order_id: 工程号
data: 检验数据列表格式: [{'position': 1, 'config_id': 1, 'value': '合格'}, ...]
username: 操作用户
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.db.begin_transaction()
for item in data:
position = item.get('position')
config_id = item.get('config_id')
value = item.get('value')
status = item.get('status', 'pass')
remark = item.get('remark', '')
tray_id = item.get('tray_id', '')
# 检查是否已存在该工程号和位置的记录
check_sql = """
SELECT id FROM inspection_data
WHERE order_id = ? AND position = ? AND is_deleted = FALSE
"""
check_params = (order_id, position)
self.db.cursor.execute(check_sql, check_params)
existing = self.db.cursor.fetchone()
if existing:
# 更新已有记录
update_sql = """
UPDATE inspection_data
SET config_id = ?, value = ?, status = ?, remark = ?,
update_time = ?, update_by = ?, tray_id = ?
WHERE id = ?
"""
update_params = (
config_id, value, status, remark,
current_time, username, tray_id, existing[0]
)
self.db.cursor.execute(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO inspection_data (
order_id, position, config_id, value, status, remark,
create_time, create_by, is_deleted, tray_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, FALSE, ?)
"""
insert_params = (
order_id, position, config_id, value, status, remark,
current_time, username, tray_id
)
self.db.cursor.execute(insert_sql, insert_params)
self.db.commit_transaction()
return True
except Exception as e:
self.db.rollback_transaction()
logging.error(f"保存检验数据失败: {str(e)}")
return False
def get_inspection_data_unfinished(self, tray_id):
"""获取未完成的检验数据,通过是否贴标来判断
Returns:
list: 未完成的检验数据列表
"""
try:
# 先获取所有没有贴标的工程号
sql_orders = """
SELECT DISTINCT d.order_id
FROM inspection_data d
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.position = 11 AND COALESCE(d.value,'') = ''
"""
params = (tray_id,)
self.db.cursor.execute(sql_orders, params)
orders = self.db.cursor.fetchall()
if not orders:
return []
# 构建IN子句的参数
order_ids = [order[0] for order in orders]
placeholders = ','.join(['?' for _ in order_ids])
# 获取这些工程号的所有检验数据
sql = f"""
SELECT d.id, d.order_id, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM inspection_data d
LEFT JOIN inspection_config c ON d.config_id = c.id
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.order_id IN ({placeholders})
ORDER BY d.create_time, d.order_id, d.position
"""
params = [tray_id] + order_ids
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'order_id': row[1],
'position': row[2],
'config_id': row[3],
'value': row[4],
'status': row[5],
'remark': row[6],
'name': row[7],
'display_name': row[8],
'data_type': row[9],
'unit': row[10]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取未完成的检验数据失败: {str(e)}")
return []
def get_inspection_data_by_order(self, order_id, tray_id):
"""根据工程号获取检验数据
Args:
order_id: 工程号
tray_id: 托盘号
Returns:
list: 检验数据列表
"""
try:
sql = """
SELECT d.id, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM inspection_data d
LEFT JOIN inspection_config c ON d.config_id = c.id
WHERE d.order_id = ? AND d.is_deleted = FALSE AND d.tray_id = ?
ORDER BY d.create_time, d.order_id, d.position
"""
params = (order_id, tray_id)
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'position': row[1],
'config_id': row[2],
'value': row[3],
'status': row[4],
'remark': row[5],
'name': row[6],
'display_name': row[7],
'data_type': row[8],
'unit': row[9]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取检验数据失败: {str(e)}")
return []
def get_package_record(self, tray_id):
"""根据托盘号获取包装记录
Args:
tray_id: 托盘号
Returns:
list: 包装记录列表
"""
try:
sql = """
SELECT order_id,
COALESCE(material, '') as material,
COALESCE(spec, '') as spec,
tray_id,
COALESCE(axis_package_id, '') as axis_package_id,
COALESCE(weight, 0) as weight,
STRFTIME('%Y-%m-%d %H:%M:%S', pack_time) as pack_time
FROM inspection_pack_data
WHERE tray_id = ?
AND is_deleted = FALSE
ORDER BY pack_time DESC
"""
params = (tray_id,)
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
return results
except Exception as e:
logging.error(f"获取包装记录失败: {str(e)}")
return []
def save_package_record(self, order_id, tray_id, label_value, weight_value, finish_time):
"""保存包装记录
Args:
order_id: 工程号
tray_id: 托盘号
label_value: 标签值
weight_value: 重量值
finish_time: 完成时间
"""
# TODO调用接口获取到工程号对应的其他信息比如材质规格后续完成
try:
sql = """
INSERT INTO inspection_pack_data (order_id, tray_id, axis_package_id, weight, pack_time, create_time, create_by, update_time, update_by, is_deleted)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (order_id, tray_id, label_value, weight_value, finish_time, datetime.now(), 'system', datetime.now(), 'system', False)
self.db.cursor.execute(sql, params)
self.db.conn.commit()
except Exception as e:
logging.error(f"保存包装记录失败: {str(e)}")
self.db.conn.rollback()