tianfeng_task_modules/api/task_instance_api.py
2025-03-18 18:34:03 +08:00

51 lines
1.6 KiB
Python

"""
任务实例API模块
提供任务实例的增删改查接口
"""
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from services.task_instance_service import TaskInstanceService
from api.models import (
ApiResponse, TaskInstanceCreateInput, TaskInstanceUpdateInput,
TaskInputParamItem, TaskInputParamsInput, TaskInstanceStatus
)
# 创建路由器
router = APIRouter(prefix="/task-instances", tags=["任务实例"])
# 创建服务实例
task_instance_service = TaskInstanceService()
# API接口
@router.post("/create", response_model=ApiResponse)
async def create_task_instance(instance_input: TaskInstanceCreateInput):
"""创建任务实例"""
try:
# 创建任务实例
instance = task_instance_service.create_instance(
task_id=instance_input.task_id,
name=instance_input.name,
variables=instance_input.variables,
priority=instance_input.priority,
input_params=instance_input.input_params,
block_outputs=instance_input.block_outputs,
context_params=instance_input.context_params
)
return {
"code": 200,
"message": "创建任务实例成功",
"data": instance
}
except ValueError as e:
return {
"code": 400,
"message": str(e),
"data": None
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"创建任务实例失败: {str(e)}")