jiateng_ws/widgets/luno_query_dialog.py
2025-07-19 13:17:24 +08:00

413 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from datetime import datetime
from PySide6.QtWidgets import QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt, Signal
from ui.luno_query_dialog_ui import LunoQueryDialogUI
from utils.sql_utils import SQLUtils
from utils.app_mode import AppMode
from apis.gc_api import GcApi
class LunoQueryDialog(LunoQueryDialogUI):
"""炉号查询对话框,用于查询炉号信息"""
# 定义信号,用于向上传递选中的炉号信息
luno_selected = Signal(dict)
def __init__(self, parent=None):
super().__init__()
# 存储查询结果
self.query_results = []
# 获取父窗口的用户信息
self.corp_id = None
self.user_id = None
self.user_name = None
if parent:
if hasattr(parent, 'corp_id'):
self.corp_id = parent.corp_id
if hasattr(parent, 'user_id'):
self.user_id = parent.user_id[0] if isinstance(parent.user_id, tuple) else parent.user_id
if hasattr(parent, 'user_name'):
self.user_name = parent.user_name
# 连接信号和槽
self.connect_signals()
def connect_signals(self):
"""连接信号和槽"""
# 查询按钮点击事件
self.query_button.clicked.connect(self.on_luno_query)
# 确认按钮点击事件
self.confirm_button.clicked.connect(self.on_confirm)
# 取消按钮点击事件
self.cancel_button.clicked.connect(self.reject)
# 表格双击事件
self.result_table.cellDoubleClicked.connect(self.on_table_double_clicked)
def on_luno_query(self):
"""执行炉号查询"""
try:
# 获取查询参数
start_date = self.start_date.date().toString("yyyy-MM-dd")
end_date = self.end_date.date().toString("yyyy-MM-dd")
luno = self.luno_input.text().strip()
material = self.material_combo.currentText()
if material == "全部":
material = ""
spec = self.spec_input.text().strip()
steel = self.steel_input.text().strip()
# 构建查询参数
query_params = {
"srch_rq1": start_date,
"srch_rq2": end_date,
"luono": luno,
"cz": material,
"gg": spec,
"gc": steel,
"corp_id": self.corp_id
}
# 执行查询
results = self.query_lunos(query_params)
# 更新表格
self.update_result_table(results)
except Exception as e:
logging.error(f"炉号查询失败: {e}")
QMessageBox.critical(self, "查询错误", f"查询炉号时发生错误: {str(e)}")
def query_lunos(self, params):
"""查询炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
try:
# 如果是API模式优先从接口获取数据
if AppMode.is_api():
return self._query_lunos_from_api(params)
else:
return self._query_lunos_from_db(params)
except Exception as e:
logging.error(f"查询炉号数据失败: {e}")
raise
def _query_lunos_from_api(self, params):
"""从API获取炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
try:
# 调用接口
gc_api = GcApi()
# 调用接口查询炉号列表
response = gc_api.get_luno_list(params)
# 处理接口返回结果 - 接口直接返回数据列表
if response and isinstance(response, list):
# 接口直接返回炉号数据列表
lunos_data = response
results = []
for luno_data in lunos_data:
# 转换为统一格式的字典,处理接口返回的实际字段
luno_dict = {
"luono": luno_data.get("luono", ""),
"cz": luno_data.get("cz", ""),
"gg": luno_data.get("gg", ""),
"gc": luno_data.get("gc", ""),
"bz": luno_data.get("bz", ""),
"data_corp": luno_data.get("data_corp", ""),
"data_corp_name": luno_data.get("data_corp_name", ""),
"c": luno_data.get("c", ""),
"si": luno_data.get("si", ""),
"mn": luno_data.get("mn", ""),
"p": luno_data.get("p", ""),
"s": luno_data.get("s", ""),
"ni": luno_data.get("ni", ""),
"cr": luno_data.get("cr", ""),
"ti": luno_data.get("ti", ""),
"mo": luno_data.get("mo", ""),
"cu": luno_data.get("cu", ""),
"others": luno_data.get("others", ""),
"klqd": luno_data.get("klqd", ""),
"ysl": luno_data.get("ysl", ""),
"rq": luno_data.get("rq", "")
}
# 处理null值转换为空字符串
for key, value in luno_dict.items():
if value is None:
luno_dict[key] = ""
results.append(luno_dict)
# 保存查询结果
self.query_results = results
# 如果接口查询到数据,则保存到数据库中
if results:
self._save_lunos_to_db(results)
return results
else:
# 如果接口返回的不是列表或为空,则尝试从数据库查询
logging.warning("从API获取炉号数据失败或返回格式不正确尝试从数据库查询")
return self._query_lunos_from_db(params)
except Exception as e:
logging.error(f"从API获取炉号数据失败: {e}")
# 如果接口查询失败,则尝试从数据库查询
logging.warning("从API获取炉号数据失败尝试从数据库查询")
return self._query_lunos_from_db(params)
def _save_lunos_to_db(self, lunos):
"""将炉号数据保存到数据库
Args:
lunos: 炉号数据列表
"""
try:
# 导入DAO
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 遍历炉号数据,保存到数据库
for luno_data in lunos:
luno_code = luno_data.get("luono", "")
if luno_code:
# 设置用户信息
if self.user_id:
luno_data['user_id'] = self.user_id
if self.user_name:
luno_data['user_name'] = self.user_name
if self.corp_id:
luno_data['data_corp'] = self.corp_id
# 保存炉号信息
inspection_dao.save_luno_info(luno_code, luno_data)
logging.info(f"成功保存{len(lunos)}条炉号数据到数据库")
except Exception as e:
logging.error(f"保存炉号数据到数据库失败: {e}")
def _query_lunos_from_db(self, params):
"""从数据库获取炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
# 构建SQL查询
sql = """
SELECT
l.luono, l.cz, l.gg, l.gc, l.bz, l.data_corp, l.data_corp_name,
l.c, l.si, l.mn, l.p, l.s, l.ni, l.cr, l.ti, l.mo, l.cu, l.others,
l.klqd, l.ysl, l.create_time as rq
FROM wsbz_luno_info l
WHERE 1=1
"""
query_params = []
# 添加查询条件
if params.get("srch_rq1") and params.get("srch_rq2"):
sql += " AND l.create_time BETWEEN ? AND ?"
query_params.append(params["srch_rq1"] + " 00:00:00")
query_params.append(params["srch_rq2"] + " 23:59:59")
if params.get("luono"):
sql += " AND l.luono LIKE ?"
query_params.append(f"%{params['luono']}%")
if params.get("cz"):
sql += " AND l.cz = ?"
query_params.append(params["cz"])
if params.get("gg"):
sql += " AND l.gg LIKE ?"
query_params.append(f"%{params['gg']}%")
if params.get("gc"):
sql += " AND l.gc LIKE ?"
query_params.append(f"%{params['gc']}%")
# 添加排序
sql += " ORDER BY l.create_time DESC"
# 执行查询
with SQLUtils('sqlite') as db:
db.execute_query(sql, query_params)
rows = db.fetchall()
# 处理查询结果
results = []
for row in rows:
# 将元组转换为字典
luno_data = {
"luono": row[0],
"cz": row[1],
"gg": row[2],
"gc": row[3],
"bz": row[4],
"data_corp": row[5],
"data_corp_name": row[6],
"c": row[7],
"si": row[8],
"mn": row[9],
"p": row[10],
"s": row[11],
"ni": row[12],
"cr": row[13],
"ti": row[14],
"mo": row[15],
"cu": row[16],
"others": row[17],
"klqd": row[18],
"ysl": row[19],
"rq": row[20]
}
results.append(luno_data)
# 保存查询结果
self.query_results = results
return results
def update_result_table(self, results):
"""更新结果表格
Args:
results: 炉号数据列表
"""
# 清空表格
self.result_table.setRowCount(0)
if not results:
return
# 定义表头和对应的字段名
columns = [
{"title": "序号", "field": None},
{"title": "日期", "field": "rq"},
{"title": "炉号", "field": "luono"},
{"title": "材质", "field": "cz"},
{"title": "规格", "field": "gg"},
{"title": "钢厂", "field": "gc"},
{"title": "标准", "field": "bz"},
{"title": "公司账套", "field": "data_corp_name"},
{"title": "C", "field": "c"},
{"title": "Si", "field": "si"},
{"title": "Mn", "field": "mn"},
{"title": "P", "field": "p"},
{"title": "S", "field": "s"},
{"title": "Ni", "field": "ni"},
{"title": "Cr", "field": "cr"},
{"title": "Ti", "field": "ti"},
{"title": "Mo", "field": "mo"},
{"title": "Cu", "field": "cu"},
{"title": "其他", "field": "others"},
{"title": "抗拉强度", "field": "klqd"},
{"title": "延伸率", "field": "ysl"}
]
# 设置表头
self.result_table.setColumnCount(len(columns))
header_labels = [col["title"] for col in columns]
self.result_table.setHorizontalHeaderLabels(header_labels)
# 设置行数
self.result_table.setRowCount(len(results))
# 填充数据
for row, luno_data in enumerate(results):
# 序号
item_index = QTableWidgetItem(str(row + 1))
item_index.setTextAlignment(Qt.AlignCenter)
self.result_table.setItem(row, 0, item_index)
# 遍历列,填充数据
for col_idx, column in enumerate(columns[1:], 1): # 从1开始跳过序号列
field = column["field"]
if field:
value = ""
# 特殊处理某些字段
if field == "rq":
create_time = luno_data.get(field, "")
if create_time:
try:
# 尝试解析日期时间字符串,支持多种格式
if " " in create_time:
# 包含时间的格式2025-07-09 10:30:00
dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
value = dt.strftime("%Y-%m-%d")
else:
# 只有日期的格式2025-07-09
dt = datetime.strptime(create_time, "%Y-%m-%d")
value = dt.strftime("%Y-%m-%d")
except:
# 如果解析失败,直接使用原值
value = create_time
else:
# 其他普通字段
value = str(luno_data.get(field, ""))
# 创建表格项并设置文本
item = QTableWidgetItem(value)
# 居中对齐特定的列
if field in ["rq", "c", "si", "mn", "p", "s", "ni", "cr", "ti", "mo", "cu", "klqd", "ysl"]:
item.setTextAlignment(Qt.AlignCenter)
# 设置表格项
self.result_table.setItem(row, col_idx, item)
# 存储原始数据到第一列的item中
item_index.setData(Qt.UserRole, luno_data)
def on_table_double_clicked(self, row, column):
"""表格双击事件处理"""
self.select_current_row(row)
def on_confirm(self):
"""确认按钮点击事件处理"""
# 获取当前选中行
selected_rows = self.result_table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "提示", "请选择一个炉号")
return
# 获取选中行的索引
row_index = selected_rows[0].row()
self.select_current_row(row_index)
def select_current_row(self, row):
"""选择当前行并返回数据"""
if 0 <= row < self.result_table.rowCount():
# 获取存储在item中的原始数据
item = self.result_table.item(row, 0)
if item:
luno_data = item.data(Qt.UserRole)
if luno_data:
# 发出信号
self.luno_selected.emit(luno_data)
self.accept()