#!/usr/bin/env python # -*- coding: utf-8 -*- """ 脚本处理器模块 提供脚本执行和变量设置的处理器 """ import logging from typing import Dict, Any from services.execution.task_context import TaskContext from utils.logger import get_logger from .model.block_name import ScriptBlockName from .base import BlockHandler, register_handler # 获取日志记录器 logger = get_logger("services.execution.handlers.script") # 脚本块处理器 @register_handler(ScriptBlockName.SET_TASK_VARIABLES) class SetTaskVariablesBlockHandler(BlockHandler): """设置任务变量块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行设置任务变量块""" try: # 获取函数名和参数 function_name = input_params.get("functionName") function_args = input_params.get("functionArgs", {}) if not function_name: result = { "success": False, "message": "缺少函数名" } # 记录执行结果 await self._record_task_log(block, result, context) return result # 调用脚本执行方法 exec_result = await self._execute_script(function_name, function_args, context) # 如果执行成功,将变量保存到任务记录 if exec_result.get("success", False): await self._save_variables_to_database(context) # 记录执行结果 await self._record_task_log(block, exec_result, context) return exec_result except Exception as e: result = { "success": False, "message": f"设置任务变量失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _execute_script( self, function_name: str, function_args: Any, context: TaskContext ) -> Dict[str, Any]: """ 执行指定的脚本函数 Args: function_name: 要执行的函数名 function_args: 函数参数 context: 任务上下文 Returns: Dict[str, Any]: 执行结果 """ # 固定加载scripts/user_save/test1.py文件 script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py" logger.info(f"正在加载脚本文件: {script_file}") try: # 动态加载脚本模块 import importlib.util import sys import os # 检查文件是否存在 if not os.path.exists(script_file): return { "success": False, "message": f"脚本文件不存在: {script_file}" } # 加载模块 spec = importlib.util.spec_from_file_location("user_script", script_file) if not spec: return { "success": False, "message": f"无法加载脚本规范: {script_file}" } module = importlib.util.module_from_spec(spec) sys.modules["user_script"] = module spec.loader.exec_module(module) # 检查函数是否存在 if not hasattr(module, function_name): return { "success": False, "message": f"函数 {function_name} 在脚本 {script_file} 中不存在" } # 获取函数对象 func = getattr(module, function_name) # 准备函数参数(处理不同类型的参数传递方式) args = [] kwargs = {} # 如果function_args是字典,将其作为关键字参数传递 if isinstance(function_args, dict): kwargs = function_args # 如果function_args是列表,将其作为位置参数传递 elif isinstance(function_args, list): args = function_args # 如果function_args是其他类型,将其作为单个位置参数传递 elif function_args is not None: args = [function_args] # 调用函数 logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}") result_value = func(*args, **kwargs) # 检查是否是异步函数并等待结果 import inspect if inspect.iscoroutine(result_value): import asyncio result_value = await result_value logger.info(f"函数 {function_name} 执行结果: {result_value}") # 设置返回值到上下文 - 与ScriptBp不同,这里会更新所有变量 if isinstance(result_value, dict): # 如果返回值是字典,将字典中的每个键值对设置为任务变量 for key, value in result_value.items(): context.set_variable(key, value) result_message = f"脚本函数 {function_name} 执行成功,已更新 {len(result_value)} 个变量" else: # 如果返回值不是字典,将整个返回值设置为scriptResult变量 context.set_variable("scriptResult", result_value) result_message = f"脚本函数 {function_name} 执行成功,结果已保存为scriptResult变量" # 构建成功结果 return { "success": True, "message": result_message, "output": { "result": result_value, "functionName": function_name } } except Exception as e: logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True) return { "success": False, "message": f"执行脚本函数 {function_name} 失败: {str(e)}" } async def _save_variables_to_database(self, context: TaskContext) -> None: """ 将任务变量保存到数据库的任务记录中 Args: context: 任务上下文 """ try: from sqlalchemy import update from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session import json # 获取任务记录ID task_record_id = context.task_record_id if not task_record_id: logger.error("无法保存变量,任务记录ID为空") return # 将任务变量转换为JSON字符串 variables_json = json.dumps(context.variables, ensure_ascii=False) # 更新数据库记录 async with get_async_session() as session: stmt = update(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id).values( variables=variables_json ) await session.execute(stmt) await session.commit() logger.info(f"已将任务变量保存到数据库记录 {task_record_id}") except Exception as e: logger.error(f"保存任务变量到数据库失败: {str(e)}") # 脚本块处理器 @register_handler(ScriptBlockName.SCRIPT) class ScriptBlockHandler(BlockHandler): """脚本块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行脚本块""" try: # 获取函数名和参数 function_name = input_params.get("functionName") function_args = input_params.get("functionArgs", {}) if not function_name: result = { "success": False, "message": "缺少函数名" } # 记录执行结果 await self._record_task_log(block, result, context) return result # 调用脚本执行方法 exec_result = await self._execute_script(function_name, function_args, context) # 记录执行结果 await self._record_task_log(block, exec_result, context) return exec_result except Exception as e: result = { "success": False, "message": f"脚本执行失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _execute_script( self, function_name: str, function_args: Any, context: TaskContext ) -> Dict[str, Any]: """ 执行指定的脚本函数 Args: function_name: 要执行的函数名 function_args: 函数参数 context: 任务上下文 Returns: Dict[str, Any]: 执行结果 """ # 固定加载scripts/user_save/test1.py文件 script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py" logger.info(f"正在加载脚本文件: {script_file}") try: # 动态加载脚本模块 import importlib.util import sys import os # 检查文件是否存在 if not os.path.exists(script_file): return { "success": False, "message": f"脚本文件不存在: {script_file}" } # 加载模块 spec = importlib.util.spec_from_file_location("user_script", script_file) if not spec: return { "success": False, "message": f"无法加载脚本规范: {script_file}" } module = importlib.util.module_from_spec(spec) sys.modules["user_script"] = module spec.loader.exec_module(module) # 检查函数是否存在 if not hasattr(module, function_name): return { "success": False, "message": f"函数 {function_name} 在脚本 {script_file} 中不存在" } # 获取函数对象 func = getattr(module, function_name) # 准备函数参数(处理不同类型的参数传递方式) args = [] kwargs = {} # 如果function_args是字典,将其作为关键字参数传递 if isinstance(function_args, dict): kwargs = function_args # 如果function_args是列表,将其作为位置参数传递 elif isinstance(function_args, list): args = function_args # 如果function_args是其他类型,将其作为单个位置参数传递 elif isinstance(function_args, str): eval_result = eval(function_args) if isinstance(eval_result, dict): kwargs = eval_result elif isinstance(eval_result, list): args = eval_result else: args = [eval_result] elif function_args is not None: args = [function_args] # 调用函数 logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}") result_value = func(*args, **kwargs) # 检查是否是异步函数并等待结果 import inspect if inspect.iscoroutine(result_value): import asyncio result_value = await result_value logger.info(f"函数 {function_name} 执行结果: {result_value}") # 设置返回值到上下文 context.set_variable("scriptResult", result_value) # 构建成功结果 return { "success": True, "message": f"脚本函数 {function_name} 执行成功", "output": { "result": result_value, "functionName": function_name } } except Exception as e: logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True) return { "success": False, "message": f"执行脚本函数 {function_name} 失败: {str(e)}" }