jiateng_ws/apis/gc_api.py
2025-07-20 15:34:17 +08:00

334 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import logging
from utils.config_loader import ConfigLoader
from utils.api_utils import ApiUtils
class GcApi:
"""工程API接口类"""
def __init__(self):
"""初始化API接口"""
self.api_utils = ApiUtils()
def ismt_option(self, params):
"""
标记是否满托
Args:
ismt: 是否满托
corp_id: 公司ID
tray_id: 托盘ID
ip: 机器IP
Returns:
dict: 返回结果
"""
try:
# API 配置中的键名
api_key = "ismt_option"
# 构建 form-data 格式的数据
data = {
"ismt": params.get("ismt", False),
"xpack":params.get("tray_id", ""),
"nw_ip":params.get("ip", ""),
"data_corp":params.get("corp_id", ""),
}
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
response = self.api_utils.post(api_key, data=data)
# 请求失败时返回空数据
if not response.get("status", False):
return {
"success": False,
"message": "标记是否满托失败",
"data": None
}
return response
except Exception as e:
logging.error(f"标记是否满托失败: {str(e)}")
return None
def get_gc_info(self, gc_code):
"""
获取GC信息
Args:
gc_code: GC编号
Returns:
dict: GC信息
"""
try:
# API 配置中的键名
api_key = "get_gc_info"
# 构建 form-data 格式的数据
data = {
"sc_gch": gc_code,
"data_corp":"JT"
}
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
response = self.api_utils.post(api_key, data=data)
# 请求失败时返回空数据
if not response.get("status", False):
return {
"success": False,
"message": "获取GC信息失败",
"data": None
}
return response
except Exception as e:
logging.error(f"获取GC信息失败: {str(e)}")
return None
def get_order_info(self, order_code,corp_id):
"""
获取订单信息
"""
try:
# API 配置中的键名
api_key = "get_order_info"
# 构建 form-data 格式的数据
order_dict = {"srch_mo":order_code,"data_corp":corp_id}
data = {
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
"pageIndex": 0,
"pageSize": 10,
"sortField": "",
"sortOrder": ""
}
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"获取订单信息失败: {str(e)}")
return None
def add_order_info(self, info):
"""
添加订单信息
"""
try:
# API 配置中的键名
api_key = "add_order_info"
# 构建 form-data 格式的数据
data = {
"parms": json.dumps(info), # 必须将数据序列化为JSON字符串
"pageIndex": 0,
"pageSize": 10,
"sortField": "",
"sortOrder": ""
}
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"添加订单信息失败: {str(e)}")
return None
def remove_order_info(self, xpack, ddmo, sgc):
"""
删除订单信息
Args:
xpack: 托盘号
ddmo: 订单号
sgc: 工程号
Returns:
dict: 接口响应结果
"""
try:
# API 配置中的键名
api_key = "remove_order_info"
# 构建 form-data 格式的数据
data = {
"xpack": xpack,
"ddmo": ddmo,
"gch": sgc
}
# 调用接口
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"删除订单信息失败: {str(e)}")
return {"status": False, "message": f"删除订单信息失败: {str(e)}"}
def get_xpack(self, order_id,corp_id):
"""
获取包装号
"""
try:
# API 配置中的键名
api_key = "get_xpack"
# 构建 form-data 格式的数据
data = {
"ddmo": order_id,
"data_corp":corp_id
}
# 将工程号作为参数传递,使用 data 参数传递 form-data 格式数据
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"获取包装号失败: {str(e)}")
return None
def get_order_list(self, params):
"""
获取订单列表
Args:
params: 查询参数字典
Returns:
dict: 订单列表
"""
try:
# API 配置中的键名
api_key = "get_order_info"
# 构建查询参数
order_dict = {
"srch_mo": params.get("srch_mo", ""),
"srch_note": params.get("srch_note", ""),
"srch_rq1": params.get("srch_rq1", ""),
"srch_rq2": params.get("srch_rq2", ""),
"srch_cz": params.get("srch_cz", ""),
"srch_size": params.get("srch_size", ""),
"data_corp": params.get("corp_id", "")
}
# 构建 form-data 格式的数据
data = {
"parms": json.dumps(order_dict), # 必须将数据序列化为JSON字符串
"pageIndex": 0,
"pageSize": 50, # 获取更多数据
"sortField": "create_time",
"sortOrder": "desc"
}
# 调用API
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"获取订单列表失败: {str(e)}")
return {"status": False, "message": f"获取订单列表失败: {str(e)}"}
def get_params(self, stype, main, corp_id):
"""
获取指定参数信息
Args:
stype: 参数类型,如"库房档案"
main: 主参数,如"XC"
corp_id: 公司ID
Returns:
dict: 参数信息列表
"""
try:
# API 配置中的键名
api_key = "get_params"
# 构建GET请求参数
params = {
"stype": stype,
"main": main,
"corp_id": corp_id
}
# 发送GET请求
response = self.api_utils.get(api_key, params=params)
# 检查响应状态
if response.get("success", False):
return {
"status": True,
"data": response.get("data", []),
"message": "获取参数信息成功"
}
else:
return {
"status": False,
"data": [],
"message": response.get("message", "获取参数信息失败")
}
except Exception as e:
logging.error(f"获取参数信息失败: {str(e)}")
return {
"status": False,
"data": [],
"message": f"获取参数信息失败: {str(e)}"
}
def get_wire_type_params(self, corp_id):
"""
获取线材类型参数信息
Args:
corp_id: 公司ID
Returns:
dict: 线材类型信息列表
"""
try:
# 使用get_params方法传入线材类型参数
return self.get_params("线材类型", "", corp_id)
except Exception as e:
logging.error(f"获取线材类型参数失败: {str(e)}")
return {
"status": False,
"data": [],
"message": f"获取线材类型参数失败: {str(e)}"
}
def get_luno_list(self, params):
"""
获取炉号列表
Args:
params: 查询参数字典,包含:
- srch_rq1: 开始日期
- srch_rq2: 结束日期
- luono: 炉号
- cz: 材质
- gg: 规格
- gc: 钢厂
- corp_id: 公司ID
Returns:
dict: 炉号列表
"""
try:
# API 配置中的键名
api_key = "get_luno"
# 构建查询参数
luno_dict = {
"srch_rq1": params.get("srch_rq1", ""),
"srch_rq2": params.get("srch_rq2", ""),
"luono": params.get("luono", ""),
"cz": params.get("cz", ""),
"gg": params.get("gg", ""),
"gc": params.get("gc", ""),
"data_corp": params.get("corp_id", "")
}
# 构建 form-data 格式的数据
data = {
"parms": json.dumps(luno_dict), # 必须将数据序列化为JSON字符串
"pageIndex": 0,
"pageSize": 50, # 获取更多数据
"sortField": "create_time",
"sortOrder": "desc"
}
# 调用API
response = self.api_utils.post(api_key, data=data)
return response
except Exception as e:
logging.error(f"获取炉号列表失败: {str(e)}")
return {"status": False, "message": f"获取炉号列表失败: {str(e)}"}