""" 子任务组件类 提供调用任务相关逻辑组件工具 """ import json import logging import uuid import asyncio from typing import Dict, Any, List, Optional from datetime import datetime from services.execution.task_context import TaskContext from .base import BlockHandler, register_handler from data.enum.task_record_enum import TaskStatus from data.enum.task_def_enum import EnableStatus, PeriodicTaskStatus, TaskStatusEnum from utils.logger import get_logger from .model.block_name import SubTaskBlockName logger = get_logger("services.execution.handlers.sub_task") # 提取公共的任务提交函数 async def submit_subtask( subtask_id: str, params_list: List[Dict[str, Any]], parent_task_id: str, root_task_id: str, source_info: Dict[str, Any], specified_task_record_id: str = None, is_async: bool = False ) -> Dict[str, Any]: """ 提交子任务到调度器 Args: subtask_id: 子任务定义ID params_list: 子任务参数列表 parent_task_id: 父任务ID root_task_id: 根任务ID source_info: 来源信息 specified_task_record_id: 指定的任务记录ID is_async: 是否异步执行 Returns: Dict[str, Any]: 提交结果 """ from sqlalchemy import select, update from data.models.taskdef import VWEDTaskDef from data.session import get_async_session from services.enhanced_scheduler import scheduler try: task_record_id = None # 查询子任务定义 async with get_async_session() as session: task_def_result = await session.execute( select(VWEDTaskDef).where(VWEDTaskDef.id == subtask_id) ) task_def = task_def_result.scalars().first() if not task_def: return { "success": False, "message": f"找不到子任务定义: {subtask_id}" } # 检查是否为定时任务,如果是定时任务则确保启用 if task_def.periodic_task == PeriodicTaskStatus.PERIODIC and task_def.if_enable != EnableStatus.ENABLED: logger.info(f"子任务 {subtask_id} 是定时任务但未启用,正在启用...") # 更新任务定义状态为启用 await session.execute( update(VWEDTaskDef) .where(VWEDTaskDef.id == subtask_id) .values(if_enable=EnableStatus.ENABLED, status=TaskStatusEnum.RUNNING) ) await session.commit() # 通知调度器更新定时任务状态 update_result = await scheduler.update_periodic_task(subtask_id, True) if not update_result.get("success", False): logger.warning(f"启用子任务定时任务失败: {update_result.get('message', '未知错误')}") # 处理指定任务ID的情况 if specified_task_record_id: create_result = await create_task_record( specified_task_record_id, subtask_id, task_def, params_list, parent_task_id, root_task_id, source_info ) if not create_result.get("success", False): return create_result # 使用调度器提交任务 task_record_id = specified_task_record_id submit_result = await scheduler.submit_task(task_record_id) else: # 让调度器创建并执行任务 run_result = await scheduler.run_task( task_def_id=subtask_id, params=params_list, parent_task_id=parent_task_id, root_task_id=root_task_id, source_type=source_info.get("source_type"), source_system=source_info.get("source_system"), source_device=source_info.get("source_device"), source_ip=source_info.get("source_ip"), source_time=datetime.now(), source_client_info=source_info.get("source_client_info") ) if not run_result.get("success", False): return { "success": False, "message": f"调度器创建子任务失败: {run_result.get('message', '未知错误')}" } task_record_id = run_result.get("taskRecordId") submit_result = run_result.get("queueResult", {}) # 构造返回结果 return { "success": True, "message": f"子任务已提交到调度器: {task_record_id}", "taskRecordId": task_record_id, "submitResult": submit_result, "isAsync": is_async } except Exception as e: logger.error(f"提交子任务异常: {str(e)}") return { "success": False, "message": f"提交子任务异常: {str(e)}" } async def create_task_record( task_record_id: str, subtask_id: str, task_def: Any, params_list: List[Dict[str, Any]], parent_task_id: str, root_task_id: str, source_info: Dict[str, Any] ) -> Dict[str, Any]: """ 创建任务记录 Args: task_record_id: 任务记录ID subtask_id: 子任务ID task_def: 任务定义对象 params_list: 参数列表 parent_task_id: 父任务ID root_task_id: 根任务ID source_info: 来源信息 Returns: Dict[str, Any]: 创建结果 """ from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session try: # 检查指定的任务记录是否已存在 async with get_async_session() as session: from sqlalchemy import select existing_check = await session.execute( select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id) ) if existing_check.scalars().first(): return { "success": False, "message": f"指定的任务记录ID已存在: {task_record_id}" } # 创建任务记录 task_record = VWEDTaskRecord( id=task_record_id, def_id=subtask_id, def_label=task_def.label, def_version=task_def.version, parent_task_record_id=parent_task_id, root_task_record_id=root_task_id, status=TaskStatus.QUEUED, # 队列中 created_on=datetime.now(), input_params=json.dumps(params_list, ensure_ascii=False) if params_list else None, periodic_task=task_def.periodic_task or PeriodicTaskStatus.NON_PERIODIC, task_def_detail=task_def.detail, source_type=source_info.get("source_type"), source_system=source_info.get("source_system"), source_device=source_info.get("source_device"), source_ip=source_info.get("source_ip"), source_client_info=source_info.get("source_client_info"), source_time=datetime.now() ) session.add(task_record) await session.commit() return { "success": True, "message": f"任务记录创建成功: {task_record_id}", "taskRecordId": task_record_id } except Exception as e: logger.error(f"创建任务记录异常: {str(e)}") return { "success": False, "message": f"创建任务记录异常: {str(e)}" } async def wait_for_task_completion(task_record_id: str, max_wait_time: int = 3600, wait_interval: int = 3) -> Dict[str, Any]: """ 等待任务完成 Args: task_record_id: 任务记录ID max_wait_time: 最大等待时间(秒) wait_interval: 检查间隔(秒) Returns: Dict[str, Any]: 任务完成结果 """ from services.enhanced_scheduler import scheduler from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session try: total_waited = 0 # 等待任务完成 while total_waited < max_wait_time: # 检查任务状态 status_result = await scheduler.get_task_status(task_record_id) if not status_result.get("success", False): return { "success": False, "message": f"获取子任务状态失败: {status_result.get('message', '未知错误')}", "taskRecordId": task_record_id } task_status = status_result.get("data", {}).get("status") # 如果任务已完成或失败或取消,则退出等待 if task_status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED]: # 完成、失败、取消 break # 等待一段时间 await asyncio.sleep(wait_interval) total_waited += wait_interval # 获取最终任务状态 final_status = await scheduler.get_task_status(task_record_id) task_data = final_status.get("data", {}) task_status = task_data.get("status") if task_status in [TaskStatus.COMPLETED, TaskStatus.CANCELED]: # 完成或取消 success = True message = "子任务执行成功" else: success = False message = f"子任务执行失败: {task_data.get('endedReason', '未知原因')}" # # 从数据库获取任务执行结果 # async with get_async_session() as session: # from sqlalchemy import select # result = await session.execute( # select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id) # ) # record = result.scalars().first() # if record and record.result: # try: # output = json.loads(record.result) # except Exception: # output = {} # else: # output = {} output = {"success": True, "message": message, "taskRecordId": task_record_id, "output": {}} return { "success": success, "message": message, "taskStatus": task_status, "taskRecordId": task_record_id, "output": output } except Exception as e: logger.error(f"等待任务完成异常: {str(e)}") return { "success": False, "message": f"等待任务完成异常: {str(e)}", "taskRecordId": task_record_id } @register_handler(SubTaskBlockName.SUB_TASK) class SubTaskBlockHandler(BlockHandler): """子任务块处理器""" async def execute( self, block: Dict[str, Any], input_params: Dict[str, Any], context: TaskContext ) -> Dict[str, Any]: """执行子任务块""" try: # 获取子任务ID subtask_id = block.get("refTaskDefId") if not subtask_id: result = { "success": False, "message": "缺少子任务ID" } await self._record_task_log(block, result, context) return result # 获取执行参数(确保默认值) is_async = input_params.get("ifAsync", False) # 是否异步执行,默认False specified_task_record_id = input_params.get("taskRecordId", None) # 指定的任务记录ID,默认None # 从input_params中提取出子任务需要的参数 # 排除控制参数 control_params = ["ifAsync", "taskRecordId"] subtask_params = {} for key, value in input_params.items(): if key not in control_params: subtask_params[key] = value # 将字典转换为列表形式的参数,与task_edit_service保持一致 params_list = [] for key, value in subtask_params.items(): params_list.append({ "name": key, "type": "字符串", # 默认类型,后续可根据实际类型修改 "label": key, "required": False, "defaultValue": str(value) if value is not None else "" }) logger.info(f"开始执行子任务: {subtask_id}, 异步执行: {is_async}, 指定ID: {specified_task_record_id}") # 获取根任务记录ID - 如果当前任务有父任务,则根任务是父任务的根任务,否则是当前任务 root_task_record_id = context.get_variable("rootTaskRecordId", context.task_record_id) # 获取父任务的来源信息 source_info = await self._get_parent_source_info(context.task_record_id) if not source_info: result = { "success": False, "message": f"获取父任务来源信息失败: {context.task_record_id}" } await self._record_task_log(block, result, context) return result # 提交子任务到调度器 submit_result = await submit_subtask( subtask_id=subtask_id, params_list=params_list, parent_task_id=context.task_record_id, root_task_id=root_task_record_id, source_info=source_info, specified_task_record_id=specified_task_record_id, is_async=is_async ) if not submit_result.get("success", False): result = { "success": False, "message": submit_result.get("message", "提交子任务失败") } await self._record_task_log(block, result, context) return result task_record_id = submit_result.get("taskRecordId") # 根据是否异步执行采取不同的处理方式 if is_async: # 异步执行:不等待任务完成 result = { "success": True, "message": f"子任务已异步提交到调度器: {task_record_id}", "output": { "subtaskId": subtask_id, "taskRecordId": task_record_id, "async": True, "queueStatus": submit_result.get("submitResult", {}) } } await self._record_task_log(block, result, context) return result else: # 同步执行:等待任务执行完成 completion_result = await wait_for_task_completion(task_record_id) success = completion_result.get("success", False) message = completion_result.get("message", "子任务执行完成") output = completion_result.get("output", {}) task_status = completion_result.get("taskStatus") # 将子任务输出合并到当前上下文变量 if isinstance(output, dict): for key, value in output.items(): variable_name = f"subtask_{key}" context.set_variable(variable_name, value) # 记录子任务结果到上下文变量 context.set_variable("subtaskResult", { "success": success, "message": message, "output": output, "taskStatus": task_status }) # 执行完成后记录日志 result = { "success": success, "message": message, "output": { "subtaskId": subtask_id, "taskRecordId": task_record_id, "async": False, "output": output, "taskStatus": task_status } } await self._record_task_log(block, result, context) return result except Exception as e: result = { "success": False, "message": f"子任务执行异常: {str(e)}" } await self._record_task_log(block, result, context) return result async def _get_parent_source_info(self, task_record_id: str) -> Optional[Dict[str, Any]]: """ 获取父任务的来源信息 Args: task_record_id: 父任务ID Returns: Optional[Dict[str, Any]]: 来源信息 """ from sqlalchemy import select from data.models.taskrecord import VWEDTaskRecord from data.session import get_async_session try: async with get_async_session() as session: result = await session.execute( select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id) ) parent_record = result.scalars().first() if not parent_record: logger.error(f"找不到父任务记录: {task_record_id}") return None # 获取父任务的来源信息 return { "source_type": parent_record.source_type, "source_system": parent_record.source_system, "source_device": parent_record.source_device, "source_ip": parent_record.source_ip, "source_client_info": parent_record.source_client_info } except Exception as e: logger.error(f"获取父任务来源信息异常: {str(e)}") return None