2025-05-12 15:43:21 +08:00

618 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from typing import Dict, Any
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from utils.logger import get_logger
from .model.block_name import FoundationBlockName
# 获取日志记录器
logger = get_logger("services.execution.handlers.foundation")
# 检查任务实例ID是否存在处理器
@register_handler(FoundationBlockName.CHECK_TASK_RECORD_ID_IS_EXIST)
class CheckTaskRecordIdIsExistBlockHandler(BlockHandler):
"""检查任务实例ID是否存在处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""检查任务记录ID是否存在"""
from sqlalchemy import text, select
from data.models import VWEDTaskRecord
from data.session import get_async_session
import json
try:
# 获取任务记录ID
task_record_id = input_params.get("taskRecordId")
if not task_record_id:
result = {
"success": False,
"message": "缺少任务记录ID"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
logger.info(f"检查任务记录ID: {task_record_id}")
record = None
exists = False
record_info = None
# 查询任务记录
async with get_async_session() as session:
try:
# 使用ORM查询
query = select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
result = await session.execute(query)
record = result.scalar_one_or_none()
if record:
logger.info(f"找到任务记录: id={record.id}, def_id={record.def_id}, status={record.status}")
exists = True
# 记录详细信息
record_info = {
"id": record.id,
"def_id": record.def_id,
"status": record.status,
"def_label": record.def_label,
"created_on": record.created_on.isoformat() if record.created_on else None,
"ended_on": record.ended_on.isoformat() if record.ended_on else None,
}
else:
logger.info(f"未找到任务记录ID: {task_record_id}")
except Exception as db_error:
logger.error(f"数据库查询错误: {str(db_error)}")
# 使用原生SQL作为备选
# 设置上下文变量
context.set_block_output(block.get("name"), {"taskRecordIdIsExist": exists})
# 记录任务信息(如果找到)
if record_info:
context.set_variable("taskRecordInfo", record_info)
context.set_block_output(block.get("name"), {"taskRecordInfo": record_info})
exec_result = {
"success": True,
"message": f"任务记录ID {'存在' if exists else '不存在'}",
"output": {
"exists": exists,
"taskRecordId": task_record_id,
"recordInfo": record_info
}
}
# 记录执行结果
await self._record_task_log(block, exec_result, context)
return exec_result
except Exception as e:
logger.error(f"检查任务记录ID是否存在失败: {str(e)}")
result = {
"success": False,
"message": f"检查任务记录ID是否存在失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 释放资源处理器
@register_handler(FoundationBlockName.RELEASE_RESOURCE)
class ReleaseResourceBlockHandler(BlockHandler):
"""释放资源处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""释放资源"""
from sqlalchemy import select
from data.models import VWEDTaskRecord
from data.session import get_async_session
task_record_id = context.task_record_id
query = select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
async with get_async_session() as session:
try:
result = await session.execute(query)
record = result.scalar_one()
if record:
record.allow_restart_same_location = True
await session.commit()
logger.info(f"任务记录ID: {task_record_id} 释放资源成功")
else:
logger.error(f"未找到任务记录ID: {task_record_id}")
except Exception as e:
logger.error(f"释放资源失败: {str(e)}")
result = {
"success": False,
"message": f"释放资源失败: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
result = {
"success": True,
"message": "释放资源成功",
"output": {}
}
await self._record_task_log(block, result, context)
return result
# 创建唯一ID处理器
@register_handler(FoundationBlockName.CREATE_UUID)
class CreateUuidBlockHandler(BlockHandler):
"""创建唯一ID处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""创建唯一ID"""
import uuid
try:
# 生成UUID
generated_uuid = str(uuid.uuid4())
# 设置上下文变量
result = {
"success": True,
"message": "创建唯一ID成功",
"output": {
"createUuid": generated_uuid,
}
}
context.set_block_output(block.get("name"), {"createUuid": generated_uuid})
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"创建唯一ID失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 打印块处理器
@register_handler(FoundationBlockName.PRINT)
class PrintBlockHandler(BlockHandler):
"""打印块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行打印块"""
try:
# 获取消息参数
message = input_params.get("message", "")
# 记录日志
logger.info(f"任务 {context.task_record_id} 打印: {message}")
# 返回结果
result = {
"success": True,
"message": "打印成功",
"output": {
"message": message
}
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"打印失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 当前时间戳处理器
@register_handler(FoundationBlockName.CURRENT_TIME_STAMP)
class CurrentTimeStampBlockHandler(BlockHandler):
"""当前时间戳处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""获取当前时间戳"""
import time
try:
# 获取当前时间戳(毫秒)
current_timestamp = int(time.time() * 1000)
# 设置上下文变量
context.set_block_output(block.get("name"), {"currentTimeStamp": current_timestamp})
result = {
"success": True,
"message": "获取当前时间戳成功",
"output": {
"timestamp": current_timestamp
}
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"获取当前时间戳失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
@register_handler(FoundationBlockName.JDBC_EXECUTE)
class JdbcExecuteBlockHandler(BlockHandler):
"""执行SQL处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行SQL语句"""
from sqlalchemy import text
from data.session import get_async_session
try:
# 获取SQL语句
sql = input_params.get("sql")
if not sql:
result = {
"success": False,
"message": "缺少SQL语句"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
logger.info(f"执行SQL语句: {sql}")
# 执行SQL
async with get_async_session() as session:
try:
result = await session.execute(text(sql))
await session.commit()
# 获取受影响的行数
row_count = result.rowcount if hasattr(result, 'rowcount') else 0
logger.info(f"SQL执行成功影响了 {row_count}")
exec_result = {
"success": True,
"message": f"SQL执行成功影响了 {row_count}",
"output": {
"rowCount": row_count
}
}
except Exception as db_error:
logger.error(f"SQL执行错误: {str(db_error)}")
exec_result = {
"success": False,
"message": f"SQL执行失败: {str(db_error)}"
}
# 记录执行结果
await self._record_task_log(block, exec_result, context)
return exec_result
except Exception as e:
logger.error(f"SQL执行失败: {str(e)}")
result = {
"success": False,
"message": f"SQL执行失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 查询SQL处理器
@register_handler(FoundationBlockName.JDBC_QUERY)
class JdbcQueryBlockHandler(BlockHandler):
"""查询SQL处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行SQL查询"""
from sqlalchemy import text
from data.session import get_async_session
import json
try:
# 获取SQL语句
sql = input_params.get("sql")
if not sql:
result = {
"success": False,
"message": "缺少SQL语句"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
logger.info(f"执行SQL查询: {sql}")
# 执行SQL查询
async with get_async_session() as session:
try:
result = await session.execute(text(sql))
# 处理结果
if result.returns_rows:
# 获取列名
columns = result.keys()
# 转换为字典列表
rows = [dict(zip(columns, row)) for row in result.fetchall()]
# 转换为JSON字符串
result_json = json.dumps(rows, ensure_ascii=False, default=str)
# 设置上下文变量
context.set_variable("resultSet", result_json)
context.set_block_output(block.get("name"), {"resultSet": rows})
logger.info(f"SQL查询成功返回 {len(rows)} 条记录")
exec_result = {
"success": True,
"message": f"SQL查询成功返回 {len(rows)} 条记录",
"output": {
"rowCount": len(rows),
"resultSet": rows,
"resultJson": result_json
}
}
else:
# 非查询语句,没有返回行
context.set_variable("resultSet", "[]")
context.set_block_output(block.get("name"), {"resultSet": []})
logger.info("SQL执行成功没有返回数据")
exec_result = {
"success": True,
"message": "SQL执行成功没有返回数据",
"output": {
"rowCount": 0,
"resultSet": [],
"resultJson": "[]"
}
}
except Exception as db_error:
logger.error(f"SQL查询错误: {str(db_error)}")
exec_result = {
"success": False,
"message": f"SQL查询失败: {str(db_error)}"
}
# 记录执行结果
await self._record_task_log(block, exec_result, context)
return exec_result
except Exception as e:
logger.error(f"SQL查询失败: {str(e)}")
result = {
"success": False,
"message": f"SQL查询失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 当前时间处理器
@register_handler(FoundationBlockName.TIMESTAMP)
class TimestampBlockHandler(BlockHandler):
"""当前时间处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""获取当前时间"""
from datetime import datetime
try:
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置上下文变量
context.set_block_output(block.get("name"), {"timestamp": current_time})
result = {
"success": True,
"message": "获取当前时间成功",
"output": {
"timestamp": current_time
}
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"获取当前时间失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 字符串转JSON对象处理器
@register_handler(FoundationBlockName.STRING_TO_JSON_OBJECT)
class StringToJsonObjectBlockHandler(BlockHandler):
"""字符串转JSON对象处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""将字符串转换为JSON对象"""
import json
try:
# 获取输入参数
convert_string = input_params.get("convertString", "")
if not convert_string:
result = {
"success": False,
"message": "转换字符串为空"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 转换为JSON对象
json_object = json.loads(convert_string)
# 设置输出参数
output = {
"convertObject": json_object
}
context.set_block_output(block.get("name"), {"convertObject": json_object})
result = {
"success": True,
"message": "字符串转换为JSON对象成功",
"output": output
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except json.JSONDecodeError as e:
result = {
"success": False,
"message": f"字符串解析为JSON对象失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"字符串转换为JSON对象失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# 字符串转JSON数组处理器
@register_handler(FoundationBlockName.STRING_TO_JSON_ARRAY)
class StringToJsonArrayBlockHandler(BlockHandler):
"""字符串转JSON数组处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""将字符串转换为JSON数组"""
import json
try:
# 获取输入参数
convert_string = input_params.get("convertString", "")
if not convert_string:
result = {
"success": False,
"message": "转换字符串为空"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 处理可能被额外引号包裹的情况
try:
# 首先尝试直接解析
json_array = json.loads(convert_string)
except Exception as e:
# 如果失败,尝试去除可能的外层引号
json_array = convert_string
# 设置上下文输出
context.set_block_output(block.get("name"), {"convertArray": json_array})
# 验证是否为数组
if not isinstance(json_array, list):
result = {
"success": False,
"message": "转换结果不是JSON数组"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 设置输出参数
output = {
"convertArray": json_array
}
result = {
"success": True,
"message": "字符串转换为JSON数组成功",
"output": output
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except json.JSONDecodeError as e:
result = {
"success": False,
"message": f"字符串解析为JSON数组失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"字符串转换为JSON数组失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result