jiateng_ws/dao/inspection_dao.py

1288 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from datetime import datetime
from utils.sql_utils import SQLUtils
class InspectionDAO:
"""检验项目配置和数据访问对象"""
def __init__(self):
"""初始化数据访问对象"""
# 不再在初始化时创建数据库连接,而是在需要时创建
pass
def __del__(self):
"""析构函数,确保数据库连接关闭"""
# 不再需要在这里关闭连接,由上下文管理器处理
pass
def get_all_inspection_configs(self, include_disabled=False):
"""获取所有检验项目配置
Args:
include_disabled: 是否包含禁用的项目
Returns:
list: 检验项目配置列表
"""
try:
if include_disabled:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE is_deleted = FALSE
ORDER BY sort_order, position
"""
params = ()
else:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE is_deleted = FALSE AND enabled = TRUE
ORDER BY sort_order, position
"""
params = ()
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
configs = []
for row in results:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
configs.append(config)
return configs
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return []
def get_enabled_inspection_configs(self):
"""获取已启用的检验项目配置
Returns:
list: 已启用的检验项目配置列表
"""
return self.get_all_inspection_configs(include_disabled=False)
def get_inspection_config_by_position(self, position):
"""根据位置获取检验项目配置
Args:
position: 位置序号 (1-6)
Returns:
dict: 检验项目配置, 未找到则返回None
"""
try:
sql = """
SELECT id, position, name, display_name, enabled, required,
data_type, min_value, max_value, enum_values, unit, sort_order
FROM wsbz_inspection_config
WHERE position = ? AND is_deleted = FALSE
"""
params = (position,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
row = db.cursor.fetchone()
if row:
config = {
'id': row[0],
'position': row[1],
'name': row[2],
'display_name': row[3],
'enabled': bool(row[4]),
'required': bool(row[5]),
'data_type': row[6],
'min_value': row[7],
'max_value': row[8],
'enum_values': json.loads(row[9]) if row[9] else None,
'unit': row[10],
'sort_order': row[11]
}
return config
else:
return None
except Exception as e:
logging.error(f"获取检验项目配置失败: {str(e)}")
return None
def update_inspection_config(self, config_id, data, username='system'):
"""更新检验项目配置
Args:
config_id: 配置ID
data: 更新数据
username: 操作用户
Returns:
bool: 更新是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建更新SQL
update_fields = []
params = []
# 可更新的字段
allowed_fields = [
'name', 'display_name', 'enabled', 'required',
'data_type', 'min_value', 'max_value', 'unit',
'sort_order', 'enum_values'
]
for field in allowed_fields:
if field in data:
# 特殊处理enum_values字段确保存储为JSON字符串
if field == 'enum_values' and data[field] is not None:
if isinstance(data[field], list):
update_fields.append(f"{field} = ?")
params.append(json.dumps(data[field]))
elif isinstance(data[field], str):
# 如果已经是字符串检查是否有效的JSON
try:
json.loads(data[field])
update_fields.append(f"{field} = ?")
params.append(data[field])
except:
logging.warning(f"无效的JSON: {data[field]}")
continue
else:
update_fields.append(f"{field} = ?")
params.append(data[field])
# 添加更新时间和更新人
update_fields.append("update_time = ?")
params.append(current_time)
update_fields.append("update_by = ?")
params.append(username)
# 添加配置ID到参数列表
params.append(config_id)
# 构建SQL
sql = f"""
UPDATE inspection_config
SET {', '.join(update_fields)}
WHERE id = ?
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目配置失败: {str(e)}")
return False
def toggle_inspection_config(self, position, enabled, username='system'):
"""启用或禁用检验项目配置
Args:
position: 位置序号 (1-6)
enabled: 是否启用
username: 操作用户
Returns:
bool: 操作是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """
UPDATE wsbz_inspection_config
SET enabled = ?, update_time = ?, update_by = ?
WHERE position = ? AND is_deleted = FALSE
"""
params = (enabled, current_time, username, position)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.execute_update(sql, params)
return True
except Exception as e:
logging.error(f"更新检验项目启用状态失败: {str(e)}")
return False
def save_order_info(self, order_id,data):
"""保存订单信息到 wsbz_order_info 表
Args:
data: 订单信息字典
Returns:
bool: 操作是否成功
"""
try:
if not data:
return False
# 使用单一连接实例处理整个操作
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction() # 开始事务
# 先检查是否存在记录
check_cursor = db.get_new_cursor()
check_sql = "SELECT ddmo FROM wsbz_order_info WHERE ddmo = ?"
check_cursor.execute(check_sql, (data.get("mo", ""),))
existing_record = check_cursor.fetchone()
check_cursor.close() # 使用后关闭新游标
if existing_record:
# 如果记录存在,执行更新
sql = """
UPDATE wsbz_order_info SET
data_corp = ?, user_id = ?, user_name = ?, gzl_zl = ?,
xpack = ?, qd = ?, spack_type = ?, mxzs = ?, jt = ?,
ddnote = ?, code = ?, type = ?, lable = ?, lib = ?,
gzl = ?, maxsl = ?, cz = ?, size = ?, cd = ?, luno = ?,
qfqd = ?, pono = ?, xj = ?, ysl = ?, dycz = ?,
zx_code = ?, edit_id = ?, remarks = ?, zx_name = ?,
bccd = ? ,tccd = ?, zzyq = ?, customer = ?,customerexp = ?,
bz_bqd = ?,bz_tqd = ?,type_name = ?,remarks_hb=?,khno=?
WHERE ddmo = ?
"""
params = (
data.get("data_corp", "JT"),
data.get("user_id", ""),
data.get("user_name", ""),
data.get("zx_zl", ""),
data.get("xpack", ""),
data.get("qd", ""),
data.get("spack_type", ""),
data.get("mxzs", ""),
data.get("jt", ""),
data.get("note", ""),
data.get("code", ""),
data.get("type", ""),
data.get("template_name", ""),
data.get("lib", ""),
data.get("zx_code", ""),
data.get("maxsl", ""),
data.get("cz", ""),
data.get("size", ""),
data.get("cd", ""),
data.get("luno", ""),
data.get("qfqd", ""),
data.get("khno", ""),
data.get("size", ""),
data.get("ysl", ""),
data.get("dycz", ""),
data.get("zx_code", ""),
data.get("edit_id", ""),
data.get("remarks", ""),
data.get("zx_name", ""),
data.get("bccd", ""),
data.get("tccd", ""),
data.get("zzyq", ""),
data.get("customer", ""),
data.get("customerexp", ""),
data.get("bz_bqd", ""),
data.get("bz_tqd", ""),
data.get("type_name", ""),
data.get("remarks_hb", ""),
data.get("khno", ""),
data.get("mo", "") # WHERE 条件参数
)
logging.info(f"更新订单信息: ddmo={data.get('mo', '')}")
else:
# 如果记录不存在,执行插入
sql = """
INSERT INTO wsbz_order_info (
data_corp, user_id, user_name, gzl_zl, ddmo, xpack,
qd, spack_type, mxzs, jt, ddnote, code, type,
lable, lib, gzl, maxsl, cz, size, cd, luno, qfqd,
pono, xj, ysl, dycz, zx_code, edit_id, remarks,zx_name,
bccd,tccd,zzyq,customer,customerexp,bz_bqd,bz_tqd,type_name,
remarks_hb,khno
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
"""
params = (
data.get("data_corp", "JT"),
data.get("user_id", ""),
data.get("user_name", ""),
data.get("zx_zl", ""),
data.get("mo", ""),
data.get("xpack", ""),
data.get("qd", ""),
data.get("spack_type", ""),
data.get("mxzs", ""),
data.get("jt", ""),
data.get("note", ""),
data.get("code", ""),
data.get("type", ""),
data.get("template_name", ""),
data.get("lib", ""),
data.get("zx_code", ""),
data.get("maxsl", ""),
data.get("cz", ""),
data.get("size", ""),
data.get("cd", ""),
data.get("luno", ""),
data.get("qfqd", ""),
data.get("khno", ""),
data.get("size", ""),
data.get("ysl", ""),
data.get("dycz", ""),
data.get("zx_code", ""),
data.get("edit_id", ""),
data.get("remarks", ""),
data.get("zx_name", ""),
data.get("bccd", ""),
data.get("tccd", ""),
data.get("zzyq", ""),
data.get("customer", ""),
data.get("customerexp", ""),
data.get("bz_bqd", ""),
data.get("bz_tqd", ""),
data.get("type_name", ""),
data.get("remarks_hb", ""),
data.get("khno", "")
)
logging.info(f"插入新订单信息: ddmo={data.get('mo', '')}")
# 执行SQL
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"保存订单信息失败: {str(e)}")
return False
def save_inspection_data(self, order_id, gc_note, data, username='system'):
"""保存检验数据
Args:
order_id: 订单号
gc_note: 工程号
data: 检验数据列表每项包含position, config_id, value, status, remark
username: 操作用户
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 使用上下文管理器自动处理连接和游标
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction() # 开始事务
for item in data:
position = item.get('position')
config_id = item.get('config_id')
value = item.get('value', '')
status = item.get('status', '')
remark = item.get('remark', '')
tray_id = item.get('tray_id', '')
# 获取新游标执行查询,避免递归使用
check_cursor = db.get_new_cursor()
check_sql = """
SELECT id FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND position = ? AND tray_id = ?
"""
check_params = (order_id, gc_note, position, tray_id)
check_cursor.execute(check_sql, check_params)
existing_record = check_cursor.fetchone()
check_cursor.close() # 使用后关闭新游标
if existing_record:
# 更新现有记录
update_sql = """
UPDATE wsbz_inspection_data
SET config_id = ?, value = ?, status = ?, remark = ?,
update_time = ?, update_by = ?
WHERE order_id = ? AND gc_note = ? AND position = ? AND tray_id = ?
"""
update_params = (
config_id, value, status, remark,
current_time, username,
order_id, gc_note, position, tray_id
)
db.execute_update(update_sql, update_params)
else:
# 插入新记录
insert_sql = """
INSERT INTO wsbz_inspection_data (
order_id, gc_note, position, config_id, value, status, remark,
create_time, create_by, update_time, update_by, tray_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_params = (
order_id, gc_note, position, config_id, value, status, remark,
current_time, username, current_time, username, tray_id
)
db.execute_update(insert_sql, insert_params)
db.commit_transaction() # 提交事务
return True
except Exception as e:
logging.error(f"保存检验数据失败: {str(e)}")
return False
def get_inspection_data_unfinished(self, tray_id):
"""获取未完成的检验数据,通过是否贴标来判断
Returns:
list: 未完成的检验数据列表
"""
try:
# 先获取所有没有贴标的工程号
sql_orders = """
SELECT DISTINCT d.gc_note
FROM wsbz_inspection_data d
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND status != 'labeled'
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql_orders, params)
gc_notes = db.cursor.fetchall()
if not gc_notes:
return []
# 构建IN子句的参数
gc_notes = [gc_note[0] for gc_note in gc_notes]
placeholders = ','.join(['?' for _ in gc_notes])
# 获取这些工程号的所有检验数据
sql = f"""
SELECT d.id, d.gc_note, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM wsbz_inspection_data d
LEFT JOIN wsbz_inspection_config c ON d.config_id = c.id
WHERE d.is_deleted = FALSE AND d.tray_id = ?
AND d.gc_note IN ({placeholders})
ORDER BY d.create_time
"""
params = [tray_id] + gc_notes
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'gc_note': row[1],
'position': row[2],
'config_id': row[3],
'value': row[4],
'status': row[5],
'remark': row[6],
'name': row[7],
'display_name': row[8],
'data_type': row[9],
'unit': row[10]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取未完成的检验数据失败: {str(e)}")
return []
def get_inspection_data_by_order(self, order_id,gc_note, tray_id):
"""根据工程号获取检验数据
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
list: 检验数据列表
"""
try:
sql = """
SELECT d.id, d.position, d.config_id, d.value, d.status, d.remark,
c.name, c.display_name, c.data_type, c.unit
FROM wsbz_inspection_data d
LEFT JOIN wsbz_inspection_config c ON d.config_id = c.id
WHERE d.order_id = ? AND d.gc_note = ? AND d.is_deleted = FALSE AND d.tray_id = ?
ORDER BY d.create_time, d.order_id, d.position
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'position': row[1],
'config_id': row[2],
'value': row[3],
'status': row[4],
'remark': row[5],
'name': row[6],
'display_name': row[7],
'data_type': row[8],
'unit': row[9]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取检验数据失败: {str(e)}")
return []
def get_package_record(self, tray_id):
"""根据托盘号获取包装记录
Args:
tray_id: 托盘号
Returns:
list: 包装记录列表
"""
try:
sql = """
SELECT DISTINCT order_id,
gc_note,
COALESCE(orders.size, '') as material,
COALESCE(orders.cz, '') as spec,
tray_id,
COALESCE(axis_package_id, '') as axis_package_id,
COALESCE(weight, 0) as weight,
COALESCE(net_weight, 0) as net_weight,
STRFTIME('%Y-%m-%d %H:%M:%S', pack_time) as pack_time
FROM wsbz_inspection_pack_data t1
LEFT JOIN wsbz_order_info orders on t1.order_id = orders.ddmo
WHERE tray_id = ?
AND is_deleted = FALSE
ORDER BY pack_time DESC
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
return results
except Exception as e:
logging.error(f"获取包装记录失败: {str(e)}")
return []
def save_package_record(self, order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, gc_note=None):
"""保存包装记录
Args:
order_id: 订单号
tray_id: 托盘号
label_value: 贴标值
weight_value: 称重值
net_weight_value: 净重值
finish_time: 完成时间
gc_note: 工程号
Returns:
bool: 保存是否成功
"""
# TODO调用接口获取到工程号对应的其他信息比如材质规格后续完成
try:
sql = """
INSERT INTO wsbz_inspection_pack_data (order_id, tray_id, axis_package_id, weight, net_weight, pack_time, create_time, create_by, update_time, update_by, is_deleted,gc_note)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)
"""
params = (order_id, tray_id, label_value, weight_value, net_weight_value, finish_time, datetime.now(), 'system', datetime.now(), 'system', False,gc_note)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction()
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"保存包装记录失败: {str(e)}")
return False
def get_product_status(self, order_id, gc_note, tray_id):
"""获取产品的当前状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
str: 产品状态,如果没有找到则返回'init'
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
sql = """
SELECT status FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
ORDER BY id ASC LIMIT 1
"""
params = (order_id, gc_note, tray_id)
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return result[0] if result and result[0] else 'init' # 默认为init状态
except Exception as e:
logging.error(f"获取产品状态失败: {str(e)}")
return 'init' # 出错时返回默认状态
def check_package_record_exists(self, order_id, gc_note, tray_id):
"""检查指定工程号和托盘号的包装记录是否已存在
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
bool: 记录是否存在
"""
try:
sql = """
SELECT COUNT(*) FROM wsbz_inspection_pack_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ? AND is_deleted = FALSE
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return result[0] > 0 if result else False
except Exception as e:
logging.error(f"检查包装记录是否存在失败: {str(e)}")
return False
def update_product_status(self, order_id, gc_note, tray_id, new_status):
"""更新产品的状态
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
new_status: 新状态
Returns:
bool: 更新是否成功
"""
try:
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 更新该产品所有记录的状态字段
update_sql = """
UPDATE wsbz_inspection_data SET status = ?, update_time = ?
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
update_params = (new_status, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
order_id, gc_note, tray_id)
db.execute_update(update_sql, update_params)
logging.info(f"已更新产品状态: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}, 新状态={new_status}")
return True
except Exception as e:
logging.error(f"更新产品状态失败: {str(e)}")
return False
def delete_inspection_data(self, order_id, gc_note, tray_id):
"""删除检验数据
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
"""
try:
sql = """
DELETE FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction()
db.execute_update(sql, params)
db.commit_transaction()
return True
except Exception as e:
logging.error(f"删除检验数据失败: {str(e)}")
return False
def get_axios_num_by_order_id(self, order_id):
"""获取托盘号对应的轴号"""
try:
sql = """
SELECT max(axis_package_id) as axios_num FROM wsbz_inspection_pack_data WHERE order_id = ?
AND is_deleted = FALSE
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return int(result[0]) if result[0] else 0
except Exception as e:
logging.error(f"获取轴号失败: {str(e)}")
return 0
def get_axios_num(self, tray_id):
"""获取托盘号对应的轴号"""
try:
sql = """
SELECT max(cast(axis_package_id as int)) as axios_num FROM wsbz_inspection_pack_data WHERE tray_id = ?
AND is_deleted = FALSE
"""
params = (tray_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return int(result[0]) if result else 0
except Exception as e:
logging.error(f"获取轴号失败: {str(e)}")
return 0
def get_gzl_zl(self,order_id):
"""获取工字轮重量"""
try:
sql = """
SELECT gzl_zl FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logging.error(f"获取工字轮重量失败: {str(e)}")
return 0
def get_xj_range(self,order_id):
"""获取线径范围"""
try:
sql = """
SELECT bccd, tccd FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
return (result[0],result[1]) if result else (None,None)
except Exception as e:
logging.error(f"获取线径范围失败: {str(e)}")
return None,None
def get_order_create_time(self, order_id):
"""获取工程号的最早创建时间
Args:
order_id: 工程号
Returns:
str: 创建时间,格式为'YYYY-MM-DD HH:MM:SS'如果未找到则返回None
"""
try:
sql = """
SELECT MIN(create_time) FROM wsbz_inspection_data
WHERE order_id = ? AND is_deleted = FALSE
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, (order_id,))
result = db.cursor.fetchone()
return result[0] if result and result[0] else None
except Exception as e:
logging.error(f"获取工程号创建时间失败: {str(e)}")
return None
def get_orders_by_create_time(self, order_ids):
"""按创建时间排序工程号
Args:
order_ids: 工程号列表
Returns:
list: 按创建时间排序的工程号列表
"""
try:
if not order_ids:
return []
# 构建IN子句
placeholders = ','.join(['?' for _ in order_ids])
# 查询每个工程号的最早创建时间并排序
sql = f"""
SELECT gc_note, MIN(create_time) as first_create_time
FROM wsbz_inspection_data
WHERE gc_note IN ({placeholders}) AND is_deleted = FALSE
GROUP BY gc_note
ORDER BY first_create_time
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, order_ids)
results = db.cursor.fetchall()
# 提取排序后的工程号
sorted_order_ids = [row[0] for row in results]
# 确保所有传入的工程号都在结果中
for order_id in order_ids:
if order_id not in sorted_order_ids:
sorted_order_ids.append(order_id)
return sorted_order_ids
except Exception as e:
logging.error(f"按创建时间排序工程号失败: {str(e)}")
return order_ids # 出错时返回原始顺序
def get_order_info(self, order_id):
"""获取订单信息
Args:
order_id: 工程号
Returns:
dict: 订单信息字典
"""
try:
sql = """
SELECT DISTINCT data_corp, user_id, user_name, gzl_zl, mzl, ddmo, xpack, qd, spack_type, mxzs, jt, ddnote, code, type, lable, lib, gzl, maxsl, cz, size, cd, luno,
coalesce(qfqd,'') as qfqd, pono, xj, ysl, dycz, zx_code, edit_id, remarks, zx_name, bccd, tccd, zzyq, customer, customerexp, bz_bqd as bqd, bz_tqd as tqd, type_name, remarks_hb
FROM wsbz_order_info WHERE ddmo = ?
"""
params = (order_id,)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
result = db.cursor.fetchone()
if not result:
return {}
# 获取列名
column_names = [desc[0] for desc in db.cursor.description]
# 转换为字典
result_dict = {}
for i, value in enumerate(result):
if i < len(column_names):
result_dict[column_names[i]] = value
return result_dict
except Exception as e:
logging.error(f"获取订单信息失败: {str(e)}")
return {}
def get_order_others_info(self, gc_note, order_id, tray_id):
"""获取订单其他信息
Args:
order_id: 工程号
tray_id: 托盘号
Returns:
dict: 订单其他信息字典以name为keyvalue为值
"""
try:
sql = """
SELECT t1.order_id, CASE t1.position WHEN 12 THEN 'mzl' when 2 then 'xj' ELSE name END AS name, value
FROM wsbz_inspection_data t1
LEFT JOIN main.wsbz_inspection_config wic ON t1.config_id = wic.id
WHERE gc_note = ? AND t1.order_id = ?
AND tray_id = ?
AND CASE WHEN t1.position = 12 THEN 'mzl' ELSE name END IS NOT NULL
AND COALESCE(value, '') != ''
"""
params = (gc_note, order_id, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql, params)
results = db.cursor.fetchall()
if not results:
return {}
# 将结果转换为字典以name为keyvalue为值
result_dict = {}
for row in results:
if len(row) >= 3: # 确保行至少有3个元素
name = row[1] # name在第二列
value = row[2] # value在第三列
result_dict[name] = value
return result_dict
except Exception as e:
logging.error(f"获取订单其他信息失败: {str(e)}")
return {}
def get_order_statistics(self):
"""获取订单数量和产量统计数据(日/月/年/累计)
Returns:
dict: 包含日、月、年、累计订单数量和产量的字典
"""
try:
# 使用提供的SQL查询
sql = """
SELECT CASE
WHEN create_time >= DATE('now') AND create_time < DATE('now', '+1 day')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_day,
CASE
WHEN create_time >= DATE('now', 'start of month') AND create_time < DATE('now', 'start of month', '+1 month')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_month,
CASE
WHEN create_time >= DATE('now', 'start of year') AND create_time < DATE('now', 'start of year', '+1 year')
THEN COUNT(DISTINCT order_id)
ELSE 0 END AS order_cnt_year,
COUNT(DISTINCT order_id) AS order_cnt_all,
CASE
WHEN create_time >= DATE('now') AND create_time < DATE('now', '+1 day')
THEN SUM(value)
ELSE 0 END AS order_num_day,
CASE
WHEN create_time >= DATE('now', 'start of month') AND
create_time < DATE('now', 'start of month', '+1 month')
THEN SUM(value)
ELSE 0 END AS order_num_month,
CASE
WHEN create_time >= DATE('now', 'start of year') AND
create_time < DATE('now', 'start of year', '+1 year')
THEN SUM(value)
ELSE 0 END AS order_num_year,
CASE WHEN position = 12 THEN SUM(value) ELSE 0 END AS order_num_all
FROM wsbz_inspection_data WHERE position = 12
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.cursor.execute(sql)
row = db.cursor.fetchone()
if row:
data = {
'order_cnt_day': row[0] if row[0] is not None else 0,
'order_cnt_month': row[1] if row[1] is not None else 0,
'order_cnt_year': row[2] if row[2] is not None else 0,
'order_cnt_all': row[3] if row[3] is not None else 0,
'order_num_day': float(row[4]) if row[4] is not None else 0,
'order_num_month': float(row[5]) if row[5] is not None else 0,
'order_num_year': float(row[6]) if row[6] is not None else 0,
'order_num_all': float(row[7]) if row[7] is not None else 0
}
return data
else:
return {
'order_cnt_day': 0,
'order_cnt_month': 0,
'order_cnt_year': 0,
'order_cnt_all': 0,
'order_num_day': 0,
'order_num_month': 0,
'order_num_year': 0,
'order_num_all': 0
}
except Exception as e:
logging.error(f"获取订单数量和产量统计数据失败: {str(e)}")
return {
'order_cnt_day': 0,
'order_cnt_month': 0,
'order_cnt_year': 0,
'order_cnt_all': 0,
'order_num_day': 0,
'order_num_month': 0,
'order_num_year': 0,
'order_num_all': 0
}
def save_luno_info(self, luno_code, luno_data):
"""保存炉号信息到数据库
Args:
luno_code: 炉号编码
luno_data: 炉号数据字典
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 检查是否已存在该炉号
check_sql = "SELECT id FROM wsbz_luno_info WHERE luono = ?"
with SQLUtils('sqlite') as db:
db.cursor.execute(check_sql, (luno_code,))
existing = db.cursor.fetchone()
if existing:
# 更新现有记录
update_sql = """
UPDATE wsbz_luno_info SET
cz = ?, gg = ?, gc = ?, bz = ?, data_corp = ?, data_corp_name = ?,
c = ?, si = ?, mn = ?, p = ?, s = ?, ni = ?, cr = ?, ti = ?, mo = ?, cu = ?,
others = ?, klqd = ?, ysl = ?, rq = ?, update_time = ?, update_by = ?
WHERE luono = ?
"""
params = (
luno_data.get('cz', ''),
luno_data.get('gg', ''),
luno_data.get('gc', ''),
luno_data.get('bz', ''),
luno_data.get('data_corp', ''),
luno_data.get('data_corp_name', ''),
luno_data.get('c', ''),
luno_data.get('si', ''),
luno_data.get('mn', ''),
luno_data.get('p', ''),
luno_data.get('s', ''),
luno_data.get('ni', ''),
luno_data.get('cr', ''),
luno_data.get('ti', ''),
luno_data.get('mo', ''),
luno_data.get('cu', ''),
luno_data.get('others', ''),
luno_data.get('klqd', ''),
luno_data.get('ysl', ''),
luno_data.get('rq', ''),
current_time,
luno_data.get('user_name', 'system'),
luno_code
)
db.cursor.execute(update_sql, params)
else:
# 插入新记录
insert_sql = """
INSERT INTO wsbz_luno_info (
luono, cz, gg, gc, bz, data_corp, data_corp_name,
c, si, mn, p, s, ni, cr, ti, mo, cu, others, klqd, ysl, rq,
create_time, create_by, update_time, update_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
luno_code,
luno_data.get('cz', ''),
luno_data.get('gg', ''),
luno_data.get('gc', ''),
luno_data.get('bz', ''),
luno_data.get('data_corp', ''),
luno_data.get('data_corp_name', ''),
luno_data.get('c', ''),
luno_data.get('si', ''),
luno_data.get('mn', ''),
luno_data.get('p', ''),
luno_data.get('s', ''),
luno_data.get('ni', ''),
luno_data.get('cr', ''),
luno_data.get('ti', ''),
luno_data.get('mo', ''),
luno_data.get('cu', ''),
luno_data.get('others', ''),
luno_data.get('klqd', ''),
luno_data.get('ysl', ''),
luno_data.get('rq', ''),
current_time,
luno_data.get('user_name', 'system'),
current_time,
luno_data.get('user_name', 'system')
)
db.cursor.execute(insert_sql, params)
db.connection.commit()
logging.info(f"成功保存炉号信息: {luno_code}")
return True
except Exception as e:
logging.error(f"保存炉号信息失败: {str(e)}")
return False
def get_luno_info(self, luno_code):
"""根据炉号获取炉号信息
Args:
luno_code: 炉号编码
Returns:
dict: 炉号信息字典未找到返回None
"""
try:
sql = """
SELECT luono, cz, gg, gc, bz, data_corp, data_corp_name,
c, si, mn, p, s, ni, cr, ti, mo, cu, others, klqd, ysl, rq,
create_time, create_by, update_time, update_by
FROM wsbz_luno_info
WHERE luono = ? AND is_deleted = FALSE
"""
with SQLUtils('sqlite') as db:
db.cursor.execute(sql, (luno_code,))
row = db.cursor.fetchone()
if row:
# 获取列名
column_names = [desc[0] for desc in db.cursor.description]
# 转换为字典
luno_info = {}
for i, value in enumerate(row):
if i < len(column_names):
luno_info[column_names[i]] = value
return luno_info
else:
return None
except Exception as e:
logging.error(f"获取炉号信息失败: {str(e)}")
return None
def delete_package_record(self, order_id, gc_note, tray_id):
"""删除包装记录
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
Returns:
bool: 删除是否成功
"""
try:
sql = """
DELETE FROM wsbz_inspection_pack_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ?
"""
params = (order_id, gc_note, tray_id)
with SQLUtils('sqlite', database='db/jtDB.db') as db:
db.begin_transaction()
db.execute_update(sql, params)
db.commit_transaction()
logging.info(f"已删除包装记录: 订单号={order_id}, 工程号={gc_note}, 托盘号={tray_id}")
return True
except Exception as e:
logging.error(f"删除包装记录失败: {str(e)}")
return False
def get_inspection_data_by_config(self, order_id, gc_note, tray_id, position, config_id):
"""根据工程号、托盘号、位置和配置ID查询检验数据
Args:
order_id: 订单号
gc_note: 工程号
tray_id: 托盘号
position: 位置序号
config_id: 配置ID
Returns:
dict: 检验数据记录如果不存在则返回None
"""
try:
# 使用SQLUtils获取数据库连接
sql = """
SELECT id, order_id, gc_note, position, config_id, value, status, remark, tray_id, create_time, update_time
FROM wsbz_inspection_data
WHERE order_id = ? AND gc_note = ? AND tray_id = ? AND position = ? AND config_id = ?
ORDER BY update_time DESC
LIMIT 1
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 执行查询
db.cursor.execute(sql, (order_id, gc_note, tray_id, position, config_id))
# 获取结果
row = db.cursor.fetchone()
# 如果有结果,转换为字典
if row:
return {
'id': row[0],
'order_id': row[1],
'gc_note': row[2],
'position': row[3],
'config_id': row[4],
'value': row[5],
'status': row[6],
'remark': row[7],
'tray_id': row[8],
'create_time': row[9],
'update_time': row[10]
}
else:
return None
except Exception as e:
logging.error(f"查询检验数据失败: {str(e)}")
return None
def get_package_statistics(self, order_id=None):
"""获取包装记录的统计数据
Args:
order_id: 订单号如果为None则获取所有订单的统计
Returns:
dict: 包含当前订单和所有订单的统计数据
{
'count': 当前订单的记录数量,
'weight': 当前订单的总重量,
'count_all': 所有订单的记录数量,
'weight_all': 所有订单的总重量
}
"""
try:
# 构建SQL查询
sql = """
SELECT SUM(weight) weight,
SUM(count) count,
SUM(count_all) count_all,
SUM(weight_all) weight_all
FROM (
SELECT COUNT(gc_note) AS count,
SUM(weight) AS weight,
'' AS count_all,
'' AS weight_all
FROM wsbz_inspection_pack_data
WHERE order_id = ?
UNION ALL
SELECT '', '',
COUNT(gc_note) AS count_all,
SUM(weight) AS weight_all
FROM wsbz_inspection_pack_data
) a
"""
with SQLUtils('sqlite', database='db/jtDB.db') as db:
# 执行查询
db.cursor.execute(sql, (order_id or '',))
# 获取结果
row = db.cursor.fetchone()
# 如果有结果,转换为字典
if row:
return {
'weight': float(row[0] or 0),
'count': int(row[1] or 0),
'count_all': int(row[2] or 0),
'weight_all': float(row[3] or 0)
}
else:
return {
'weight': 0,
'count': 0,
'count_all': 0,
'weight_all': 0
}
except Exception as e:
logging.error(f"获取包装记录统计数据失败: {str(e)}")
return {
'weight': 0,
'count': 0,
'count_all': 0,
'weight_all': 0
}