478 lines
18 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
"""
子任务组件类
提供调用任务相关逻辑组件工具
"""
import json
import logging
import uuid
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from data.enum.task_record_enum import TaskStatus
from data.enum.task_def_enum import EnableStatus, PeriodicTaskStatus, TaskStatusEnum
from utils.logger import get_logger
from .model.block_name import SubTaskBlockName
logger = get_logger("services.execution.handlers.sub_task")
# 提取公共的任务提交函数
async def submit_subtask(
subtask_id: str,
params_list: List[Dict[str, Any]],
parent_task_id: str,
root_task_id: str,
source_info: Dict[str, Any],
specified_task_record_id: str = None,
is_async: bool = False
) -> Dict[str, Any]:
"""
提交子任务到调度器
Args:
subtask_id: 子任务定义ID
params_list: 子任务参数列表
parent_task_id: 父任务ID
root_task_id: 根任务ID
source_info: 来源信息
specified_task_record_id: 指定的任务记录ID
is_async: 是否异步执行
Returns:
Dict[str, Any]: 提交结果
"""
from sqlalchemy import select, update
from data.models.taskdef import VWEDTaskDef
from data.session import get_async_session
from services.enhanced_scheduler import scheduler
try:
task_record_id = None
# 查询子任务定义
async with get_async_session() as session:
task_def_result = await session.execute(
select(VWEDTaskDef).where(VWEDTaskDef.id == subtask_id)
)
task_def = task_def_result.scalars().first()
if not task_def:
return {
"success": False,
"message": f"找不到子任务定义: {subtask_id}"
}
# 检查是否为定时任务,如果是定时任务则确保启用
if task_def.periodic_task == PeriodicTaskStatus.PERIODIC and task_def.if_enable != EnableStatus.ENABLED:
logger.info(f"子任务 {subtask_id} 是定时任务但未启用,正在启用...")
# 更新任务定义状态为启用
await session.execute(
update(VWEDTaskDef)
.where(VWEDTaskDef.id == subtask_id)
.values(if_enable=EnableStatus.ENABLED, status=TaskStatusEnum.RUNNING)
)
await session.commit()
# 通知调度器更新定时任务状态
update_result = await scheduler.update_periodic_task(subtask_id, True)
if not update_result.get("success", False):
logger.warning(f"启用子任务定时任务失败: {update_result.get('message', '未知错误')}")
# 处理指定任务ID的情况
if specified_task_record_id:
create_result = await create_task_record(
specified_task_record_id,
subtask_id,
task_def,
params_list,
parent_task_id,
root_task_id,
source_info
)
if not create_result.get("success", False):
return create_result
# 使用调度器提交任务
task_record_id = specified_task_record_id
submit_result = await scheduler.submit_task(task_record_id)
else:
# 让调度器创建并执行任务
run_result = await scheduler.run_task(
task_def_id=subtask_id,
params=params_list,
parent_task_id=parent_task_id,
root_task_id=root_task_id,
source_type=source_info.get("source_type"),
source_system=source_info.get("source_system"),
source_device=source_info.get("source_device"),
source_ip=source_info.get("source_ip"),
source_time=datetime.now(),
source_client_info=source_info.get("source_client_info")
)
if not run_result.get("success", False):
return {
"success": False,
"message": f"调度器创建子任务失败: {run_result.get('message', '未知错误')}"
}
task_record_id = run_result.get("taskRecordId")
submit_result = run_result.get("queueResult", {})
# 构造返回结果
return {
"success": True,
"message": f"子任务已提交到调度器: {task_record_id}",
"taskRecordId": task_record_id,
"submitResult": submit_result,
"isAsync": is_async
}
except Exception as e:
logger.error(f"提交子任务异常: {str(e)}")
return {
"success": False,
"message": f"提交子任务异常: {str(e)}"
}
async def create_task_record(
task_record_id: str,
subtask_id: str,
task_def: Any,
params_list: List[Dict[str, Any]],
parent_task_id: str,
root_task_id: str,
source_info: Dict[str, Any]
) -> Dict[str, Any]:
"""
创建任务记录
Args:
task_record_id: 任务记录ID
subtask_id: 子任务ID
task_def: 任务定义对象
params_list: 参数列表
parent_task_id: 父任务ID
root_task_id: 根任务ID
source_info: 来源信息
Returns:
Dict[str, Any]: 创建结果
"""
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
# 检查指定的任务记录是否已存在
async with get_async_session() as session:
from sqlalchemy import select
existing_check = await session.execute(
select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
)
if existing_check.scalars().first():
return {
"success": False,
"message": f"指定的任务记录ID已存在: {task_record_id}"
}
# 创建任务记录
task_record = VWEDTaskRecord(
id=task_record_id,
def_id=subtask_id,
def_label=task_def.label,
def_version=task_def.version,
parent_task_record_id=parent_task_id,
root_task_record_id=root_task_id,
status=TaskStatus.QUEUED, # 队列中
created_on=datetime.now(),
input_params=json.dumps(params_list, ensure_ascii=False) if params_list else None,
periodic_task=task_def.periodic_task or PeriodicTaskStatus.NON_PERIODIC,
task_def_detail=task_def.detail,
source_type=source_info.get("source_type"),
source_system=source_info.get("source_system"),
source_device=source_info.get("source_device"),
source_ip=source_info.get("source_ip"),
source_client_info=source_info.get("source_client_info"),
source_time=datetime.now()
)
session.add(task_record)
await session.commit()
return {
"success": True,
"message": f"任务记录创建成功: {task_record_id}",
"taskRecordId": task_record_id
}
except Exception as e:
logger.error(f"创建任务记录异常: {str(e)}")
return {
"success": False,
"message": f"创建任务记录异常: {str(e)}"
}
async def wait_for_task_completion(task_record_id: str, max_wait_time: int = 3600, wait_interval: int = 3) -> Dict[str, Any]:
"""
等待任务完成
Args:
task_record_id: 任务记录ID
max_wait_time: 最大等待时间
wait_interval: 检查间隔
Returns:
Dict[str, Any]: 任务完成结果
"""
from services.enhanced_scheduler import scheduler
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
total_waited = 0
# 等待任务完成
while total_waited < max_wait_time:
# 检查任务状态
status_result = await scheduler.get_task_status(task_record_id)
if not status_result.get("success", False):
return {
"success": False,
"message": f"获取子任务状态失败: {status_result.get('message', '未知错误')}",
"taskRecordId": task_record_id
}
task_status = status_result.get("data", {}).get("status")
# 如果任务已完成或失败或取消,则退出等待
if task_status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED]: # 完成、失败、取消
break
# 等待一段时间
await asyncio.sleep(wait_interval)
total_waited += wait_interval
# 获取最终任务状态
final_status = await scheduler.get_task_status(task_record_id)
task_data = final_status.get("data", {})
task_status = task_data.get("status")
if task_status in [TaskStatus.COMPLETED, TaskStatus.CANCELED]: # 完成或取消
success = True
message = "子任务执行成功"
else:
success = False
message = f"子任务执行失败: {task_data.get('endedReason', '未知原因')}"
# # 从数据库获取任务执行结果
# async with get_async_session() as session:
# from sqlalchemy import select
# result = await session.execute(
# select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
# )
# record = result.scalars().first()
# if record and record.result:
# try:
# output = json.loads(record.result)
# except Exception:
# output = {}
# else:
# output = {}
output = {"success": True, "message": message, "taskRecordId": task_record_id, "output": {}}
return {
"success": success,
"message": message,
"taskStatus": task_status,
"taskRecordId": task_record_id,
"output": output
}
except Exception as e:
logger.error(f"等待任务完成异常: {str(e)}")
return {
"success": False,
"message": f"等待任务完成异常: {str(e)}",
"taskRecordId": task_record_id
}
@register_handler(SubTaskBlockName.SUB_TASK)
class SubTaskBlockHandler(BlockHandler):
"""子任务块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行子任务块"""
try:
# 获取子任务ID
subtask_id = block.get("refTaskDefId")
if not subtask_id:
result = {
"success": False,
"message": "缺少子任务ID"
}
await self._record_task_log(block, result, context)
return result
# 获取执行参数(确保默认值)
is_async = input_params.get("ifAsync", False) # 是否异步执行默认False
specified_task_record_id = input_params.get("taskRecordId", None) # 指定的任务记录ID默认None
# 从input_params中提取出子任务需要的参数
# 排除控制参数
control_params = ["ifAsync", "taskRecordId"]
subtask_params = {}
for key, value in input_params.items():
if key not in control_params:
subtask_params[key] = value
# 将字典转换为列表形式的参数与task_edit_service保持一致
params_list = []
for key, value in subtask_params.items():
params_list.append({
"name": key,
"type": "字符串", # 默认类型,后续可根据实际类型修改
"label": key,
"required": False,
"defaultValue": str(value) if value is not None else ""
})
logger.info(f"开始执行子任务: {subtask_id}, 异步执行: {is_async}, 指定ID: {specified_task_record_id}")
# 获取根任务记录ID - 如果当前任务有父任务,则根任务是父任务的根任务,否则是当前任务
root_task_record_id = context.get_variable("rootTaskRecordId", context.task_record_id)
# 获取父任务的来源信息
source_info = await self._get_parent_source_info(context.task_record_id)
if not source_info:
result = {
"success": False,
"message": f"获取父任务来源信息失败: {context.task_record_id}"
}
await self._record_task_log(block, result, context)
return result
# 提交子任务到调度器
submit_result = await submit_subtask(
subtask_id=subtask_id,
params_list=params_list,
parent_task_id=context.task_record_id,
root_task_id=root_task_record_id,
source_info=source_info,
specified_task_record_id=specified_task_record_id,
is_async=is_async
)
if not submit_result.get("success", False):
result = {
"success": False,
"message": submit_result.get("message", "提交子任务失败")
}
await self._record_task_log(block, result, context)
return result
task_record_id = submit_result.get("taskRecordId")
# 根据是否异步执行采取不同的处理方式
if is_async:
# 异步执行:不等待任务完成
result = {
"success": True,
"message": f"子任务已异步提交到调度器: {task_record_id}",
"output": {
"subtaskId": subtask_id,
"taskRecordId": task_record_id,
"async": True,
"queueStatus": submit_result.get("submitResult", {})
}
}
await self._record_task_log(block, result, context)
return result
else:
# 同步执行:等待任务执行完成
completion_result = await wait_for_task_completion(task_record_id)
success = completion_result.get("success", False)
message = completion_result.get("message", "子任务执行完成")
output = completion_result.get("output", {})
task_status = completion_result.get("taskStatus")
# 将子任务输出合并到当前上下文变量
if isinstance(output, dict):
for key, value in output.items():
variable_name = f"subtask_{key}"
context.set_variable(variable_name, value)
# 记录子任务结果到上下文变量
context.set_variable("subtaskResult", {
"success": success,
"message": message,
"output": output,
"taskStatus": task_status
})
# 执行完成后记录日志
result = {
"success": success,
"message": message,
"output": {
"subtaskId": subtask_id,
"taskRecordId": task_record_id,
"async": False,
"output": output,
"taskStatus": task_status
}
}
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"子任务执行异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
async def _get_parent_source_info(self, task_record_id: str) -> Optional[Dict[str, Any]]:
"""
获取父任务的来源信息
Args:
task_record_id: 父任务ID
Returns:
Optional[Dict[str, Any]]: 来源信息
"""
from sqlalchemy import select
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
async with get_async_session() as session:
result = await session.execute(
select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
)
parent_record = result.scalars().first()
if not parent_record:
logger.error(f"找不到父任务记录: {task_record_id}")
return None
# 获取父任务的来源信息
return {
"source_type": parent_record.source_type,
"source_system": parent_record.source_system,
"source_device": parent_record.source_device,
"source_ip": parent_record.source_ip,
"source_client_info": parent_record.source_client_info
}
except Exception as e:
logger.error(f"获取父任务来源信息异常: {str(e)}")
return None