176 lines
5.8 KiB
Python
176 lines
5.8 KiB
Python
# components/robot_components.py
|
||
from typing import Dict, Any, List, Optional
|
||
from core.component import Component, ComponentFactory
|
||
from core.exceptions import ComponentError
|
||
|
||
@ComponentFactory.register("select_robot")
|
||
class SelectRobotComponent(Component):
|
||
"""选择执行机器人组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
priority = self.resolve_param("priority", "distance")
|
||
robot_id = self.resolve_param("robot_id")
|
||
tags = self.resolve_param("tags", [])
|
||
robot_group = self.resolve_param("robot_group")
|
||
|
||
# 如果直接指定了机器人ID,则使用该机器人
|
||
if robot_id:
|
||
selected_robot = robot_id
|
||
else:
|
||
# 实际实现中,这里应该调用机器人调度系统API选择合适的机器人
|
||
# 这里简化为返回模拟数据
|
||
selected_robot = "robot_001"
|
||
|
||
# 设置执行机器人
|
||
self.context.set_executing_robot(selected_robot)
|
||
|
||
result = {
|
||
"selectedAgvId": selected_robot,
|
||
"priority": priority
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_robot_position")
|
||
class GetRobotPositionComponent(Component):
|
||
"""获取机器人位置组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
robot_id = self.resolve_param("robot_id")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["robot_id"])
|
||
|
||
# 实际实现中,这里应该调用机器人系统API获取位置
|
||
# 这里简化为返回模拟数据
|
||
position = {
|
||
"x": 100.5,
|
||
"y": 200.3,
|
||
"angle": 45.0,
|
||
"station": "station_A"
|
||
}
|
||
|
||
result = {
|
||
"position": position,
|
||
"station": position["station"]
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
# components/robot_components.py (继续)
|
||
@ComponentFactory.register("robot_action")
|
||
class RobotActionComponent(Component):
|
||
"""机器人通用动作组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
target_station = self.resolve_param("target_station")
|
||
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
|
||
goods_id = self.resolve_param("goods_id")
|
||
speed = self.resolve_param("speed")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["target_station"])
|
||
|
||
if not robot_id:
|
||
raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数")
|
||
|
||
# 实际实现中,这里应该调用机器人控制API执行动作
|
||
# 这里简化为返回模拟数据
|
||
result = {
|
||
"robotId": robot_id,
|
||
"targetStation": target_station,
|
||
"goodsId": goods_id,
|
||
"status": "executing"
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("change_robot_destination")
|
||
class ChangeRobotDestinationComponent(Component):
|
||
"""机器人更换目的地组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
target_station = self.resolve_param("target_station")
|
||
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
|
||
goods_id = self.resolve_param("goods_id")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["target_station"])
|
||
|
||
if not robot_id:
|
||
raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数")
|
||
|
||
# 实际实现中,这里应该调用机器人控制API更改目的地
|
||
# 这里简化为返回模拟数据
|
||
result = {
|
||
"robotId": robot_id,
|
||
"newTargetStation": target_station,
|
||
"goodsId": goods_id,
|
||
"status": "destination_changed"
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_robot_battery")
|
||
class GetRobotBatteryComponent(Component):
|
||
"""获取机器人电量组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
|
||
|
||
# 验证必要参数
|
||
if not robot_id:
|
||
raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数")
|
||
|
||
# 实际实现中,这里应该调用机器人系统API获取电量
|
||
# 这里简化为返回模拟数据
|
||
battery_level = 85 # 电量百分比
|
||
|
||
result = {"batteryLevel": battery_level}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("get_robot_pgv_code")
|
||
class GetRobotPGVCodeComponent(Component):
|
||
"""获取机器人PGV码组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
|
||
|
||
# 验证必要参数
|
||
if not robot_id:
|
||
raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数")
|
||
|
||
# 实际实现中,这里应该调用机器人系统API获取PGV码
|
||
# 这里简化为返回模拟数据
|
||
pgv_code = {
|
||
"code": "PGV12345",
|
||
"timestamp": int(time.time() * 1000),
|
||
"coordinates": {
|
||
"x": 123.45,
|
||
"y": 67.89
|
||
},
|
||
"status": "valid"
|
||
}
|
||
|
||
result = {"codeInfo": pgv_code}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result |