1005 lines
39 KiB
Python
1005 lines
39 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
库位处理器模块
|
||
提供与库位操作相关的各种处理器
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import asyncio
|
||
import aiohttp
|
||
import uuid
|
||
from typing import Dict, Any, List, Optional
|
||
from services.execution.task_context import TaskContext
|
||
from .base import BlockHandler, register_handler
|
||
from config.settings import settings
|
||
from utils.logger import get_logger
|
||
from .model.block_name import StorageBlockName
|
||
|
||
# 获取日志记录器
|
||
logger = get_logger("services.execution.handlers.storage_location")
|
||
# 提取公共的API调用函数
|
||
async def call_storage_api(api_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
调用库位服务API的通用函数
|
||
|
||
Args:
|
||
api_name: API名称,对应API_ENDPOINTS中的键
|
||
params: API参数
|
||
|
||
Returns:
|
||
API响应结果
|
||
"""
|
||
# 获取API端点和方法
|
||
endpoint = settings.STORAGE_API_ENDPOINTS.get(api_name)
|
||
method = settings.STORAGE_API_METHODS.get(api_name)
|
||
|
||
if not endpoint or not method:
|
||
logger.error(f"未找到API端点或方法:{api_name}")
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到API配置: {api_name}"
|
||
}
|
||
|
||
# 检查是否启用测试模式
|
||
if settings.STORAGE_API_MOCK_MODE:
|
||
logger.info(f"[测试模式] 模拟调用API: {api_name}, 参数: {params}")
|
||
# 构造测试模式下的响应
|
||
test_response = generate_mock_response(api_name, params)
|
||
return test_response
|
||
|
||
# 构建完整的URL
|
||
url = f"{settings.STORAGE_API_BASE_URL}{endpoint}"
|
||
|
||
# 准备请求头
|
||
headers = {"Content-Type": "application/json"}
|
||
if settings.STORAGE_API_TOKEN:
|
||
headers["Authorization"] = f"Bearer {settings.STORAGE_API_TOKEN}"
|
||
|
||
logger.info(f"调用外部API {api_name} - {method} {url}, 参数: {params}")
|
||
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
# 根据HTTP方法选择相应的请求方式
|
||
if method == "GET":
|
||
# 对于GET请求,将params转换为URL参数
|
||
async with session.get(
|
||
url,
|
||
params=params,
|
||
headers=headers,
|
||
timeout=settings.STORAGE_API_TIMEOUT
|
||
) as response:
|
||
result = await response.json()
|
||
elif method == "POST":
|
||
# 对于POST请求,将params作为JSON数据发送
|
||
async with session.post(
|
||
url,
|
||
json=params,
|
||
headers=headers,
|
||
timeout=settings.STORAGE_API_TIMEOUT
|
||
) as response:
|
||
result = await response.json()
|
||
elif method == "PUT":
|
||
# 对于PUT请求,将params作为JSON数据发送
|
||
async with session.put(
|
||
url,
|
||
json=params,
|
||
headers=headers,
|
||
timeout=settings.STORAGE_API_TIMEOUT
|
||
) as response:
|
||
result = await response.json()
|
||
else:
|
||
logger.error(f"不支持的HTTP方法: {method}")
|
||
return {
|
||
"success": False,
|
||
"message": f"不支持的HTTP方法: {method}"
|
||
}
|
||
|
||
# 检查响应状态码
|
||
if response.status != 200:
|
||
logger.error(f"API调用失败: {url}, 状态码: {response.status}, 响应: {result}")
|
||
return {
|
||
"success": False,
|
||
"message": f"API调用失败, 状态码: {response.status}",
|
||
"data": result
|
||
}
|
||
|
||
logger.info(f"API调用成功: {url}, 响应: {result}")
|
||
return result
|
||
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"API调用客户端错误: {url}, 错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"API调用客户端错误: {str(e)}"
|
||
}
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"API调用超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "API调用超时"
|
||
}
|
||
except json.JSONDecodeError:
|
||
logger.error(f"API响应解析失败: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "API响应格式错误,无法解析JSON"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"API调用异常: {url}, 错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"API调用异常: {str(e)}"
|
||
}
|
||
|
||
def generate_mock_response(api_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
生成测试模式下的模拟响应
|
||
|
||
Args:
|
||
api_name: API名称
|
||
params: API参数
|
||
|
||
Returns:
|
||
模拟的API响应
|
||
"""
|
||
# 基本响应结构
|
||
response = {
|
||
"success": True,
|
||
"message": f"[测试模式] {api_name} 操作成功",
|
||
"data": {}
|
||
}
|
||
# 根据不同API返回不同的模拟数据
|
||
if api_name.startswith("get_idle_site"):
|
||
# 获取库位类API
|
||
response["data"] = {
|
||
"siteId": f"MOCK-SITE-{str(uuid.uuid4())[:8]}"
|
||
}
|
||
elif api_name == "get_idle_crowded_site":
|
||
# 获取密集库位
|
||
response["data"] = {
|
||
"siteId": f"MOCK-CROWDED-{str(uuid.uuid4())[:8]}"
|
||
}
|
||
elif api_name == "get_locked_sites_by_task_record_id":
|
||
# 获取任务锁定的库位
|
||
response["data"] = {
|
||
"lockedSiteIdList": [f"MOCK-LOCKED-{str(uuid.uuid4())[:8]}" for _ in range(3)]
|
||
}
|
||
elif api_name == "get_site_attr":
|
||
# 获取库位属性
|
||
attr_name = params.get("attrName", "unknown")
|
||
response["data"] = {
|
||
"attrValue": f"MOCK-VALUE-FOR-{attr_name}"
|
||
}
|
||
elif api_name == "query_idle_site":
|
||
# 查询空闲库位
|
||
response["data"] = {
|
||
"site": {
|
||
"id": f"MOCK-QUERY-{str(uuid.uuid4())[:8]}",
|
||
"groupName": "MOCK-GROUP",
|
||
"type": 0,
|
||
"filled": params.get("filled", False),
|
||
"locked": params.get("locked", False),
|
||
"content": params.get("content", "")
|
||
}
|
||
}
|
||
elif api_name == "set_site_locked":
|
||
# 锁定库位
|
||
response["data"] = {"success": True}
|
||
|
||
# 记录模拟调用结果
|
||
logger.info(f"[测试模式] 生成模拟响应: {api_name}, 响应: {response}")
|
||
return response
|
||
|
||
# 创建一个基础库位处理器类,包含通用方法
|
||
class StorageBlockHandler(BlockHandler):
|
||
"""库位处理器基类,提供公共的API调用方法"""
|
||
|
||
async def _call_external_api(self, api_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""调用外部API的通用方法"""
|
||
return await call_storage_api(api_name, params)
|
||
|
||
# 批量设置库位处理器
|
||
@register_handler(StorageBlockName.BATCH_SETTING_SITE)
|
||
class BatchSettingSiteBlockHandler(StorageBlockHandler):
|
||
"""批量设置库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""执行批量设置库位操作"""
|
||
try:
|
||
# 获取关键参数用于验证
|
||
site_ids = input_params.get("siteIds", [])
|
||
group_names = input_params.get("groupNames", [])
|
||
filled = input_params.get("filled")
|
||
|
||
# 参数检查
|
||
if not site_ids and not group_names:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID和库区集不能同时为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
if filled is None:
|
||
result = {
|
||
"success": False,
|
||
"message": "占用参数不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API执行批量设置,传递所有input_params
|
||
result = await self._call_external_api("batch_setting_site", input_params)
|
||
if result.get("success", False):
|
||
result["message"] = f"批量设置库位成功,共设置 {len(site_ids) or len(group_names)} 个库位"
|
||
else:
|
||
result["message"] = f"批量设置库位失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"批量设置库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 获取密集库位处理器
|
||
@register_handler(StorageBlockName.GET_IDLE_CROWDED_SITE)
|
||
class GetIdleCrowdedSiteBlockHandler(StorageBlockHandler):
|
||
"""获取密集库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""执行获取密集库位操作"""
|
||
try:
|
||
# 获取关键参数用于验证
|
||
group_name = input_params.get("groupName", [])
|
||
filled = input_params.get("filled")
|
||
retry = input_params.get("retry", False)
|
||
retry_num = input_params.get("retryNum", 1)
|
||
|
||
# 参数检查
|
||
if not group_name:
|
||
result = {
|
||
"success": False,
|
||
"message": "库区集不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
if filled is None:
|
||
result = {
|
||
"success": False,
|
||
"message": "取/放参数不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 确保retry_num是整数类型
|
||
try:
|
||
retry_num = int(retry_num)
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"retry_num参数类型错误或无法转换为整数: {retry_num}, 将使用默认值1")
|
||
retry_num = 1
|
||
input_params["retryNum"] = retry_num
|
||
|
||
# 确保retry_period是数字
|
||
retry_period = input_params.get("retryPeriod")
|
||
if retry_period is not None:
|
||
try:
|
||
retry_sleep = float(retry_period) / 1000
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"retry_period参数类型错误或无法转换为数字: {retry_period}, 使用默认值1秒")
|
||
retry_sleep = 1
|
||
else:
|
||
retry_sleep = 1
|
||
|
||
# 调用外部API执行获取密集库位
|
||
attempts = 1
|
||
# 确保max_attempts是整数
|
||
max_attempts = retry_num if retry and retry_num is not None else 1
|
||
|
||
result = None
|
||
while attempts <= max_attempts:
|
||
result = await self._call_external_api("get_idle_crowded_site", input_params)
|
||
|
||
if result.get("success", False) and result.get("data", {}).get("siteId"):
|
||
# 获取成功
|
||
site_id = result.get("data", {}).get("siteId")
|
||
# 设置上下文变量
|
||
context.set_variable("siteId", site_id)
|
||
context.set_block_output(block.get("name"), {"siteId": site_id})
|
||
|
||
result["message"] = f"获取密集库位成功,库位ID: {site_id}"
|
||
break
|
||
else:
|
||
# 获取失败,判断是否需要重试
|
||
if retry and attempts < max_attempts:
|
||
logger.info(f"获取密集库位失败,第 {attempts} 次重试,等待 {retry_sleep} 秒后重试")
|
||
await asyncio.sleep(retry_sleep)
|
||
attempts += 1
|
||
else:
|
||
result["message"] = f"获取密集库位失败: {result.get('message', '未找到合适的库位')}"
|
||
break
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"获取密集库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 获取库位处理器
|
||
@register_handler(StorageBlockName.GET_IDLE_SITE)
|
||
class GetIdleSiteBlockHandler(StorageBlockHandler):
|
||
"""获取库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""执行获取库位操作"""
|
||
try:
|
||
# 获取关键参数用于验证
|
||
locked = input_params.get("locked")
|
||
retry_period = input_params.get("retryPeriod")
|
||
|
||
# 必填参数检查
|
||
if locked is None:
|
||
result = {
|
||
"success": False,
|
||
"message": "是否已锁定参数不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 确保retry_period是数字类型
|
||
if retry_period is not None:
|
||
try:
|
||
retry_period = float(retry_period)
|
||
input_params["retryPeriod"] = retry_period
|
||
retry_sleep = retry_period / 1000
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"retry_period参数类型错误或无法转换为数字: {retry_period}, 将使用默认值1000")
|
||
retry_sleep = 1
|
||
input_params["retryPeriod"] = 1000
|
||
else:
|
||
retry_sleep = 1
|
||
input_params["retryPeriod"] = 1000
|
||
|
||
# 设置重试计数器(用于日志记录)
|
||
retry_count = 0
|
||
|
||
while True: # 无限循环,直到找到库位
|
||
# 尝试获取库位
|
||
result = await self._call_external_api("get_idle_site", input_params)
|
||
|
||
if result.get("success", False) and result.get("data", {}).get("siteId"):
|
||
# 获取成功
|
||
site_id = result.get("data", {}).get("siteId")
|
||
# 设置上下文变量
|
||
context.set_variable("siteId", site_id)
|
||
context.set_block_output(block.get("name"), {"siteId": site_id})
|
||
|
||
# 根据重试次数设置不同的成功消息
|
||
if retry_count == 0:
|
||
result["message"] = f"获取库位成功,库位ID: {site_id}"
|
||
else:
|
||
result["message"] = f"第{retry_count}次重试获取库位成功,库位ID: {site_id}"
|
||
|
||
# 记录执行结果并退出循环
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
else:
|
||
# 获取失败,记录日志并继续重试
|
||
retry_count += 1
|
||
logger.info(f"获取库位失败,第{retry_count}次重试,等待 {retry_sleep} 秒后继续")
|
||
|
||
# 更新临时结果消息(仅用于日志记录)
|
||
temp_result = {
|
||
"success": False,
|
||
"message": f"第{retry_count}次尝试获取库位失败: {result.get('message', '未找到合适的库位')},将继续重试"
|
||
}
|
||
await self._record_task_log(block, temp_result, context)
|
||
|
||
# 等待指定时间后继续重试
|
||
await asyncio.sleep(retry_sleep)
|
||
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"获取库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 获取任务实例加锁库位处理器
|
||
@register_handler(StorageBlockName.GET_LOCKED_SITES_BY_TASK_RECORD_ID)
|
||
class GetLockedSitesByTaskRecordIdBlockHandler(StorageBlockHandler):
|
||
"""根据任务实例ID获取所有加锁库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""获取任务实例关联的所有已锁库位"""
|
||
try:
|
||
# 获取参数进行验证
|
||
task_record_id = input_params.get("taskRecordId")
|
||
|
||
# 必填参数检查
|
||
if not task_record_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "任务实例ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API获取库位列表
|
||
result = await self._call_external_api("get_locked_sites_by_task_record_id", input_params)
|
||
|
||
if result.get("success", False):
|
||
# 获取成功,设置上下文变量
|
||
locked_site_list = result.get("data", {}).get("lockedSiteIdList", [])
|
||
context.set_variable("lockedSiteIdList", locked_site_list)
|
||
context.set_block_output(block.get("name"), {"lockedSiteIdList": locked_site_list})
|
||
|
||
result["message"] = f"获取任务锁定库位成功,共 {len(locked_site_list)} 个库位"
|
||
else:
|
||
result["message"] = f"获取任务锁定库位失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"获取任务锁定库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 获取库位扩展属性处理器
|
||
@register_handler(StorageBlockName.GET_SITE_ATTR)
|
||
class GetSiteAttrBlockHandler(StorageBlockHandler):
|
||
"""获取库位扩展属性处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""获取库位扩展属性值"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
attr_name = input_params.get("attrName")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
if not attr_name:
|
||
result = {
|
||
"success": False,
|
||
"message": "属性名称不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API获取属性值
|
||
result = await self._call_external_api("get_site_attr", input_params)
|
||
|
||
if result.get("success", False):
|
||
# 获取成功,设置上下文变量
|
||
attr_value = result.get("data", {}).get("attrValue")
|
||
context.set_variable("attrValue", attr_value)
|
||
context.set_block_output(block.get("name"), {"attrValue": attr_value})
|
||
|
||
result["message"] = f"获取库位属性值成功,{attr_name}: {attr_value}"
|
||
else:
|
||
result["message"] = f"获取库位属性值失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"获取库位属性值执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 查询库位处理器
|
||
@register_handler(StorageBlockName.QUERY_IDLE_SITE)
|
||
class QueryIdleSiteBlockHandler(StorageBlockHandler):
|
||
"""查询库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""查询库位"""
|
||
try:
|
||
# 直接调用外部API查询库位
|
||
result = await self._call_external_api("query_idle_site", input_params)
|
||
|
||
if result.get("success", False) and result.get("data", {}).get("site"):
|
||
# 查询成功,设置上下文变量
|
||
site = result.get("data", {}).get("site")
|
||
context.set_variable("site", site)
|
||
context.set_block_output(block.get("name"), {"site": site})
|
||
|
||
site_id = site.get("id", "未知")
|
||
result["message"] = f"查询库位成功,库位ID: {site_id}"
|
||
else:
|
||
result["message"] = f"查询库位失败: {result.get('message', '未找到符合条件的库位')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"查询库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 设置库位扩展属性处理器
|
||
@register_handler(StorageBlockName.SET_SITE_ATTR)
|
||
class SetSiteAttrBlockHandler(StorageBlockHandler):
|
||
"""设置库位扩展属性处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""设置库位扩展属性值"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
attr_name = input_params.get("attrName")
|
||
attr_value = input_params.get("attrValue")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
if not attr_name:
|
||
result = {
|
||
"success": False,
|
||
"message": "属性名称不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API设置属性值
|
||
result = await self._call_external_api("set_site_attr", input_params)
|
||
|
||
if result.get("success", False):
|
||
value_display = attr_value if attr_value is not None else "null"
|
||
result["message"] = f"设置库位属性值成功,{site_id}.{attr_name} = {value_display}"
|
||
else:
|
||
result["message"] = f"设置库位属性值失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置库位属性值执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 设置库位货物处理器
|
||
@register_handler(StorageBlockName.SET_SITE_CONTENT)
|
||
class SetSiteContentBlockHandler(StorageBlockHandler):
|
||
"""设置库位货物处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""设置库位货物"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
content = input_params.get("content")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
if content is None:
|
||
result = {
|
||
"success": False,
|
||
"message": "货物不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API设置库位货物
|
||
result = await self._call_external_api("set_site_content", input_params)
|
||
|
||
if result.get("success", False):
|
||
result["message"] = f"设置库位货物成功,{site_id} 货物: {content}"
|
||
else:
|
||
result["message"] = f"设置库位货物失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置库位货物执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 设置库位为空处理器
|
||
@register_handler(StorageBlockName.SET_SITE_EMPTY)
|
||
class SetSiteEmptyBlockHandler(StorageBlockHandler):
|
||
"""设置库位为空处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""设置库位为空"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API设置库位为空
|
||
result = await self._call_external_api("set_site_empty", input_params)
|
||
|
||
if result.get("success", False):
|
||
result["message"] = f"设置库位为空成功,库位ID: {site_id}"
|
||
else:
|
||
result["message"] = f"设置库位为空失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置库位为空执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 设置库位为占用处理器
|
||
@register_handler(StorageBlockName.SET_SITE_FILLED)
|
||
class SetSiteFilledBlockHandler(StorageBlockHandler):
|
||
"""设置库位为占用处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""设置库位为占用"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API设置库位为占用
|
||
result = await self._call_external_api("set_site_filled", input_params)
|
||
|
||
if result.get("success", False):
|
||
result["message"] = f"设置库位为占用成功,库位ID: {site_id}"
|
||
else:
|
||
result["message"] = f"设置库位为占用失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置库位为占用执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 锁定库位处理器
|
||
@register_handler(StorageBlockName.SET_SITE_LOCKED)
|
||
class SetSiteLockedBlockHandler(StorageBlockHandler):
|
||
"""锁定库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""锁定库位"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
locked_id = input_params.get("lockedId")
|
||
if_fair = input_params.get("ifFair", False) # 是否为公平锁
|
||
retryTimes = input_params.get("retryTimes", 0) # 重试次数,默认为0表示不重试
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 如果未指定锁定者ID,则使用当前任务ID
|
||
if not locked_id:
|
||
locked_id = context.get_task_record_id()
|
||
input_params["lockedId"] = locked_id
|
||
|
||
# 确保retry_num是整数类型
|
||
try:
|
||
retry_num = int(retryTimes)
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"retry_num参数类型错误或无法转换为整数: {retryTimes}, 将使用默认值0")
|
||
retry_num = 0
|
||
input_params["retryNum"] = retry_num
|
||
|
||
# 处理公平锁参数
|
||
if if_fair is not None:
|
||
input_params["ifFair"] = if_fair
|
||
|
||
# 实现重试逻辑
|
||
attempts = 0
|
||
max_attempts = retry_num # 包括首次尝试
|
||
retry_sleep = 1 # 默认重试间隔为1秒
|
||
|
||
while attempts < max_attempts:
|
||
attempts += 1
|
||
|
||
# 调用外部API锁定库位
|
||
result = await self._call_external_api("set_site_locked", input_params)
|
||
|
||
if result.get("success", False):
|
||
# 获取锁定结果
|
||
success = result.get("data", {}).get("success", False)
|
||
|
||
# 设置上下文变量
|
||
context.set_variable("success", success)
|
||
context.set_block_output(block.get("name"), {"success": success})
|
||
|
||
if success:
|
||
# 锁定成功
|
||
if attempts == 1:
|
||
result["message"] = f"锁定库位成功,库位ID: {site_id}"
|
||
else:
|
||
result["message"] = f"第{attempts}次尝试锁定库位成功,库位ID: {site_id}"
|
||
break
|
||
else:
|
||
# 锁定失败但API调用成功,检查是否需要重试
|
||
if attempts < max_attempts:
|
||
logger.info(f"锁定库位失败,库位ID: {site_id},第 {attempts} 次尝试,等待 {retry_sleep} 秒后重试")
|
||
# 记录临时结果(仅用于日志记录)
|
||
temp_result = {
|
||
"success": False,
|
||
"message": f"第{attempts}次尝试锁定库位失败: 库位 {site_id} 可能已被其他任务锁定,将继续重试"
|
||
}
|
||
await self._record_task_log(block, temp_result, context)
|
||
await asyncio.sleep(retry_sleep)
|
||
else:
|
||
# 达到最大重试次数,返回最终失败结果
|
||
result["message"] = f"锁定库位失败,库位ID: {site_id},可能已被其他任务锁定,重试 {retry_num} 次后仍然失败"
|
||
else:
|
||
# API调用失败,检查是否需要重试
|
||
if attempts < max_attempts:
|
||
logger.info(f"API调用失败,第 {attempts} 次尝试,等待 {retry_sleep} 秒后重试")
|
||
# 记录临时结果(仅用于日志记录)
|
||
temp_result = {
|
||
"success": False,
|
||
"message": f"第{attempts}次API调用失败: {result.get('message', '未知错误')},将继续重试"
|
||
}
|
||
await self._record_task_log(block, temp_result, context)
|
||
await asyncio.sleep(retry_sleep)
|
||
else:
|
||
# 达到最大重试次数,返回最终失败结果
|
||
result["message"] = f"锁定库位失败: {result.get('message', '未知错误')},重试 {retry_num} 次后仍然失败"
|
||
|
||
# 记录最终执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"锁定库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 设置库位标签处理器
|
||
@register_handler(StorageBlockName.SET_SITE_TAGS)
|
||
class SetSiteTagsBlockHandler(StorageBlockHandler):
|
||
"""设置库位标签处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""设置库位标签"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
tags = input_params.get("tags")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 直接调用外部API设置库位标签
|
||
result = await self._call_external_api("set_site_tags", input_params)
|
||
|
||
if result.get("success", False):
|
||
tags_display = tags if tags else "空"
|
||
result["message"] = f"设置库位标签成功,库位ID: {site_id},标签: {tags_display}"
|
||
else:
|
||
result["message"] = f"设置库位标签失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置库位标签执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 解锁库位处理器
|
||
@register_handler(StorageBlockName.SET_SITE_UNLOCKED)
|
||
class SetSiteUnlockedBlockHandler(StorageBlockHandler):
|
||
"""解锁库位处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""解锁库位"""
|
||
try:
|
||
# 获取参数进行验证
|
||
site_id = input_params.get("siteId")
|
||
un_locked_id = input_params.get("unLockedId")
|
||
|
||
# 必填参数检查
|
||
if not site_id:
|
||
result = {
|
||
"success": False,
|
||
"message": "库位ID不能为空"
|
||
}
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 如果未指定解锁者ID,则使用当前任务ID
|
||
if not un_locked_id:
|
||
un_locked_id = context.get_task_record_id()
|
||
input_params["unLockedId"] = un_locked_id
|
||
logger.info(f"未指定解锁者ID,使用当前任务ID: {un_locked_id}")
|
||
|
||
# 调用外部API解锁库位
|
||
result = await self._call_external_api("set_site_unlocked", input_params)
|
||
|
||
if result.get("success", False):
|
||
# 设置上下文变量
|
||
context.set_variable("unlockSuccess", True)
|
||
context.set_block_output(block.get("name"), {"unlockSuccess": True})
|
||
|
||
result["message"] = f"解锁库位成功,库位ID: {site_id}"
|
||
else:
|
||
# 设置上下文变量
|
||
context.set_variable("unlockSuccess", False)
|
||
context.set_block_output(block.get("name"), {"unlockSuccess": False})
|
||
|
||
result["message"] = f"解锁库位失败: {result.get('message', '未知错误')}"
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"解锁库位执行异常: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result |