510 lines
17 KiB
Python
510 lines
17 KiB
Python
import requests
|
||
import json
|
||
import logging
|
||
import time
|
||
from utils.config_loader import ConfigLoader
|
||
from utils.api_utils import ApiUtils
|
||
|
||
class GcApi:
|
||
"""工程API接口类"""
|
||
|
||
def __init__(self):
|
||
"""初始化API接口"""
|
||
self.api_utils = ApiUtils()
|
||
|
||
# 添加缓存相关属性
|
||
self._tray_stats_cache = {} # 缓存数据
|
||
self._tray_stats_cache_time = {} # 缓存时间
|
||
self._cache_ttl = 5 # 缓存有效期(秒)
|
||
|
||
|
||
def ismt_option(self, params):
|
||
"""
|
||
标记是否满托
|
||
|
||
Args:
|
||
ismt: 是否满托
|
||
corp_id: 公司ID
|
||
tray_id: 托盘ID
|
||
ip: 机器IP
|
||
|
||
Returns:
|
||
dict: 返回结果
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "ismt_option"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"ismt": params.get("ismt", False),
|
||
"xpack":params.get("tray_id", ""),
|
||
"nw_ip":params.get("ip", ""),
|
||
"data_corp":params.get("corp_id", ""),
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
|
||
# 请求失败时返回空数据
|
||
if not response.get("status", False):
|
||
return {
|
||
"success": False,
|
||
"message": "标记是否满托失败",
|
||
"data": None
|
||
}
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"标记是否满托失败: {str(e)}")
|
||
return None
|
||
def get_gc_info(self, gc_code):
|
||
"""
|
||
获取GC信息
|
||
|
||
Args:
|
||
gc_code: GC编号
|
||
|
||
Returns:
|
||
dict: GC信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_gc_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"sc_gch": gc_code,
|
||
"data_corp":"JT"
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
|
||
# 请求失败时返回空数据
|
||
if not response.get("status", False):
|
||
return {
|
||
"success": False,
|
||
"message": "获取GC信息失败",
|
||
"data": None
|
||
}
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取GC信息失败: {str(e)}")
|
||
return None
|
||
def get_order_info(self, order_code,corp_id):
|
||
"""
|
||
获取订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info"
|
||
# 构建 form-data 格式的数据
|
||
order_dict = {"srch_mo":order_code,"data_corp":corp_id}
|
||
data = {
|
||
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取订单信息失败: {str(e)}")
|
||
return None
|
||
def get_spakc_info(self, order_code,corp_id,tray_id):
|
||
"""
|
||
获取订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_spack_info"
|
||
# 构建 form-data 格式的数据
|
||
order_dict = {"orderId":order_code,"data_corp":corp_id,"trayId":tray_id}
|
||
data = {
|
||
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取订单信息失败: {str(e)}")
|
||
return None
|
||
def add_order_info(self, info):
|
||
"""
|
||
添加订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "add_order_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(info), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"添加订单信息失败: {str(e)}")
|
||
return None
|
||
|
||
def remove_order_info(self, xpack, ddmo, sgc):
|
||
"""
|
||
删除订单信息
|
||
|
||
Args:
|
||
xpack: 托盘号
|
||
ddmo: 订单号
|
||
sgc: 工程号
|
||
|
||
Returns:
|
||
dict: 接口响应结果
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "remove_order_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"xpack": xpack,
|
||
"ddmo": ddmo,
|
||
"gch": sgc
|
||
}
|
||
# 调用接口
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"删除订单信息失败: {str(e)}")
|
||
return {"status": False, "message": f"删除订单信息失败: {str(e)}"}
|
||
|
||
def get_xpack(self, order_id,corp_id):
|
||
"""
|
||
获取包装号
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_xpack"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"ddmo": order_id,
|
||
"data_corp":corp_id
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取包装号失败: {str(e)}")
|
||
return None
|
||
|
||
def get_order_list(self, params):
|
||
"""
|
||
获取订单列表
|
||
|
||
Args:
|
||
params: 查询参数字典
|
||
|
||
Returns:
|
||
dict: 订单列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info"
|
||
|
||
# 构建查询参数
|
||
order_dict = {
|
||
"srch_mo": params.get("srch_mo", ""),
|
||
"srch_note": params.get("srch_note", ""),
|
||
"srch_rq1": params.get("srch_rq1", ""),
|
||
"srch_rq2": params.get("srch_rq2", ""),
|
||
"srch_cz": params.get("srch_cz", ""),
|
||
"srch_size": params.get("srch_size", ""),
|
||
"data_corp": params.get("corp_id", "")
|
||
}
|
||
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 50, # 获取更多数据
|
||
"sortField": "create_time",
|
||
"sortOrder": "desc"
|
||
}
|
||
|
||
# 调用API
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取订单列表失败: {str(e)}")
|
||
return {"status": False, "message": f"获取订单列表失败: {str(e)}"}
|
||
|
||
def get_params(self, stype, main, corp_id):
|
||
"""
|
||
获取指定参数信息
|
||
|
||
Args:
|
||
stype: 参数类型,如"库房档案"
|
||
main: 主参数,如"XC"
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 参数信息列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_params"
|
||
|
||
# 构建GET请求参数
|
||
params = {
|
||
"stype": stype,
|
||
"main": main,
|
||
"corp_id": corp_id
|
||
}
|
||
|
||
# 发送GET请求
|
||
response = self.api_utils.get(api_key, params=params)
|
||
|
||
# 检查响应状态
|
||
if response.get("success", False):
|
||
return {
|
||
"status": True,
|
||
"data": response.get("data", []),
|
||
"message": "获取参数信息成功"
|
||
}
|
||
else:
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": response.get("message", "获取参数信息失败")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取参数信息失败: {str(e)}")
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": f"获取参数信息失败: {str(e)}"
|
||
}
|
||
|
||
def get_wire_type_params(self, corp_id):
|
||
"""
|
||
获取线材类型参数信息
|
||
|
||
Args:
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 线材类型信息列表
|
||
"""
|
||
try:
|
||
# 使用get_params方法,传入线材类型参数
|
||
return self.get_params("线材类型", "", corp_id)
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取线材类型参数失败: {str(e)}")
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": f"获取线材类型参数失败: {str(e)}"
|
||
}
|
||
|
||
def get_luno_list(self, params):
|
||
"""
|
||
获取炉号列表
|
||
|
||
Args:
|
||
params: 查询参数字典,包含:
|
||
- srch_rq1: 开始日期
|
||
- srch_rq2: 结束日期
|
||
- luono: 炉号
|
||
- cz: 材质
|
||
- gg: 规格
|
||
- gc: 钢厂
|
||
- corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 炉号列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_luno"
|
||
|
||
# 构建查询参数
|
||
luno_dict = {
|
||
"srch_rq1": params.get("srch_rq1", ""),
|
||
"srch_rq2": params.get("srch_rq2", ""),
|
||
"luono": params.get("luono", ""),
|
||
"cz": params.get("cz", ""),
|
||
"gg": params.get("gg", ""),
|
||
"gc": params.get("gc", ""),
|
||
"data_corp": params.get("corp_id", "")
|
||
}
|
||
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(luno_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 50, # 获取更多数据
|
||
"sortField": "create_time",
|
||
"sortOrder": "desc"
|
||
}
|
||
|
||
# 调用API
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取炉号列表失败: {str(e)}")
|
||
return {"status": False, "message": f"获取炉号列表失败: {str(e)}"}
|
||
|
||
def get_order_info_by_xpack(self, xpack, corp_id):
|
||
"""
|
||
通过托盘号获取订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info_by_xpack"
|
||
order_dict = {
|
||
"srch_spack": xpack,
|
||
"data_corp": corp_id
|
||
}
|
||
data = {
|
||
"parms": json.dumps(order_dict),
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"通过托盘号获取订单信息失败: {str(e)}")
|
||
return None
|
||
|
||
def get_package_statistics(self, order_id, corp_id):
|
||
"""
|
||
获取订单包装统计数据
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_package_statistics"
|
||
# 构建URL参数
|
||
params = {
|
||
"orderId": order_id,
|
||
"data_corp": corp_id
|
||
}
|
||
response = self.api_utils.get(api_key, params=params)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取订单包装统计数据失败: {str(e)}")
|
||
return {"status": False, "message": str(e)}
|
||
|
||
def get_tray_package_statistics(self, order_id, tray_id, corp_id):
|
||
"""获取托盘包装统计信息
|
||
|
||
Args:
|
||
order_id: 订单号
|
||
tray_id: 托盘号
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 托盘包装统计信息
|
||
"""
|
||
# 生成缓存键
|
||
cache_key = f"{order_id}_{tray_id}_{corp_id}"
|
||
|
||
# 检查缓存是否有效
|
||
current_time = time.time()
|
||
if (cache_key in self._tray_stats_cache and
|
||
current_time - self._tray_stats_cache_time.get(cache_key, 0) < self._cache_ttl):
|
||
logging.info(f"使用缓存数据: getBzNumByTrayWszb.do, 参数={order_id}, {tray_id}")
|
||
return self._tray_stats_cache[cache_key]
|
||
|
||
# 原有API调用逻辑
|
||
logging.info(f"调用API: getBzNumByTrayWszb.do, 参数={order_id}, {tray_id}")
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_tray_package_statistics"
|
||
# 构建URL参数
|
||
params = {
|
||
"orderId": order_id,
|
||
"trayId": tray_id,
|
||
"data_corp": corp_id
|
||
}
|
||
response = self.api_utils.get(api_key, params=params)
|
||
|
||
if response and response.get('status', False):
|
||
# 更新缓存
|
||
self._tray_stats_cache[cache_key] = response
|
||
self._tray_stats_cache_time[cache_key] = current_time
|
||
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取托盘包装统计数据失败: {str(e)}")
|
||
return {"status": False, "message": str(e)}
|
||
|
||
def get_spack_info(self, order_id, tray_id, corp_id):
|
||
"""
|
||
获取spack信息
|
||
|
||
Args:
|
||
order_id: 订单号
|
||
tray_id: 托盘号
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 包含spack信息的字典
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_spack_info"
|
||
# 构建URL参数
|
||
params = {
|
||
"orderId": order_id,
|
||
"trayId": tray_id,
|
||
"data_corp": corp_id
|
||
}
|
||
response = self.api_utils.get(api_key, params=params)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取spack信息失败: {str(e)}")
|
||
return {"status": False, "message": str(e)}
|
||
|
||
def clear_tray_stats_cache(self, order_id=None, tray_id=None, corp_id=None):
|
||
"""清除托盘统计数据缓存
|
||
|
||
Args:
|
||
order_id: 订单号,如果为None则清除所有缓存
|
||
tray_id: 托盘号,如果为None则清除该订单的所有缓存
|
||
corp_id: 公司ID,如果为None则清除该订单和托盘的所有缓存
|
||
"""
|
||
if order_id is None:
|
||
# 清除所有缓存
|
||
self._tray_stats_cache = {}
|
||
self._tray_stats_cache_time = {}
|
||
logging.info("已清除所有托盘统计数据缓存")
|
||
return
|
||
|
||
# 清除特定缓存
|
||
keys_to_remove = []
|
||
prefix = f"{order_id}_"
|
||
if tray_id:
|
||
prefix += f"{tray_id}_"
|
||
if corp_id:
|
||
prefix += f"{corp_id}"
|
||
|
||
for key in list(self._tray_stats_cache.keys()):
|
||
if key.startswith(prefix):
|
||
keys_to_remove.append(key)
|
||
|
||
for key in keys_to_remove:
|
||
if key in self._tray_stats_cache:
|
||
del self._tray_stats_cache[key]
|
||
if key in self._tray_stats_cache_time:
|
||
del self._tray_stats_cache_time[key]
|
||
|
||
logging.info(f"已清除托盘统计数据缓存: 前缀={prefix}, 清除数量={len(keys_to_remove)}") |