VWED_server/services/task_record_service_new.py

486 lines
17 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务运行记录服务模块 - 新版get_block_results实现
基于层级关系字段重构
"""
import json
from typing import Dict, List, Any
from sqlalchemy import select
from data.models.blockrecord import VWEDBlockRecord
from data.models.taskrecord import VWEDTaskRecord
from data.models.tasklog import VWEDTaskLog
from data.session import get_async_session
from utils.logger import get_logger
logger = get_logger("service.task_record_service_new")
async def get_block_results_v2(task_record_id: str) -> Dict[str, Any]:
"""
获取指定任务记录的执行结果(新版本)
基于层级关系字段组织嵌套结构
Args:
task_record_id: 任务记录ID
Returns:
Dict: 包含嵌套结构执行结果的响应
"""
from services.execution.handlers.model.block_name import BLOCK_NAME_STR, ProgressBlockName
try:
async with get_async_session() as session:
# 获取任务记录信息
task_query = select(VWEDTaskRecord).where(
VWEDTaskRecord.id == task_record_id
)
task_result = await session.execute(task_query)
task_record = task_result.scalars().first()
if not task_record:
return {
"success": False,
"message": f"未找到任务记录 {task_record_id}",
"data": None
}
# 获取所有块执行记录
blocks_query = select(VWEDBlockRecord).where(
VWEDBlockRecord.task_record_id == task_record_id
).order_by(VWEDBlockRecord.started_on)
blocks_result = await session.execute(blocks_query)
blocks = blocks_result.scalars().all()
# 获取所有任务日志
logs_query = select(VWEDTaskLog).where(
VWEDTaskLog.task_record_id == task_record_id
).order_by(VWEDTaskLog.created_at)
logs_result = await session.execute(logs_query)
logs = logs_result.scalars().all()
# 构建块记录映射
block_map = {block.block_name: block for block in blocks}
# 构建日志映射 (按parent_log_id和iteration_index组织)
log_map_by_block = {} # {block_name: [logs]}
log_map_by_parent = {} # {parent_log_id: [logs]}
for log in logs:
# 按块名称分组
if log.task_block_id not in log_map_by_block:
log_map_by_block[log.task_block_id] = []
log_map_by_block[log.task_block_id].append(log)
# 按父日志ID分组
if log.parent_log_id:
if log.parent_log_id not in log_map_by_parent:
log_map_by_parent[log.parent_log_id] = []
log_map_by_parent[log.parent_log_id].append(log)
# 解析任务定义详情
task_def_detail = None
if task_record.task_def_detail:
try:
task_def_detail = json.loads(task_record.task_def_detail)
except json.JSONDecodeError:
logger.warning(f"任务记录 {task_record_id} 的task_def_detail解析失败")
if not task_def_detail or 'rootBlock' not in task_def_detail:
# 回退到简单格式
return {
"success": True,
"message": "任务定义格式不完整,使用简化输出",
"data": await _build_simple_results(blocks, logs, BLOCK_NAME_STR)
}
# 构建嵌套结构
nested_results = await _build_nested_results(
task_def_detail['rootBlock'],
block_map,
log_map_by_block,
log_map_by_parent,
BLOCK_NAME_STR,
level=0
)
return {
"success": True,
"message": "成功获取任务记录嵌套执行结果",
"data": nested_results
}
except Exception as e:
logger.error(f"获取任务记录执行结果失败: {str(e)}")
import traceback
traceback.print_exc()
return {
"success": False,
"message": f"获取任务记录执行结果失败: {str(e)}"
}
async def _build_nested_results(
block_def: Dict[str, Any],
block_map: Dict[str, VWEDBlockRecord],
log_map_by_block: Dict[str, List[VWEDTaskLog]],
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
block_name_mapping: Dict[str, str],
level: int = 0
) -> Dict[str, Any]:
"""
递归构建嵌套的块执行结果
Args:
block_def: 任务块定义
block_map: 块名称到执行记录的映射
log_map_by_block: 块名称到日志记录的映射
log_map_by_parent: 父日志ID到子日志的映射
block_name_mapping: 块类型名称映射
level: 嵌套层级
Returns:
Dict: 嵌套的块执行结果
"""
from services.execution.handlers.model.block_name import ProgressBlockName
block_id = str(block_def.get('id', ''))
block_name = block_def.get('name', '')
block_type = block_def.get('blockType', '')
# 获取执行记录
execution_record = block_map.get(block_name)
# 构建基础结果结构
result = {
"blockId": block_id,
"blockName": block_name,
"blockType": block_type,
"blockTypeName": block_name_mapping.get(block_type, block_type),
"level": level,
"executionStatus": None,
"executionRecord": None,
"logs": [],
"children": []
}
# 如果有执行记录,添加执行信息
if execution_record:
from data.enum.task_block_record_enum import TaskBlockRecordStatus
execution_info = {
"recordId": execution_record.id,
"status": execution_record.status,
"startedOn": execution_record.started_on.isoformat() if execution_record.started_on else None,
"endedOn": execution_record.ended_on.isoformat() if execution_record.ended_on else None,
"endedReason": execution_record.ended_reason,
"inputParamsValue": json.loads(execution_record.block_input_params_value) if execution_record.block_input_params_value else None,
"outputParamsValue": json.loads(execution_record.block_out_params_value) if execution_record.block_out_params_value else None,
"remark": execution_record.remark
}
result["executionRecord"] = execution_info
result["executionStatus"] = execution_record.status
else:
from data.enum.task_block_record_enum import TaskBlockRecordStatus
result["executionStatus"] = TaskBlockRecordStatus.NOT_EXECUTED
# 检查是否是迭代类型的块
is_iteration_block = block_type in [
ProgressBlockName.ITERATE_LIST,
ProgressBlockName.WHILE,
ProgressBlockName.REPEAT_NUM
]
if is_iteration_block:
# 迭代块: 按迭代次数组织子块
result["children"] = await _build_iteration_children(
block_def,
block_name,
block_map,
log_map_by_block,
log_map_by_parent,
block_name_mapping,
level
)
else:
# 普通块: 直接递归处理子块
# 获取该块的非迭代日志
block_logs = log_map_by_block.get(block_name, [])
for log in block_logs:
# 只添加没有parent_log_id的日志(顶层日志)
if not log.parent_log_id:
result["logs"].append(_format_log(log))
# 递归处理子块
children_def = block_def.get('children', {})
if children_def:
for children_blocks in children_def.values():
if isinstance(children_blocks, list):
for child_block in children_blocks:
child_result = await _build_nested_results(
child_block,
block_map,
log_map_by_block,
log_map_by_parent,
block_name_mapping,
level + 1
)
result["children"].append(child_result)
return result
async def _build_iteration_children(
block_def: Dict[str, Any],
block_name: str,
block_map: Dict[str, VWEDBlockRecord],
log_map_by_block: Dict[str, List[VWEDTaskLog]],
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
block_name_mapping: Dict[str, str],
level: int
) -> List[Dict[str, Any]]:
"""
构建迭代块的子块结果(按迭代次数组织)
Args:
block_def: 迭代块定义
block_name: 块名称
block_map: 块记录映射
log_map_by_block: 块名称到日志的映射
log_map_by_parent: 父日志ID到子日志的映射
block_name_mapping: 块类型名称映射
level: 当前层级
Returns:
List[Dict]: 按迭代次数组织的结果列表
"""
iteration_results = []
# 获取该块的所有日志
block_logs = log_map_by_block.get(block_name, [])
# 找出所有迭代开始日志 (log_type="iteration_start")
iteration_start_logs = [log for log in block_logs if log.log_type == "iteration_start"]
# 按iteration_index排序
iteration_start_logs.sort(key=lambda x: x.iteration_index if x.iteration_index is not None else 0)
# 为每次迭代构建结果
for iter_log in iteration_start_logs:
iteration_index = iter_log.iteration_index
iteration_log_id = iter_log.id
# 构建该次迭代的结果结构
iteration_item = {
"iterationIndex": iteration_index,
"success": True, # 默认成功,后续可根据子块状态更新
"iterationLogs": [_format_log(iter_log)], # 迭代开始日志
"children": []
}
# 获取该次迭代的所有子日志(parent_log_id = iteration_log_id)
child_logs = log_map_by_parent.get(iteration_log_id, [])
# 按块名称分组子日志
child_logs_by_block = {}
for child_log in child_logs:
if child_log.task_block_id not in child_logs_by_block:
child_logs_by_block[child_log.task_block_id] = []
child_logs_by_block[child_log.task_block_id].append(child_log)
# 递归构建子块
children_def = block_def.get('children', {})
if children_def:
for branch_name, children_blocks in children_def.items():
if isinstance(children_blocks, list):
for child_block in children_blocks:
child_block_name = child_block.get('name', '')
# 构建子块结果
child_result = await _build_iteration_child_result(
child_block,
iteration_index,
child_logs_by_block.get(child_block_name, []),
block_map,
log_map_by_block,
log_map_by_parent,
block_name_mapping,
level + 1
)
iteration_item["children"].append(child_result)
iteration_results.append(iteration_item)
return iteration_results
async def _build_iteration_child_result(
child_block_def: Dict[str, Any],
iteration_index: int,
child_logs: List[VWEDTaskLog],
block_map: Dict[str, VWEDBlockRecord],
log_map_by_block: Dict[str, List[VWEDTaskLog]],
log_map_by_parent: Dict[str, List[VWEDTaskLog]],
block_name_mapping: Dict[str, str],
level: int
) -> Dict[str, Any]:
"""
构建迭代子块的结果
Args:
child_block_def: 子块定义
iteration_index: 迭代索引
child_logs: 该次迭代中该子块的日志
block_map: 块记录映射
log_map_by_block: 块名称到日志的映射
log_map_by_parent: 父日志ID到子日志的映射
block_name_mapping: 块类型名称映射
level: 层级
Returns:
Dict: 子块结果
"""
from services.execution.handlers.model.block_name import ProgressBlockName
block_id = str(child_block_def.get('id', ''))
block_name = child_block_def.get('name', '')
block_type = child_block_def.get('blockType', '')
# 获取执行记录
execution_record = block_map.get(block_name)
# 构建结果结构
result = {
"blockId": block_id,
"blockName": block_name,
"blockType": block_type,
"blockTypeName": block_name_mapping.get(block_type, block_type),
"level": level,
"iterationIndex": iteration_index,
"executionStatus": None,
"executionRecord": None,
"logs": [],
"children": []
}
# 添加执行记录
if execution_record:
from data.enum.task_block_record_enum import TaskBlockRecordStatus
execution_info = {
"recordId": execution_record.id,
"status": execution_record.status,
"startedOn": execution_record.started_on.isoformat() if execution_record.started_on else None,
"endedOn": execution_record.ended_on.isoformat() if execution_record.ended_on else None,
"endedReason": execution_record.ended_reason,
"inputParamsValue": json.loads(execution_record.block_input_params_value) if execution_record.block_input_params_value else None,
"outputParamsValue": json.loads(execution_record.block_out_params_value) if execution_record.block_out_params_value else None,
"remark": execution_record.remark
}
result["executionRecord"] = execution_info
result["executionStatus"] = execution_record.status
# 添加该次迭代的日志
for log in child_logs:
result["logs"].append(_format_log(log))
# 如果子块也是迭代块,递归处理
is_iteration_block = block_type in [
ProgressBlockName.ITERATE_LIST,
ProgressBlockName.WHILE,
ProgressBlockName.REPEAT_NUM
]
if is_iteration_block:
# 嵌套迭代块
result["children"] = await _build_iteration_children(
child_block_def,
block_name,
block_map,
log_map_by_block,
log_map_by_parent,
block_name_mapping,
level
)
else:
# 普通子块,递归处理其子块
children_def = child_block_def.get('children', {})
if children_def:
for children_blocks in children_def.values():
if isinstance(children_blocks, list):
for nested_child in children_blocks:
nested_result = await _build_nested_results(
nested_child,
block_map,
log_map_by_block,
log_map_by_parent,
block_name_mapping,
level + 1
)
result["children"].append(nested_result)
return result
def _format_log(log: VWEDTaskLog) -> Dict[str, Any]:
"""
格式化日志对象
Args:
log: 日志对象
Returns:
Dict: 格式化后的日志字典
"""
try:
log_messages = json.loads(log.message) if log.message else {}
return {
"logId": log.id,
"level": log.level,
"createdAt": log.created_at.isoformat() if log.created_at else None,
"message": log_messages.get("message", ""),
"output": log_messages.get("output", ""),
"logType": log.log_type,
"iterationIndex": log.iteration_index
}
except json.JSONDecodeError:
return {
"logId": log.id,
"level": log.level,
"createdAt": log.created_at.isoformat() if log.created_at else None,
"message": log.message or "",
"output": "",
"logType": log.log_type,
"iterationIndex": log.iteration_index
}
async def _build_simple_results(
blocks: List[VWEDBlockRecord],
logs: List[VWEDTaskLog],
block_name_mapping: Dict[str, str]
) -> List[Dict[str, Any]]:
"""
简化的结果格式(回退方案)
Args:
blocks: 块记录列表
logs: 日志列表
block_name_mapping: 块类型名称映射
Returns:
List[Dict]: 简化的结果列表
"""
results = []
for block in blocks:
block_info = {
"blockName": block.block_name,
"blockType": block.block_execute_name,
"blockTypeName": block_name_mapping.get(block.block_execute_name, block.block_execute_name),
"status": block.status,
"startedOn": block.started_on.isoformat() if block.started_on else None,
"endedOn": block.ended_on.isoformat() if block.ended_on else None,
"endedReason": block.ended_reason
}
results.append(block_info)
return results