#!/usr/bin/env python # -*- coding: utf-8 -*- """ 任务块处理器模块 实现任务相关的各种块处理功能 """ import json import logging import uuid from typing import Dict, Any, Optional from datetime import datetime from services.execution.task_context import TaskContext from .base import BlockHandler, register_handler from utils.logger import get_logger from .model.block_name import TaskBlockName # 获取日志记录器 logger = get_logger("services.execution.handlers.task") @register_handler(TaskBlockName.CACHE_DATA) class CacheDataBlockHandler(BlockHandler): """缓存数据块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行缓存数据块""" try: # 获取参数 key = input_params.get("key") value = input_params.get("value") if not key: return {"success": False, "message": "缓存key不能为空"} # 将数据缓存到数据库中 await self._save_cache_data(key, value) result = { "success": True, "message": f"数据缓存成功: {key}" } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"缓存数据块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _save_cache_data(self, key: str, value: Any) -> None: """将数据保存到缓存表中""" from sqlalchemy import select, insert, update from data.models.datacachesplit import VWEDDataCacheSplit from data.session import get_async_session async with get_async_session() as session: # 查询是否已存在相同key的记录且未被删除 query = select(VWEDDataCacheSplit).where( VWEDDataCacheSplit.data_key == key, VWEDDataCacheSplit.is_deleted == 0 ) result = await session.execute(query) existing_active_record = result.scalar_one_or_none() # 如果有未删除的记录,则更新它 if existing_active_record: stmt = update(VWEDDataCacheSplit).where( VWEDDataCacheSplit.id == existing_active_record.id ).values( data_value=json.dumps(value) if not isinstance(value, str) else value, ) await session.execute(stmt) else: # 如果没有未删除的记录,创建新记录 stmt = insert(VWEDDataCacheSplit).values( id=str(uuid.uuid4()), data_key=key, data_value=json.dumps(value) if not isinstance(value, str) else value, is_deleted=0 ) await session.execute(stmt) await session.commit() @register_handler(TaskBlockName.CLEAR_CACHE_DATA) class ClearCacheDataBlockHandler(BlockHandler): """清除缓存数据块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行清除缓存数据块""" try: # 获取参数 key = input_params.get("key") if not key: return {"success": False, "message": "缓存key不能为空"} # 从数据库中软删除缓存数据 found_and_deleted = await self._soft_delete_cache_data(key) result = { "success": True, "message": f"缓存数据清除成功: {key}" if found_and_deleted else f"缓存数据不存在: {key}" } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"清除缓存数据块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _soft_delete_cache_data(self, key: str) -> bool: """软删除缓存表中的数据(设置is_deleted=1) Returns: bool: 是否找到并删除了缓存数据 """ from sqlalchemy import update, select from data.models.datacachesplit import VWEDDataCacheSplit from data.session import get_async_session async with get_async_session() as session: # 首先检查是否存在该条记录 query = select(VWEDDataCacheSplit).where( VWEDDataCacheSplit.data_key == key, VWEDDataCacheSplit.is_deleted == 0 ) result = await session.execute(query) exists = result.scalar_one_or_none() is not None if not exists: return False # 软删除具有指定键的记录 stmt = update(VWEDDataCacheSplit).where( VWEDDataCacheSplit.data_key == key, VWEDDataCacheSplit.is_deleted == 0 ).values( is_deleted=1, ) result = await session.execute(stmt) await session.commit() return result.rowcount > 0 @register_handler(TaskBlockName.GET_CACHE_DATA) class GetCacheDataBlockHandler(BlockHandler): """获取缓存数据块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行获取缓存数据块""" try: # 获取参数 key = input_params.get("key") if not key: return {"success": False, "message": "缓存key不能为空"} # 从数据库中获取缓存数据 value = await self._get_cache_data(key) # 将获取的值设置到上下文变量中 context.set_block_output(block.get("name"), {"cache_data": value}) result = { "success": True, "message": f"获取缓存数据成功: {key}", "output": {"value": value} } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"获取缓存数据块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _get_cache_data(self, key: str) -> Optional[Any]: """从缓存表中获取数据(只获取未删除的数据)""" from sqlalchemy import select from data.models.datacachesplit import VWEDDataCacheSplit from data.session import get_async_session async with get_async_session() as session: # 查询具有指定键且未删除的记录 query = select(VWEDDataCacheSplit).where( VWEDDataCacheSplit.data_key == key, VWEDDataCacheSplit.is_deleted == 0 ) result = await session.execute(query) record = result.scalar_one_or_none() if not record or not record.data_value: return None # 尝试将值解析为JSON,如果失败则返回原始字符串 try: return json.loads(record.data_value) except (json.JSONDecodeError, TypeError): return record.data_value @register_handler(TaskBlockName.SET_TASK_VARIABLE) class SetTaskVariableBlockHandler(BlockHandler): """设置任务变量块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行设置任务变量块""" try: # 获取参数 var_name = input_params.get("varName") var_value = input_params.get("varValue") if not var_name: return {"success": False, "message": "变量名不能为空"} # 设置任务变量 context.set_variable(var_name, var_value) # 更新数据库中的变量字段 await self._update_task_variables(context) result = { "success": True, "message": f"设置任务变量成功: {var_name}" } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"设置任务变量块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _update_task_variables(self, context: Any) -> None: """ 将任务变量同步更新到数据库 Args: context: 任务上下文 """ try: # 检查是否需要同步变量 if not context.need_sync_variables(): return from sqlalchemy import update from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session import json # 获取任务记录ID task_record_id = context.task_record_id if not task_record_id: return # 将任务变量转换为JSON字符串 variables_json = json.dumps(context.variables, ensure_ascii=False) # 更新数据库记录 async with get_async_session() as session: stmt = update(VWEDTaskRecord).where( VWEDTaskRecord.id == task_record_id ).values( variables=variables_json ) await session.execute(stmt) await session.commit() # 标记变量已同步 context.mark_variables_synced() except Exception as e: # 记录异常但不抛出,避免影响主流程 logger.error(f"更新任务变量到数据库失败: {str(e)}") @register_handler(TaskBlockName.SKIP_TO_COMPONENT) class SkipToComponentBlockHandler(BlockHandler): """跳到某个块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行跳到某个块""" try: # 获取参数 skip_component_id = input_params.get("skipComponentId") if not skip_component_id: return {"success": False, "message": "跳转目标块标识不能为空"} # 设置跳转标记 context.set_skip_to(skip_component_id) # 结果信息 result = { "success": True, "message": f"设置跳转成功,将跳转到: {skip_component_id}", "output": { "skipTo": skip_component_id, "executed": True, "skipped": True } } # 记录执行结果 await self._record_task_log(block, result, context) # 将结果输出到上下文中,方便其他块获取 context.set_block_output(block.get("id", block.get("name", "unknown")), result.get("output")) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"跳转块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result @register_handler(TaskBlockName.TASK_STATE) class TaskStateBlockHandler(BlockHandler): """设置任务状态块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: Any # TaskContext类型 ) -> Dict[str, Any]: """执行设置任务状态块""" try: # 获取参数 state_msg = input_params.get("stateMsg") if not state_msg: return {"success": False, "message": "状态描述不能为空"} # 更新任务状态 await self._update_task_state(context.task_record_id, state_msg) result = { "success": True, "message": f"任务状态更新成功: {state_msg}" } # 记录执行结果 await self._record_task_log(block, result, context) return result except Exception as e: # 处理异常 result = { "success": False, "message": f"设置任务状态块执行异常: {str(e)}" } # 记录异常 await self._record_task_log(block, result, context) return result async def _update_task_state(self, task_record_id: str, state_msg: str) -> None: """更新任务状态""" from sqlalchemy import update from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session async with get_async_session() as session: # 更新任务记录状态 stmt = update(VWEDTaskRecord).where( VWEDTaskRecord.id == task_record_id ).values( state_description=state_msg, updated_at=datetime.now() ) await session.execute(stmt) await session.commit()