import logging from typing import Dict, Any from services.execution.task_context import TaskContext from .base import BlockHandler, register_handler from utils.logger import get_logger from .model.block_name import FoundationBlockName # 获取日志记录器 logger = get_logger("services.execution.handlers.foundation") # 检查任务实例ID是否存在处理器 @register_handler(FoundationBlockName.CHECK_TASK_RECORD_ID_IS_EXIST) class CheckTaskRecordIdIsExistBlockHandler(BlockHandler): """检查任务实例ID是否存在处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """检查任务记录ID是否存在""" from sqlalchemy import text, select from data.models import VWEDTaskRecord from data.session import get_async_session import json try: # 获取任务记录ID task_record_id = input_params.get("taskRecordId") if not task_record_id: result = { "success": False, "message": "缺少任务记录ID" } # 记录执行结果 await self._record_task_log(block, result, context) return result logger.info(f"检查任务记录ID: {task_record_id}") record = None exists = False record_info = None # 查询任务记录 async with get_async_session() as session: try: # 使用ORM查询 query = select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id) result = await session.execute(query) record = result.scalar_one_or_none() if record: logger.info(f"找到任务记录: id={record.id}, def_id={record.def_id}, status={record.status}") exists = True # 记录详细信息 record_info = { "id": record.id, "def_id": record.def_id, "status": record.status, "def_label": record.def_label, "created_on": record.created_on.isoformat() if record.created_on else None, "ended_on": record.ended_on.isoformat() if record.ended_on else None, } else: logger.info(f"未找到任务记录ID: {task_record_id}") except Exception as db_error: logger.error(f"数据库查询错误: {str(db_error)}") # 使用原生SQL作为备选 # 设置上下文变量 context.set_block_output(block.get("name"), {"taskRecordIdIsExist": exists}) # 记录任务信息(如果找到) if record_info: context.set_variable("taskRecordInfo", record_info) context.set_block_output(block.get("name"), {"taskRecordInfo": record_info}) exec_result = { "success": True, "message": f"任务记录ID {'存在' if exists else '不存在'}", "output": { "exists": exists, "taskRecordId": task_record_id, "recordInfo": record_info } } # 记录执行结果 await self._record_task_log(block, exec_result, context) return exec_result except Exception as e: logger.error(f"检查任务记录ID是否存在失败: {str(e)}") result = { "success": False, "message": f"检查任务记录ID是否存在失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 释放资源处理器 @register_handler(FoundationBlockName.RELEASE_RESOURCE) class ReleaseResourceBlockHandler(BlockHandler): """释放资源处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """释放资源""" from sqlalchemy import select from data.models import VWEDTaskRecord from data.session import get_async_session task_record_id = context.task_record_id query = select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id) async with get_async_session() as session: try: result = await session.execute(query) record = result.scalar_one() if record: record.allow_restart_same_location = True await session.commit() logger.info(f"任务记录ID: {task_record_id} 释放资源成功") else: logger.error(f"未找到任务记录ID: {task_record_id}") except Exception as e: logger.error(f"释放资源失败: {str(e)}") result = { "success": False, "message": f"释放资源失败: {str(e)}" } await self._record_task_log(block, result, context) return result result = { "success": True, "message": "释放资源成功", "output": {} } await self._record_task_log(block, result, context) return result # 创建唯一ID处理器 @register_handler(FoundationBlockName.CREATE_UUID) class CreateUuidBlockHandler(BlockHandler): """创建唯一ID处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """创建唯一ID""" import uuid try: # 生成UUID generated_uuid = str(uuid.uuid4()) # 设置上下文变量 result = { "success": True, "message": "创建唯一ID成功", "output": { "createUuid": generated_uuid, } } context.set_block_output(block.get("name"), {"createUuid": generated_uuid}) # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"创建唯一ID失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 打印块处理器 @register_handler(FoundationBlockName.PRINT) class PrintBlockHandler(BlockHandler): """打印块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行打印块""" try: # 获取消息参数 message = input_params.get("message", "") # 记录日志 logger.info(f"任务 {context.task_record_id} 打印: {message}") # 返回结果 result = { "success": True, "message": "打印成功", "output": { "message": message } } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"打印失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 当前时间戳处理器 @register_handler(FoundationBlockName.CURRENT_TIME_STAMP) class CurrentTimeStampBlockHandler(BlockHandler): """当前时间戳处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """获取当前时间戳""" import time try: # 获取当前时间戳(毫秒) current_timestamp = int(time.time() * 1000) # 设置上下文变量 context.set_block_output(block.get("name"), {"currentTimeStamp": current_timestamp}) result = { "success": True, "message": "获取当前时间戳成功", "output": { "timestamp": current_timestamp } } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"获取当前时间戳失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result @register_handler(FoundationBlockName.JDBC_EXECUTE) class JdbcExecuteBlockHandler(BlockHandler): """执行SQL处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行SQL语句""" from sqlalchemy import text from data.session import get_async_session try: # 获取SQL语句 sql = input_params.get("sql") if not sql: result = { "success": False, "message": "缺少SQL语句" } # 记录执行结果 await self._record_task_log(block, result, context) return result logger.info(f"执行SQL语句: {sql}") # 执行SQL async with get_async_session() as session: try: result = await session.execute(text(sql)) await session.commit() # 获取受影响的行数 row_count = result.rowcount if hasattr(result, 'rowcount') else 0 logger.info(f"SQL执行成功,影响了 {row_count} 行") exec_result = { "success": True, "message": f"SQL执行成功,影响了 {row_count} 行", "output": { "rowCount": row_count } } except Exception as db_error: logger.error(f"SQL执行错误: {str(db_error)}") exec_result = { "success": False, "message": f"SQL执行失败: {str(db_error)}" } # 记录执行结果 await self._record_task_log(block, exec_result, context) return exec_result except Exception as e: logger.error(f"SQL执行失败: {str(e)}") result = { "success": False, "message": f"SQL执行失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 查询SQL处理器 @register_handler(FoundationBlockName.JDBC_QUERY) class JdbcQueryBlockHandler(BlockHandler): """查询SQL处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行SQL查询""" from sqlalchemy import text from data.session import get_async_session import json try: # 获取SQL语句 sql = input_params.get("sql") if not sql: result = { "success": False, "message": "缺少SQL语句" } # 记录执行结果 await self._record_task_log(block, result, context) return result logger.info(f"执行SQL查询: {sql}") # 执行SQL查询 async with get_async_session() as session: try: result = await session.execute(text(sql)) # 处理结果 if result.returns_rows: # 获取列名 columns = result.keys() # 转换为字典列表 rows = [dict(zip(columns, row)) for row in result.fetchall()] # 转换为JSON字符串 result_json = json.dumps(rows, ensure_ascii=False, default=str) # 设置上下文变量 context.set_variable("resultSet", result_json) context.set_block_output(block.get("name"), {"resultSet": rows}) logger.info(f"SQL查询成功,返回 {len(rows)} 条记录") exec_result = { "success": True, "message": f"SQL查询成功,返回 {len(rows)} 条记录", "output": { "rowCount": len(rows), "resultSet": rows, "resultJson": result_json } } else: # 非查询语句,没有返回行 context.set_variable("resultSet", "[]") context.set_block_output(block.get("name"), {"resultSet": []}) logger.info("SQL执行成功,没有返回数据") exec_result = { "success": True, "message": "SQL执行成功,没有返回数据", "output": { "rowCount": 0, "resultSet": [], "resultJson": "[]" } } except Exception as db_error: logger.error(f"SQL查询错误: {str(db_error)}") exec_result = { "success": False, "message": f"SQL查询失败: {str(db_error)}" } # 记录执行结果 await self._record_task_log(block, exec_result, context) return exec_result except Exception as e: logger.error(f"SQL查询失败: {str(e)}") result = { "success": False, "message": f"SQL查询失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 当前时间处理器 @register_handler(FoundationBlockName.TIMESTAMP) class TimestampBlockHandler(BlockHandler): """当前时间处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """获取当前时间""" from datetime import datetime try: # 获取当前时间 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置上下文变量 context.set_block_output(block.get("name"), {"timestamp": current_time}) result = { "success": True, "message": "获取当前时间成功", "output": { "timestamp": current_time } } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"获取当前时间失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 字符串转JSON对象处理器 @register_handler(FoundationBlockName.STRING_TO_JSON_OBJECT) class StringToJsonObjectBlockHandler(BlockHandler): """字符串转JSON对象处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """将字符串转换为JSON对象""" import json try: # 获取输入参数 convert_string = input_params.get("convertString", "") if not convert_string: result = { "success": False, "message": "转换字符串为空" } # 记录执行结果 await self._record_task_log(block, result, context) return result # 转换为JSON对象 json_object = json.loads(convert_string) # 设置输出参数 output = { "convertObject": json_object } context.set_block_output(block.get("name"), {"convertObject": json_object}) result = { "success": True, "message": "字符串转换为JSON对象成功", "output": output } # 记录执行结果 await self._record_task_log(block, result, context) return result except json.JSONDecodeError as e: result = { "success": False, "message": f"字符串解析为JSON对象失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"字符串转换为JSON对象失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result # 字符串转JSON数组处理器 @register_handler(FoundationBlockName.STRING_TO_JSON_ARRAY) class StringToJsonArrayBlockHandler(BlockHandler): """字符串转JSON数组处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """将字符串转换为JSON数组""" import json try: # 获取输入参数 convert_string = input_params.get("convertString", "") if not convert_string: result = { "success": False, "message": "转换字符串为空" } # 记录执行结果 await self._record_task_log(block, result, context) return result # 处理可能被额外引号包裹的情况 try: # 首先尝试直接解析 json_array = json.loads(convert_string) except Exception as e: # 如果失败,尝试去除可能的外层引号 json_array = convert_string # 设置上下文输出 context.set_block_output(block.get("name"), {"convertArray": json_array}) # 验证是否为数组 if not isinstance(json_array, list): result = { "success": False, "message": "转换结果不是JSON数组" } # 记录执行结果 await self._record_task_log(block, result, context) return result # 设置输出参数 output = { "convertArray": json_array } result = { "success": True, "message": "字符串转换为JSON数组成功", "output": output } # 记录执行结果 await self._record_task_log(block, result, context) return result except json.JSONDecodeError as e: result = { "success": False, "message": f"字符串解析为JSON数组失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"字符串转换为JSON数组失败: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result