486 lines
17 KiB
Python
486 lines
17 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
任务运行记录服务模块 - 新版get_block_results实现
|
|
基于层级关系字段重构
|
|
"""
|
|
|
|
import json
|
|
from typing import Dict, List, Any
|
|
from sqlalchemy import select
|
|
from data.models.blockrecord import VWEDBlockRecord
|
|
from data.models.taskrecord import VWEDTaskRecord
|
|
from data.models.tasklog import VWEDTaskLog
|
|
from data.session import get_async_session
|
|
from utils.logger import get_logger
|
|
|
|
logger = get_logger("service.task_record_service_new")
|
|
|
|
|
|
async def get_block_results_v2(task_record_id: str) -> Dict[str, Any]:
|
|
"""
|
|
获取指定任务记录的执行结果(新版本)
|
|
基于层级关系字段组织嵌套结构
|
|
|
|
Args:
|
|
task_record_id: 任务记录ID
|
|
|
|
Returns:
|
|
Dict: 包含嵌套结构执行结果的响应
|
|
"""
|
|
from services.execution.handlers.model.block_name import BLOCK_NAME_STR, ProgressBlockName
|
|
try:
|
|
async with get_async_session() as session:
|
|
# 获取任务记录信息
|
|
task_query = select(VWEDTaskRecord).where(
|
|
VWEDTaskRecord.id == task_record_id
|
|
)
|
|
task_result = await session.execute(task_query)
|
|
task_record = task_result.scalars().first()
|
|
|
|
if not task_record:
|
|
return {
|
|
"success": False,
|
|
"message": f"未找到任务记录 {task_record_id}",
|
|
"data": None
|
|
}
|
|
|
|
# 获取所有块执行记录
|
|
blocks_query = select(VWEDBlockRecord).where(
|
|
VWEDBlockRecord.task_record_id == task_record_id
|
|
).order_by(VWEDBlockRecord.started_on)
|
|
blocks_result = await session.execute(blocks_query)
|
|
blocks = blocks_result.scalars().all()
|
|
|
|
# 获取所有任务日志
|
|
logs_query = select(VWEDTaskLog).where(
|
|
VWEDTaskLog.task_record_id == task_record_id
|
|
).order_by(VWEDTaskLog.created_at)
|
|
logs_result = await session.execute(logs_query)
|
|
logs = logs_result.scalars().all()
|
|
|
|
# 构建块记录映射
|
|
block_map = {block.block_name: block for block in blocks}
|
|
|
|
# 构建日志映射 (按parent_log_id和iteration_index组织)
|
|
log_map_by_block = {} # {block_name: [logs]}
|
|
log_map_by_parent = {} # {parent_log_id: [logs]}
|
|
|
|
for log in logs:
|
|
# 按块名称分组
|
|
if log.task_block_id not in log_map_by_block:
|
|
log_map_by_block[log.task_block_id] = []
|
|
log_map_by_block[log.task_block_id].append(log)
|
|
|
|
# 按父日志ID分组
|
|
if log.parent_log_id:
|
|
if log.parent_log_id not in log_map_by_parent:
|
|
log_map_by_parent[log.parent_log_id] = []
|
|
log_map_by_parent[log.parent_log_id].append(log)
|
|
|
|
# 解析任务定义详情
|
|
task_def_detail = None
|
|
if task_record.task_def_detail:
|
|
try:
|
|
task_def_detail = json.loads(task_record.task_def_detail)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"任务记录 {task_record_id} 的task_def_detail解析失败")
|
|
|
|
if not task_def_detail or 'rootBlock' not in task_def_detail:
|
|
# 回退到简单格式
|
|
return {
|
|
"success": True,
|
|
"message": "任务定义格式不完整,使用简化输出",
|
|
"data": await _build_simple_results(blocks, logs, BLOCK_NAME_STR)
|
|
}
|
|
|
|
# 构建嵌套结构
|
|
nested_results = await _build_nested_results(
|
|
task_def_detail['rootBlock'],
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
BLOCK_NAME_STR,
|
|
level=0
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "成功获取任务记录嵌套执行结果",
|
|
"data": nested_results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取任务记录执行结果失败: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
"success": False,
|
|
"message": f"获取任务记录执行结果失败: {str(e)}"
|
|
}
|
|
|
|
|
|
async def _build_nested_results(
|
|
block_def: Dict[str, Any],
|
|
block_map: Dict[str, VWEDBlockRecord],
|
|
log_map_by_block: Dict[str, List[VWEDTaskLog]],
|
|
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
|
|
block_name_mapping: Dict[str, str],
|
|
level: int = 0
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
递归构建嵌套的块执行结果
|
|
|
|
Args:
|
|
block_def: 任务块定义
|
|
block_map: 块名称到执行记录的映射
|
|
log_map_by_block: 块名称到日志记录的映射
|
|
log_map_by_parent: 父日志ID到子日志的映射
|
|
block_name_mapping: 块类型名称映射
|
|
level: 嵌套层级
|
|
|
|
Returns:
|
|
Dict: 嵌套的块执行结果
|
|
"""
|
|
from services.execution.handlers.model.block_name import ProgressBlockName
|
|
|
|
block_id = str(block_def.get('id', ''))
|
|
block_name = block_def.get('name', '')
|
|
block_type = block_def.get('blockType', '')
|
|
|
|
# 获取执行记录
|
|
execution_record = block_map.get(block_name)
|
|
|
|
# 构建基础结果结构
|
|
result = {
|
|
"blockId": block_id,
|
|
"blockName": block_name,
|
|
"blockType": block_type,
|
|
"blockTypeName": block_name_mapping.get(block_type, block_type),
|
|
"level": level,
|
|
"executionStatus": None,
|
|
"executionRecord": None,
|
|
"logs": [],
|
|
"children": []
|
|
}
|
|
|
|
# 如果有执行记录,添加执行信息
|
|
if execution_record:
|
|
from data.enum.task_block_record_enum import TaskBlockRecordStatus
|
|
execution_info = {
|
|
"recordId": execution_record.id,
|
|
"status": execution_record.status,
|
|
"startedOn": execution_record.started_on.isoformat() if execution_record.started_on else None,
|
|
"endedOn": execution_record.ended_on.isoformat() if execution_record.ended_on else None,
|
|
"endedReason": execution_record.ended_reason,
|
|
"inputParamsValue": json.loads(execution_record.block_input_params_value) if execution_record.block_input_params_value else None,
|
|
"outputParamsValue": json.loads(execution_record.block_out_params_value) if execution_record.block_out_params_value else None,
|
|
"remark": execution_record.remark
|
|
}
|
|
result["executionRecord"] = execution_info
|
|
result["executionStatus"] = execution_record.status
|
|
else:
|
|
from data.enum.task_block_record_enum import TaskBlockRecordStatus
|
|
result["executionStatus"] = TaskBlockRecordStatus.NOT_EXECUTED
|
|
|
|
# 检查是否是迭代类型的块
|
|
is_iteration_block = block_type in [
|
|
ProgressBlockName.ITERATE_LIST,
|
|
ProgressBlockName.WHILE,
|
|
ProgressBlockName.REPEAT_NUM
|
|
]
|
|
|
|
if is_iteration_block:
|
|
# 迭代块: 按迭代次数组织子块
|
|
result["children"] = await _build_iteration_children(
|
|
block_def,
|
|
block_name,
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
block_name_mapping,
|
|
level
|
|
)
|
|
else:
|
|
# 普通块: 直接递归处理子块
|
|
# 获取该块的非迭代日志
|
|
block_logs = log_map_by_block.get(block_name, [])
|
|
for log in block_logs:
|
|
# 只添加没有parent_log_id的日志(顶层日志)
|
|
if not log.parent_log_id:
|
|
result["logs"].append(_format_log(log))
|
|
|
|
# 递归处理子块
|
|
children_def = block_def.get('children', {})
|
|
if children_def:
|
|
for children_blocks in children_def.values():
|
|
if isinstance(children_blocks, list):
|
|
for child_block in children_blocks:
|
|
child_result = await _build_nested_results(
|
|
child_block,
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
block_name_mapping,
|
|
level + 1
|
|
)
|
|
result["children"].append(child_result)
|
|
|
|
return result
|
|
|
|
|
|
async def _build_iteration_children(
|
|
block_def: Dict[str, Any],
|
|
block_name: str,
|
|
block_map: Dict[str, VWEDBlockRecord],
|
|
log_map_by_block: Dict[str, List[VWEDTaskLog]],
|
|
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
|
|
block_name_mapping: Dict[str, str],
|
|
level: int
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
构建迭代块的子块结果(按迭代次数组织)
|
|
|
|
Args:
|
|
block_def: 迭代块定义
|
|
block_name: 块名称
|
|
block_map: 块记录映射
|
|
log_map_by_block: 块名称到日志的映射
|
|
log_map_by_parent: 父日志ID到子日志的映射
|
|
block_name_mapping: 块类型名称映射
|
|
level: 当前层级
|
|
|
|
Returns:
|
|
List[Dict]: 按迭代次数组织的结果列表
|
|
"""
|
|
iteration_results = []
|
|
|
|
# 获取该块的所有日志
|
|
block_logs = log_map_by_block.get(block_name, [])
|
|
|
|
# 找出所有迭代开始日志 (log_type="iteration_start")
|
|
iteration_start_logs = [log for log in block_logs if log.log_type == "iteration_start"]
|
|
|
|
# 按iteration_index排序
|
|
iteration_start_logs.sort(key=lambda x: x.iteration_index if x.iteration_index is not None else 0)
|
|
|
|
# 为每次迭代构建结果
|
|
for iter_log in iteration_start_logs:
|
|
iteration_index = iter_log.iteration_index
|
|
iteration_log_id = iter_log.id
|
|
|
|
# 构建该次迭代的结果结构
|
|
iteration_item = {
|
|
"iterationIndex": iteration_index,
|
|
"success": True, # 默认成功,后续可根据子块状态更新
|
|
"iterationLogs": [_format_log(iter_log)], # 迭代开始日志
|
|
"children": []
|
|
}
|
|
|
|
# 获取该次迭代的所有子日志(parent_log_id = iteration_log_id)
|
|
child_logs = log_map_by_parent.get(iteration_log_id, [])
|
|
|
|
# 按块名称分组子日志
|
|
child_logs_by_block = {}
|
|
for child_log in child_logs:
|
|
if child_log.task_block_id not in child_logs_by_block:
|
|
child_logs_by_block[child_log.task_block_id] = []
|
|
child_logs_by_block[child_log.task_block_id].append(child_log)
|
|
|
|
# 递归构建子块
|
|
children_def = block_def.get('children', {})
|
|
if children_def:
|
|
for branch_name, children_blocks in children_def.items():
|
|
if isinstance(children_blocks, list):
|
|
for child_block in children_blocks:
|
|
child_block_name = child_block.get('name', '')
|
|
|
|
# 构建子块结果
|
|
child_result = await _build_iteration_child_result(
|
|
child_block,
|
|
iteration_index,
|
|
child_logs_by_block.get(child_block_name, []),
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
block_name_mapping,
|
|
level + 1
|
|
)
|
|
iteration_item["children"].append(child_result)
|
|
|
|
iteration_results.append(iteration_item)
|
|
|
|
return iteration_results
|
|
|
|
|
|
async def _build_iteration_child_result(
|
|
child_block_def: Dict[str, Any],
|
|
iteration_index: int,
|
|
child_logs: List[VWEDTaskLog],
|
|
block_map: Dict[str, VWEDBlockRecord],
|
|
log_map_by_block: Dict[str, List[VWEDTaskLog]],
|
|
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
|
|
block_name_mapping: Dict[str, str],
|
|
level: int
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
构建迭代子块的结果
|
|
|
|
Args:
|
|
child_block_def: 子块定义
|
|
iteration_index: 迭代索引
|
|
child_logs: 该次迭代中该子块的日志
|
|
block_map: 块记录映射
|
|
log_map_by_block: 块名称到日志的映射
|
|
log_map_by_parent: 父日志ID到子日志的映射
|
|
block_name_mapping: 块类型名称映射
|
|
level: 层级
|
|
|
|
Returns:
|
|
Dict: 子块结果
|
|
"""
|
|
from services.execution.handlers.model.block_name import ProgressBlockName
|
|
|
|
block_id = str(child_block_def.get('id', ''))
|
|
block_name = child_block_def.get('name', '')
|
|
block_type = child_block_def.get('blockType', '')
|
|
|
|
# 获取执行记录
|
|
execution_record = block_map.get(block_name)
|
|
|
|
# 构建结果结构
|
|
result = {
|
|
"blockId": block_id,
|
|
"blockName": block_name,
|
|
"blockType": block_type,
|
|
"blockTypeName": block_name_mapping.get(block_type, block_type),
|
|
"level": level,
|
|
"iterationIndex": iteration_index,
|
|
"executionStatus": None,
|
|
"executionRecord": None,
|
|
"logs": [],
|
|
"children": []
|
|
}
|
|
|
|
# 添加执行记录
|
|
if execution_record:
|
|
from data.enum.task_block_record_enum import TaskBlockRecordStatus
|
|
execution_info = {
|
|
"recordId": execution_record.id,
|
|
"status": execution_record.status,
|
|
"startedOn": execution_record.started_on.isoformat() if execution_record.started_on else None,
|
|
"endedOn": execution_record.ended_on.isoformat() if execution_record.ended_on else None,
|
|
"endedReason": execution_record.ended_reason,
|
|
"inputParamsValue": json.loads(execution_record.block_input_params_value) if execution_record.block_input_params_value else None,
|
|
"outputParamsValue": json.loads(execution_record.block_out_params_value) if execution_record.block_out_params_value else None,
|
|
"remark": execution_record.remark
|
|
}
|
|
result["executionRecord"] = execution_info
|
|
result["executionStatus"] = execution_record.status
|
|
|
|
# 添加该次迭代的日志
|
|
for log in child_logs:
|
|
result["logs"].append(_format_log(log))
|
|
|
|
# 如果子块也是迭代块,递归处理
|
|
is_iteration_block = block_type in [
|
|
ProgressBlockName.ITERATE_LIST,
|
|
ProgressBlockName.WHILE,
|
|
ProgressBlockName.REPEAT_NUM
|
|
]
|
|
|
|
if is_iteration_block:
|
|
# 嵌套迭代块
|
|
result["children"] = await _build_iteration_children(
|
|
child_block_def,
|
|
block_name,
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
block_name_mapping,
|
|
level
|
|
)
|
|
else:
|
|
# 普通子块,递归处理其子块
|
|
children_def = child_block_def.get('children', {})
|
|
if children_def:
|
|
for children_blocks in children_def.values():
|
|
if isinstance(children_blocks, list):
|
|
for nested_child in children_blocks:
|
|
nested_result = await _build_nested_results(
|
|
nested_child,
|
|
block_map,
|
|
log_map_by_block,
|
|
log_map_by_parent,
|
|
block_name_mapping,
|
|
level + 1
|
|
)
|
|
result["children"].append(nested_result)
|
|
|
|
return result
|
|
|
|
|
|
def _format_log(log: VWEDTaskLog) -> Dict[str, Any]:
|
|
"""
|
|
格式化日志对象
|
|
|
|
Args:
|
|
log: 日志对象
|
|
|
|
Returns:
|
|
Dict: 格式化后的日志字典
|
|
"""
|
|
try:
|
|
log_messages = json.loads(log.message) if log.message else {}
|
|
return {
|
|
"logId": log.id,
|
|
"level": log.level,
|
|
"createdAt": log.created_at.isoformat() if log.created_at else None,
|
|
"message": log_messages.get("message", ""),
|
|
"output": log_messages.get("output", ""),
|
|
"logType": log.log_type,
|
|
"iterationIndex": log.iteration_index
|
|
}
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"logId": log.id,
|
|
"level": log.level,
|
|
"createdAt": log.created_at.isoformat() if log.created_at else None,
|
|
"message": log.message or "",
|
|
"output": "",
|
|
"logType": log.log_type,
|
|
"iterationIndex": log.iteration_index
|
|
}
|
|
|
|
|
|
async def _build_simple_results(
|
|
blocks: List[VWEDBlockRecord],
|
|
logs: List[VWEDTaskLog],
|
|
block_name_mapping: Dict[str, str]
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
简化的结果格式(回退方案)
|
|
|
|
Args:
|
|
blocks: 块记录列表
|
|
logs: 日志列表
|
|
block_name_mapping: 块类型名称映射
|
|
|
|
Returns:
|
|
List[Dict]: 简化的结果列表
|
|
"""
|
|
results = []
|
|
for block in blocks:
|
|
block_info = {
|
|
"blockName": block.block_name,
|
|
"blockType": block.block_execute_name,
|
|
"blockTypeName": block_name_mapping.get(block.block_execute_name, block.block_execute_name),
|
|
"status": block.status,
|
|
"startedOn": block.started_on.isoformat() if block.started_on else None,
|
|
"endedOn": block.ended_on.isoformat() if block.ended_on else None,
|
|
"endedReason": block.ended_reason
|
|
}
|
|
results.append(block_info)
|
|
return results
|