# components/robot_components.py from typing import Dict, Any, List, Optional from core.component import Component, ComponentFactory from core.exceptions import ComponentError @ComponentFactory.register("select_robot") class SelectRobotComponent(Component): """选择执行机器人组件""" def execute(self) -> Dict[str, Any]: # 获取参数 priority = self.resolve_param("priority", "distance") robot_id = self.resolve_param("robot_id") tags = self.resolve_param("tags", []) robot_group = self.resolve_param("robot_group") # 如果直接指定了机器人ID,则使用该机器人 if robot_id: selected_robot = robot_id else: # 实际实现中,这里应该调用机器人调度系统API选择合适的机器人 # 这里简化为返回模拟数据 selected_robot = "robot_001" # 设置执行机器人 self.context.set_executing_robot(selected_robot) result = { "selectedAgvId": selected_robot, "priority": priority } # 存储结果 self.store_result(result) return result @ComponentFactory.register("get_robot_position") class GetRobotPositionComponent(Component): """获取机器人位置组件""" def execute(self) -> Dict[str, Any]: # 获取参数 robot_id = self.resolve_param("robot_id") # 验证必要参数 self.validate_required_params(["robot_id"]) # 实际实现中,这里应该调用机器人系统API获取位置 # 这里简化为返回模拟数据 position = { "x": 100.5, "y": 200.3, "angle": 45.0, "station": "station_A" } result = { "position": position, "station": position["station"] } # 存储结果 self.store_result(result) return result # components/robot_components.py (继续) @ComponentFactory.register("robot_action") class RobotActionComponent(Component): """机器人通用动作组件""" def execute(self) -> Dict[str, Any]: # 获取参数 target_station = self.resolve_param("target_station") robot_id = self.resolve_param("robot_id", self.context.executing_robot) goods_id = self.resolve_param("goods_id") speed = self.resolve_param("speed") # 验证必要参数 self.validate_required_params(["target_station"]) if not robot_id: raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数") # 实际实现中,这里应该调用机器人控制API执行动作 # 这里简化为返回模拟数据 result = { "robotId": robot_id, "targetStation": target_station, "goodsId": goods_id, "status": "executing" } # 存储结果 self.store_result(result) return result @ComponentFactory.register("change_robot_destination") class ChangeRobotDestinationComponent(Component): """机器人更换目的地组件""" def execute(self) -> Dict[str, Any]: # 获取参数 target_station = self.resolve_param("target_station") robot_id = self.resolve_param("robot_id", self.context.executing_robot) goods_id = self.resolve_param("goods_id") # 验证必要参数 self.validate_required_params(["target_station"]) if not robot_id: raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数") # 实际实现中,这里应该调用机器人控制API更改目的地 # 这里简化为返回模拟数据 result = { "robotId": robot_id, "newTargetStation": target_station, "goodsId": goods_id, "status": "destination_changed" } # 存储结果 self.store_result(result) return result @ComponentFactory.register("get_robot_battery") class GetRobotBatteryComponent(Component): """获取机器人电量组件""" def execute(self) -> Dict[str, Any]: # 获取参数 robot_id = self.resolve_param("robot_id", self.context.executing_robot) # 验证必要参数 if not robot_id: raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数") # 实际实现中,这里应该调用机器人系统API获取电量 # 这里简化为返回模拟数据 battery_level = 85 # 电量百分比 result = {"batteryLevel": battery_level} # 存储结果 self.store_result(result) return result @ComponentFactory.register("get_robot_pgv_code") class GetRobotPGVCodeComponent(Component): """获取机器人PGV码组件""" def execute(self) -> Dict[str, Any]: # 获取参数 robot_id = self.resolve_param("robot_id", self.context.executing_robot) # 验证必要参数 if not robot_id: raise ComponentError("未指定机器人ID,请先选择执行机器人或直接指定robot_id参数") # 实际实现中,这里应该调用机器人系统API获取PGV码 # 这里简化为返回模拟数据 pgv_code = { "code": "PGV12345", "timestamp": int(time.time() * 1000), "coordinates": { "x": 123.45, "y": 67.89 }, "status": "valid" } result = {"codeInfo": pgv_code} # 存储结果 self.store_result(result) return result