jiateng_ws/widgets/order_query_dialog.py
2025-07-20 18:15:31 +08:00

467 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from datetime import datetime
from PySide6.QtWidgets import QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt, Signal
from ui.order_query_dialog_ui import OrderQueryDialogUI
from utils.sql_utils import SQLUtils
from utils.app_mode import AppMode
from apis.gc_api import GcApi
class OrderQueryDialog(OrderQueryDialogUI):
"""订单查询对话框,用于查询订单信息"""
# 定义信号,用于向上传递选中的订单信息
order_selected = Signal(dict)
def __init__(self, parent=None):
super().__init__()
# 存储查询结果
self.query_results = []
# 获取父窗口的用户信息
self.corp_id = None
self.user_id = None
self.user_name = None
if parent:
if hasattr(parent, 'corp_id'):
self.corp_id = parent.corp_id
if hasattr(parent, 'user_id'):
self.user_id = parent.user_id[0]
if hasattr(parent, 'user_name'):
self.user_name = parent.user_name
# 连接信号和槽
self.connect_signals()
def connect_signals(self):
"""连接信号和槽"""
# 查询按钮点击事件
self.query_button.clicked.connect(self.on_order_query)
# 确认按钮点击事件
self.confirm_button.clicked.connect(self.on_confirm)
# 取消按钮点击事件
self.cancel_button.clicked.connect(self.reject)
# 表格双击事件
self.result_table.cellDoubleClicked.connect(self.on_table_double_clicked)
def on_order_query(self):
"""执行订单查询"""
try:
# 获取查询参数
start_date = self.start_date.date().toString("yyyy-MM-dd")
end_date = self.end_date.date().toString("yyyy-MM-dd")
order_mo = self.order_mo_input.text().strip()
material = self.material_combo.currentText()
if material == "全部":
material = ""
spec = self.spec_input.text().strip()
# 构建查询参数
query_params = {
"srch_rq1": start_date,
"srch_rq2": end_date,
"srch_mo": order_mo,
"srch_note": order_mo,
"srch_cz": material,
"srch_size": spec,
"corp_id": self.corp_id
}
# 执行查询
results = self.query_orders(query_params)
# 更新表格
self.update_result_table(results)
except Exception as e:
logging.error(f"订单查询失败: {e}")
QMessageBox.critical(self, "查询错误", f"查询订单时发生错误: {str(e)}")
def query_orders(self, params):
"""查询订单数据
Args:
params: 查询参数字典
Returns:
list: 订单数据列表
"""
try:
# 如果是API模式优先从接口获取数据
if AppMode.is_api():
return self._query_orders_from_api(params)
else:
return self._query_orders_from_db(params)
except Exception as e:
logging.error(f"查询订单数据失败: {e}")
raise
def _query_orders_from_api(self, params):
"""从API获取订单数据
Args:
params: 查询参数字典
Returns:
list: 订单数据列表
"""
try:
# 调用接口
gc_api = GcApi()
# 调用接口查询订单列表
response = gc_api.get_order_list(params)
# 处理接口返回结果
if response and response.get("status", False):
# 将接口返回的数据转换为标准格式
orders_data = response.get("data", [])
results = []
for order_data in orders_data:
# 转换为统一格式的字典
order_dict = {
"note": order_data.get("note", ""),
"mo": order_data.get("mo", ""),
"customer": order_data.get("customer", ""),
"customerexp": order_data.get("customerexp", ""),
"khno": order_data.get("khno", ""),
"ddzl": order_data.get("ddzl", ""),
"khjq": order_data.get("khjq", ""),
"code": order_data.get("code", ""),
"type": order_data.get("type", ""),
"cz": order_data.get("cz", ""),
"size": order_data.get("size", ""),
"cd": order_data.get("cd", ""),
"maxsl": order_data.get("maxsl", ""),
"spack": order_data.get("spack", ""),
"zx_name": order_data.get("zx_name", ""),
"zx_code": order_data.get("zx_code", ""),
"zx_zl": order_data.get("zx_zl", ""),
"template_name": order_data.get("template_name", ""),
"bqlb": order_data.get("bqlb", ""),
# 删除重复的khno字段
"dycz": order_data.get("dycz", ""),
"luno": order_data.get("luno", ""),
"corp": order_data.get("corp", ""),
"sl": order_data.get("sl", ""),
"tccd": order_data.get("tccd", ""),
"bccd": order_data.get("bccd", ""),
"ysl": order_data.get("ysl", ""),
"qfqd": order_data.get("qfqd", ""),
"yzgg": order_data.get("yzgg", ""),
"tqd": order_data.get("tqd", ""),
"bqd": order_data.get("bqd", ""),
"bzfs": order_data.get("bzfs", ""),
"ddyq": order_data.get("ddyq", ""),
"remarks_hb": order_data.get("remarks_hb", ""),
"bz_tqd": order_data.get("bz_tqd", ""),
"bz_bqd": order_data.get("bz_bqd", ""),
"zzyq": order_data.get("zzyq", ""),
"rq": order_data.get("rq", "")
}
results.append(order_dict)
# 保存查询结果
self.query_results = results
# 如果接口查询到数据,则保存到数据库中
if results:
self._save_orders_to_db(results)
return results
else:
# 如果接口查询失败,则尝试从数据库查询
logging.warning("从API获取订单数据失败尝试从数据库查询")
return self._query_orders_from_db(params)
except Exception as e:
logging.error(f"从API获取订单数据失败: {e}")
# 如果接口查询失败,则尝试从数据库查询
logging.warning("从API获取订单数据失败尝试从数据库查询")
return self._query_orders_from_db(params)
def _save_orders_to_db(self, orders):
"""将订单数据保存到数据库
Args:
orders: 订单数据列表
"""
try:
# 导入DAO
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 遍历订单数据,保存到数据库
for order_data in orders:
order_code = order_data.get("note", "")
if order_code:
# 设置用户信息
if self.user_id:
order_data['user_id'] = self.user_id
if self.user_name:
order_data['user_name'] = self.user_name
if self.corp_id:
order_data['data_corp'] = self.corp_id
# 保存订单信息
inspection_dao.save_order_info(order_code, order_data)
logging.info(f"成功保存{len(orders)}条订单数据到数据库")
except Exception as e:
logging.error(f"保存订单数据到数据库失败: {e}")
def _query_orders_from_db(self, params):
"""从数据库获取订单数据
Args:
params: 查询参数字典
Returns:
list: 订单数据列表
"""
# 构建SQL查询
sql = """
SELECT
o.note, o.mo, o.customer, o.customerexp, o.khno, o.ddzl, o.khjq, o.code, o.type,
o.cz, o.size, o.cd, o.maxsl, o.spack, o.zx_name, o.zx_code, o.zx_zl,
o.template_name, o.bqlb, o.dycz, o.luno, o.corp, o.sl, o.tccd, o.bccd,
o.ysl, o.qfqd, o.yzgg, o.tqd, o.bqd, o.bzfs, o.ddyq, o.remarks_hb,
o.bz_tqd, o.bz_bqd, o.zzyq, o.create_time as rq
FROM wsbz_order_info o
WHERE 1=1
"""
query_params = []
# 添加查询条件
if params.get("srch_rq1") and params.get("srch_rq2"):
sql += " AND o.create_time BETWEEN ? AND ?"
query_params.append(params["srch_rq1"] + " 00:00:00")
query_params.append(params["srch_rq2"] + " 23:59:59")
if params.get("srch_mo"):
sql += " AND (o.mo LIKE ? OR o.note LIKE ?)"
query_params.append(f"%{params['srch_mo']}%")
query_params.append(f"%{params['srch_mo']}%")
if params.get("srch_cz"):
sql += " AND o.cz = ?"
query_params.append(params["srch_cz"])
if params.get("srch_size"):
sql += " AND o.size LIKE ?"
query_params.append(f"%{params['srch_size']}%")
# 添加排序
sql += " ORDER BY o.create_time DESC"
# 执行查询
with SQLUtils('sqlite') as db:
db.execute_query(sql, query_params)
rows = db.fetchall()
# 处理查询结果
results = []
for row in rows:
# 将元组转换为字典,使用安全的索引访问
try:
order_data = {
"note": row[0] if len(row) > 0 else "",
"mo": row[1] if len(row) > 1 else "",
"customer": row[2] if len(row) > 2 else "",
"customerexp": row[3] if len(row) > 3 else "",
"khno": row[4] if len(row) > 4 else "",
"ddzl": row[5] if len(row) > 5 else "",
"khjq": row[6] if len(row) > 6 else "",
"code": row[7] if len(row) > 7 else "",
"type": row[8] if len(row) > 8 else "",
"cz": row[9] if len(row) > 9 else "",
"size": row[10] if len(row) > 10 else "",
"cd": row[11] if len(row) > 11 else "",
"maxsl": row[12] if len(row) > 12 else "",
"spack": row[13] if len(row) > 13 else "",
"zx_name": row[14] if len(row) > 14 else "",
"zx_code": row[15] if len(row) > 15 else "",
"zx_zl": row[16] if len(row) > 16 else "",
"template_name": row[17] if len(row) > 17 else "",
"bqlb": row[18] if len(row) > 18 else "",
"dycz": row[19] if len(row) > 19 else "",
"luno": row[20] if len(row) > 20 else "",
"corp": row[21] if len(row) > 21 else "",
"sl": row[22] if len(row) > 22 else "",
"tccd": row[23] if len(row) > 23 else "",
"bccd": row[24] if len(row) > 24 else "",
"ysl": row[25] if len(row) > 25 else "",
"qfqd": row[26] if len(row) > 26 else "",
"yzgg": row[27] if len(row) > 27 else "",
"tqd": row[28] if len(row) > 28 else "",
"bqd": row[29] if len(row) > 29 else "",
"bzfs": row[30] if len(row) > 30 else "",
"ddyq": row[31] if len(row) > 31 else "",
"remarks_hb": row[32] if len(row) > 32 else "",
"bz_tqd": row[33] if len(row) > 33 else "",
"bz_bqd": row[34] if len(row) > 34 else "",
"zzyq": row[35] if len(row) > 35 else "",
"rq": row[36] if len(row) > 36 else ""
}
results.append(order_data)
except Exception as e:
logging.error(f"处理查询结果行时出错: {str(e)}, 行数据: {row}")
continue
# 保存查询结果
self.query_results = results
return results
def update_result_table(self, results):
"""更新结果表格
Args:
results: 订单数据列表
"""
# 清空表格
self.result_table.setRowCount(0)
if not results:
return
# 定义表头和对应的字段名(完整列表)
columns = [
{"title": "序号", "field": None},
{"title": "日期", "field": "rq"},
{"title": "订单号", "field": "note"},
{"title": "订单明细", "field": "mo"},
{"title": "客户", "field": "customer"},
{"title": "客户名称", "field": "customerexp"},
{"title": "客户订单号", "field": "khno"}, # 修改标题以区分
{"title": "订单类别", "field": "ddzl"},
{"title": "客户交期", "field": "khjq"},
{"title": "编码", "field": "code"},
{"title": "产品类别", "field": "type"},
{"title": "材质", "field": "cz"},
{"title": "规格", "field": "size"},
{"title": "产地", "field": "cd"},
{"title": "最大入库量", "field": "maxsl"},
{"title": "托盘号", "field": "spack"},
{"title": "轴型", "field": "zx_name"},
{"title": "轴型code", "field": "zx_code"},
{"title": "轴型重量", "field": "zx_zl"},
{"title": "标签类别", "field": "template_name"},
{"title": "标签类别code", "field": "bqlb"},
# 删除重复的"客户实际订单号"列
{"title": "打印材质", "field": "dycz"},
{"title": "炉号", "field": "luno"},
{"title": "公司", "field": "corp"},
{"title": "数量", "field": "sl"},
{"title": "上公差", "field": "tccd"},
{"title": "下公差", "field": "bccd"},
{"title": "延伸率", "field": "ysl"},
{"title": "屈服强度", "field": "qfqd"},
{"title": "英制规格", "field": "yzgg"},
{"title": "强度上限", "field": "tqd"},
{"title": "强度下限", "field": "bqd"},
{"title": "包装方式", "field": "bzfs"},
{"title": "订单要求", "field": "ddyq"},
{"title": "备注", "field": "remarks_hb"},
{"title": "包装强度上限", "field": "bz_tqd"},
{"title": "包装强度下限", "field": "bz_bqd"},
{"title": "轴重要求", "field": "zzyq"}
]
# 设置表头
self.result_table.setColumnCount(len(columns))
header_labels = [col["title"] for col in columns]
self.result_table.setHorizontalHeaderLabels(header_labels)
# 设置行数
self.result_table.setRowCount(len(results))
# 填充数据
for row, order_data in enumerate(results):
# 序号
item_index = QTableWidgetItem(str(row + 1))
item_index.setTextAlignment(Qt.AlignCenter)
self.result_table.setItem(row, 0, item_index)
# 遍历列,填充数据
for col_idx, column in enumerate(columns[1:], 1): # 从1开始跳过序号列
field = column["field"]
if field:
value = ""
# 特殊处理某些字段
if field == "rq":
create_time = order_data.get(field, "")
if create_time:
try:
# 尝试解析日期时间字符串
dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
value = dt.strftime("%Y-%m-%d")
except:
value = create_time
else:
# 其他普通字段
value = str(order_data.get(field, ""))
# 创建表格项并设置文本
item = QTableWidgetItem(value)
# 居中对齐特定的列
if field in ["rq", "sl", "maxsl", "zx_zl"]:
item.setTextAlignment(Qt.AlignCenter)
# 设置表格项
self.result_table.setItem(row, col_idx, item)
# 存储原始数据到第一列的item中
item_index.setData(Qt.UserRole, order_data)
def on_table_double_clicked(self, row, column):
"""表格双击事件处理"""
self.select_current_row(row)
def on_confirm(self):
"""确认按钮点击事件处理"""
# 获取当前选中行
selected_rows = self.result_table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "提示", "请选择一个订单")
return
# 获取选中行的索引
row_index = selected_rows[0].row()
self.select_current_row(row_index)
def select_current_row(self, row):
"""选择当前行并返回数据
将订单明细(mo)作为订单号传递给上层组件,并自动获取托盘号
"""
if 0 <= row < self.result_table.rowCount():
# 获取存储在item中的原始数据
item = self.result_table.item(row, 0)
if item:
order_data = item.data(Qt.UserRole)
if order_data:
# 修改订单数据将mo字段作为note字段的值
mo_value = order_data.get("mo", "")
if mo_value:
order_data["note"] = mo_value
# 自动获取托盘号
order_code = mo_value
if order_code and self.corp_id:
gc_api = GcApi()
xpack_response = gc_api.get_xpack(order_code, self.corp_id)
if xpack_response and xpack_response.get("status", False):
order_data["xpack"] = xpack_response.get("xpack", "")
# 发出信号
self.order_selected.emit(order_data)
self.accept()