2025-04-30 16:57:46 +08:00

478 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
子任务组件类
提供调用任务相关逻辑组件工具
"""
import json
import logging
import uuid
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from data.enum.task_record_enum import TaskStatus
from data.enum.task_def_enum import EnableStatus, PeriodicTaskStatus, TaskStatusEnum
from utils.logger import get_logger
from .model.block_name import SubTaskBlockName
logger = get_logger("services.execution.handlers.sub_task")
# 提取公共的任务提交函数
async def submit_subtask(
subtask_id: str,
params_list: List[Dict[str, Any]],
parent_task_id: str,
root_task_id: str,
source_info: Dict[str, Any],
specified_task_record_id: str = None,
is_async: bool = False
) -> Dict[str, Any]:
"""
提交子任务到调度器
Args:
subtask_id: 子任务定义ID
params_list: 子任务参数列表
parent_task_id: 父任务ID
root_task_id: 根任务ID
source_info: 来源信息
specified_task_record_id: 指定的任务记录ID
is_async: 是否异步执行
Returns:
Dict[str, Any]: 提交结果
"""
from sqlalchemy import select, update
from data.models.taskdef import VWEDTaskDef
from data.session import get_async_session
from services.enhanced_scheduler import scheduler
try:
task_record_id = None
# 查询子任务定义
async with get_async_session() as session:
task_def_result = await session.execute(
select(VWEDTaskDef).where(VWEDTaskDef.id == subtask_id)
)
task_def = task_def_result.scalars().first()
if not task_def:
return {
"success": False,
"message": f"找不到子任务定义: {subtask_id}"
}
# 检查是否为定时任务,如果是定时任务则确保启用
if task_def.periodic_task == PeriodicTaskStatus.PERIODIC and task_def.if_enable != EnableStatus.ENABLED:
logger.info(f"子任务 {subtask_id} 是定时任务但未启用,正在启用...")
# 更新任务定义状态为启用
await session.execute(
update(VWEDTaskDef)
.where(VWEDTaskDef.id == subtask_id)
.values(if_enable=EnableStatus.ENABLED, status=TaskStatusEnum.RUNNING)
)
await session.commit()
# 通知调度器更新定时任务状态
update_result = await scheduler.update_periodic_task(subtask_id, True)
if not update_result.get("success", False):
logger.warning(f"启用子任务定时任务失败: {update_result.get('message', '未知错误')}")
# 处理指定任务ID的情况
if specified_task_record_id:
create_result = await create_task_record(
specified_task_record_id,
subtask_id,
task_def,
params_list,
parent_task_id,
root_task_id,
source_info
)
if not create_result.get("success", False):
return create_result
# 使用调度器提交任务
task_record_id = specified_task_record_id
submit_result = await scheduler.submit_task(task_record_id)
else:
# 让调度器创建并执行任务
run_result = await scheduler.run_task(
task_def_id=subtask_id,
params=params_list,
parent_task_id=parent_task_id,
root_task_id=root_task_id,
source_type=source_info.get("source_type"),
source_system=source_info.get("source_system"),
source_device=source_info.get("source_device"),
source_ip=source_info.get("source_ip"),
source_time=datetime.now(),
source_client_info=source_info.get("source_client_info")
)
if not run_result.get("success", False):
return {
"success": False,
"message": f"调度器创建子任务失败: {run_result.get('message', '未知错误')}"
}
task_record_id = run_result.get("taskRecordId")
submit_result = run_result.get("queueResult", {})
# 构造返回结果
return {
"success": True,
"message": f"子任务已提交到调度器: {task_record_id}",
"taskRecordId": task_record_id,
"submitResult": submit_result,
"isAsync": is_async
}
except Exception as e:
logger.error(f"提交子任务异常: {str(e)}")
return {
"success": False,
"message": f"提交子任务异常: {str(e)}"
}
async def create_task_record(
task_record_id: str,
subtask_id: str,
task_def: Any,
params_list: List[Dict[str, Any]],
parent_task_id: str,
root_task_id: str,
source_info: Dict[str, Any]
) -> Dict[str, Any]:
"""
创建任务记录
Args:
task_record_id: 任务记录ID
subtask_id: 子任务ID
task_def: 任务定义对象
params_list: 参数列表
parent_task_id: 父任务ID
root_task_id: 根任务ID
source_info: 来源信息
Returns:
Dict[str, Any]: 创建结果
"""
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
# 检查指定的任务记录是否已存在
async with get_async_session() as session:
from sqlalchemy import select
existing_check = await session.execute(
select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
)
if existing_check.scalars().first():
return {
"success": False,
"message": f"指定的任务记录ID已存在: {task_record_id}"
}
# 创建任务记录
task_record = VWEDTaskRecord(
id=task_record_id,
def_id=subtask_id,
def_label=task_def.label,
def_version=task_def.version,
parent_task_record_id=parent_task_id,
root_task_record_id=root_task_id,
status=TaskStatus.QUEUED, # 队列中
created_on=datetime.now(),
input_params=json.dumps(params_list, ensure_ascii=False) if params_list else None,
periodic_task=task_def.periodic_task or PeriodicTaskStatus.NON_PERIODIC,
task_def_detail=task_def.detail,
source_type=source_info.get("source_type"),
source_system=source_info.get("source_system"),
source_device=source_info.get("source_device"),
source_ip=source_info.get("source_ip"),
source_client_info=source_info.get("source_client_info"),
source_time=datetime.now()
)
session.add(task_record)
await session.commit()
return {
"success": True,
"message": f"任务记录创建成功: {task_record_id}",
"taskRecordId": task_record_id
}
except Exception as e:
logger.error(f"创建任务记录异常: {str(e)}")
return {
"success": False,
"message": f"创建任务记录异常: {str(e)}"
}
async def wait_for_task_completion(task_record_id: str, max_wait_time: int = 3600, wait_interval: int = 3) -> Dict[str, Any]:
"""
等待任务完成
Args:
task_record_id: 任务记录ID
max_wait_time: 最大等待时间(秒)
wait_interval: 检查间隔(秒)
Returns:
Dict[str, Any]: 任务完成结果
"""
from services.enhanced_scheduler import scheduler
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
total_waited = 0
# 等待任务完成
while total_waited < max_wait_time:
# 检查任务状态
status_result = await scheduler.get_task_status(task_record_id)
if not status_result.get("success", False):
return {
"success": False,
"message": f"获取子任务状态失败: {status_result.get('message', '未知错误')}",
"taskRecordId": task_record_id
}
task_status = status_result.get("data", {}).get("status")
# 如果任务已完成或失败或取消,则退出等待
if task_status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED]: # 完成、失败、取消
break
# 等待一段时间
await asyncio.sleep(wait_interval)
total_waited += wait_interval
# 获取最终任务状态
final_status = await scheduler.get_task_status(task_record_id)
task_data = final_status.get("data", {})
task_status = task_data.get("status")
if task_status in [TaskStatus.COMPLETED, TaskStatus.CANCELED]: # 完成或取消
success = True
message = "子任务执行成功"
else:
success = False
message = f"子任务执行失败: {task_data.get('endedReason', '未知原因')}"
# # 从数据库获取任务执行结果
# async with get_async_session() as session:
# from sqlalchemy import select
# result = await session.execute(
# select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
# )
# record = result.scalars().first()
# if record and record.result:
# try:
# output = json.loads(record.result)
# except Exception:
# output = {}
# else:
# output = {}
output = {"success": True, "message": message, "taskRecordId": task_record_id, "output": {}}
return {
"success": success,
"message": message,
"taskStatus": task_status,
"taskRecordId": task_record_id,
"output": output
}
except Exception as e:
logger.error(f"等待任务完成异常: {str(e)}")
return {
"success": False,
"message": f"等待任务完成异常: {str(e)}",
"taskRecordId": task_record_id
}
@register_handler(SubTaskBlockName.SUB_TASK)
class SubTaskBlockHandler(BlockHandler):
"""子任务块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行子任务块"""
try:
# 获取子任务ID
subtask_id = block.get("refTaskDefId")
if not subtask_id:
result = {
"success": False,
"message": "缺少子任务ID"
}
await self._record_task_log(block, result, context)
return result
# 获取执行参数(确保默认值)
is_async = input_params.get("ifAsync", False) # 是否异步执行默认False
specified_task_record_id = input_params.get("taskRecordId", None) # 指定的任务记录ID默认None
# 从input_params中提取出子任务需要的参数
# 排除控制参数
control_params = ["ifAsync", "taskRecordId"]
subtask_params = {}
for key, value in input_params.items():
if key not in control_params:
subtask_params[key] = value
# 将字典转换为列表形式的参数与task_edit_service保持一致
params_list = []
for key, value in subtask_params.items():
params_list.append({
"name": key,
"type": "字符串", # 默认类型,后续可根据实际类型修改
"label": key,
"required": False,
"defaultValue": str(value) if value is not None else ""
})
logger.info(f"开始执行子任务: {subtask_id}, 异步执行: {is_async}, 指定ID: {specified_task_record_id}")
# 获取根任务记录ID - 如果当前任务有父任务,则根任务是父任务的根任务,否则是当前任务
root_task_record_id = context.get_variable("rootTaskRecordId", context.task_record_id)
# 获取父任务的来源信息
source_info = await self._get_parent_source_info(context.task_record_id)
if not source_info:
result = {
"success": False,
"message": f"获取父任务来源信息失败: {context.task_record_id}"
}
await self._record_task_log(block, result, context)
return result
# 提交子任务到调度器
submit_result = await submit_subtask(
subtask_id=subtask_id,
params_list=params_list,
parent_task_id=context.task_record_id,
root_task_id=root_task_record_id,
source_info=source_info,
specified_task_record_id=specified_task_record_id,
is_async=is_async
)
if not submit_result.get("success", False):
result = {
"success": False,
"message": submit_result.get("message", "提交子任务失败")
}
await self._record_task_log(block, result, context)
return result
task_record_id = submit_result.get("taskRecordId")
# 根据是否异步执行采取不同的处理方式
if is_async:
# 异步执行:不等待任务完成
result = {
"success": True,
"message": f"子任务已异步提交到调度器: {task_record_id}",
"output": {
"subtaskId": subtask_id,
"taskRecordId": task_record_id,
"async": True,
"queueStatus": submit_result.get("submitResult", {})
}
}
await self._record_task_log(block, result, context)
return result
else:
# 同步执行:等待任务执行完成
completion_result = await wait_for_task_completion(task_record_id)
success = completion_result.get("success", False)
message = completion_result.get("message", "子任务执行完成")
output = completion_result.get("output", {})
task_status = completion_result.get("taskStatus")
# 将子任务输出合并到当前上下文变量
if isinstance(output, dict):
for key, value in output.items():
variable_name = f"subtask_{key}"
context.set_variable(variable_name, value)
# 记录子任务结果到上下文变量
context.set_variable("subtaskResult", {
"success": success,
"message": message,
"output": output,
"taskStatus": task_status
})
# 执行完成后记录日志
result = {
"success": success,
"message": message,
"output": {
"subtaskId": subtask_id,
"taskRecordId": task_record_id,
"async": False,
"output": output,
"taskStatus": task_status
}
}
await self._record_task_log(block, result, context)
return result
except Exception as e:
result = {
"success": False,
"message": f"子任务执行异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
async def _get_parent_source_info(self, task_record_id: str) -> Optional[Dict[str, Any]]:
"""
获取父任务的来源信息
Args:
task_record_id: 父任务ID
Returns:
Optional[Dict[str, Any]]: 来源信息
"""
from sqlalchemy import select
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session
try:
async with get_async_session() as session:
result = await session.execute(
select(VWEDTaskRecord).where(VWEDTaskRecord.id == task_record_id)
)
parent_record = result.scalars().first()
if not parent_record:
logger.error(f"找不到父任务记录: {task_record_id}")
return None
# 获取父任务的来源信息
return {
"source_type": parent_record.source_type,
"source_system": parent_record.source_system,
"source_device": parent_record.source_device,
"source_ip": parent_record.source_ip,
"source_client_info": parent_record.source_client_info
}
except Exception as e:
logger.error(f"获取父任务来源信息异常: {str(e)}")
return None