237 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

s = {
"inputParams": [],
"outputParams": [],
"rootBlock": {
"id": -1,
"name": "-1",
"blockType": "RootBp",
"inputParams": {},
"children": {
"default": [
{
"id": 9,
"name": "b5",
"blockType": "GetIdleCrowdedSiteBp",
"children": {},
"inputParams": {
"groupName": {
"value": "[\"AS2_2\"]",
"type": "Simple",
"required": True
},
"filled": {
"value": "true",
"type": "Simple",
"required": True
},
"content": {
"value": "",
"type": "Simple",
"required": True
},
"lock": {
"value": "",
"type": "Simple",
"required": True
},
"retry": {
"value": "false",
"type": "Simple",
"required": True
},
"retryPeriod": {
"value": "",
"type": "Simple",
"required": True
},
"retryNum": {
"value": "",
"type": "Simple",
"required": True
}
},
"refTaskDefId": "",
"selected": True,
"expanded": True,
"label": "获取密集库位"
},
{
"id": 10,
"name": "b6",
"blockType": "GetIdleCrowdedSiteBp",
"children": {},
"inputParams": {
"groupName": {
"value": "AS2_2",
"type": "Simple",
"required": True
},
"filled": {
"value": "true",
"type": "Simple",
"required": True
},
"content": {
"value": "",
"type": "Simple",
"required": True
},
"lock": {
"value": "",
"type": "Simple",
"required": True
},
"retry": {
"value": "true",
"type": "Simple",
"required": True
},
"retryPeriod": {
"value": "",
"type": "Simple",
"required": True
},
"retryNum": {
"value": "",
"type": "Simple",
"required": True
}
},
"refTaskDefId": "",
"selected": True,
"expanded": True,
"label": "获取密集库位"
},
{
"id": 11,
"name": "b7",
"blockType": "CSelectAgvBp",
"children": {
"default": [
{
"id": 9,
"name": "b8",
"blockType": "CAgvOperationBp",
"children": {},
"inputParams": {
"targetSiteLabel": {
"value": "AP1",
"type": "Simple",
"required": True
},
"spin": {
"value": True,
"type": "Simple",
"required": True
},
"task": {
"value": "",
"type": "Simple",
"required": True
}
},
"refTaskDefId": "",
"selected": True,
"expanded": True,
"label": "机器人通用动作"
}
]
},
"inputParams": {
"priority": {
"value": "",
"type": "Simple",
"required": True
},
"vehicle": {
"value": "",
"type": "Simple",
"required": True
},
"group": {
"value": "",
"type": "Simple",
"required": True
},
"keyRoute": {
"value": "blocks.b5.siteId",
"type": "Expression",
"required": True
}
},
"refTaskDefId": "",
"selected": True,
"expanded": True,
"label": "选择执行机器人"
}
]
},
"selected": True,
"refTaskDefId": "",
"expanded": True
}
}
import json
def check_task_def_select_agv(detail_json: str) -> int:
"""
检查任务定义详情中是否包含SELECT_AGV块类型
Args:
detail_json: 任务定义详情JSON字符串
Returns:
int: 存在返回1不存在返回0
"""
# from services.execution.handlers.model.block_name import RobotBlockName
try:
# 解析JSON字符串
print("---")
if not detail_json:
return 0
print("========")
detail = json.loads(detail_json)
if not detail:
return 0
# 选择AGV块类型常量
SELECT_AGV_TYPE = "CSelectAgvBp"
# print(SELECT_AGV_TYPE, "---")
# 检查根块
root_block = detail.get("rootBlock")
if not root_block:
return 0
# 递归检查块类型
def check_block(block):
# 检查当前块类型
if block.get("blockType") == SELECT_AGV_TYPE:
return True
# 递归检查子块
children = block.get("children", {})
for child_key, child_list in children.items():
if isinstance(child_list, list):
for child in child_list:
if check_block(child):
return True
return False
# 从根块开始检查
if check_block(root_block):
return 1
return 0
except json.JSONDecodeError:
# logger.error(f"解析任务定义详情JSON失败: {detail_json[:100]}...")
return 0
except Exception as e:
# logger.error(f"检查任务定义中SELECT_AGV类型异常: {str(e)}")
return 0
# import VWED任务模块接口文/
ds = check_task_def_select_agv(json.dumps(s))
print(ds)