472 lines
17 KiB
Python
472 lines
17 KiB
Python
# api/task_api.py
|
||
from typing import Dict, Any, List, Optional
|
||
from fastapi import APIRouter, HTTPException, Depends, Query
|
||
from pydantic import BaseModel, Field
|
||
from services.task_service import TaskService
|
||
from services.task_instance_service import TaskInstanceService
|
||
from core.exceptions import TianfengTaskError
|
||
from config.task_config import (
|
||
get_all_task_types,
|
||
get_task_type_name,
|
||
get_task_type_description,
|
||
TaskTypeConfig,
|
||
TaskStatusConfig,
|
||
DefaultConfig,
|
||
TASK_TYPE_CONFIG,
|
||
TASK_TYPE_NORMAL,
|
||
TASK_TYPE_SCHEDULED,
|
||
DEFAULT_TASK_DESCRIPTION,
|
||
DEFAULT_TEMPLATE_DESCRIPTION,
|
||
TaskType,
|
||
TaskStatus,
|
||
TaskInputParamConfig,
|
||
SystemParamKey
|
||
)
|
||
from api.models import (
|
||
ApiResponse, TaskInput, TaskBatchInput, TaskIdList,
|
||
TaskTypeInfo, SortField, SortOrder, TaskUpdateInput, TaskEditInput
|
||
)
|
||
# from data import get_session
|
||
|
||
# 创建路由器
|
||
router = APIRouter(tags=["任务管理"])
|
||
|
||
# 创建服务实例
|
||
task_service = TaskService()
|
||
task_instance_service = TaskInstanceService()
|
||
|
||
@router.get("/tasks", response_model=ApiResponse)
|
||
async def get_tasks(
|
||
status: Optional[TaskStatus] = Query(None, description="任务状态"),
|
||
task_type: Optional[TaskType] = Query(None, description="任务类型"),
|
||
name: Optional[str] = Query(None, description="任务名称(模糊查询)"),
|
||
is_scheduled: Optional[bool] = Query(None, description="是否为定时任务"),
|
||
created_start: Optional[int] = Query(None, description="创建时间起始(毫秒时间戳)"),
|
||
created_end: Optional[int] = Query(None, description="创建时间结束(毫秒时间戳)"),
|
||
sort_by: SortField = Query(default=SortField.CREATED_AT, description="排序字段"),
|
||
sort_order: SortOrder = Query(default=SortOrder.DESC, description="排序方式"),
|
||
page: int = Query(1, ge=1, description="页码"),
|
||
page_size: int = Query(10, ge=1, le=100, description="每页数量")
|
||
):
|
||
"""
|
||
获取任务列表
|
||
|
||
支持多种筛选条件、排序和分页
|
||
"""
|
||
try:
|
||
# 获取任务列表
|
||
tasks, total = task_service.get_all_tasks(
|
||
status=status,
|
||
task_type=task_type,
|
||
name=name,
|
||
is_scheduled=is_scheduled,
|
||
created_start=created_start,
|
||
created_end=created_end,
|
||
sort_by=sort_by,
|
||
sort_order=sort_order,
|
||
page=page,
|
||
page_size=page_size
|
||
)
|
||
|
||
# 构建分页信息
|
||
pagination = {
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total": total,
|
||
"total_pages": (total + page_size - 1) // page_size
|
||
}
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "获取任务列表成功",
|
||
"data": {
|
||
"tasks": tasks,
|
||
"pagination": pagination
|
||
}
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
|
||
|
||
|
||
@router.get("/task/types", response_model=ApiResponse)
|
||
async def get_task_types():
|
||
"""获取任务类型列表"""
|
||
try:
|
||
# 从配置文件中获取任务类型列表
|
||
task_types = get_all_task_types()
|
||
# 添加value字段(枚举值)
|
||
for task_type in task_types:
|
||
# 从TaskType枚举中获取value
|
||
try:
|
||
# 尝试通过名称获取枚举值
|
||
task_type["value"] = TaskType[task_type["key"]].value
|
||
except (KeyError, AttributeError):
|
||
# 如果枚举中没有对应的值,则使用key的小写作为value
|
||
task_type["value"] = task_type["key"]
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "获取任务类型列表成功",
|
||
"data": task_types
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取任务类型列表失败: {str(e)}")
|
||
|
||
@router.post("/task/create", response_model=ApiResponse)
|
||
async def create_task(task_input: TaskInput):
|
||
"""创建任务"""
|
||
try:
|
||
# 获取任务类型值(处理枚举)
|
||
task_type = task_input.task_type
|
||
if hasattr(task_type, "value"):
|
||
task_type_value = task_type.value
|
||
else:
|
||
task_type_value = task_type
|
||
|
||
# 从配置文件中获取任务类型信息
|
||
task_type_info = TaskTypeConfig.DETAILS.get(task_type_value, {})
|
||
|
||
# 根据任务类型决定是否为定时任务
|
||
is_scheduled = task_type_value == TASK_TYPE_SCHEDULED
|
||
|
||
# 创建任务,设置固定参数
|
||
task = task_service.create_task(
|
||
name=task_input.name,
|
||
task_type=task_type_value, # 使用key作为task_type
|
||
description=DEFAULT_TASK_DESCRIPTION, # 使用配置文件中的默认备注
|
||
template_desc=DEFAULT_TEMPLATE_DESCRIPTION, # 使用配置文件中的默认模板描述
|
||
is_scheduled=is_scheduled # 根据任务类型决定
|
||
)
|
||
|
||
# 在返回结果中添加任务类型的中文名称
|
||
if "task_type_name" not in task and task_type_info:
|
||
task["task_type_name"] = task_type_info.get("name", task_type_value)
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "创建任务成功",
|
||
"data": task
|
||
}
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
|
||
|
||
|
||
@router.delete("/task/{task_id}", response_model=ApiResponse)
|
||
async def delete_task(task_id: str):
|
||
"""删除任务"""
|
||
try:
|
||
# 删除任务
|
||
success = task_service.delete_task(task_id)
|
||
|
||
# 如果任务不存在,返回成功但提示任务不存在
|
||
if not success:
|
||
return {
|
||
"code": 200,
|
||
"message": f"任务不存在或已被删除: {task_id}",
|
||
"data": None
|
||
}
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "删除任务成功",
|
||
"data": None
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"删除任务失败: {str(e)}")
|
||
|
||
|
||
# @router.post("/task/{task_id}/execute", response_model=ApiResponse)
|
||
# async def execute_task(task_id: str):
|
||
# """执行任务"""
|
||
# try:
|
||
# # 执行任务
|
||
# task = task_service.execute_task(task_id)
|
||
|
||
# return {
|
||
# "code": 200,
|
||
# "message": "执行任务成功",
|
||
# "data": task
|
||
# }
|
||
# except ValueError as e:
|
||
# # 任务不存在
|
||
# return {
|
||
# "code": 404,
|
||
# "message": str(e),
|
||
# "data": None
|
||
# }
|
||
# except TianfengTaskError as e:
|
||
# # 业务逻辑错误
|
||
# return {
|
||
# "code": 400,
|
||
# "message": str(e),
|
||
# "data": None
|
||
# }
|
||
# except Exception as e:
|
||
# raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
|
||
|
||
# @router.post("/task/{task_id}/cancel", response_model=ApiResponse)
|
||
# async def cancel_task(task_id: str):
|
||
# """取消任务"""
|
||
# try:
|
||
# # 取消任务
|
||
# task = task_service.cancel_task(task_id)
|
||
|
||
# return {
|
||
# "code": 200,
|
||
# "message": "取消任务成功",
|
||
# "data": task
|
||
# }
|
||
# except ValueError as e:
|
||
# # 任务不存在或状态不允许取消
|
||
# return {
|
||
# "code": 400,
|
||
# "message": str(e),
|
||
# "data": None
|
||
# }
|
||
# except Exception as e:
|
||
# raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")
|
||
|
||
|
||
|
||
@router.delete("/task/batch", response_model=ApiResponse)
|
||
async def batch_delete_tasks(id_list: TaskIdList):
|
||
"""批量删除任务"""
|
||
try:
|
||
# 批量删除任务
|
||
deleted_count = 0
|
||
not_found_ids = []
|
||
|
||
for task_id in id_list.task_ids:
|
||
if task_service.delete_task(task_id):
|
||
deleted_count += 1
|
||
else:
|
||
not_found_ids.append(task_id)
|
||
|
||
# 构建消息
|
||
message = f"批量删除任务成功,共删除 {deleted_count} 个任务"
|
||
if not_found_ids:
|
||
message += f",有 {len(not_found_ids)} 个任务不存在或已被删除"
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": message,
|
||
"data": {
|
||
"deleted_count": deleted_count,
|
||
"total_count": len(id_list.task_ids),
|
||
"not_found_ids": not_found_ids
|
||
}
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"批量删除任务失败: {str(e)}")
|
||
|
||
|
||
@router.put("/task/{task_id}", response_model=ApiResponse)
|
||
async def update_task(task_id: str, task_update: TaskUpdateInput):
|
||
"""更新任务基本信息"""
|
||
try:
|
||
# 检查任务是否存在
|
||
task = task_service.get_task_by_id(task_id)
|
||
if not task:
|
||
return {
|
||
"code": 404,
|
||
"message": f"任务 {task_id} 不存在",
|
||
"data": None
|
||
}
|
||
|
||
# 更新任务信息
|
||
updated_task = task_service.update_task(
|
||
task_id=task_id,
|
||
name=task_update.name,
|
||
description=task_update.description,
|
||
task_type=task_update.task_type,
|
||
blocks=task_update.blocks,
|
||
variables=task_update.variables,
|
||
schedule=task_update.schedule
|
||
)
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "更新任务成功",
|
||
"data": {"task": updated_task}
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"更新任务失败: {str(e)}")
|
||
|
||
@router.get("/task/{task_id}/edit", response_model=ApiResponse)
|
||
async def get_task_edit_info(task_id: str):
|
||
"""获取任务编辑信息"""
|
||
try:
|
||
# 获取任务信息
|
||
task = task_service.get_task_by_id(task_id)
|
||
if not task:
|
||
return {
|
||
"code": 404,
|
||
"message": f"任务 {task_id} 不存在",
|
||
"data": None
|
||
}
|
||
|
||
# 获取或创建编辑中的任务实例
|
||
instance = task_instance_service.get_or_create_editing_instance(task_id)
|
||
|
||
# 获取任务输入参数
|
||
from services.task_param_service import TaskParamService
|
||
task_param_service = TaskParamService()
|
||
task_input_params, _ = task_param_service.get_task_input_params(task_id, instance["instance_id"])
|
||
|
||
# 将任务输入参数添加到实例数据中
|
||
instance["task_input_params"] = task_input_params
|
||
|
||
# 获取可用的子任务列表(排除当前任务自身)
|
||
available_subtasks = []
|
||
|
||
try:
|
||
all_tasks, _ = task_service.get_all_tasks(page=1, page_size=1000) # 获取所有任务
|
||
for t in all_tasks:
|
||
if t["task_id"] != task_id: # 排除当前任务
|
||
subtask_info = {
|
||
"task_id": t["task_id"],
|
||
"name": t["name"]
|
||
}
|
||
available_subtasks.append(subtask_info)
|
||
except Exception as e:
|
||
# 如果获取任务列表失败,记录错误但继续执行
|
||
print(f"获取可用子任务列表失败: {str(e)}")
|
||
|
||
# 获取组件详细信息
|
||
from config.component_config import ComponentDetailConfig, SubtaskComponentConfig
|
||
component_details = ComponentDetailConfig.get_all_components()
|
||
|
||
# 获取组件类型中文名称映射
|
||
component_type_names = ComponentDetailConfig.get_component_type_names()
|
||
|
||
# 按组件类型分组并添加中文名称
|
||
component_types = {}
|
||
for component in component_details:
|
||
component_type = component["type"]
|
||
|
||
# 如果该类型还未添加到结果中,先创建类型信息
|
||
if component_type not in component_types:
|
||
# 获取组件类型的中文名称,如果没有则使用英文标识
|
||
type_name = component_type_names.get(component_type, component_type)
|
||
component_types[component_type] = {
|
||
"type": component_type, # 英文标识
|
||
"name": type_name, # 中文名称
|
||
"components": [] # 该类型下的组件列表
|
||
}
|
||
|
||
# 添加组件到对应类型下
|
||
component_types[component_type]["components"].append(component)
|
||
|
||
# 特殊处理subtask类型,将每个可用子任务构建为独立组件
|
||
if "subtask" in component_types and available_subtasks:
|
||
# 使用配置中的方法生成子任务组件
|
||
subtask_components = SubtaskComponentConfig.generate_subtask_components(available_subtasks)
|
||
|
||
# 替换原有的组件列表
|
||
component_types["subtask"]["components"] = subtask_components
|
||
|
||
# 获取常用参数数据
|
||
from services.common_params_service import CommonParamsService
|
||
from config.component_config import CommonParamsConfig
|
||
|
||
common_params_service = CommonParamsService()
|
||
param_types = CommonParamsConfig.get_param_types()
|
||
|
||
# 获取所有常用参数数据
|
||
common_params = {}
|
||
for param_type_config in param_types:
|
||
param_type = param_type_config["type"]
|
||
param_data = common_params_service.get_param_data(param_type)
|
||
common_params[param_type] = {
|
||
"type_info": param_type_config,
|
||
"data": param_data
|
||
}
|
||
|
||
# 获取块输出参数和上下文参数
|
||
block_output_params = instance.get("block_outputs", {})
|
||
context_params = instance.get("context_params", {})
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "获取任务编辑信息成功",
|
||
"data": {
|
||
"task": task,
|
||
"instance": instance,
|
||
"component_types": component_types,
|
||
"common_params": common_params,
|
||
"task_input_params": instance["task_input_params"], # 使用实例中的任务输入参数
|
||
"block_output_params": block_output_params,
|
||
"context_params": context_params
|
||
}
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取任务编辑信息失败: {str(e)}")
|
||
|
||
@router.get("/tasks/available-subtasks", response_model=ApiResponse)
|
||
async def get_available_subtasks(current_task_id: Optional[str] = None):
|
||
"""获取可用的子任务列表"""
|
||
try:
|
||
# 获取所有任务
|
||
all_tasks, _ = task_service.get_all_tasks(page=1, page_size=1000)
|
||
|
||
# 过滤出可用的子任务(如果提供了当前任务ID,则排除当前任务)
|
||
available_subtasks = []
|
||
for task in all_tasks:
|
||
if not current_task_id or task["task_id"] != current_task_id:
|
||
available_subtasks.append({
|
||
"task_id": task["task_id"],
|
||
"name": task["name"]
|
||
})
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "获取可用子任务列表成功",
|
||
"data": {
|
||
"subtasks": available_subtasks
|
||
}
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取可用子任务列表失败: {str(e)}")
|
||
|
||
# @router.post("/task/{task_id}/publish", response_model=ApiResponse)
|
||
# async def publish_task(task_id: str):
|
||
# """发布任务(将编辑中的任务实例状态更改为已发布)"""
|
||
# try:
|
||
# # 检查任务是否存在
|
||
# task = task_service.get_task_by_id(task_id)
|
||
# if not task:
|
||
# return {
|
||
# "code": 404,
|
||
# "message": f"任务 {task_id} 不存在",
|
||
# "data": None
|
||
# }
|
||
|
||
# # 获取编辑中的任务实例
|
||
# editing_instance = task_instance_service.get_editing_instance_by_task_id(task_id)
|
||
# if not editing_instance:
|
||
# return {
|
||
# "code": 404,
|
||
# "message": f"任务 {task_id} 没有编辑中的实例",
|
||
# "data": None
|
||
# }
|
||
|
||
# # 发布任务实例
|
||
# published_instance = task_instance_service.publish_instance(editing_instance["instance_id"])
|
||
|
||
# # 构建响应数据
|
||
# response_data = {
|
||
# "task": task,
|
||
# "instance": published_instance
|
||
# }
|
||
|
||
# return {
|
||
# "code": 200,
|
||
# "message": "发布任务成功",
|
||
# "data": response_data
|
||
# }
|
||
# except Exception as e:
|
||
# raise HTTPException(status_code=500, detail=f"发布任务失败: {str(e)}")
|
||
|