tianfeng_task_modules/config/component_config.py
2025-03-17 18:31:20 +08:00

906 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
组件配置模块
包含组件类型、详细配置、注册和常用参数等相关配置信息
"""
from typing import Dict, List, Any, Optional
from enum import Enum
from core.component import ComponentFactory
import importlib
from config.settings import COMPONENT_PACKAGES
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger(__name__)
#################################################
# 组件初始化配置
#################################################
# 组件类别配置
COMPONENT_CATEGORIES = [
{"name": "基础组件", "code": "basic", "description": "基础功能组件", "sort_order": 1},
{"name": "流程组件", "code": "flow", "description": "流程控制组件", "sort_order": 2},
{"name": "机器人组件", "code": "robot", "description": "机器人控制组件", "sort_order": 3},
{"name": "库位组件", "code": "storage", "description": "库位管理组件", "sort_order": 4},
{"name": "设备组件", "code": "device", "description": "设备控制组件", "sort_order": 5},
{"name": "HTTP组件", "code": "http", "description": "HTTP请求组件", "sort_order": 6},
{"name": "脚本组件", "code": "script", "description": "脚本执行组件", "sort_order": 7},
{"name": "子任务组件", "code": "subtask", "description": "子任务组件", "sort_order": 8},
{"name": "任务组件", "code": "task", "description": "任务管理组件", "sort_order": 9}
]
# 组件类型配置
def get_component_types(categories):
"""
获取组件类型配置
Args:
categories: 类别ID映射字典 {类别代码: 类别ID}
Returns:
list: 组件类型配置列表
"""
return [
# 基础组件
{"name": "变量赋值", "code": "variable_assign", "category_id": categories["basic"], "description": "为变量赋值", "icon": "variable", "sort_order": 1},
{"name": "条件判断", "code": "condition", "category_id": categories["basic"], "description": "条件判断", "icon": "condition", "sort_order": 2},
{"name": "延时等待", "code": "delay", "category_id": categories["basic"], "description": "延时等待", "icon": "delay", "sort_order": 3},
# 流程组件
{"name": "If条件", "code": "if", "category_id": categories["flow"], "description": "If条件判断", "icon": "if", "sort_order": 1},
{"name": "If-Else条件", "code": "if_else", "category_id": categories["flow"], "description": "If-Else条件判断", "icon": "if_else", "sort_order": 2},
{"name": "循环", "code": "loop", "category_id": categories["flow"], "description": "循环执行", "icon": "loop", "sort_order": 3},
{"name": "并行执行", "code": "parallel", "category_id": categories["flow"], "description": "并行执行多个分支", "icon": "parallel", "sort_order": 4},
# 机器人组件
{"name": "选择机器人", "code": "select_robot", "category_id": categories["robot"], "description": "选择执行机器人", "icon": "robot", "sort_order": 1},
{"name": "机器人移动", "code": "robot_move", "category_id": categories["robot"], "description": "控制机器人移动", "icon": "move", "sort_order": 2},
{"name": "获取机器人状态", "code": "robot_status", "category_id": categories["robot"], "description": "获取机器人状态", "icon": "status", "sort_order": 3},
# HTTP组件
{"name": "HTTP请求", "code": "http_request", "category_id": categories["http"], "description": "发送HTTP请求", "icon": "http", "sort_order": 1},
{"name": "API调用", "code": "api_call", "category_id": categories["http"], "description": "调用系统API", "icon": "api", "sort_order": 2},
# 脚本组件
{"name": "JavaScript脚本", "code": "javascript", "category_id": categories["script"], "description": "执行JavaScript脚本", "icon": "script", "sort_order": 1},
# 子任务组件
{"name": "执行子任务", "code": "execute_subtask", "category_id": categories["subtask"], "description": "执行子任务", "icon": "subtask", "sort_order": 1}
]
# 系统组件配置
def get_system_components(types):
"""
获取系统组件配置
Args:
types: 组件类型ID映射字典 {类型代码: 类型ID}
Returns:
list: 系统组件配置列表
"""
return [
{
"name": "变量赋值",
"code": "variable_assign",
"type_id": types["variable_assign"],
"description": "为变量赋值",
"is_system": True,
"config_schema": {
"type": "object",
"properties": {
"variable_name": {"type": "string", "title": "变量名"},
"value_type": {"type": "string", "enum": ["string", "number", "boolean", "object"], "title": "值类型"},
"value": {"type": "string", "title": ""}
},
"required": ["variable_name", "value_type", "value"]
},
"input_schema": {},
"output_schema": {
"type": "object",
"properties": {
"result": {"type": "boolean", "title": "执行结果"}
}
}
},
{
"name": "条件判断",
"code": "condition",
"type_id": types["condition"],
"description": "条件判断",
"is_system": True,
"config_schema": {
"type": "object",
"properties": {
"condition": {"type": "string", "title": "条件表达式"}
},
"required": ["condition"]
},
"input_schema": {},
"output_schema": {
"type": "object",
"properties": {
"result": {"type": "boolean", "title": "判断结果"}
}
}
},
{
"name": "If条件",
"code": "if",
"type_id": types["if"],
"description": "If条件判断",
"is_system": True,
"config_schema": {
"type": "object",
"properties": {
"condition": {"type": "string", "title": "条件表达式"}
},
"required": ["condition"]
},
"input_schema": {},
"output_schema": {
"type": "object",
"properties": {
"result": {"type": "boolean", "title": "判断结果"}
}
}
}
]
# 缓存配置
CACHE_EXPIRE_TIME = 86400 # 缓存过期时间单位24小时
CACHE_KEYS = {
"COMPONENT_CATEGORIES": "component_categories",
"COMPONENT_TYPES": "component_types",
"SYSTEM_COMPONENTS": "system_components"
}
#################################################
# 组件自动发现与注册
#################################################
class ComponentDiscovery:
"""组件自动发现配置"""
DEFAULT_PACKAGE = "components" # 默认组件包名
AUTO_REGISTER = True # 是否在启动时自动注册所有组件
def register_all_components():
"""注册所有组件"""
for package_name in COMPONENT_PACKAGES:
try:
ComponentFactory.auto_discover(package_name)
logger.info(f"自动注册组件包: {package_name}")
except ImportError:
logger.error(f"导入组件包失败: {package_name}")
except Exception as e:
logger.error(f"注册组件包失败: {package_name}, 错误: {str(e)}")
#################################################
# 组件分类配置
#################################################
class ComponentCategory:
"""组件类别配置"""
# 组件类别定义
HTTP_REQUEST = "HTTP请求"
SCRIPT = "脚本"
FLOW = "流程"
ROBOT = "机器人调度"
SITE = "库位"
DEVICE = "设备"
SUBTASK = "子任务"
TASK = "任务"
BASE = "基础"
# 组件类别映射配置
# 用于将组件类型映射到对应的类别,便于前端展示
@classmethod
def get_mapping(cls) -> Dict[str, str]:
"""获取组件类型到类别的映射"""
return {
# HTTP请求组件
"http": cls.HTTP_REQUEST,
# 脚本组件
"script": cls.SCRIPT,
# 流程控制组件
"if": cls.FLOW,
"if_else": cls.FLOW,
"if_else_if": cls.FLOW,
"for_each": cls.FLOW,
"while": cls.FLOW,
"break": cls.FLOW,
"return": cls.FLOW,
"delay": cls.FLOW,
# 机器人调度组件
"robot": cls.ROBOT,
# 库位组件
"site": cls.SITE,
# 设备组件
"modbus": cls.DEVICE,
# 子任务组件
"subtask": cls.SUBTASK,
}
# 组件分类优先级
# 控制前端组件面板中各分类的显示顺序
@classmethod
def get_priority(cls) -> List[str]:
"""获取组件类别优先级列表"""
return [
cls.SUBTASK,
cls.SCRIPT,
cls.HTTP_REQUEST,
cls.TASK,
cls.FLOW,
cls.BASE,
cls.SITE,
cls.ROBOT,
cls.DEVICE
]
class ComponentCategoryConfig:
"""组件分类配置"""
@classmethod
def get_category_name(cls, category_enum):
"""获取分类名称"""
from data.models.component import ComponentCategoryEnum
names = {
ComponentCategoryEnum.SUBTASK: "子任务",
ComponentCategoryEnum.SCRIPT: "脚本",
ComponentCategoryEnum.HTTP: "HTTP请求",
ComponentCategoryEnum.TASK: "任务",
ComponentCategoryEnum.FLOW: "流程",
ComponentCategoryEnum.BASIC: "基础",
ComponentCategoryEnum.STORAGE: "库位",
ComponentCategoryEnum.ROBOT: "机器人调度",
ComponentCategoryEnum.DEVICE: "设备"
}
return names.get(category_enum, category_enum.value)
@classmethod
def get_category_description(cls, category_enum):
"""获取分类描述"""
from data.models.component import ComponentCategoryEnum
descriptions = {
ComponentCategoryEnum.SUBTASK: "可重用的子任务组件",
ComponentCategoryEnum.SCRIPT: "执行脚本代码的组件",
ComponentCategoryEnum.HTTP: "发送HTTP请求的组件",
ComponentCategoryEnum.TASK: "任务管理相关组件",
ComponentCategoryEnum.FLOW: "流程控制相关组件",
ComponentCategoryEnum.BASIC: "基础功能组件",
ComponentCategoryEnum.STORAGE: "库位管理相关组件",
ComponentCategoryEnum.ROBOT: "机器人调度相关组件",
ComponentCategoryEnum.DEVICE: "设备控制相关组件"
}
return descriptions.get(category_enum, f"{category_enum.value}类组件")
@classmethod
def get_category_order(cls, category_enum):
"""获取分类排序"""
from data.models.component import ComponentCategoryEnum
orders = {
ComponentCategoryEnum.BASIC: 1,
ComponentCategoryEnum.FLOW: 2,
ComponentCategoryEnum.TASK: 3,
ComponentCategoryEnum.SUBTASK: 4,
ComponentCategoryEnum.SCRIPT: 5,
ComponentCategoryEnum.HTTP: 6,
ComponentCategoryEnum.ROBOT: 7,
ComponentCategoryEnum.STORAGE: 8,
ComponentCategoryEnum.DEVICE: 9
}
return orders.get(category_enum, 99)
#################################################
# 组件详细配置
#################################################
class ScriptComponentConfig:
"""脚本组件配置"""
# 脚本组件类型
RUN_SCRIPT = "run_script" # 运行脚本
SET_VARIABLES = "set_variables" # 设置task.variables
# 脚本组件详细配置
@classmethod
def get_components(cls) -> List[Dict[str, Any]]:
"""获取脚本组件列表"""
return [
{
"type": "script",
"sub_type": cls.RUN_SCRIPT,
"name": "运行脚本",
"description": "执行JavaScript代码并返回结果",
"icon": "code", # 图标名称,前端可用
"params": [
{
"name": "function_name",
"label": "函数名",
"type": "string",
"required": False,
"description": "定义脚本中的主函数名称",
"value_types": [
{
"type": "simple",
"label": "简单值",
"default": True
},
{
"type": "expression",
"label": "表达式",
"default": False
}
]
},
{
"name": "params",
"label": "函数参数",
"type": "array",
"required": False,
"description": "传递给脚本的参数",
"value_types": [
{
"type": "simple",
"label": "简单值",
"default": True
},
{
"type": "expression",
"label": "表达式",
"default": False
}
]
}
]
},
{
"type": "script",
"sub_type": cls.SET_VARIABLES,
"name": "设置task.variables",
"description": "设置和管理任务变量",
"icon": "variable",
"params": [
{
"name": "function_name",
"label": "函数名",
"type": "string",
"required": False,
"description": "定义脚本中的主函数名称",
"value_types": [
{
"type": "simple",
"label": "简单值",
"default": True
},
{
"type": "expression",
"label": "表达式",
"default": False
}
]
},
{
"name": "params",
"label": "函数参数",
"type": "array",
"required": False,
"description": "传递给脚本的参数",
"value_types": [
{
"type": "simple",
"label": "简单值",
"default": True
},
{
"type": "expression",
"label": "表达式",
"default": False
}
]
}
]
}
]
class HttpComponentConfig:
"""HTTP请求组件配置"""
# HTTP请求组件类型
HTTP_REQUEST = "http_request" # HTTP请求
# HTTP请求组件详细配置
@classmethod
def get_components(cls) -> List[Dict[str, Any]]:
"""获取HTTP请求组件列表"""
return [
{
"type": "http",
"sub_type": cls.HTTP_REQUEST,
"name": "HTTP请求",
"description": "发送HTTP请求并处理响应",
"icon": "http",
"params": [
{
"name": "method",
"label": "请求方法",
"type": "select",
"options": ["GET", "POST", "PUT", "DELETE", "PATCH"],
"required": True,
"description": "HTTP请求方法"
},
{
"name": "url",
"label": "请求URL",
"type": "string",
"required": True,
"description": "请求的目标URL"
},
{
"name": "headers",
"label": "请求头",
"type": "object",
"required": False,
"description": "HTTP请求头"
},
{
"name": "body",
"label": "请求体",
"type": "object",
"required": False,
"description": "HTTP请求体"
},
{
"name": "timeout",
"label": "超时时间",
"type": "number",
"required": False,
"description": "请求超时时间(毫秒)"
}
]
}
]
class FlowComponentConfig:
"""流程控制组件配置"""
# 流程控制组件类型
IF = "if" # 条件判断
IF_ELSE = "if_else" # 条件分支
FOR_EACH = "for_each" # 循环遍历
WHILE = "while" # 条件循环
# 流程控制组件详细配置
@classmethod
def get_components(cls) -> List[Dict[str, Any]]:
"""获取流程控制组件列表"""
return [
{
"type": "flow",
"sub_type": cls.IF,
"name": "条件判断",
"description": "根据条件执行不同的操作",
"icon": "branch",
"params": [
{
"name": "condition",
"label": "条件表达式",
"type": "expression",
"required": True,
"description": "条件判断表达式"
}
]
},
{
"type": "flow",
"sub_type": cls.IF_ELSE,
"name": "条件分支",
"description": "根据条件执行不同的分支",
"icon": "branch-multiple",
"params": [
{
"name": "condition",
"label": "条件表达式",
"type": "expression",
"required": True,
"description": "条件判断表达式"
}
]
},
{
"type": "flow",
"sub_type": cls.FOR_EACH,
"name": "循环遍历",
"description": "遍历数组或对象的每个元素",
"icon": "loop",
"params": [
{
"name": "collection",
"label": "集合表达式",
"type": "expression",
"required": True,
"description": "要遍历的数组或对象"
},
{
"name": "item_name",
"label": "元素变量名",
"type": "string",
"required": True,
"description": "当前元素的变量名"
},
{
"name": "index_name",
"label": "索引变量名",
"type": "string",
"required": False,
"description": "当前索引的变量名"
}
]
},
{
"type": "flow",
"sub_type": cls.WHILE,
"name": "条件循环",
"description": "当条件为真时重复执行",
"icon": "loop-circular",
"params": [
{
"name": "condition",
"label": "条件表达式",
"type": "expression",
"required": True,
"description": "循环条件表达式"
}
]
}
]
class SubtaskComponentConfig:
"""子任务组件配置"""
# 子任务组件类型
SUBTASK = "subtask" # 子任务
# 子任务组件详细配置
@classmethod
def get_components(cls) -> List[Dict[str, Any]]:
"""获取子任务组件列表"""
return [
{
"type": "subtask",
"sub_type": cls.SUBTASK,
"name": "子任务",
"description": "执行已定义的任务作为子任务",
"icon": "subtask",
"params": [
{
"name": "task_id",
"label": "选择子任务",
"type": "select",
"required": True,
"description": "选择要执行的子任务(从已创建的任务中选择)",
"data_source": "available_subtasks", # 指示前端从API返回的available_subtasks字段获取数据
"display_field": "name", # 显示任务名称
"value_field": "task_id" # 使用任务ID作为值
},
{
"name": "params",
"label": "任务参数",
"type": "object",
"required": False,
"description": "传递给子任务的参数"
},
{
"name": "wait_complete",
"label": "等待完成",
"type": "boolean",
"required": False,
"default": True,
"description": "是否等待子任务完成后再继续执行"
}
]
}
]
class ComponentDetailConfig:
"""组件详细配置管理"""
# 组件类型中文名称映射
@classmethod
def get_component_type_names(cls) -> Dict[str, str]:
"""获取组件类型的中文名称映射"""
return {
# 基础类型
"script": "脚本",
"http": "HTTP请求",
"flow": "流程",
"robot": "机器人调度",
"site": "库位",
"device": "设备",
"subtask": "子任务",
"task": "任务",
"basic": "基础",
# 脚本组件
"run_script": "运行脚本",
"set_task_variables": "设置任务变量",
"runscript": "运行脚本",
"settaskvariables": "设置任务变量",
# HTTP请求组件
"http_get_request": "GET请求",
"http_post_request": "POST请求",
"httpgetrequest": "GET请求",
"httppostrequest": "POST请求",
# 流程控制组件
"if": "条件判断",
"if_else": "条件分支",
"if_else_if": "多条件分支",
"for_each": "循环遍历",
"while": "条件循环",
"break": "跳出循环",
"return": "返回",
"delay": "延时",
"parallel_execute": "并行执行",
"serial_execute": "串行执行",
"throw_exception": "抛出异常",
"foreach": "循环遍历",
"ifelse": "条件分支",
"ifelseif": "多条件分支",
"parallelexecute": "并行执行",
"serialexecute": "串行执行",
"throwexception": "抛出异常",
# 机器人调度组件
"select_robot": "选择机器人",
"get_robot_position": "获取机器人位置",
"robot_action": "机器人动作",
"change_robot_destination": "更改机器人目的地",
"get_robot_battery": "获取机器人电量",
"get_robot_pgv_code": "获取机器人PGV码",
"changerobotdestination": "更改机器人目的地",
"getrobotbattery": "获取机器人电量",
"getrobotpgvcode": "获取机器人PGV码",
"getrobotposition": "获取机器人位置",
"robotaction": "机器人动作",
"selectrobot": "选择机器人",
# 库位组件
"batch_set_site": "批量设置库位",
"get_dense_site": "获取密集库位",
"query_site": "查询库位",
"lock_site": "锁定库位",
"unlock_site": "解锁库位",
"get_locked_sites_by_task": "获取任务锁定的库位",
"get_site_extension_property": "获取库位扩展属性",
"set_site_extension_property": "设置库位扩展属性",
"set_site_goods": "设置库位货物",
"set_site_empty": "设置库位为空",
"set_site_occupied": "设置库位为占用",
"set_site_tag": "设置库位标签",
"batchsetsite": "批量设置库位",
"getdensesite": "获取密集库位",
"getlockedsitesbytask": "获取任务锁定的库位",
"getsiteextensionproperty": "获取库位扩展属性",
"locksite": "锁定库位",
"querysite": "查询库位",
"setsiteempty": "设置库位为空",
"setsiteextensionproperty": "设置库位扩展属性",
"setsitegoods": "设置库位货物",
"setsiteoccupied": "设置库位为占用",
"setsitetag": "设置库位标签",
"unlocksite": "解锁库位",
# 任务组件
"cache_data": "缓存数据",
"clear_cache_data": "清除缓存数据",
"get_cache_data": "获取缓存数据",
"set_task_status": "设置任务状态",
"jump_to_block": "跳转到块",
"get_task_input_param": "获取任务输入参数",
"cachedata": "缓存数据",
"clearcachedata": "清除缓存数据",
"getcachedata": "获取缓存数据",
"gettaskinputparam": "获取任务输入参数",
"jumptoblock": "跳转到块",
"settaskstatus": "设置任务状态",
# 基础组件
"check_task_instance_id_exists": "检查任务实例ID是否存在",
"create_unique_id": "创建唯一ID",
"current_timestamp": "当前时间戳",
"current_time": "当前时间",
"execute_sql": "执行SQL",
"query_sql": "查询SQL",
"string_md5_encrypt": "字符串MD5加密",
"string_to_json_array": "字符串转JSON数组",
"string_to_json_object": "字符串转JSON对象",
"print": "打印",
"checktaskinstanceidexists": "检查任务实例ID是否存在",
"createuniqueid": "创建唯一ID",
"currenttime": "当前时间",
"currenttimestamp": "当前时间戳",
"executesql": "执行SQL",
"querysql": "查询SQL",
"stringmd5encrypt": "字符串MD5加密",
"stringtojsonarray": "字符串转JSON数组",
"stringtojsonobject": "字符串转JSON对象",
# 设备组件
"wait_modbus_value": "等待Modbus值",
"write_modbus_value": "写入Modbus值",
"waitmodbusvalue": "等待Modbus值",
"writemodbusvalue": "写入Modbus值"
}
@classmethod
def get_all_components(cls) -> List[Dict[str, Any]]:
"""获取所有组件详细配置"""
all_components = []
# 添加子任务组件(放在第一位)
all_components.extend(SubtaskComponentConfig.get_components())
# 添加脚本组件
all_components.extend(ScriptComponentConfig.get_components())
# 添加HTTP请求组件
all_components.extend(HttpComponentConfig.get_components())
# 添加流程控制组件
all_components.extend(FlowComponentConfig.get_components())
# 可以继续添加其他类型的组件...
return all_components
@classmethod
def get_components_by_type(cls, component_type: str) -> List[Dict[str, Any]]:
"""根据组件类型获取组件列表"""
all_components = cls.get_all_components()
return [comp for comp in all_components if comp["type"] == component_type]
#################################################
# 常用参数配置
#################################################
class CommonParamType(str, Enum):
"""常用参数类型枚举"""
ROBOT_ID = "robot_id" # 机器人ID
ROBOT_GROUP = "robot_group" # 机器人组
ROBOT_TAG = "robot_tag" # 机器人标签
STORAGE_AREA_ID = "storage_area_id" # 库区ID
STORAGE_AREA = "storage_area" # 库区
SITE = "site" # 站点
BIN_TASK = "bin_task" # binTask
WORKSTATION = "workstation" # 工位
POST = "post" # 岗位
USER = "user" # 用户
CACHE = "cache" # 缓存
BUILT_IN_FUNCTION = "built_in_function" # 内置函数
class CommonParamsConfig:
"""常用参数配置类"""
@classmethod
def get_param_types(cls) -> List[Dict[str, Any]]:
"""获取所有常用参数类型"""
return [
{
"type": CommonParamType.ROBOT_ID,
"name": "机器人ID",
"description": "选择机器人ID",
"api_path": "/api/robots/ids", # 获取数据的API路径
"value_field": "id", # 值字段
"display_field": "name" # 显示字段
},
{
"type": CommonParamType.ROBOT_GROUP,
"name": "机器人组",
"description": "选择机器人组",
"api_path": "/api/robots/groups",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.ROBOT_TAG,
"name": "机器人标签",
"description": "选择机器人标签",
"api_path": "/api/robots/tags",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.STORAGE_AREA_ID,
"name": "库区ID",
"description": "选择库区ID",
"api_path": "/api/storage/areas/ids",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.STORAGE_AREA,
"name": "库区",
"description": "选择库区",
"api_path": "/api/storage/areas",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.SITE,
"name": "站点",
"description": "选择站点",
"api_path": "/api/sites",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.BIN_TASK,
"name": "binTask",
"description": "选择binTask",
"api_path": "/api/bin-tasks",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.WORKSTATION,
"name": "工位",
"description": "选择工位",
"api_path": "/api/workstations",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.POST,
"name": "岗位",
"description": "选择岗位",
"api_path": "/api/posts",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.USER,
"name": "用户",
"description": "选择用户",
"api_path": "/api/users",
"value_field": "id",
"display_field": "name"
},
{
"type": CommonParamType.CACHE,
"name": "缓存",
"description": "选择缓存",
"api_path": "/api/caches",
"value_field": "key",
"display_field": "key"
},
{
"type": CommonParamType.BUILT_IN_FUNCTION,
"name": "内置函数",
"description": "选择内置函数",
"api_path": "/api/built-in-functions",
"value_field": "name",
"display_field": "name"
}
]
@classmethod
def get_param_type_by_type(cls, param_type: str) -> Optional[Dict[str, Any]]:
"""根据类型获取参数类型配置"""
param_types = cls.get_param_types()
for pt in param_types:
if pt["type"] == param_type:
return pt
return None