import logging from datetime import datetime from PySide6.QtWidgets import QTableWidgetItem, QMessageBox from PySide6.QtCore import Qt, Signal from ui.luno_query_dialog_ui import LunoQueryDialogUI from utils.sql_utils import SQLUtils from utils.app_mode import AppMode from apis.gc_api import GcApi class LunoQueryDialog(LunoQueryDialogUI): """炉号查询对话框,用于查询炉号信息""" # 定义信号,用于向上传递选中的炉号信息 luno_selected = Signal(dict) def __init__(self, parent=None): super().__init__() # 存储查询结果 self.query_results = [] # 获取父窗口的用户信息 self.corp_id = None self.user_id = None self.user_name = None if parent: if hasattr(parent, 'corp_id'): self.corp_id = parent.corp_id if hasattr(parent, 'user_id'): self.user_id = parent.user_id[0] if isinstance(parent.user_id, tuple) else parent.user_id if hasattr(parent, 'user_name'): self.user_name = parent.user_name # 连接信号和槽 self.connect_signals() def connect_signals(self): """连接信号和槽""" # 查询按钮点击事件 self.query_button.clicked.connect(self.on_luno_query) # 确认按钮点击事件 self.confirm_button.clicked.connect(self.on_confirm) # 取消按钮点击事件 self.cancel_button.clicked.connect(self.reject) # 表格双击事件 self.result_table.cellDoubleClicked.connect(self.on_table_double_clicked) def on_luno_query(self): """执行炉号查询""" try: # 获取查询参数 start_date = self.start_date.date().toString("yyyy-MM-dd") end_date = self.end_date.date().toString("yyyy-MM-dd") luno = self.luno_input.text().strip() material = self.material_combo.currentText() if material == "全部": material = "" spec = self.spec_input.text().strip() steel = self.steel_input.text().strip() # 构建查询参数 query_params = { "srch_rq1": start_date, "srch_rq2": end_date, "luono": luno, "cz": material, "gg": spec, "gc": steel, "corp_id": self.corp_id } # 执行查询 results = self.query_lunos(query_params) # 更新表格 self.update_result_table(results) except Exception as e: logging.error(f"炉号查询失败: {e}") QMessageBox.critical(self, "查询错误", f"查询炉号时发生错误: {str(e)}") def query_lunos(self, params): """查询炉号数据 Args: params: 查询参数字典 Returns: list: 炉号数据列表 """ try: # 如果是API模式,优先从接口获取数据 if AppMode.is_api(): return self._query_lunos_from_api(params) else: return self._query_lunos_from_db(params) except Exception as e: logging.error(f"查询炉号数据失败: {e}") raise def _query_lunos_from_api(self, params): """从API获取炉号数据 Args: params: 查询参数字典 Returns: list: 炉号数据列表 """ try: # 调用接口 gc_api = GcApi() # 调用接口查询炉号列表 response = gc_api.get_luno_list(params) # 处理接口返回结果 - 接口直接返回数据列表 if response and isinstance(response, list): # 接口直接返回炉号数据列表 lunos_data = response results = [] for luno_data in lunos_data: # 转换为统一格式的字典,处理接口返回的实际字段 luno_dict = { "luono": luno_data.get("luono", ""), "cz": luno_data.get("cz", ""), "gg": luno_data.get("gg", ""), "gc": luno_data.get("gc", ""), "bz": luno_data.get("bz", ""), "data_corp": luno_data.get("data_corp", ""), "data_corp_name": luno_data.get("data_corp_name", ""), "c": luno_data.get("c", ""), "si": luno_data.get("si", ""), "mn": luno_data.get("mn", ""), "p": luno_data.get("p", ""), "s": luno_data.get("s", ""), "ni": luno_data.get("ni", ""), "cr": luno_data.get("cr", ""), "ti": luno_data.get("ti", ""), "mo": luno_data.get("mo", ""), "cu": luno_data.get("cu", ""), "others": luno_data.get("others", ""), "klqd": luno_data.get("klqd", ""), "ysl": luno_data.get("ysl", ""), "rq": luno_data.get("rq", "") } # 处理null值,转换为空字符串 for key, value in luno_dict.items(): if value is None: luno_dict[key] = "" results.append(luno_dict) # 保存查询结果 self.query_results = results # 如果接口查询到数据,则保存到数据库中 if results: self._save_lunos_to_db(results) return results else: # 如果接口返回的不是列表或为空,则尝试从数据库查询 logging.warning("从API获取炉号数据失败或返回格式不正确,尝试从数据库查询") return self._query_lunos_from_db(params) except Exception as e: logging.error(f"从API获取炉号数据失败: {e}") # 如果接口查询失败,则尝试从数据库查询 logging.warning("从API获取炉号数据失败,尝试从数据库查询") return self._query_lunos_from_db(params) def _save_lunos_to_db(self, lunos): """将炉号数据保存到数据库 Args: lunos: 炉号数据列表 """ try: # 导入DAO from dao.inspection_dao import InspectionDAO inspection_dao = InspectionDAO() # 遍历炉号数据,保存到数据库 for luno_data in lunos: luno_code = luno_data.get("luono", "") if luno_code: # 设置用户信息 if self.user_id: luno_data['user_id'] = self.user_id if self.user_name: luno_data['user_name'] = self.user_name if self.corp_id: luno_data['data_corp'] = self.corp_id # 保存炉号信息 inspection_dao.save_luno_info(luno_code, luno_data) logging.info(f"成功保存{len(lunos)}条炉号数据到数据库") except Exception as e: logging.error(f"保存炉号数据到数据库失败: {e}") def _query_lunos_from_db(self, params): """从数据库获取炉号数据 Args: params: 查询参数字典 Returns: list: 炉号数据列表 """ # 构建SQL查询 sql = """ SELECT l.luono, l.cz, l.gg, l.gc, l.bz, l.data_corp, l.data_corp_name, l.c, l.si, l.mn, l.p, l.s, l.ni, l.cr, l.ti, l.mo, l.cu, l.others, l.klqd, l.ysl, l.create_time as rq FROM wsbz_luno_info l WHERE 1=1 """ query_params = [] # 添加查询条件 if params.get("srch_rq1") and params.get("srch_rq2"): sql += " AND l.create_time BETWEEN ? AND ?" query_params.append(params["srch_rq1"] + " 00:00:00") query_params.append(params["srch_rq2"] + " 23:59:59") if params.get("luono"): sql += " AND l.luono LIKE ?" query_params.append(f"%{params['luono']}%") if params.get("cz"): sql += " AND l.cz = ?" query_params.append(params["cz"]) if params.get("gg"): sql += " AND l.gg LIKE ?" query_params.append(f"%{params['gg']}%") if params.get("gc"): sql += " AND l.gc LIKE ?" query_params.append(f"%{params['gc']}%") # 添加排序 sql += " ORDER BY l.create_time DESC" # 执行查询 with SQLUtils('sqlite') as db: db.execute_query(sql, query_params) rows = db.fetchall() # 处理查询结果 results = [] for row in rows: # 将元组转换为字典 luno_data = { "luono": row[0], "cz": row[1], "gg": row[2], "gc": row[3], "bz": row[4], "data_corp": row[5], "data_corp_name": row[6], "c": row[7], "si": row[8], "mn": row[9], "p": row[10], "s": row[11], "ni": row[12], "cr": row[13], "ti": row[14], "mo": row[15], "cu": row[16], "others": row[17], "klqd": row[18], "ysl": row[19], "rq": row[20] } results.append(luno_data) # 保存查询结果 self.query_results = results return results def update_result_table(self, results): """更新结果表格 Args: results: 炉号数据列表 """ # 清空表格 self.result_table.setRowCount(0) if not results: return # 定义表头和对应的字段名 columns = [ {"title": "序号", "field": None}, {"title": "日期", "field": "rq"}, {"title": "炉号", "field": "luono"}, {"title": "材质", "field": "cz"}, {"title": "规格", "field": "gg"}, {"title": "钢厂", "field": "gc"}, {"title": "标准", "field": "bz"}, {"title": "公司账套", "field": "data_corp_name"}, {"title": "C", "field": "c"}, {"title": "Si", "field": "si"}, {"title": "Mn", "field": "mn"}, {"title": "P", "field": "p"}, {"title": "S", "field": "s"}, {"title": "Ni", "field": "ni"}, {"title": "Cr", "field": "cr"}, {"title": "Ti", "field": "ti"}, {"title": "Mo", "field": "mo"}, {"title": "Cu", "field": "cu"}, {"title": "其他", "field": "others"}, {"title": "抗拉强度", "field": "klqd"}, {"title": "延伸率", "field": "ysl"} ] # 设置表头 self.result_table.setColumnCount(len(columns)) header_labels = [col["title"] for col in columns] self.result_table.setHorizontalHeaderLabels(header_labels) # 设置行数 self.result_table.setRowCount(len(results)) # 填充数据 for row, luno_data in enumerate(results): # 序号 item_index = QTableWidgetItem(str(row + 1)) item_index.setTextAlignment(Qt.AlignCenter) self.result_table.setItem(row, 0, item_index) # 遍历列,填充数据 for col_idx, column in enumerate(columns[1:], 1): # 从1开始,跳过序号列 field = column["field"] if field: value = "" # 特殊处理某些字段 if field == "rq": create_time = luno_data.get(field, "") if create_time: try: # 尝试解析日期时间字符串,支持多种格式 if " " in create_time: # 包含时间的格式:2025-07-09 10:30:00 dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S") value = dt.strftime("%Y-%m-%d") else: # 只有日期的格式:2025-07-09 dt = datetime.strptime(create_time, "%Y-%m-%d") value = dt.strftime("%Y-%m-%d") except: # 如果解析失败,直接使用原值 value = create_time else: # 其他普通字段 value = str(luno_data.get(field, "")) # 创建表格项并设置文本 item = QTableWidgetItem(value) # 居中对齐特定的列 if field in ["rq", "c", "si", "mn", "p", "s", "ni", "cr", "ti", "mo", "cu", "klqd", "ysl"]: item.setTextAlignment(Qt.AlignCenter) # 设置表格项 self.result_table.setItem(row, col_idx, item) # 存储原始数据到第一列的item中 item_index.setData(Qt.UserRole, luno_data) def on_table_double_clicked(self, row, column): """表格双击事件处理""" self.select_current_row(row) def on_confirm(self): """确认按钮点击事件处理""" # 获取当前选中行 selected_rows = self.result_table.selectionModel().selectedRows() if not selected_rows: QMessageBox.warning(self, "提示", "请选择一个炉号") return # 获取选中行的索引 row_index = selected_rows[0].row() self.select_current_row(row_index) def select_current_row(self, row): """选择当前行并返回数据""" if 0 <= row < self.result_table.rowCount(): # 获取存储在item中的原始数据 item = self.result_table.item(row, 0) if item: luno_data = item.data(Qt.UserRole) if luno_data: # 发出信号 self.luno_selected.emit(luno_data) self.accept()