s = { "inputParams": [], "outputParams": [], "rootBlock": { "id": -1, "name": "-1", "blockType": "RootBp", "inputParams": {}, "children": { "default": [ { "id": 9, "name": "b5", "blockType": "GetIdleCrowdedSiteBp", "children": {}, "inputParams": { "groupName": { "value": "[\"AS2_2\"]", "type": "Simple", "required": True }, "filled": { "value": "true", "type": "Simple", "required": True }, "content": { "value": "", "type": "Simple", "required": True }, "lock": { "value": "", "type": "Simple", "required": True }, "retry": { "value": "false", "type": "Simple", "required": True }, "retryPeriod": { "value": "", "type": "Simple", "required": True }, "retryNum": { "value": "", "type": "Simple", "required": True } }, "refTaskDefId": "", "selected": True, "expanded": True, "label": "获取密集库位" }, { "id": 10, "name": "b6", "blockType": "GetIdleCrowdedSiteBp", "children": {}, "inputParams": { "groupName": { "value": "AS2_2", "type": "Simple", "required": True }, "filled": { "value": "true", "type": "Simple", "required": True }, "content": { "value": "", "type": "Simple", "required": True }, "lock": { "value": "", "type": "Simple", "required": True }, "retry": { "value": "true", "type": "Simple", "required": True }, "retryPeriod": { "value": "", "type": "Simple", "required": True }, "retryNum": { "value": "", "type": "Simple", "required": True } }, "refTaskDefId": "", "selected": True, "expanded": True, "label": "获取密集库位" }, { "id": 11, "name": "b7", "blockType": "CSelectAgvBp", "children": { "default": [ { "id": 9, "name": "b8", "blockType": "CAgvOperationBp", "children": {}, "inputParams": { "targetSiteLabel": { "value": "AP1", "type": "Simple", "required": True }, "spin": { "value": True, "type": "Simple", "required": True }, "task": { "value": "", "type": "Simple", "required": True } }, "refTaskDefId": "", "selected": True, "expanded": True, "label": "机器人通用动作" } ] }, "inputParams": { "priority": { "value": "", "type": "Simple", "required": True }, "vehicle": { "value": "", "type": "Simple", "required": True }, "group": { "value": "", "type": "Simple", "required": True }, "keyRoute": { "value": "blocks.b5.siteId", "type": "Expression", "required": True } }, "refTaskDefId": "", "selected": True, "expanded": True, "label": "选择执行机器人" } ] }, "selected": True, "refTaskDefId": "", "expanded": True } } import json def check_task_def_select_agv(detail_json: str) -> int: """ 检查任务定义详情中是否包含SELECT_AGV块类型 Args: detail_json: 任务定义详情JSON字符串 Returns: int: 存在返回1,不存在返回0 """ # from services.execution.handlers.model.block_name import RobotBlockName try: # 解析JSON字符串 print("---") if not detail_json: return 0 print("========") detail = json.loads(detail_json) if not detail: return 0 # 选择AGV块类型常量 SELECT_AGV_TYPE = "CSelectAgvBp" # print(SELECT_AGV_TYPE, "---") # 检查根块 root_block = detail.get("rootBlock") if not root_block: return 0 # 递归检查块类型 def check_block(block): # 检查当前块类型 if block.get("blockType") == SELECT_AGV_TYPE: return True # 递归检查子块 children = block.get("children", {}) for child_key, child_list in children.items(): if isinstance(child_list, list): for child in child_list: if check_block(child): return True return False # 从根块开始检查 if check_block(root_block): return 1 return 0 except json.JSONDecodeError: # logger.error(f"解析任务定义详情JSON失败: {detail_json[:100]}...") return 0 except Exception as e: # logger.error(f"检查任务定义中SELECT_AGV类型异常: {str(e)}") return 0 # import VWED任务模块接口文/ ds = check_task_def_select_agv(json.dumps(s)) print(ds)