296 lines
9.4 KiB
Python
296 lines
9.4 KiB
Python
# components/site_components.py
|
||
"""
|
||
库位组件
|
||
"""
|
||
from typing import Dict, Any, List, Optional
|
||
from core.component import Component, ComponentFactory
|
||
from core.exceptions import ComponentError
|
||
|
||
@ComponentFactory.register("batch_set_site")
|
||
class BatchSetSiteComponent(Component):
|
||
"""批量设置库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_ids = self.resolve_param("site_ids", [])
|
||
area_set = self.resolve_param("area_set")
|
||
occupied = self.resolve_param("occupied")
|
||
has_material = self.resolve_param("has_material")
|
||
is_material_locked = self.resolve_param("is_material_locked")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_ids"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API批量设置库位
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"affected_sites": site_ids,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_dense_site")
|
||
class GetDenseSiteComponent(Component):
|
||
"""获取密集库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
area_set = self.resolve_param("area_set")
|
||
operation = self.resolve_param("operation") # 取/放
|
||
goods = self.resolve_param("goods")
|
||
auto_lock = self.resolve_param("auto_lock", False)
|
||
retry = self.resolve_param("retry", False)
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["area_set", "operation"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API获取密集库位
|
||
# 这里简化为返回模拟结果
|
||
site_id = "site_001"
|
||
|
||
result = {"siteId": site_id}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("query_site")
|
||
class QuerySiteComponent(Component):
|
||
"""查询库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
goods = self.resolve_param("goods")
|
||
has_goods = self.resolve_param("has_goods")
|
||
is_locked = self.resolve_param("is_locked")
|
||
is_inventory = self.resolve_param("is_inventory")
|
||
retry_interval = self.resolve_param("retry_interval", 1000)
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API查询库位
|
||
# 这里简化为返回模拟结果
|
||
site_info = {
|
||
"id": site_id or "site_001",
|
||
"hasGoods": has_goods if has_goods is not None else False,
|
||
"isLocked": is_locked if is_locked is not None else False,
|
||
"isInventory": is_inventory if is_inventory is not None else True,
|
||
"goods": goods
|
||
}
|
||
|
||
result = {"site": site_info}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("lock_site")
|
||
class LockSiteComponent(Component):
|
||
"""锁定库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
reason = self.resolve_param("reason", "任务锁定")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API锁定库位
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"locked": True,
|
||
"reason": reason
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("unlock_site")
|
||
class UnlockSiteComponent(Component):
|
||
"""解锁库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
reason = self.resolve_param("reason", "任务解锁")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API解锁库位
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"locked": False,
|
||
"reason": reason
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_locked_sites_by_task")
|
||
class GetLockedSitesByTaskComponent(Component):
|
||
"""根据任务实例ID获取所有加锁库位组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
task_instance_id = self.resolve_param("task_instance_id")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["task_instance_id"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API获取任务锁定的库位
|
||
# 这里简化为返回模拟结果
|
||
site_ids = ["site_001", "site_002"]
|
||
|
||
result = {"siteIds": site_ids}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_site_extension_property")
|
||
class GetSiteExtensionPropertyComponent(Component):
|
||
"""获取库位扩展属性值组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
property_name = self.resolve_param("property_name")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id", "property_name"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API获取库位扩展属性
|
||
# 这里简化为返回模拟结果
|
||
value = f"属性值_{property_name}"
|
||
|
||
result = {"value": value}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("set_site_extension_property")
|
||
class SetSiteExtensionPropertyComponent(Component):
|
||
"""设置库位扩展属性组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
property_name = self.resolve_param("property_name")
|
||
property_value = self.resolve_param("property_value")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id", "property_name", "property_value"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API设置库位扩展属性
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"propertyName": property_name,
|
||
"propertyValue": property_value,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("set_site_goods")
|
||
class SetSiteGoodsComponent(Component):
|
||
"""设置库位货物组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
goods = self.resolve_param("goods")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id", "goods"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API设置库位货物
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"goods": goods,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("set_site_empty")
|
||
class SetSiteEmptyComponent(Component):
|
||
"""设置库位为空组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API设置库位为空
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("set_site_occupied")
|
||
class SetSiteOccupiedComponent(Component):
|
||
"""设置库位为占用组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API设置库位为占用
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("set_site_tag")
|
||
class SetSiteTagComponent(Component):
|
||
"""设置库位标签组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
site_id = self.resolve_param("site_id")
|
||
tag = self.resolve_param("tag")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["site_id", "tag"])
|
||
|
||
# 实际实现中,这里应该调用库位管理系统API设置库位标签
|
||
# 这里简化为返回模拟结果
|
||
result = {
|
||
"siteId": site_id,
|
||
"tag": tag,
|
||
"success": True
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result |