tianfeng_task_modules/components/robot_components.py
2025-03-17 14:58:05 +08:00

176 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# components/robot_components.py
from typing import Dict, Any, List, Optional
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("select_robot")
class SelectRobotComponent(Component):
"""选择执行机器人组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
priority = self.resolve_param("priority", "distance")
robot_id = self.resolve_param("robot_id")
tags = self.resolve_param("tags", [])
robot_group = self.resolve_param("robot_group")
# 如果直接指定了机器人ID则使用该机器人
if robot_id:
selected_robot = robot_id
else:
# 实际实现中这里应该调用机器人调度系统API选择合适的机器人
# 这里简化为返回模拟数据
selected_robot = "robot_001"
# 设置执行机器人
self.context.set_executing_robot(selected_robot)
result = {
"selectedAgvId": selected_robot,
"priority": priority
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_position")
class GetRobotPositionComponent(Component):
"""获取机器人位置组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id")
# 验证必要参数
self.validate_required_params(["robot_id"])
# 实际实现中这里应该调用机器人系统API获取位置
# 这里简化为返回模拟数据
position = {
"x": 100.5,
"y": 200.3,
"angle": 45.0,
"station": "station_A"
}
result = {
"position": position,
"station": position["station"]
}
# 存储结果
self.store_result(result)
return result
# components/robot_components.py (继续)
@ComponentFactory.register("robot_action")
class RobotActionComponent(Component):
"""机器人通用动作组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_station = self.resolve_param("target_station")
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
goods_id = self.resolve_param("goods_id")
speed = self.resolve_param("speed")
# 验证必要参数
self.validate_required_params(["target_station"])
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人控制API执行动作
# 这里简化为返回模拟数据
result = {
"robotId": robot_id,
"targetStation": target_station,
"goodsId": goods_id,
"status": "executing"
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("change_robot_destination")
class ChangeRobotDestinationComponent(Component):
"""机器人更换目的地组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_station = self.resolve_param("target_station")
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
goods_id = self.resolve_param("goods_id")
# 验证必要参数
self.validate_required_params(["target_station"])
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人控制API更改目的地
# 这里简化为返回模拟数据
result = {
"robotId": robot_id,
"newTargetStation": target_station,
"goodsId": goods_id,
"status": "destination_changed"
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_battery")
class GetRobotBatteryComponent(Component):
"""获取机器人电量组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
# 验证必要参数
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人系统API获取电量
# 这里简化为返回模拟数据
battery_level = 85 # 电量百分比
result = {"batteryLevel": battery_level}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_robot_pgv_code")
class GetRobotPGVCodeComponent(Component):
"""获取机器人PGV码组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
robot_id = self.resolve_param("robot_id", self.context.executing_robot)
# 验证必要参数
if not robot_id:
raise ComponentError("未指定机器人ID请先选择执行机器人或直接指定robot_id参数")
# 实际实现中这里应该调用机器人系统API获取PGV码
# 这里简化为返回模拟数据
pgv_code = {
"code": "PGV12345",
"timestamp": int(time.time() * 1000),
"coordinates": {
"x": 123.45,
"y": 67.89
},
"status": "valid"
}
result = {"codeInfo": pgv_code}
# 存储结果
self.store_result(result)
return result