359 lines
12 KiB
Python
359 lines
12 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
脚本处理器模块
|
||
提供脚本执行和变量设置的处理器
|
||
"""
|
||
|
||
import logging
|
||
from typing import Dict, Any
|
||
from services.execution.task_context import TaskContext
|
||
from utils.logger import get_logger
|
||
from .model.block_name import ScriptBlockName
|
||
from .base import BlockHandler, register_handler
|
||
|
||
|
||
# 获取日志记录器
|
||
logger = get_logger("services.execution.handlers.script")
|
||
|
||
|
||
# 脚本块处理器
|
||
@register_handler(ScriptBlockName.SET_TASK_VARIABLES)
|
||
class SetTaskVariablesBlockHandler(BlockHandler):
|
||
"""设置任务变量块处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""执行设置任务变量块"""
|
||
try:
|
||
# 获取函数名和参数
|
||
function_name = input_params.get("functionName")
|
||
function_args = input_params.get("functionArgs", {})
|
||
|
||
if not function_name:
|
||
result = {
|
||
"success": False,
|
||
"message": "缺少函数名"
|
||
}
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 调用脚本执行方法
|
||
exec_result = await self._execute_script(function_name, function_args, context)
|
||
|
||
# 如果执行成功,将变量保存到任务记录
|
||
if exec_result.get("success", False):
|
||
await self._save_variables_to_database(context)
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, exec_result, context)
|
||
return exec_result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"设置任务变量失败: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
async def _execute_script(
|
||
self,
|
||
function_name: str,
|
||
function_args: Any,
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
执行指定的脚本函数
|
||
|
||
Args:
|
||
function_name: 要执行的函数名
|
||
function_args: 函数参数
|
||
context: 任务上下文
|
||
|
||
Returns:
|
||
Dict[str, Any]: 执行结果
|
||
"""
|
||
# 固定加载scripts/user_save/test1.py文件
|
||
script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py"
|
||
logger.info(f"正在加载脚本文件: {script_file}")
|
||
|
||
try:
|
||
# 动态加载脚本模块
|
||
import importlib.util
|
||
import sys
|
||
import os
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(script_file):
|
||
return {
|
||
"success": False,
|
||
"message": f"脚本文件不存在: {script_file}"
|
||
}
|
||
|
||
# 加载模块
|
||
spec = importlib.util.spec_from_file_location("user_script", script_file)
|
||
if not spec:
|
||
return {
|
||
"success": False,
|
||
"message": f"无法加载脚本规范: {script_file}"
|
||
}
|
||
module = importlib.util.module_from_spec(spec)
|
||
sys.modules["user_script"] = module
|
||
spec.loader.exec_module(module)
|
||
|
||
# 检查函数是否存在
|
||
if not hasattr(module, function_name):
|
||
return {
|
||
"success": False,
|
||
"message": f"函数 {function_name} 在脚本 {script_file} 中不存在"
|
||
}
|
||
# 获取函数对象
|
||
func = getattr(module, function_name)
|
||
|
||
# 准备函数参数(处理不同类型的参数传递方式)
|
||
args = []
|
||
kwargs = {}
|
||
|
||
# 如果function_args是字典,将其作为关键字参数传递
|
||
if isinstance(function_args, dict):
|
||
kwargs = function_args
|
||
# 如果function_args是列表,将其作为位置参数传递
|
||
elif isinstance(function_args, list):
|
||
args = function_args
|
||
# 如果function_args是其他类型,将其作为单个位置参数传递
|
||
elif function_args is not None:
|
||
args = [function_args]
|
||
|
||
# 调用函数
|
||
logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}")
|
||
result_value = func(*args, **kwargs)
|
||
|
||
# 检查是否是异步函数并等待结果
|
||
import inspect
|
||
if inspect.iscoroutine(result_value):
|
||
import asyncio
|
||
result_value = await result_value
|
||
|
||
logger.info(f"函数 {function_name} 执行结果: {result_value}")
|
||
|
||
# 设置返回值到上下文 - 与ScriptBp不同,这里会更新所有变量
|
||
if isinstance(result_value, dict):
|
||
# 如果返回值是字典,将字典中的每个键值对设置为任务变量
|
||
for key, value in result_value.items():
|
||
context.set_variable(key, value)
|
||
|
||
result_message = f"脚本函数 {function_name} 执行成功,已更新 {len(result_value)} 个变量"
|
||
else:
|
||
# 如果返回值不是字典,将整个返回值设置为scriptResult变量
|
||
context.set_variable("scriptResult", result_value)
|
||
result_message = f"脚本函数 {function_name} 执行成功,结果已保存为scriptResult变量"
|
||
|
||
# 构建成功结果
|
||
return {
|
||
"success": True,
|
||
"message": result_message,
|
||
"output": {
|
||
"result": result_value,
|
||
"functionName": function_name
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True)
|
||
return {
|
||
"success": False,
|
||
"message": f"执行脚本函数 {function_name} 失败: {str(e)}"
|
||
}
|
||
|
||
async def _save_variables_to_database(self, context: TaskContext) -> None:
|
||
"""
|
||
将任务变量保存到数据库的任务记录中
|
||
|
||
Args:
|
||
context: 任务上下文
|
||
"""
|
||
try:
|
||
from sqlalchemy import update
|
||
from data.models.taskrecord import VWEDTaskRecord
|
||
from data.session import get_async_session
|
||
import json
|
||
|
||
# 获取任务记录ID
|
||
task_record_id = context.task_record_id
|
||
|
||
if not task_record_id:
|
||
logger.error("无法保存变量,任务记录ID为空")
|
||
return
|
||
|
||
# 将任务变量转换为JSON字符串
|
||
variables_json = json.dumps(context.variables, ensure_ascii=False)
|
||
|
||
# 更新数据库记录
|
||
async with get_async_session() as session:
|
||
stmt = update(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id).values(
|
||
variables=variables_json
|
||
)
|
||
await session.execute(stmt)
|
||
await session.commit()
|
||
|
||
logger.info(f"已将任务变量保存到数据库记录 {task_record_id}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存任务变量到数据库失败: {str(e)}")
|
||
|
||
|
||
# 脚本块处理器
|
||
@register_handler(ScriptBlockName.SCRIPT)
|
||
class ScriptBlockHandler(BlockHandler):
|
||
"""脚本块处理器"""
|
||
|
||
async def execute(
|
||
self,
|
||
block: Dict[str, Any],
|
||
input_params: Dict[str, Any],
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""执行脚本块"""
|
||
try:
|
||
# 获取函数名和参数
|
||
function_name = input_params.get("functionName")
|
||
function_args = input_params.get("functionArgs", {})
|
||
|
||
if not function_name:
|
||
result = {
|
||
"success": False,
|
||
"message": "缺少函数名"
|
||
}
|
||
# 记录执行结果
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
# 调用脚本执行方法
|
||
exec_result = await self._execute_script(function_name, function_args, context)
|
||
|
||
# 记录执行结果
|
||
await self._record_task_log(block, exec_result, context)
|
||
return exec_result
|
||
except Exception as e:
|
||
result = {
|
||
"success": False,
|
||
"message": f"脚本执行失败: {str(e)}"
|
||
}
|
||
# 记录异常
|
||
await self._record_task_log(block, result, context)
|
||
return result
|
||
|
||
async def _execute_script(
|
||
self,
|
||
function_name: str,
|
||
function_args: Any,
|
||
context: TaskContext
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
执行指定的脚本函数
|
||
|
||
Args:
|
||
function_name: 要执行的函数名
|
||
function_args: 函数参数
|
||
context: 任务上下文
|
||
|
||
Returns:
|
||
Dict[str, Any]: 执行结果
|
||
"""
|
||
# 固定加载scripts/user_save/test1.py文件
|
||
script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py"
|
||
logger.info(f"正在加载脚本文件: {script_file}")
|
||
|
||
try:
|
||
# 动态加载脚本模块
|
||
import importlib.util
|
||
import sys
|
||
import os
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(script_file):
|
||
return {
|
||
"success": False,
|
||
"message": f"脚本文件不存在: {script_file}"
|
||
}
|
||
|
||
# 加载模块
|
||
spec = importlib.util.spec_from_file_location("user_script", script_file)
|
||
if not spec:
|
||
return {
|
||
"success": False,
|
||
"message": f"无法加载脚本规范: {script_file}"
|
||
}
|
||
module = importlib.util.module_from_spec(spec)
|
||
sys.modules["user_script"] = module
|
||
spec.loader.exec_module(module)
|
||
# 检查函数是否存在
|
||
if not hasattr(module, function_name):
|
||
return {
|
||
"success": False,
|
||
"message": f"函数 {function_name} 在脚本 {script_file} 中不存在"
|
||
}
|
||
# 获取函数对象
|
||
func = getattr(module, function_name)
|
||
|
||
# 准备函数参数(处理不同类型的参数传递方式)
|
||
args = []
|
||
kwargs = {}
|
||
|
||
# 如果function_args是字典,将其作为关键字参数传递
|
||
if isinstance(function_args, dict):
|
||
kwargs = function_args
|
||
# 如果function_args是列表,将其作为位置参数传递
|
||
elif isinstance(function_args, list):
|
||
args = function_args
|
||
# 如果function_args是其他类型,将其作为单个位置参数传递
|
||
elif isinstance(function_args, str):
|
||
eval_result = eval(function_args)
|
||
if isinstance(eval_result, dict):
|
||
kwargs = eval_result
|
||
elif isinstance(eval_result, list):
|
||
args = eval_result
|
||
else:
|
||
args = [eval_result]
|
||
elif function_args is not None:
|
||
args = [function_args]
|
||
|
||
# 调用函数
|
||
logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}")
|
||
result_value = func(*args, **kwargs)
|
||
|
||
# 检查是否是异步函数并等待结果
|
||
import inspect
|
||
if inspect.iscoroutine(result_value):
|
||
import asyncio
|
||
result_value = await result_value
|
||
|
||
logger.info(f"函数 {function_name} 执行结果: {result_value}")
|
||
|
||
# 设置返回值到上下文
|
||
context.set_variable("scriptResult", result_value)
|
||
|
||
# 构建成功结果
|
||
return {
|
||
"success": True,
|
||
"message": f"脚本函数 {function_name} 执行成功",
|
||
"output": {
|
||
"result": result_value,
|
||
"functionName": function_name
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True)
|
||
return {
|
||
"success": False,
|
||
"message": f"执行脚本函数 {function_name} 失败: {str(e)}"
|
||
}
|