359 lines
12 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
脚本处理器模块
提供脚本执行和变量设置的处理器
"""
import logging
from typing import Dict, Any
from services.execution.task_context import TaskContext
from utils.logger import get_logger
from .model.block_name import ScriptBlockName
from .base import BlockHandler, register_handler
# 获取日志记录器
logger = get_logger("services.execution.handlers.script")
# 脚本块处理器
@register_handler(ScriptBlockName.SET_TASK_VARIABLES)
class SetTaskVariablesBlockHandler(BlockHandler):
"""设置任务变量块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行设置任务变量块"""
try:
# 获取函数名和参数
function_name = input_params.get("functionName")
function_args = input_params.get("functionArgs", {})
if not function_name:
result = {
"success": False,
"message": "缺少函数名"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 调用脚本执行方法
exec_result = await self._execute_script(function_name, function_args, context)
# 如果执行成功,将变量保存到任务记录
if exec_result.get("success", False):
await self._save_variables_to_database(context)
# 记录执行结果
await self._record_task_log(block, exec_result, context)
return exec_result
except Exception as e:
result = {
"success": False,
"message": f"设置任务变量失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _execute_script(
self,
function_name: str,
function_args: Any,
context: TaskContext
) -> Dict[str, Any]:
"""
执行指定的脚本函数
Args:
function_name: 要执行的函数名
function_args: 函数参数
context: 任务上下文
Returns:
Dict[str, Any]: 执行结果
"""
# 固定加载scripts/user_save/test1.py文件
script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py"
logger.info(f"正在加载脚本文件: {script_file}")
try:
# 动态加载脚本模块
import importlib.util
import sys
import os
# 检查文件是否存在
if not os.path.exists(script_file):
return {
"success": False,
"message": f"脚本文件不存在: {script_file}"
}
# 加载模块
spec = importlib.util.spec_from_file_location("user_script", script_file)
if not spec:
return {
"success": False,
"message": f"无法加载脚本规范: {script_file}"
}
module = importlib.util.module_from_spec(spec)
sys.modules["user_script"] = module
spec.loader.exec_module(module)
# 检查函数是否存在
if not hasattr(module, function_name):
return {
"success": False,
"message": f"函数 {function_name} 在脚本 {script_file} 中不存在"
}
# 获取函数对象
func = getattr(module, function_name)
# 准备函数参数(处理不同类型的参数传递方式)
args = []
kwargs = {}
# 如果function_args是字典将其作为关键字参数传递
if isinstance(function_args, dict):
kwargs = function_args
# 如果function_args是列表将其作为位置参数传递
elif isinstance(function_args, list):
args = function_args
# 如果function_args是其他类型将其作为单个位置参数传递
elif function_args is not None:
args = [function_args]
# 调用函数
logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}")
result_value = func(*args, **kwargs)
# 检查是否是异步函数并等待结果
import inspect
if inspect.iscoroutine(result_value):
import asyncio
result_value = await result_value
logger.info(f"函数 {function_name} 执行结果: {result_value}")
# 设置返回值到上下文 - 与ScriptBp不同这里会更新所有变量
if isinstance(result_value, dict):
# 如果返回值是字典,将字典中的每个键值对设置为任务变量
for key, value in result_value.items():
context.set_variable(key, value)
result_message = f"脚本函数 {function_name} 执行成功,已更新 {len(result_value)} 个变量"
else:
# 如果返回值不是字典将整个返回值设置为scriptResult变量
context.set_variable("scriptResult", result_value)
result_message = f"脚本函数 {function_name} 执行成功结果已保存为scriptResult变量"
# 构建成功结果
return {
"success": True,
"message": result_message,
"output": {
"result": result_value,
"functionName": function_name
}
}
except Exception as e:
logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True)
return {
"success": False,
"message": f"执行脚本函数 {function_name} 失败: {str(e)}"
}
async def _save_variables_to_database(self, context: TaskContext) -> None:
"""
将任务变量保存到数据库的任务记录中
Args:
context: 任务上下文
"""
try:
from sqlalchemy import update
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
import json
# 获取任务记录ID
task_record_id = context.task_record_id
if not task_record_id:
logger.error("无法保存变量任务记录ID为空")
return
# 将任务变量转换为JSON字符串
variables_json = json.dumps(context.variables, ensure_ascii=False)
# 更新数据库记录
async with get_async_session() as session:
stmt = update(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id).values(
variables=variables_json
)
await session.execute(stmt)
await session.commit()
logger.info(f"已将任务变量保存到数据库记录 {task_record_id}")
except Exception as e:
logger.error(f"保存任务变量到数据库失败: {str(e)}")
# 脚本块处理器
@register_handler(ScriptBlockName.SCRIPT)
class ScriptBlockHandler(BlockHandler):
"""脚本块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行脚本块"""
try:
# 获取函数名和参数
function_name = input_params.get("functionName")
function_args = input_params.get("functionArgs", {})
if not function_name:
result = {
"success": False,
"message": "缺少函数名"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 调用脚本执行方法
exec_result = await self._execute_script(function_name, function_args, context)
# 记录执行结果
await self._record_task_log(block, exec_result, context)
return exec_result
except Exception as e:
result = {
"success": False,
"message": f"脚本执行失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _execute_script(
self,
function_name: str,
function_args: Any,
context: TaskContext
) -> Dict[str, Any]:
"""
执行指定的脚本函数
Args:
function_name: 要执行的函数名
function_args: 函数参数
context: 任务上下文
Returns:
Dict[str, Any]: 执行结果
"""
# 固定加载scripts/user_save/test1.py文件
script_file = r"D:\jsw_code\project\VWED_task\scripts\user_save\test1.py"
logger.info(f"正在加载脚本文件: {script_file}")
try:
# 动态加载脚本模块
import importlib.util
import sys
import os
# 检查文件是否存在
if not os.path.exists(script_file):
return {
"success": False,
"message": f"脚本文件不存在: {script_file}"
}
# 加载模块
spec = importlib.util.spec_from_file_location("user_script", script_file)
if not spec:
return {
"success": False,
"message": f"无法加载脚本规范: {script_file}"
}
module = importlib.util.module_from_spec(spec)
sys.modules["user_script"] = module
spec.loader.exec_module(module)
# 检查函数是否存在
if not hasattr(module, function_name):
return {
"success": False,
"message": f"函数 {function_name} 在脚本 {script_file} 中不存在"
}
# 获取函数对象
func = getattr(module, function_name)
# 准备函数参数(处理不同类型的参数传递方式)
args = []
kwargs = {}
# 如果function_args是字典将其作为关键字参数传递
if isinstance(function_args, dict):
kwargs = function_args
# 如果function_args是列表将其作为位置参数传递
elif isinstance(function_args, list):
args = function_args
# 如果function_args是其他类型将其作为单个位置参数传递
2025-05-12 15:43:21 +08:00
elif isinstance(function_args, str):
eval_result = eval(function_args)
if isinstance(eval_result, dict):
kwargs = eval_result
elif isinstance(eval_result, list):
args = eval_result
else:
args = [eval_result]
2025-04-30 16:57:46 +08:00
elif function_args is not None:
args = [function_args]
# 调用函数
logger.info(f"调用函数 {function_name} 参数: args={args}, kwargs={kwargs}")
result_value = func(*args, **kwargs)
# 检查是否是异步函数并等待结果
import inspect
if inspect.iscoroutine(result_value):
import asyncio
result_value = await result_value
logger.info(f"函数 {function_name} 执行结果: {result_value}")
# 设置返回值到上下文
context.set_variable("scriptResult", result_value)
# 构建成功结果
return {
"success": True,
"message": f"脚本函数 {function_name} 执行成功",
"output": {
"result": result_value,
"functionName": function_name
}
}
except Exception as e:
logger.error(f"执行脚本函数时发生异常: {str(e)}", exc_info=True)
return {
"success": False,
"message": f"执行脚本函数 {function_name} 失败: {str(e)}"
}