# components/script_components.py """ 脚本执行组件 """ from typing import Dict, Any, Optional from core.component import Component, ComponentFactory from core.exceptions import ComponentError @ComponentFactory.register("run_script") class RunScriptComponent(Component): """脚本执行组件""" def execute(self) -> Dict[str, Any]: # 获取参数 script_code = self.resolve_param("script_code") params = self.resolve_param("params", {}) # 验证必要参数 self.validate_required_params(["script_code"]) # 创建本地变量字典 locals_dict = { "params": params, "context": self.context, "blocks": self.context.blocks, "variables": self.context.variables, "task": { "id": self.context.task_id, "variables": self.context.variables }, "taskInputs": self.context.task_inputs } try: # 执行脚本代码 exec(script_code, {"__builtins__": __builtins__}, locals_dict) # 获取结果 result = {"result": locals_dict.get("result", None)} # 存储结果 self.store_result(result) return result except Exception as e: error_result = {"error": str(e)} self.store_result(error_result) raise ComponentError(f"脚本执行失败: {str(e)}") @ComponentFactory.register("set_task_variables") class SetTaskVariablesComponent(Component): """设置任务变量组件""" def execute(self) -> Dict[str, Any]: # 获取参数 variables = self.params.copy() # 移除特殊参数 if "id" in variables: del variables["id"] updated_vars = {} # 设置任务变量 for name, value in variables.items(): resolved_value = self.resolve_param(name) self.context.set_variable(name, resolved_value) updated_vars[name] = resolved_value result = {"updated_variables": list(updated_vars.keys())} # 存储结果 self.store_result(result) return result