2025-03-18 18:34:03 +08:00

472 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# api/task_api.py
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from services.task_service import TaskService
from services.task_instance_service import TaskInstanceService
from core.exceptions import TianfengTaskError
from config.task_config import (
get_all_task_types,
get_task_type_name,
get_task_type_description,
TaskTypeConfig,
TaskStatusConfig,
DefaultConfig,
TASK_TYPE_CONFIG,
TASK_TYPE_NORMAL,
TASK_TYPE_SCHEDULED,
DEFAULT_TASK_DESCRIPTION,
DEFAULT_TEMPLATE_DESCRIPTION,
TaskType,
TaskStatus,
TaskInputParamConfig,
SystemParamKey
)
from api.models import (
ApiResponse, TaskInput, TaskBatchInput, TaskIdList,
TaskTypeInfo, SortField, SortOrder, TaskUpdateInput, TaskEditInput
)
# from data import get_session
# 创建路由器
router = APIRouter(tags=["任务管理"])
# 创建服务实例
task_service = TaskService()
task_instance_service = TaskInstanceService()
@router.get("/tasks", response_model=ApiResponse)
async def get_tasks(
status: Optional[TaskStatus] = Query(None, description="任务状态"),
task_type: Optional[TaskType] = Query(None, description="任务类型"),
name: Optional[str] = Query(None, description="任务名称(模糊查询)"),
is_scheduled: Optional[bool] = Query(None, description="是否为定时任务"),
created_start: Optional[int] = Query(None, description="创建时间起始(毫秒时间戳)"),
created_end: Optional[int] = Query(None, description="创建时间结束(毫秒时间戳)"),
sort_by: SortField = Query(default=SortField.CREATED_AT, description="排序字段"),
sort_order: SortOrder = Query(default=SortOrder.DESC, description="排序方式"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=100, description="每页数量")
):
"""
获取任务列表
支持多种筛选条件、排序和分页
"""
try:
# 获取任务列表
tasks, total = task_service.get_all_tasks(
status=status,
task_type=task_type,
name=name,
is_scheduled=is_scheduled,
created_start=created_start,
created_end=created_end,
sort_by=sort_by,
sort_order=sort_order,
page=page,
page_size=page_size
)
# 构建分页信息
pagination = {
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size
}
return {
"code": 200,
"message": "获取任务列表成功",
"data": {
"tasks": tasks,
"pagination": pagination
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
@router.get("/task/types", response_model=ApiResponse)
async def get_task_types():
"""获取任务类型列表"""
try:
# 从配置文件中获取任务类型列表
task_types = get_all_task_types()
# 添加value字段枚举值
for task_type in task_types:
# 从TaskType枚举中获取value
try:
# 尝试通过名称获取枚举值
task_type["value"] = TaskType[task_type["key"]].value
except (KeyError, AttributeError):
# 如果枚举中没有对应的值则使用key的小写作为value
task_type["value"] = task_type["key"]
return {
"code": 200,
"message": "获取任务类型列表成功",
"data": task_types
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取任务类型列表失败: {str(e)}")
@router.post("/task/create", response_model=ApiResponse)
async def create_task(task_input: TaskInput):
"""创建任务"""
try:
# 获取任务类型值(处理枚举)
task_type = task_input.task_type
if hasattr(task_type, "value"):
task_type_value = task_type.value
else:
task_type_value = task_type
# 从配置文件中获取任务类型信息
task_type_info = TaskTypeConfig.DETAILS.get(task_type_value, {})
# 根据任务类型决定是否为定时任务
is_scheduled = task_type_value == TASK_TYPE_SCHEDULED
# 创建任务,设置固定参数
task = task_service.create_task(
name=task_input.name,
task_type=task_type_value, # 使用key作为task_type
description=DEFAULT_TASK_DESCRIPTION, # 使用配置文件中的默认备注
template_desc=DEFAULT_TEMPLATE_DESCRIPTION, # 使用配置文件中的默认模板描述
is_scheduled=is_scheduled # 根据任务类型决定
)
# 在返回结果中添加任务类型的中文名称
if "task_type_name" not in task and task_type_info:
task["task_type_name"] = task_type_info.get("name", task_type_value)
return {
"code": 200,
"message": "创建任务成功",
"data": task
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
@router.delete("/task/{task_id}", response_model=ApiResponse)
async def delete_task(task_id: str):
"""删除任务"""
try:
# 删除任务
success = task_service.delete_task(task_id)
# 如果任务不存在,返回成功但提示任务不存在
if not success:
return {
"code": 200,
"message": f"任务不存在或已被删除: {task_id}",
"data": None
}
return {
"code": 200,
"message": "删除任务成功",
"data": None
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"删除任务失败: {str(e)}")
# @router.post("/task/{task_id}/execute", response_model=ApiResponse)
# async def execute_task(task_id: str):
# """执行任务"""
# try:
# # 执行任务
# task = task_service.execute_task(task_id)
# return {
# "code": 200,
# "message": "执行任务成功",
# "data": task
# }
# except ValueError as e:
# # 任务不存在
# return {
# "code": 404,
# "message": str(e),
# "data": None
# }
# except TianfengTaskError as e:
# # 业务逻辑错误
# return {
# "code": 400,
# "message": str(e),
# "data": None
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
# @router.post("/task/{task_id}/cancel", response_model=ApiResponse)
# async def cancel_task(task_id: str):
# """取消任务"""
# try:
# # 取消任务
# task = task_service.cancel_task(task_id)
# return {
# "code": 200,
# "message": "取消任务成功",
# "data": task
# }
# except ValueError as e:
# # 任务不存在或状态不允许取消
# return {
# "code": 400,
# "message": str(e),
# "data": None
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")
@router.delete("/task/batch", response_model=ApiResponse)
async def batch_delete_tasks(id_list: TaskIdList):
"""批量删除任务"""
try:
# 批量删除任务
deleted_count = 0
not_found_ids = []
for task_id in id_list.task_ids:
if task_service.delete_task(task_id):
deleted_count += 1
else:
not_found_ids.append(task_id)
# 构建消息
message = f"批量删除任务成功,共删除 {deleted_count} 个任务"
if not_found_ids:
message += f",有 {len(not_found_ids)} 个任务不存在或已被删除"
return {
"code": 200,
"message": message,
"data": {
"deleted_count": deleted_count,
"total_count": len(id_list.task_ids),
"not_found_ids": not_found_ids
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"批量删除任务失败: {str(e)}")
@router.put("/task/{task_id}", response_model=ApiResponse)
async def update_task(task_id: str, task_update: TaskUpdateInput):
"""更新任务基本信息"""
try:
# 检查任务是否存在
task = task_service.get_task_by_id(task_id)
if not task:
return {
"code": 404,
"message": f"任务 {task_id} 不存在",
"data": None
}
# 更新任务信息
updated_task = task_service.update_task(
task_id=task_id,
name=task_update.name,
description=task_update.description,
task_type=task_update.task_type,
blocks=task_update.blocks,
variables=task_update.variables,
schedule=task_update.schedule
)
return {
"code": 200,
"message": "更新任务成功",
"data": {"task": updated_task}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"更新任务失败: {str(e)}")
@router.get("/task/{task_id}/edit", response_model=ApiResponse)
async def get_task_edit_info(task_id: str):
"""获取任务编辑信息"""
try:
# 获取任务信息
task = task_service.get_task_by_id(task_id)
if not task:
return {
"code": 404,
"message": f"任务 {task_id} 不存在",
"data": None
}
# 获取或创建编辑中的任务实例
instance = task_instance_service.get_or_create_editing_instance(task_id)
# 获取任务输入参数
from services.task_param_service import TaskParamService
task_param_service = TaskParamService()
task_input_params, _ = task_param_service.get_task_input_params(task_id, instance["instance_id"])
# 将任务输入参数添加到实例数据中
instance["task_input_params"] = task_input_params
# 获取可用的子任务列表(排除当前任务自身)
available_subtasks = []
try:
all_tasks, _ = task_service.get_all_tasks(page=1, page_size=1000) # 获取所有任务
for t in all_tasks:
if t["task_id"] != task_id: # 排除当前任务
subtask_info = {
"task_id": t["task_id"],
"name": t["name"]
}
available_subtasks.append(subtask_info)
except Exception as e:
# 如果获取任务列表失败,记录错误但继续执行
print(f"获取可用子任务列表失败: {str(e)}")
# 获取组件详细信息
from config.component_config import ComponentDetailConfig, SubtaskComponentConfig
component_details = ComponentDetailConfig.get_all_components()
# 获取组件类型中文名称映射
component_type_names = ComponentDetailConfig.get_component_type_names()
# 按组件类型分组并添加中文名称
component_types = {}
for component in component_details:
component_type = component["type"]
# 如果该类型还未添加到结果中,先创建类型信息
if component_type not in component_types:
# 获取组件类型的中文名称,如果没有则使用英文标识
type_name = component_type_names.get(component_type, component_type)
component_types[component_type] = {
"type": component_type, # 英文标识
"name": type_name, # 中文名称
"components": [] # 该类型下的组件列表
}
# 添加组件到对应类型下
component_types[component_type]["components"].append(component)
# 特殊处理subtask类型将每个可用子任务构建为独立组件
if "subtask" in component_types and available_subtasks:
# 使用配置中的方法生成子任务组件
subtask_components = SubtaskComponentConfig.generate_subtask_components(available_subtasks)
# 替换原有的组件列表
component_types["subtask"]["components"] = subtask_components
# 获取常用参数数据
from services.common_params_service import CommonParamsService
from config.component_config import CommonParamsConfig
common_params_service = CommonParamsService()
param_types = CommonParamsConfig.get_param_types()
# 获取所有常用参数数据
common_params = {}
for param_type_config in param_types:
param_type = param_type_config["type"]
param_data = common_params_service.get_param_data(param_type)
common_params[param_type] = {
"type_info": param_type_config,
"data": param_data
}
# 获取块输出参数和上下文参数
block_output_params = instance.get("block_outputs", {})
context_params = instance.get("context_params", {})
return {
"code": 200,
"message": "获取任务编辑信息成功",
"data": {
"task": task,
"instance": instance,
"component_types": component_types,
"common_params": common_params,
"task_input_params": instance["task_input_params"], # 使用实例中的任务输入参数
"block_output_params": block_output_params,
"context_params": context_params
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取任务编辑信息失败: {str(e)}")
@router.get("/tasks/available-subtasks", response_model=ApiResponse)
async def get_available_subtasks(current_task_id: Optional[str] = None):
"""获取可用的子任务列表"""
try:
# 获取所有任务
all_tasks, _ = task_service.get_all_tasks(page=1, page_size=1000)
# 过滤出可用的子任务如果提供了当前任务ID则排除当前任务
available_subtasks = []
for task in all_tasks:
if not current_task_id or task["task_id"] != current_task_id:
available_subtasks.append({
"task_id": task["task_id"],
"name": task["name"]
})
return {
"code": 200,
"message": "获取可用子任务列表成功",
"data": {
"subtasks": available_subtasks
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取可用子任务列表失败: {str(e)}")
# @router.post("/task/{task_id}/publish", response_model=ApiResponse)
# async def publish_task(task_id: str):
# """发布任务(将编辑中的任务实例状态更改为已发布)"""
# try:
# # 检查任务是否存在
# task = task_service.get_task_by_id(task_id)
# if not task:
# return {
# "code": 404,
# "message": f"任务 {task_id} 不存在",
# "data": None
# }
# # 获取编辑中的任务实例
# editing_instance = task_instance_service.get_editing_instance_by_task_id(task_id)
# if not editing_instance:
# return {
# "code": 404,
# "message": f"任务 {task_id} 没有编辑中的实例",
# "data": None
# }
# # 发布任务实例
# published_instance = task_instance_service.publish_instance(editing_instance["instance_id"])
# # 构建响应数据
# response_data = {
# "task": task,
# "instance": published_instance
# }
# return {
# "code": 200,
# "message": "发布任务成功",
# "data": response_data
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"发布任务失败: {str(e)}")