import logging from datetime import datetime from PySide6.QtWidgets import QTableWidgetItem, QMessageBox from PySide6.QtCore import Qt, Signal from ui.order_query_dialog_ui import OrderQueryDialogUI from utils.sql_utils import SQLUtils from utils.app_mode import AppMode from apis.gc_api import GcApi class OrderQueryDialog(OrderQueryDialogUI): """订单查询对话框,用于查询订单信息""" # 定义信号,用于向上传递选中的订单信息 order_selected = Signal(dict) def __init__(self, parent=None): super().__init__() # 存储查询结果 self.query_results = [] # 获取父窗口的用户信息 self.corp_id = None self.user_id = None self.user_name = None if parent: if hasattr(parent, 'corp_id'): self.corp_id = parent.corp_id if hasattr(parent, 'user_id'): self.user_id = parent.user_id[0] if hasattr(parent, 'user_name'): self.user_name = parent.user_name # 连接信号和槽 self.connect_signals() def connect_signals(self): """连接信号和槽""" # 查询按钮点击事件 self.query_button.clicked.connect(self.on_order_query) # 确认按钮点击事件 self.confirm_button.clicked.connect(self.on_confirm) # 取消按钮点击事件 self.cancel_button.clicked.connect(self.reject) # 表格双击事件 self.result_table.cellDoubleClicked.connect(self.on_table_double_clicked) def on_order_query(self): """执行订单查询""" try: # 获取查询参数 start_date = self.start_date.date().toString("yyyy-MM-dd") end_date = self.end_date.date().toString("yyyy-MM-dd") order_mo = self.order_mo_input.text().strip() material = self.material_combo.currentText() if material == "全部": material = "" spec = self.spec_input.text().strip() # 构建查询参数 query_params = { "srch_rq1": start_date, "srch_rq2": end_date, "srch_mo": order_mo, "srch_note": order_mo, "srch_cz": material, "srch_size": spec, "corp_id": self.corp_id } # 执行查询 results = self.query_orders(query_params) # 更新表格 self.update_result_table(results) except Exception as e: logging.error(f"订单查询失败: {e}") QMessageBox.critical(self, "查询错误", f"查询订单时发生错误: {str(e)}") def query_orders(self, params): """查询订单数据 Args: params: 查询参数字典 Returns: list: 订单数据列表 """ try: # 如果是API模式,优先从接口获取数据 if AppMode.is_api(): return self._query_orders_from_api(params) else: return self._query_orders_from_db(params) except Exception as e: logging.error(f"查询订单数据失败: {e}") raise def _query_orders_from_api(self, params): """从API获取订单数据 Args: params: 查询参数字典 Returns: list: 订单数据列表 """ try: # 调用接口 gc_api = GcApi() # 调用接口查询订单列表 response = gc_api.get_order_list(params) # 处理接口返回结果 if response and response.get("status", False): # 将接口返回的数据转换为标准格式 orders_data = response.get("data", []) results = [] for order_data in orders_data: # 转换为统一格式的字典 order_dict = { "note": order_data.get("note", ""), "mo": order_data.get("mo", ""), "customer": order_data.get("customer", ""), "customerexp": order_data.get("customerexp", ""), "khno": order_data.get("khno", ""), "ddzl": order_data.get("ddzl", ""), "khjq": order_data.get("khjq", ""), "code": order_data.get("code", ""), "type": order_data.get("type", ""), "cz": order_data.get("cz", ""), "size": order_data.get("size", ""), "cd": order_data.get("cd", ""), "maxsl": order_data.get("maxsl", ""), "spack": order_data.get("spack", ""), "zx_name": order_data.get("zx_name", ""), "zx_code": order_data.get("zx_code", ""), "zx_zl": order_data.get("zx_zl", ""), "template_name": order_data.get("template_name", ""), "bqlb": order_data.get("bqlb", ""), # 删除重复的khno字段 "dycz": order_data.get("dycz", ""), "luno": order_data.get("luno", ""), "corp": order_data.get("corp", ""), "sl": order_data.get("sl", ""), "tccd": order_data.get("tccd", ""), "bccd": order_data.get("bccd", ""), "ysl": order_data.get("ysl", ""), "qfqd": order_data.get("qfqd", ""), "yzgg": order_data.get("yzgg", ""), "tqd": order_data.get("tqd", ""), "bqd": order_data.get("bqd", ""), "bzfs": order_data.get("bzfs", ""), "ddyq": order_data.get("ddyq", ""), "remarks_hb": order_data.get("remarks_hb", ""), "bz_tqd": order_data.get("bz_tqd", ""), "bz_bqd": order_data.get("bz_bqd", ""), "zzyq": order_data.get("zzyq", ""), "rq": order_data.get("rq", "") } results.append(order_dict) # 保存查询结果 self.query_results = results # 如果接口查询到数据,则保存到数据库中 if results: self._save_orders_to_db(results) return results else: # 如果接口查询失败,则尝试从数据库查询 logging.warning("从API获取订单数据失败,尝试从数据库查询") return self._query_orders_from_db(params) except Exception as e: logging.error(f"从API获取订单数据失败: {e}") # 如果接口查询失败,则尝试从数据库查询 logging.warning("从API获取订单数据失败,尝试从数据库查询") return self._query_orders_from_db(params) def _save_orders_to_db(self, orders): """将订单数据保存到数据库 Args: orders: 订单数据列表 """ try: # 导入DAO from dao.inspection_dao import InspectionDAO inspection_dao = InspectionDAO() # 遍历订单数据,保存到数据库 for order_data in orders: order_code = order_data.get("note", "") if order_code: # 设置用户信息 if self.user_id: order_data['user_id'] = self.user_id if self.user_name: order_data['user_name'] = self.user_name if self.corp_id: order_data['data_corp'] = self.corp_id # 保存订单信息 inspection_dao.save_order_info(order_code, order_data) logging.info(f"成功保存{len(orders)}条订单数据到数据库") except Exception as e: logging.error(f"保存订单数据到数据库失败: {e}") def _query_orders_from_db(self, params): """从数据库获取订单数据 Args: params: 查询参数字典 Returns: list: 订单数据列表 """ # 构建SQL查询 sql = """ SELECT o.note, o.mo, o.customer, o.customerexp, o.khno, o.ddzl, o.khjq, o.code, o.type, o.cz, o.size, o.cd, o.maxsl, o.spack, o.zx_name, o.zx_code, o.zx_zl, o.template_name, o.bqlb, o.dycz, o.luno, o.corp, o.sl, o.tccd, o.bccd, o.ysl, o.qfqd, o.yzgg, o.tqd, o.bqd, o.bzfs, o.ddyq, o.remarks_hb, o.bz_tqd, o.bz_bqd, o.zzyq, o.create_time as rq FROM wsbz_order_info o WHERE 1=1 """ query_params = [] # 添加查询条件 if params.get("srch_rq1") and params.get("srch_rq2"): sql += " AND o.create_time BETWEEN ? AND ?" query_params.append(params["srch_rq1"] + " 00:00:00") query_params.append(params["srch_rq2"] + " 23:59:59") if params.get("srch_mo"): sql += " AND (o.mo LIKE ? OR o.note LIKE ?)" query_params.append(f"%{params['srch_mo']}%") query_params.append(f"%{params['srch_mo']}%") if params.get("srch_cz"): sql += " AND o.cz = ?" query_params.append(params["srch_cz"]) if params.get("srch_size"): sql += " AND o.size LIKE ?" query_params.append(f"%{params['srch_size']}%") # 添加排序 sql += " ORDER BY o.create_time DESC" # 执行查询 with SQLUtils('sqlite') as db: db.execute_query(sql, query_params) rows = db.fetchall() # 处理查询结果 results = [] for row in rows: # 将元组转换为字典,使用安全的索引访问 try: order_data = { "note": row[0] if len(row) > 0 else "", "mo": row[1] if len(row) > 1 else "", "customer": row[2] if len(row) > 2 else "", "customerexp": row[3] if len(row) > 3 else "", "khno": row[4] if len(row) > 4 else "", "ddzl": row[5] if len(row) > 5 else "", "khjq": row[6] if len(row) > 6 else "", "code": row[7] if len(row) > 7 else "", "type": row[8] if len(row) > 8 else "", "cz": row[9] if len(row) > 9 else "", "size": row[10] if len(row) > 10 else "", "cd": row[11] if len(row) > 11 else "", "maxsl": row[12] if len(row) > 12 else "", "spack": row[13] if len(row) > 13 else "", "zx_name": row[14] if len(row) > 14 else "", "zx_code": row[15] if len(row) > 15 else "", "zx_zl": row[16] if len(row) > 16 else "", "template_name": row[17] if len(row) > 17 else "", "bqlb": row[18] if len(row) > 18 else "", "dycz": row[19] if len(row) > 19 else "", "luno": row[20] if len(row) > 20 else "", "corp": row[21] if len(row) > 21 else "", "sl": row[22] if len(row) > 22 else "", "tccd": row[23] if len(row) > 23 else "", "bccd": row[24] if len(row) > 24 else "", "ysl": row[25] if len(row) > 25 else "", "qfqd": row[26] if len(row) > 26 else "", "yzgg": row[27] if len(row) > 27 else "", "tqd": row[28] if len(row) > 28 else "", "bqd": row[29] if len(row) > 29 else "", "bzfs": row[30] if len(row) > 30 else "", "ddyq": row[31] if len(row) > 31 else "", "remarks_hb": row[32] if len(row) > 32 else "", "bz_tqd": row[33] if len(row) > 33 else "", "bz_bqd": row[34] if len(row) > 34 else "", "zzyq": row[35] if len(row) > 35 else "", "rq": row[36] if len(row) > 36 else "" } results.append(order_data) except Exception as e: logging.error(f"处理查询结果行时出错: {str(e)}, 行数据: {row}") continue # 保存查询结果 self.query_results = results return results def query_orders_xpack(self, params): """通过托盘号查订单数据,返回列表""" try: gc_api = GcApi() response = gc_api.get_order_info_by_xpack(params.get("srch_spack"), params.get("corp_id")) if response and response.get("status", False): orders_data = response.get("data", []) results = [] for order_data in orders_data: results.append(order_data) self.query_results = results return results else: return [] except Exception as e: logging.error(f"通过托盘号查订单数据失败: {e}") return [] def update_result_table(self, results): """更新结果表格 Args: results: 订单数据列表 """ # 清空表格 self.result_table.setRowCount(0) if not results: return # 定义表头和对应的字段名(完整列表) columns = [ {"title": "序号", "field": None}, {"title": "日期", "field": "rq"}, {"title": "订单号", "field": "note"}, {"title": "订单明细", "field": "mo"}, {"title": "客户", "field": "customer"}, {"title": "客户名称", "field": "customerexp"}, {"title": "客户订单号", "field": "khno"}, # 修改标题以区分 {"title": "订单类别", "field": "ddzl"}, {"title": "客户交期", "field": "khjq"}, {"title": "编码", "field": "code"}, {"title": "产品类别", "field": "type"}, {"title": "材质", "field": "cz"}, {"title": "规格", "field": "size"}, {"title": "产地", "field": "cd"}, {"title": "最大入库量", "field": "maxsl"}, {"title": "托盘号", "field": "spack"}, {"title": "轴型", "field": "zx_name"}, {"title": "轴型code", "field": "zx_code"}, {"title": "轴型重量", "field": "zx_zl"}, {"title": "标签类别", "field": "template_name"}, {"title": "标签类别code", "field": "bqlb"}, # 删除重复的"客户实际订单号"列 {"title": "打印材质", "field": "dycz"}, {"title": "炉号", "field": "luno"}, {"title": "公司", "field": "corp"}, {"title": "数量", "field": "sl"}, {"title": "上公差", "field": "tccd"}, {"title": "下公差", "field": "bccd"}, {"title": "延伸率", "field": "ysl"}, {"title": "屈服强度", "field": "qfqd"}, {"title": "英制规格", "field": "yzgg"}, {"title": "强度上限", "field": "tqd"}, {"title": "强度下限", "field": "bqd"}, {"title": "包装方式", "field": "bzfs"}, {"title": "订单要求", "field": "ddyq"}, {"title": "备注", "field": "remarks_hb"}, {"title": "包装强度上限", "field": "bz_tqd"}, {"title": "包装强度下限", "field": "bz_bqd"}, {"title": "轴重要求", "field": "zzyq"} ] # 设置表头 self.result_table.setColumnCount(len(columns)) header_labels = [col["title"] for col in columns] self.result_table.setHorizontalHeaderLabels(header_labels) # 设置行数 self.result_table.setRowCount(len(results)) # 填充数据 for row, order_data in enumerate(results): # 序号 item_index = QTableWidgetItem(str(row + 1)) item_index.setTextAlignment(Qt.AlignCenter) self.result_table.setItem(row, 0, item_index) # 遍历列,填充数据 for col_idx, column in enumerate(columns[1:], 1): # 从1开始,跳过序号列 field = column["field"] if field: value = "" # 特殊处理某些字段 if field == "rq": create_time = order_data.get(field, "") if create_time: try: # 尝试解析日期时间字符串 dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S") value = dt.strftime("%Y-%m-%d") except: value = create_time else: # 其他普通字段 value = str(order_data.get(field, "")) # 创建表格项并设置文本 item = QTableWidgetItem(value) # 居中对齐特定的列 if field in ["rq", "sl", "maxsl", "zx_zl"]: item.setTextAlignment(Qt.AlignCenter) # 设置表格项 self.result_table.setItem(row, col_idx, item) # 存储原始数据到第一列的item中 item_index.setData(Qt.UserRole, order_data) def on_table_double_clicked(self, row, column): """表格双击事件处理""" self.select_current_row(row) def on_confirm(self): """确认按钮点击事件处理""" # 获取当前选中行 selected_rows = self.result_table.selectionModel().selectedRows() if not selected_rows: QMessageBox.warning(self, "提示", "请选择一个订单") return # 获取选中行的索引 row_index = selected_rows[0].row() self.select_current_row(row_index) def select_current_row(self, row): """选择当前行并返回数据 将订单明细(mo)作为订单号传递给上层组件,并自动获取托盘号 """ if 0 <= row < self.result_table.rowCount(): # 获取存储在item中的原始数据 item = self.result_table.item(row, 0) if item: order_data = item.data(Qt.UserRole) if order_data: # 修改订单数据,将mo字段作为note字段的值 mo_value = order_data.get("mo", "") if mo_value: order_data["note"] = mo_value # 自动获取托盘号 order_code = mo_value if order_code and self.corp_id: gc_api = GcApi() xpack_response = gc_api.get_xpack(order_code, self.corp_id) if xpack_response and xpack_response.get("status", False): order_data["xpack"] = xpack_response.get("xpack", "") # 发出信号 self.order_selected.emit(order_data) self.accept()