jiateng_ws/widgets/luno_query_dialog.py

413 lines
15 KiB
Python
Raw Normal View History

2025-07-19 13:17:24 +08:00
import logging
from datetime import datetime
from PySide6.QtWidgets import QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt, Signal
from ui.luno_query_dialog_ui import LunoQueryDialogUI
from utils.sql_utils import SQLUtils
from utils.app_mode import AppMode
from apis.gc_api import GcApi
class LunoQueryDialog(LunoQueryDialogUI):
"""炉号查询对话框,用于查询炉号信息"""
# 定义信号,用于向上传递选中的炉号信息
luno_selected = Signal(dict)
def __init__(self, parent=None):
super().__init__()
# 存储查询结果
self.query_results = []
# 获取父窗口的用户信息
self.corp_id = None
self.user_id = None
self.user_name = None
if parent:
if hasattr(parent, 'corp_id'):
self.corp_id = parent.corp_id
if hasattr(parent, 'user_id'):
self.user_id = parent.user_id[0] if isinstance(parent.user_id, tuple) else parent.user_id
if hasattr(parent, 'user_name'):
self.user_name = parent.user_name
# 连接信号和槽
self.connect_signals()
def connect_signals(self):
"""连接信号和槽"""
# 查询按钮点击事件
self.query_button.clicked.connect(self.on_luno_query)
# 确认按钮点击事件
self.confirm_button.clicked.connect(self.on_confirm)
# 取消按钮点击事件
self.cancel_button.clicked.connect(self.reject)
# 表格双击事件
self.result_table.cellDoubleClicked.connect(self.on_table_double_clicked)
def on_luno_query(self):
"""执行炉号查询"""
try:
# 获取查询参数
start_date = self.start_date.date().toString("yyyy-MM-dd")
end_date = self.end_date.date().toString("yyyy-MM-dd")
luno = self.luno_input.text().strip()
material = self.material_combo.currentText()
if material == "全部":
material = ""
spec = self.spec_input.text().strip()
steel = self.steel_input.text().strip()
# 构建查询参数
query_params = {
"srch_rq1": start_date,
"srch_rq2": end_date,
"luono": luno,
"cz": material,
"gg": spec,
"gc": steel,
"corp_id": self.corp_id
}
# 执行查询
results = self.query_lunos(query_params)
# 更新表格
self.update_result_table(results)
except Exception as e:
logging.error(f"炉号查询失败: {e}")
QMessageBox.critical(self, "查询错误", f"查询炉号时发生错误: {str(e)}")
def query_lunos(self, params):
"""查询炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
try:
# 如果是API模式优先从接口获取数据
if AppMode.is_api():
return self._query_lunos_from_api(params)
else:
return self._query_lunos_from_db(params)
except Exception as e:
logging.error(f"查询炉号数据失败: {e}")
raise
def _query_lunos_from_api(self, params):
"""从API获取炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
try:
# 调用接口
gc_api = GcApi()
# 调用接口查询炉号列表
response = gc_api.get_luno_list(params)
# 处理接口返回结果 - 接口直接返回数据列表
if response and isinstance(response, list):
# 接口直接返回炉号数据列表
lunos_data = response
results = []
for luno_data in lunos_data:
# 转换为统一格式的字典,处理接口返回的实际字段
luno_dict = {
"luono": luno_data.get("luono", ""),
"cz": luno_data.get("cz", ""),
"gg": luno_data.get("gg", ""),
"gc": luno_data.get("gc", ""),
"bz": luno_data.get("bz", ""),
"data_corp": luno_data.get("data_corp", ""),
"data_corp_name": luno_data.get("data_corp_name", ""),
"c": luno_data.get("c", ""),
"si": luno_data.get("si", ""),
"mn": luno_data.get("mn", ""),
"p": luno_data.get("p", ""),
"s": luno_data.get("s", ""),
"ni": luno_data.get("ni", ""),
"cr": luno_data.get("cr", ""),
"ti": luno_data.get("ti", ""),
"mo": luno_data.get("mo", ""),
"cu": luno_data.get("cu", ""),
"others": luno_data.get("others", ""),
"klqd": luno_data.get("klqd", ""),
"ysl": luno_data.get("ysl", ""),
"rq": luno_data.get("rq", "")
}
# 处理null值转换为空字符串
for key, value in luno_dict.items():
if value is None:
luno_dict[key] = ""
results.append(luno_dict)
# 保存查询结果
self.query_results = results
# 如果接口查询到数据,则保存到数据库中
if results:
self._save_lunos_to_db(results)
return results
else:
# 如果接口返回的不是列表或为空,则尝试从数据库查询
logging.warning("从API获取炉号数据失败或返回格式不正确尝试从数据库查询")
return self._query_lunos_from_db(params)
except Exception as e:
logging.error(f"从API获取炉号数据失败: {e}")
# 如果接口查询失败,则尝试从数据库查询
logging.warning("从API获取炉号数据失败尝试从数据库查询")
return self._query_lunos_from_db(params)
def _save_lunos_to_db(self, lunos):
"""将炉号数据保存到数据库
Args:
lunos: 炉号数据列表
"""
try:
# 导入DAO
from dao.inspection_dao import InspectionDAO
inspection_dao = InspectionDAO()
# 遍历炉号数据,保存到数据库
for luno_data in lunos:
luno_code = luno_data.get("luono", "")
if luno_code:
# 设置用户信息
if self.user_id:
luno_data['user_id'] = self.user_id
if self.user_name:
luno_data['user_name'] = self.user_name
if self.corp_id:
luno_data['data_corp'] = self.corp_id
# 保存炉号信息
inspection_dao.save_luno_info(luno_code, luno_data)
logging.info(f"成功保存{len(lunos)}条炉号数据到数据库")
except Exception as e:
logging.error(f"保存炉号数据到数据库失败: {e}")
def _query_lunos_from_db(self, params):
"""从数据库获取炉号数据
Args:
params: 查询参数字典
Returns:
list: 炉号数据列表
"""
# 构建SQL查询
sql = """
SELECT
l.luono, l.cz, l.gg, l.gc, l.bz, l.data_corp, l.data_corp_name,
l.c, l.si, l.mn, l.p, l.s, l.ni, l.cr, l.ti, l.mo, l.cu, l.others,
l.klqd, l.ysl, l.create_time as rq
FROM wsbz_luno_info l
WHERE 1=1
"""
query_params = []
# 添加查询条件
if params.get("srch_rq1") and params.get("srch_rq2"):
sql += " AND l.create_time BETWEEN ? AND ?"
query_params.append(params["srch_rq1"] + " 00:00:00")
query_params.append(params["srch_rq2"] + " 23:59:59")
if params.get("luono"):
sql += " AND l.luono LIKE ?"
query_params.append(f"%{params['luono']}%")
if params.get("cz"):
sql += " AND l.cz = ?"
query_params.append(params["cz"])
if params.get("gg"):
sql += " AND l.gg LIKE ?"
query_params.append(f"%{params['gg']}%")
if params.get("gc"):
sql += " AND l.gc LIKE ?"
query_params.append(f"%{params['gc']}%")
# 添加排序
sql += " ORDER BY l.create_time DESC"
# 执行查询
with SQLUtils('sqlite') as db:
db.execute_query(sql, query_params)
rows = db.fetchall()
# 处理查询结果
results = []
for row in rows:
# 将元组转换为字典
luno_data = {
"luono": row[0],
"cz": row[1],
"gg": row[2],
"gc": row[3],
"bz": row[4],
"data_corp": row[5],
"data_corp_name": row[6],
"c": row[7],
"si": row[8],
"mn": row[9],
"p": row[10],
"s": row[11],
"ni": row[12],
"cr": row[13],
"ti": row[14],
"mo": row[15],
"cu": row[16],
"others": row[17],
"klqd": row[18],
"ysl": row[19],
"rq": row[20]
}
results.append(luno_data)
# 保存查询结果
self.query_results = results
return results
def update_result_table(self, results):
"""更新结果表格
Args:
results: 炉号数据列表
"""
# 清空表格
self.result_table.setRowCount(0)
if not results:
return
# 定义表头和对应的字段名
columns = [
{"title": "序号", "field": None},
{"title": "日期", "field": "rq"},
{"title": "炉号", "field": "luono"},
{"title": "材质", "field": "cz"},
{"title": "规格", "field": "gg"},
{"title": "钢厂", "field": "gc"},
{"title": "标准", "field": "bz"},
{"title": "公司账套", "field": "data_corp_name"},
{"title": "C", "field": "c"},
{"title": "Si", "field": "si"},
{"title": "Mn", "field": "mn"},
{"title": "P", "field": "p"},
{"title": "S", "field": "s"},
{"title": "Ni", "field": "ni"},
{"title": "Cr", "field": "cr"},
{"title": "Ti", "field": "ti"},
{"title": "Mo", "field": "mo"},
{"title": "Cu", "field": "cu"},
{"title": "其他", "field": "others"},
{"title": "抗拉强度", "field": "klqd"},
{"title": "延伸率", "field": "ysl"}
]
# 设置表头
self.result_table.setColumnCount(len(columns))
header_labels = [col["title"] for col in columns]
self.result_table.setHorizontalHeaderLabels(header_labels)
# 设置行数
self.result_table.setRowCount(len(results))
# 填充数据
for row, luno_data in enumerate(results):
# 序号
item_index = QTableWidgetItem(str(row + 1))
item_index.setTextAlignment(Qt.AlignCenter)
self.result_table.setItem(row, 0, item_index)
# 遍历列,填充数据
for col_idx, column in enumerate(columns[1:], 1): # 从1开始跳过序号列
field = column["field"]
if field:
value = ""
# 特殊处理某些字段
if field == "rq":
create_time = luno_data.get(field, "")
if create_time:
try:
# 尝试解析日期时间字符串,支持多种格式
if " " in create_time:
# 包含时间的格式2025-07-09 10:30:00
dt = datetime.strptime(create_time, "%Y-%m-%d %H:%M:%S")
value = dt.strftime("%Y-%m-%d")
else:
# 只有日期的格式2025-07-09
dt = datetime.strptime(create_time, "%Y-%m-%d")
value = dt.strftime("%Y-%m-%d")
except:
# 如果解析失败,直接使用原值
value = create_time
else:
# 其他普通字段
value = str(luno_data.get(field, ""))
# 创建表格项并设置文本
item = QTableWidgetItem(value)
# 居中对齐特定的列
if field in ["rq", "c", "si", "mn", "p", "s", "ni", "cr", "ti", "mo", "cu", "klqd", "ysl"]:
item.setTextAlignment(Qt.AlignCenter)
# 设置表格项
self.result_table.setItem(row, col_idx, item)
# 存储原始数据到第一列的item中
item_index.setData(Qt.UserRole, luno_data)
def on_table_double_clicked(self, row, column):
"""表格双击事件处理"""
self.select_current_row(row)
def on_confirm(self):
"""确认按钮点击事件处理"""
# 获取当前选中行
selected_rows = self.result_table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "提示", "请选择一个炉号")
return
# 获取选中行的索引
row_index = selected_rows[0].row()
self.select_current_row(row_index)
def select_current_row(self, row):
"""选择当前行并返回数据"""
if 0 <= row < self.result_table.rowCount():
# 获取存储在item中的原始数据
item = self.result_table.item(row, 0)
if item:
luno_data = item.data(Qt.UserRole)
if luno_data:
# 发出信号
self.luno_selected.emit(luno_data)
self.accept()