431 lines
15 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务块处理器模块
实现任务相关的各种块处理功能
"""
import json
import logging
import uuid
from typing import Dict, Any, Optional
from datetime import datetime
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from utils.logger import get_logger
from .model.block_name import TaskBlockName
# 获取日志记录器
logger = get_logger("services.execution.handlers.task")
@register_handler(TaskBlockName.CACHE_DATA)
class CacheDataBlockHandler(BlockHandler):
"""缓存数据块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行缓存数据块"""
try:
# 获取参数
key = input_params.get("key")
value = input_params.get("value")
if not key:
return {"success": False, "message": "缓存key不能为空"}
# 将数据缓存到数据库中
await self._save_cache_data(key, value)
result = {
"success": True,
"message": f"数据缓存成功: {key}"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"缓存数据块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _save_cache_data(self, key: str, value: Any) -> None:
"""将数据保存到缓存表中"""
from sqlalchemy import select, insert, update
from data.models.datacachesplit import VWEDDataCacheSplit
from data.session import get_async_session
async with get_async_session() as session:
# 查询是否已存在相同key的记录且未被删除
query = select(VWEDDataCacheSplit).where(
VWEDDataCacheSplit.data_key == key,
VWEDDataCacheSplit.is_deleted == 0
)
result = await session.execute(query)
existing_active_record = result.scalar_one_or_none()
# 如果有未删除的记录,则更新它
if existing_active_record:
stmt = update(VWEDDataCacheSplit).where(
VWEDDataCacheSplit.id == existing_active_record.id
).values(
data_value=json.dumps(value) if not isinstance(value, str) else value,
)
await session.execute(stmt)
else:
# 如果没有未删除的记录,创建新记录
stmt = insert(VWEDDataCacheSplit).values(
id=str(uuid.uuid4()),
data_key=key,
data_value=json.dumps(value) if not isinstance(value, str) else value,
is_deleted=0
)
await session.execute(stmt)
await session.commit()
@register_handler(TaskBlockName.CLEAR_CACHE_DATA)
class ClearCacheDataBlockHandler(BlockHandler):
"""清除缓存数据块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行清除缓存数据块"""
try:
# 获取参数
key = input_params.get("key")
if not key:
return {"success": False, "message": "缓存key不能为空"}
# 从数据库中软删除缓存数据
found_and_deleted = await self._soft_delete_cache_data(key)
result = {
"success": True,
"message": f"缓存数据清除成功: {key}" if found_and_deleted else f"缓存数据不存在: {key}"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"清除缓存数据块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _soft_delete_cache_data(self, key: str) -> bool:
"""软删除缓存表中的数据设置is_deleted=1
Returns:
bool: 是否找到并删除了缓存数据
"""
from sqlalchemy import update, select
from data.models.datacachesplit import VWEDDataCacheSplit
from data.session import get_async_session
async with get_async_session() as session:
# 首先检查是否存在该条记录
query = select(VWEDDataCacheSplit).where(
VWEDDataCacheSplit.data_key == key,
VWEDDataCacheSplit.is_deleted == 0
)
result = await session.execute(query)
exists = result.scalar_one_or_none() is not None
if not exists:
return False
# 软删除具有指定键的记录
stmt = update(VWEDDataCacheSplit).where(
VWEDDataCacheSplit.data_key == key,
VWEDDataCacheSplit.is_deleted == 0
).values(
is_deleted=1,
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
@register_handler(TaskBlockName.GET_CACHE_DATA)
class GetCacheDataBlockHandler(BlockHandler):
"""获取缓存数据块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行获取缓存数据块"""
try:
# 获取参数
key = input_params.get("key")
if not key:
return {"success": False, "message": "缓存key不能为空"}
# 从数据库中获取缓存数据
value = await self._get_cache_data(key)
# 将获取的值设置到上下文变量中
context.set_block_output(block.get("name"), {"cache_data": value})
result = {
"success": True,
"message": f"获取缓存数据成功: {key}",
2025-05-12 15:43:21 +08:00
"output": {"value": value}
2025-04-30 16:57:46 +08:00
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"获取缓存数据块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _get_cache_data(self, key: str) -> Optional[Any]:
"""从缓存表中获取数据(只获取未删除的数据)"""
from sqlalchemy import select
from data.models.datacachesplit import VWEDDataCacheSplit
from data.session import get_async_session
async with get_async_session() as session:
# 查询具有指定键且未删除的记录
query = select(VWEDDataCacheSplit).where(
VWEDDataCacheSplit.data_key == key,
VWEDDataCacheSplit.is_deleted == 0
)
result = await session.execute(query)
record = result.scalar_one_or_none()
if not record or not record.data_value:
return None
# 尝试将值解析为JSON如果失败则返回原始字符串
try:
return json.loads(record.data_value)
except (json.JSONDecodeError, TypeError):
return record.data_value
@register_handler(TaskBlockName.SET_TASK_VARIABLE)
class SetTaskVariableBlockHandler(BlockHandler):
"""设置任务变量块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行设置任务变量块"""
try:
# 获取参数
var_name = input_params.get("varName")
var_value = input_params.get("varValue")
if not var_name:
return {"success": False, "message": "变量名不能为空"}
# 设置任务变量
context.set_variable(var_name, var_value)
# 更新数据库中的变量字段
await self._update_task_variables(context)
result = {
"success": True,
"message": f"设置任务变量成功: {var_name}"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"设置任务变量块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _update_task_variables(self, context: Any) -> None:
"""
将任务变量同步更新到数据库
Args:
context: 任务上下文
"""
try:
# 检查是否需要同步变量
if not context.need_sync_variables():
return
from sqlalchemy import update
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
import json
# 获取任务记录ID
task_record_id = context.task_record_id
if not task_record_id:
return
# 将任务变量转换为JSON字符串
variables_json = json.dumps(context.variables, ensure_ascii=False)
# 更新数据库记录
async with get_async_session() as session:
stmt = update(VWEDTaskRecord).where(
VWEDTaskRecord.id == task_record_id
).values(
variables=variables_json
)
await session.execute(stmt)
await session.commit()
# 标记变量已同步
context.mark_variables_synced()
except Exception as e:
# 记录异常但不抛出,避免影响主流程
logger.error(f"更新任务变量到数据库失败: {str(e)}")
@register_handler(TaskBlockName.SKIP_TO_COMPONENT)
class SkipToComponentBlockHandler(BlockHandler):
"""跳到某个块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行跳到某个块"""
try:
# 获取参数
skip_component_id = input_params.get("skipComponentId")
if not skip_component_id:
return {"success": False, "message": "跳转目标块标识不能为空"}
# 设置跳转标记
context.set_skip_to(skip_component_id)
# 结果信息
result = {
"success": True,
"message": f"设置跳转成功,将跳转到: {skip_component_id}",
"output": {
"skipTo": skip_component_id,
"executed": True,
"skipped": True
}
}
# 记录执行结果
await self._record_task_log(block, result, context)
# 将结果输出到上下文中,方便其他块获取
context.set_block_output(block.get("id", block.get("name", "unknown")), result.get("output"))
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"跳转块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
@register_handler(TaskBlockName.TASK_STATE)
class TaskStateBlockHandler(BlockHandler):
"""设置任务状态块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行设置任务状态块"""
try:
# 获取参数
state_msg = input_params.get("stateMsg")
if not state_msg:
return {"success": False, "message": "状态描述不能为空"}
# 更新任务状态
await self._update_task_state(context.task_record_id, state_msg)
result = {
"success": True,
"message": f"任务状态更新成功: {state_msg}"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"设置任务状态块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
async def _update_task_state(self, task_record_id: str, state_msg: str) -> None:
"""更新任务状态"""
from sqlalchemy import update
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
async with get_async_session() as session:
# 更新任务记录状态
stmt = update(VWEDTaskRecord).where(
VWEDTaskRecord.id == task_record_id
).values(
state_description=state_msg,
updated_at=datetime.now()
)
await session.execute(stmt)
await session.commit()