tianfeng_task_modules/components/robot_components.py

176 lines
5.8 KiB
Python
Raw Normal View History

2025-03-17 14:58:05 +08:00
# components/robot_components.py
from typing import Dict, Any, List, Optional
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("select_robot")
class SelectRobotComponent(Component):
"""选择执行机器人组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
priority = self.resolve_param("priority", "distance")
robot_id = self.resolve_param("robot_id")
tags = self.resolve_param("tags", [])
robot_group = self.resolve_param("robot_group")
# 如果直接指定了机器人ID则使用该机器人
if robot_id:
selected_robot = robot_id
else:
# 实际实现中这里应该调用机器人调度系统API选择合适的机器人
# 这里简化为返回模拟数据
selected_robot = "robot_001"
# 设置执行机器人
self.context.set_executing_robot(selected_robot)
result = {
"selectedAgvId": selected_robot,
"priority": priority
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_position")
class GetRobotPositionComponent(Component):
"""获取机器人位置组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id")
# 验证必要参数
self.validate_required_params(["robot_id"])
# 实际实现中这里应该调用机器人系统API获取位置
# 这里简化为返回模拟数据
position = {
"x": 100.5,
"y": 200.3,
"angle": 45.0,
"station": "station_A"
}
result = {
"position": position,
"station": position["station"]
}
# 存储结果
self.store_result(result)
return result
# components/robot_components.py (继续)
@ComponentFactory.register("robot_action")
class RobotActionComponent(Component):
"""机器人通用动作组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_station = self.resolve_param("target_station")
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
goods_id = self.resolve_param("goods_id")
speed = self.resolve_param("speed")
# 验证必要参数
self.validate_required_params(["target_station"])
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人控制API执行动作
# 这里简化为返回模拟数据
result = {
"robotId": robot_id,
"targetStation": target_station,
"goodsId": goods_id,
"status": "executing"
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("change_robot_destination")
class ChangeRobotDestinationComponent(Component):
"""机器人更换目的地组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_station = self.resolve_param("target_station")
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
goods_id = self.resolve_param("goods_id")
# 验证必要参数
self.validate_required_params(["target_station"])
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人控制API更改目的地
# 这里简化为返回模拟数据
result = {
"robotId": robot_id,
"newTargetStation": target_station,
"goodsId": goods_id,
"status": "destination_changed"
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_battery")
class GetRobotBatteryComponent(Component):
"""获取机器人电量组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
# 验证必要参数
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人系统API获取电量
# 这里简化为返回模拟数据
battery_level = 85 # 电量百分比
result = {"batteryLevel": battery_level}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_pgv_code")
class GetRobotPGVCodeComponent(Component):
"""获取机器人PGV码组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
# 验证必要参数
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人系统API获取PGV码
# 这里简化为返回模拟数据
pgv_code = {
"code": "PGV12345",
"timestamp": int(time.time() * 1000),
"coordinates": {
"x": 123.45,
"y": 67.89
},
"status": "valid"
}
result = {"codeInfo": pgv_code}
# 存储结果
self.store_result(result)
return result