#!/usr/bin/env python # -*- coding: utf-8 -*- """ 组件配置管理模块 负责加载和管理任务编辑器中使用的组件配置 """ import os import json from typing import Dict, List, Any, Optional from utils.logger import get_logger logger = get_logger("utils.component_manager") class ComponentManager: """组件配置管理类""" def __init__(self) -> None: """初始化组件管理器""" # 组件配置目录 self._components_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'components') # 组件类别缓存 self._components_cache: Dict[str, Any] = {} # 组件类型映射 self._block_types: Dict[str, Any] = {} # 按顺序排列的组件类别 self._ordered_categories: List[Dict[str, Any]] = [] # 加载所有组件 self._load_all_components() def _load_all_components(self) -> None: """加载所有组件配置文件""" try: if not os.path.exists(self._components_dir): logger.error(f"组件目录不存在: {self._components_dir}") return # 加载所有组件配置文件 for filename in os.listdir(self._components_dir): if filename.endswith('.json'): component_type = os.path.splitext(filename)[0] file_path = os.path.join(self._components_dir, filename) try: with open(file_path, 'r', encoding='utf-8') as f: component_data = json.load(f) self._components_cache[component_type] = component_data # 记录每个block类型对应的信息 if 'blocks' in component_data: for block in component_data['blocks']: self._block_types[block['name']] = block except Exception as e: logger.error(f"加载组件配置文件失败: {file_path}, 错误: {str(e)}") # 按order排序分类 ordered_categories = sorted( [comp for comp in self._components_cache.values()], key=lambda x: x.get('order', 999) ) self._ordered_categories = ordered_categories logger.info(f"已加载 {len(self._components_cache)} 个组件分类,共 {len(self._block_types)} 个组件类型") except Exception as e: logger.error(f"加载组件配置失败: {str(e)}") def get_all_blocks(self) -> List[Dict[str, Any]]: """获取所有组件分类及其对应的组件""" return self._ordered_categories def get_block_type(self, block_type: str) -> Optional[Dict[str, Any]]: """获取指定类型的组件定义""" return self._block_types.get(block_type) def get_component_category(self, category: str) -> Optional[Dict[str, Any]]: """获取指定分类的组件配置""" return self._components_cache.get(category) def reload_components(self) -> None: """重新加载所有组件配置""" self._components_cache = {} self._block_types = {} self._ordered_categories = [] self._load_all_components() # 创建组件管理器单例 component_manager = ComponentManager()