358 lines
11 KiB
Python
358 lines
11 KiB
Python
import requests
|
||
import json
|
||
import logging
|
||
from utils.config_loader import ConfigLoader
|
||
from utils.api_utils import ApiUtils
|
||
|
||
class GcApi:
|
||
"""工程API接口类"""
|
||
|
||
def __init__(self):
|
||
"""初始化API接口"""
|
||
self.api_utils = ApiUtils()
|
||
|
||
|
||
def ismt_option(self, params):
|
||
"""
|
||
标记是否满托
|
||
|
||
Args:
|
||
ismt: 是否满托
|
||
corp_id: 公司ID
|
||
tray_id: 托盘ID
|
||
ip: 机器IP
|
||
|
||
Returns:
|
||
dict: 返回结果
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "ismt_option"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"ismt": params.get("ismt", False),
|
||
"xpack":params.get("tray_id", ""),
|
||
"nw_ip":params.get("ip", ""),
|
||
"data_corp":params.get("corp_id", ""),
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
|
||
# 请求失败时返回空数据
|
||
if not response.get("status", False):
|
||
return {
|
||
"success": False,
|
||
"message": "标记是否满托失败",
|
||
"data": None
|
||
}
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"标记是否满托失败: {str(e)}")
|
||
return None
|
||
def get_gc_info(self, gc_code):
|
||
"""
|
||
获取GC信息
|
||
|
||
Args:
|
||
gc_code: GC编号
|
||
|
||
Returns:
|
||
dict: GC信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_gc_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"sc_gch": gc_code,
|
||
"data_corp":"JT"
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
|
||
# 请求失败时返回空数据
|
||
if not response.get("status", False):
|
||
return {
|
||
"success": False,
|
||
"message": "获取GC信息失败",
|
||
"data": None
|
||
}
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取GC信息失败: {str(e)}")
|
||
return None
|
||
def get_order_info(self, order_code,corp_id):
|
||
"""
|
||
获取订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info"
|
||
# 构建 form-data 格式的数据
|
||
order_dict = {"srch_mo":order_code,"data_corp":corp_id}
|
||
data = {
|
||
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取订单信息失败: {str(e)}")
|
||
return None
|
||
def add_order_info(self, info):
|
||
"""
|
||
添加订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "add_order_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(info), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"添加订单信息失败: {str(e)}")
|
||
return None
|
||
|
||
def remove_order_info(self, xpack, ddmo, sgc):
|
||
"""
|
||
删除订单信息
|
||
|
||
Args:
|
||
xpack: 托盘号
|
||
ddmo: 订单号
|
||
sgc: 工程号
|
||
|
||
Returns:
|
||
dict: 接口响应结果
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "remove_order_info"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"xpack": xpack,
|
||
"ddmo": ddmo,
|
||
"gch": sgc
|
||
}
|
||
# 调用接口
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"删除订单信息失败: {str(e)}")
|
||
return {"status": False, "message": f"删除订单信息失败: {str(e)}"}
|
||
|
||
def get_xpack(self, order_id,corp_id):
|
||
"""
|
||
获取包装号
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_xpack"
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"ddmo": order_id,
|
||
"data_corp":corp_id
|
||
}
|
||
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"获取包装号失败: {str(e)}")
|
||
return None
|
||
|
||
def get_order_list(self, params):
|
||
"""
|
||
获取订单列表
|
||
|
||
Args:
|
||
params: 查询参数字典
|
||
|
||
Returns:
|
||
dict: 订单列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info"
|
||
|
||
# 构建查询参数
|
||
order_dict = {
|
||
"srch_mo": params.get("srch_mo", ""),
|
||
"srch_note": params.get("srch_note", ""),
|
||
"srch_rq1": params.get("srch_rq1", ""),
|
||
"srch_rq2": params.get("srch_rq2", ""),
|
||
"srch_cz": params.get("srch_cz", ""),
|
||
"srch_size": params.get("srch_size", ""),
|
||
"data_corp": params.get("corp_id", "")
|
||
}
|
||
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 50, # 获取更多数据
|
||
"sortField": "create_time",
|
||
"sortOrder": "desc"
|
||
}
|
||
|
||
# 调用API
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取订单列表失败: {str(e)}")
|
||
return {"status": False, "message": f"获取订单列表失败: {str(e)}"}
|
||
|
||
def get_params(self, stype, main, corp_id):
|
||
"""
|
||
获取指定参数信息
|
||
|
||
Args:
|
||
stype: 参数类型,如"库房档案"
|
||
main: 主参数,如"XC"
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 参数信息列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_params"
|
||
|
||
# 构建GET请求参数
|
||
params = {
|
||
"stype": stype,
|
||
"main": main,
|
||
"corp_id": corp_id
|
||
}
|
||
|
||
# 发送GET请求
|
||
response = self.api_utils.get(api_key, params=params)
|
||
|
||
# 检查响应状态
|
||
if response.get("success", False):
|
||
return {
|
||
"status": True,
|
||
"data": response.get("data", []),
|
||
"message": "获取参数信息成功"
|
||
}
|
||
else:
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": response.get("message", "获取参数信息失败")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取参数信息失败: {str(e)}")
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": f"获取参数信息失败: {str(e)}"
|
||
}
|
||
|
||
def get_wire_type_params(self, corp_id):
|
||
"""
|
||
获取线材类型参数信息
|
||
|
||
Args:
|
||
corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 线材类型信息列表
|
||
"""
|
||
try:
|
||
# 使用get_params方法,传入线材类型参数
|
||
return self.get_params("线材类型", "", corp_id)
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取线材类型参数失败: {str(e)}")
|
||
return {
|
||
"status": False,
|
||
"data": [],
|
||
"message": f"获取线材类型参数失败: {str(e)}"
|
||
}
|
||
|
||
def get_luno_list(self, params):
|
||
"""
|
||
获取炉号列表
|
||
|
||
Args:
|
||
params: 查询参数字典,包含:
|
||
- srch_rq1: 开始日期
|
||
- srch_rq2: 结束日期
|
||
- luono: 炉号
|
||
- cz: 材质
|
||
- gg: 规格
|
||
- gc: 钢厂
|
||
- corp_id: 公司ID
|
||
|
||
Returns:
|
||
dict: 炉号列表
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_luno"
|
||
|
||
# 构建查询参数
|
||
luno_dict = {
|
||
"srch_rq1": params.get("srch_rq1", ""),
|
||
"srch_rq2": params.get("srch_rq2", ""),
|
||
"luono": params.get("luono", ""),
|
||
"cz": params.get("cz", ""),
|
||
"gg": params.get("gg", ""),
|
||
"gc": params.get("gc", ""),
|
||
"data_corp": params.get("corp_id", "")
|
||
}
|
||
|
||
# 构建 form-data 格式的数据
|
||
data = {
|
||
"parms": json.dumps(luno_dict), # 必须将数据序列化为JSON字符串
|
||
"pageIndex": 0,
|
||
"pageSize": 50, # 获取更多数据
|
||
"sortField": "create_time",
|
||
"sortOrder": "desc"
|
||
}
|
||
|
||
# 调用API
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取炉号列表失败: {str(e)}")
|
||
return {"status": False, "message": f"获取炉号列表失败: {str(e)}"}
|
||
|
||
def get_order_info_by_xpack(self, xpack, corp_id):
|
||
"""
|
||
通过托盘号获取订单信息
|
||
"""
|
||
try:
|
||
# API 配置中的键名
|
||
api_key = "get_order_info_by_xpack"
|
||
order_dict = {
|
||
"srch_spack": xpack,
|
||
"data_corp": corp_id
|
||
}
|
||
data = {
|
||
"parms": json.dumps(order_dict),
|
||
"pageIndex": 0,
|
||
"pageSize": 10,
|
||
"sortField": "",
|
||
"sortOrder": ""
|
||
}
|
||
response = self.api_utils.post(api_key, data=data)
|
||
return response
|
||
except Exception as e:
|
||
logging.error(f"通过托盘号获取订单信息失败: {str(e)}")
|
||
return None |