VWED_server/services/task_record_service.py
2025-05-12 15:43:21 +08:00

395 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务运行记录服务模块
提供任务运行记录相关的服务方法
"""
import json
from typing import Dict, List, Any, Optional
from sqlalchemy import select, and_
import datetime
from data.models.blockrecord import VWEDBlockRecord
from data.models.taskrecord import VWEDTaskRecord
from data.models.tasklog import VWEDTaskLog
from data.session import get_async_session
from utils.logger import get_logger
from data.enum.task_block_record_enum import TaskBlockRecordStatus
# 设置日志
logger = get_logger("service.task_record_service")
class TaskRecordService:
"""
任务运行记录服务类
提供与任务运行记录相关的方法
"""
@staticmethod
async def get_task_blocks(task_record_id: str) -> Dict[str, Any]:
"""
获取指定任务记录下的所有块运行情况
Args:
task_record_id: 任务记录ID
Returns:
Dict: 包含块运行情况的字典
"""
try:
async with get_async_session() as session:
# 构建查询语句
query = select(VWEDBlockRecord).where(
VWEDBlockRecord.task_record_id == task_record_id
).order_by(VWEDBlockRecord.started_on)
# 执行查询
result = await session.execute(query)
blocks = result.scalars().all()
if not blocks:
return {
"success": True,
"message": f"未找到任务记录 {task_record_id} 的块运行情况",
"data": []
}
# 转换为字典列表
block_list = []
for block in blocks:
block_dict = {
"id": block.id,
"block_name": block.block_name,
"block_id": block.block_id,
"status": block.status,
"started_on": block.started_on.isoformat() if block.started_on else None,
"ended_on": block.ended_on.isoformat() if block.ended_on else None,
"ended_reason": block.ended_reason,
"block_execute_name": block.block_execute_name,
"block_input_params_value": json.loads(block.block_input_params_value) if block.block_input_params_value else None,
"block_out_params_value": json.loads(block.block_out_params_value) if block.block_out_params_value else None,
"remark": block.remark
}
block_list.append(block_dict)
return {
"success": True,
"message": f"成功获取任务记录 {task_record_id} 的块运行情况",
"data": block_list
}
except Exception as e:
logger.error(f"获取任务块运行情况失败: {str(e)}")
return {
"success": False,
"message": f"获取任务块运行情况失败: {str(e)}",
"data": []
}
@staticmethod
async def get_block_detail(block_record_id: str) -> Dict[str, Any]:
"""
获取指定块记录的详细信息
Args:
block_record_id: 块记录ID
Returns:
Dict: 包含块记录详细信息的字典
"""
try:
async with get_async_session() as session:
# 构建查询语句
query = select(VWEDBlockRecord).where(
VWEDBlockRecord.id == block_record_id
)
# 执行查询
result = await session.execute(query)
block = result.scalars().first()
if not block:
return {
"success": False,
"message": f"未找到ID为 {block_record_id} 的块记录",
"data": None
}
# 转换为字典
block_dict = {
"id": block.id,
"block_name": block.block_name,
"block_id": block.block_id,
"block_config_id": block.block_config_id,
"block_input_params": json.loads(block.block_input_params) if block.block_input_params else None,
"block_input_params_value": json.loads(block.block_input_params_value) if block.block_input_params_value else None,
"block_out_params_value": json.loads(block.block_out_params_value) if block.block_out_params_value else None,
"block_internal_variables": json.loads(block.block_internal_variables) if block.block_internal_variables else None,
"block_execute_name": block.block_execute_name,
"task_id": block.task_id,
"task_record_id": block.task_record_id,
"started_on": block.started_on.isoformat() if block.started_on else None,
"ended_on": block.ended_on.isoformat() if block.ended_on else None,
"ended_reason": block.ended_reason,
"status": block.status,
"ctrl_status": block.ctrl_status,
"input_params": json.loads(block.input_params) if block.input_params else None,
"internal_variables": json.loads(block.internal_variables) if block.internal_variables else None,
"output_params": json.loads(block.output_params) if block.output_params else None,
"version": block.version,
"remark": block.remark
}
return {
"success": True,
"message": "成功获取块记录详情",
"data": block_dict
}
except Exception as e:
logger.error(f"获取块记录详情失败: {str(e)}")
return {
"success": False,
"message": f"获取块记录详情失败: {str(e)}",
"data": None
}
@staticmethod
async def stop_task_record(task_record_id: str) -> Dict[str, Any]:
"""
停止指定任务记录下的所有运行任务实例,同时禁用定时任务
Args:
task_record_id: 任务记录ID
Returns:
Dict: 包含停止结果的响应
"""
# 导入增强版调度器
from services.enhanced_scheduler import scheduler
from data.enum.task_record_enum import TaskStatus
from datetime import datetime
try:
async with get_async_session() as session:
# 查找所有正在运行的任务记录
running_tasks_query = await session.execute(
select(VWEDTaskRecord)
.where(
VWEDTaskRecord.id == task_record_id,
VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码
)
)
running_tasks = running_tasks_query.scalars().first()
if not running_tasks:
return {
"success": True,
"message": "任务记录中没有运行中的任务"
}
# 取消所有运行中的任务
cancel_result = await scheduler.cancel_task(task_record_id)
if cancel_result.get("success", False):
running_tasks.status = TaskStatus.CANCELED
running_tasks.ended_on = datetime.now()
running_tasks.ended_reason = "任务终止"
await session.commit()
return {
"success": True,
"message": "任务终止成功",
"data": {
"task_record_id": task_record_id,
"status": running_tasks.status,
"ended_on": running_tasks.ended_on,
"ended_reason": running_tasks.ended_reason
}
}
else:
return {
"success": False,
"message": "任务终止失败",
"data": cancel_result
}
except Exception as e:
logger.error(f"任务记录终止失败: {str(e)}")
return {
"success": False,
"message": f"任务记录终止失败: {str(e)}"
}
@staticmethod
async def get_block_results(task_record_id: str) -> Dict[str, Any]:
"""
获取指定任务记录的执行结果
Args:
task_record_id: 任务记录ID
Returns:
Dict: 包含执行结果的响应
"""
try:
async with get_async_session() as session:
# 构建查询语句
query = select(VWEDBlockRecord).where(
VWEDBlockRecord.task_record_id == task_record_id
)
result = await session.execute(query)
blocks = result.scalars().all()
block_results = []
# 使用集合记录已处理的block_name用于去重
processed_block_names = set()
for block in blocks:
# 如果block_name已经处理过则跳过
if block.status != TaskBlockRecordStatus.SUCCESS:
if block.block_name == "-1":
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.task_record_id, "status":block.status})
continue
else:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.ended_reason, "status":block.status})
else:
if block.block_name == "-1":
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.task_record_id, "status":block.status})
continue
task_logs_query = select(VWEDTaskLog).where(
VWEDTaskLog.task_record_id == task_record_id,
VWEDTaskLog.task_block_id == block.block_name
)
if block.block_name in processed_block_names:
continue
# 记录已处理的block_name
processed_block_names.add(block.block_name)
task_logs = await session.execute(task_logs_query)
# task_logs = task_logs.scalars().all()
task_logs = task_logs.scalars().all()
if task_logs and len(task_logs) == 1:
messages = json.loads(task_logs[0].message)
message = messages.get("message", "")
output = messages.get("output", "")
if output and str(output.get("message", "")):
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output.get("message", "")), "status":block.status})
elif output:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output), "status":block.status})
else:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message, "status":block.status})
elif task_logs and len(task_logs) > 1:
for task_log in task_logs:
messages = json.loads(task_log.message)
message = messages.get("message", "")
output = messages.get("output", "")
if output and str(output.get("message", "")):
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output.get("message", "")), "status":block.status})
elif output:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output), "status":block.status})
else:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+message, "status":block.status})
else:
block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.ended_reason, "status":block.status})
logger.warning(f"任务记录 {task_record_id} 的块 {block.block_name} 没有日志")
if not blocks:
return {
"success": True,
"message": f"未找到任务记录 {task_record_id} 的块运行情况",
"data": []
}
return {
"success": True,
"message": "成功获取任务记录执行结果",
"data": block_results
}
except Exception as e:
logger.error(f"获取任务记录执行结果失败: {str(e)}")
return {
"success": False,
"message": f"获取任务记录执行结果失败: {str(e)}"
}
@staticmethod
async def get_task_record_detail(task_record_id: str) -> Dict[str, Any]:
"""
获取指定任务记录的详细信息
Args:
task_record_id: 任务记录ID
Returns:
Dict: 包含任务记录详细信息的字典
"""
try:
async with get_async_session() as session:
# 构建查询语句
query = select(VWEDTaskRecord).where(
VWEDTaskRecord.id == task_record_id
)
# 执行查询
result = await session.execute(query)
task_record = result.scalars().first()
if not task_record:
return {
"success": False,
"message": f"未找到ID为 {task_record_id} 的任务记录",
"data": None
}
# 计算执行时长(如果任务已结束)
execution_time = None
if task_record.ended_on and task_record.first_executor_time:
time_diff = task_record.ended_on - task_record.first_executor_time
execution_time = int(time_diff.total_seconds() * 1000) # 转换为毫秒
elif task_record.executor_time:
execution_time = task_record.executor_time
# 转换为字典
task_dict = {
"id": task_record.id,
"task_id": task_record.def_id,
"task_name": task_record.def_label,
"task_version": task_record.def_version,
"status": task_record.status,
"input_params": json.loads(task_record.input_params) if task_record.input_params else None,
"started_on": task_record.first_executor_time.isoformat() if task_record.first_executor_time else None,
"ended_on": task_record.ended_on.isoformat() if task_record.ended_on else None,
"ended_reason": task_record.ended_reason,
"execution_time": execution_time,
"created_at": task_record.created_at.isoformat() if task_record.created_at else None,
"updated_at": task_record.updated_at.isoformat() if task_record.updated_at else None,
"agv_id": task_record.agv_id,
"parent_task_record_id": task_record.parent_task_record_id,
"root_task_record_id": task_record.root_task_record_id,
"state_description": task_record.state_description,
"if_have_child_task": bool(task_record.if_have_child_task) if task_record.if_have_child_task is not None else None,
"periodic_task": task_record.periodic_task,
"priority": task_record.priority,
"work_stations": task_record.work_stations,
"work_types": task_record.work_types,
"variables": json.loads(task_record.variables) if task_record.variables else None,
"source_type": task_record.source_type,
"source_system": task_record.source_system,
"source_user": task_record.source_user,
"source_device": task_record.source_device,
"source_ip": task_record.source_ip,
"source_time": task_record.source_time.isoformat() if task_record.source_time else None,
"source_client_info": task_record.source_client_info,
"source_remarks": task_record.source_remarks
}
return {
"success": True,
"message": "成功获取任务记录详情",
"data": task_dict
}
except Exception as e:
logger.error(f"获取任务记录详情失败: {str(e)}")
return {
"success": False,
"message": f"获取任务记录详情失败: {str(e)}",
"data": None
}