VWED_server/routes/script_api.py
2025-04-30 16:57:46 +08:00

429 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
在线脚本管理API路由
实现脚本的创建、查询、更新、删除和执行功能
"""
import json
import os
import logging
from typing import Optional, Dict, Any, List
from fastapi import APIRouter, Depends, Path, Query, HTTPException, File, UploadFile, Form, BackgroundTasks
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from pydantic import ValidationError
from data.session import get_db
from services.script_service import ScriptService
from routes.model.base import ApiResponse
from routes.model.script_model import (
ScriptListParams, CreateScriptRequest, UpdateScriptRequest, RunScriptRequest,
StopScriptRequest, ScriptDetail, ScriptLogDetail, ScriptCreateResponse,
ScriptUpdateResponse, ScriptRunResponse, ScriptListResponse
)
from routes.common_api import format_response, error_response
from utils.logger import get_logger
# 创建路由
router = APIRouter(prefix="/api/vwed-script", tags=["在线脚本管理"])
# 设置日志
logger = get_logger("app.script_api")
# 标准API响应格式
def api_response(code: int = 200, message: str = "操作成功", data: Any = None) -> Dict[str, Any]:
"""
标准API响应格式
Args:
code: 状态码
message: 响应消息
data: 响应数据
Returns:
Dict[str, Any]: 格式化的响应数据
"""
return {
"code": code,
"message": message,
"data": data
}
@router.get("/list", response_model=ApiResponse[ScriptListResponse])
async def get_script_list(
params: ScriptListParams = Depends(),
db: Session = Depends(get_db)
):
"""
获取脚本列表
Args:
params: 查询参数(分页、排序、筛选)
db: 数据库会话
Returns:
ApiResponse[ScriptListResponse]: 包含脚本列表和分页信息的响应
"""
try:
# 调用服务层方法获取脚本列表
result = ScriptService.get_script_list(
db=db,
page=params.page,
page_size=params.pageSize,
name=params.name,
status=params.status,
folder_path=params.folderPath,
tags=params.tags,
is_public=params.isPublic,
created_by=params.createdBy,
start_time=params.startTime,
end_time=params.endTime,
sort_field=params.sortField,
sort_order=params.sortOrder
)
return api_response(data=result)
except Exception as e:
logger.error(f"获取脚本列表失败: {str(e)}")
return error_response(f"获取脚本列表失败: {str(e)}", 500)
@router.get("/{script_id}", response_model=ApiResponse[ScriptDetail])
async def get_script_detail(
script_id: str = Path(..., description="脚本ID"),
db: Session = Depends(get_db)
):
"""
获取脚本详情
Args:
script_id: 脚本ID
db: 数据库会话
Returns:
ApiResponse[ScriptDetail]: 包含脚本详情的响应
"""
try:
# 调用服务层方法获取脚本详情
script = ScriptService.get_script_detail(db=db, script_id=script_id)
return api_response(data=script)
except ValueError as e:
return error_response(str(e), 404)
except Exception as e:
logger.error(f"获取脚本详情失败: {str(e)}")
return error_response(f"获取脚本详情失败: {str(e)}", 500)
@router.post("", response_model=ApiResponse[ScriptCreateResponse])
async def create_script(
script_req: CreateScriptRequest,
db: Session = Depends(get_db)
):
"""
创建脚本
Args:
script_req: 创建脚本请求数据
db: 数据库会话
Returns:
ApiResponse[ScriptCreateResponse]: 包含新建脚本ID的响应
"""
try:
# 调用服务层方法创建脚本
result = ScriptService.create_script(
db=db,
folder_path=script_req.folderPath,
file_name=script_req.fileName,
code=script_req.code,
name=script_req.name,
description=script_req.description,
status=script_req.status,
is_public=script_req.isPublic,
tags=script_req.tags,
test_params=script_req.testParams,
created_by=None # 可从请求上下文获取当前用户
)
return api_response(message="创建成功", data=result)
except ValueError as e:
error_msg = str(e)
# 优化脚本已存在错误的提示
if "脚本已存在" in error_msg or "脚本文件已存在" in error_msg:
return error_response(error_msg, 409) # 409 Conflict表示资源冲突
return error_response(error_msg, 400)
except Exception as e:
logger.error(f"创建脚本失败: {str(e)}")
return error_response(f"创建脚本失败: {str(e)}", 500)
@router.put("/{script_id}", response_model=ApiResponse[ScriptUpdateResponse])
async def update_script(
script_req: UpdateScriptRequest,
script_id: str = Path(..., description="脚本ID"),
db: Session = Depends(get_db)
):
"""
更新脚本
Args:
script_req: 更新脚本请求数据
script_id: 脚本ID
db: 数据库会话
Returns:
ApiResponse[ScriptUpdateResponse]: 包含更新后脚本版本的响应
"""
try:
# 调用服务层方法更新脚本
result = ScriptService.update_script(
db=db,
script_id=script_id,
folder_path=script_req.folderPath,
file_name=script_req.fileName,
code=script_req.code,
name=script_req.name,
description=script_req.description,
status=script_req.status,
is_public=script_req.isPublic,
tags=script_req.tags,
test_params=script_req.testParams,
change_log=script_req.changeLog,
updated_by=None # 可从请求上下文获取当前用户
)
return api_response(message="更新成功", data=result)
except ValueError as e:
return error_response(str(e), 404 if "不存在" in str(e) else 400)
except Exception as e:
logger.error(f"更新脚本失败: {str(e)}")
return error_response(f"更新脚本失败: {str(e)}", 500)
@router.delete("/{script_id}", response_model=ApiResponse)
async def delete_script(
script_id: str = Path(..., description="脚本ID"),
db: Session = Depends(get_db)
):
"""
删除脚本
Args:
script_id: 脚本ID
db: 数据库会话
Returns:
ApiResponse: 操作结果
"""
try:
# 调用服务层方法删除脚本
ScriptService.delete_script(db=db, script_id=script_id)
return api_response(message="删除成功")
except ValueError as e:
return error_response(str(e), 404)
except Exception as e:
logger.error(f"删除脚本失败: {str(e)}")
return error_response(f"删除脚本失败: {str(e)}", 500)
@router.post("/{script_id}/run", response_model=ApiResponse[ScriptRunResponse])
async def run_script(
run_req: RunScriptRequest,
script_id: str = Path(..., description="脚本ID"),
db: Session = Depends(get_db)
):
"""
运行脚本
Args:
run_req: 运行脚本请求数据
script_id: 脚本ID
db: 数据库会话
Returns:
ApiResponse[ScriptRunResponse]: 包含脚本执行结果的响应
"""
try:
# 调用服务层方法运行脚本
result = await ScriptService.run_script(
db=db,
script_id=script_id,
params=run_req.params,
task_record_id=run_req.taskRecordId,
block_record_id=run_req.blockRecordId
)
return api_response(message="执行成功", data=result)
except ValueError as e:
return error_response(str(e), 404 if "不存在" in str(e) else 400)
except Exception as e:
logger.error(f"运行脚本失败: {str(e)}")
return error_response(f"运行脚本失败: {str(e)}", 500)
@router.post("/stop", response_model=ApiResponse)
async def stop_script(
stop_req: StopScriptRequest,
db: Session = Depends(get_db)
):
"""
停止脚本执行
Args:
stop_req: 停止脚本执行请求数据
db: 数据库会话
Returns:
ApiResponse: 操作结果
"""
try:
# 调用服务层方法停止脚本执行
await ScriptService.stop_script(db=db, log_id=stop_req.logId)
return api_response(message="已停止执行")
except ValueError as e:
return error_response(str(e), 404 if "不存在" in str(e) else 400)
except Exception as e:
logger.error(f"停止脚本执行失败: {str(e)}")
return error_response(f"停止脚本执行失败: {str(e)}", 500)
@router.get("/log/{log_id}", response_model=ApiResponse[ScriptLogDetail])
async def get_script_log(
log_id: str = Path(..., description="脚本执行日志ID"),
db: Session = Depends(get_db)
):
"""
获取脚本执行日志详情
Args:
log_id: 脚本执行日志ID
db: 数据库会话
Returns:
ApiResponse[ScriptLogDetail]: 包含脚本执行日志详情的响应
"""
try:
# 调用服务层方法获取脚本执行日志详情
log = ScriptService.get_script_log(db=db, log_id=log_id)
return api_response(data=log)
except ValueError as e:
return error_response(str(e), 404)
except Exception as e:
logger.error(f"获取脚本执行日志详情失败: {str(e)}")
return error_response(f"获取脚本执行日志详情失败: {str(e)}", 500)
@router.post("/import", response_model=ApiResponse[ScriptCreateResponse])
async def import_script(
file: UploadFile = File(...),
name: Optional[str] = Form(None),
folderPath: str = Form("/"),
description: Optional[str] = Form(None),
status: int = Form(1),
isPublic: int = Form(1),
tags: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
"""
导入脚本
Args:
file: 要导入的Python脚本文件
name: 脚本名称,不传则使用文件名(不含扩展名)
folderPath: 脚本所在目录路径,默认为"/"
description: 脚本功能描述
status: 状态(1:启用, 0:禁用)默认为1
isPublic: 是否公开(1:是, 0:否)默认为1
tags: 标签,用于分类查询
db: 数据库会话
Returns:
ApiResponse[ScriptCreateResponse]: 包含新建脚本ID的响应
"""
try:
# 检查文件名是否以.py结尾
file_name = file.filename
if not file_name or not file_name.endswith(".py"):
return error_response("只能导入Python脚本文件(.py)", 400)
# 读取文件内容
content = await file.read()
code = content.decode("utf-8")
# 如果name为空使用文件名(不含扩展名)作为name
if not name:
name = os.path.splitext(file_name)[0]
# 调用服务层方法创建脚本
result = ScriptService.create_script(
db=db,
folder_path=folderPath,
file_name=file_name,
code=code,
name=name,
description=description,
status=status,
is_public=isPublic,
tags=tags,
created_by=None # 可从请求上下文获取当前用户
)
return api_response(message="导入成功", data=result)
except ValueError as e:
return error_response(str(e), 400)
except Exception as e:
logger.error(f"导入脚本失败: {str(e)}")
return error_response(f"导入脚本失败: {str(e)}", 500)
@router.get("/{script_id}/export")
async def export_script(
script_id: str = Path(..., description="脚本ID"),
version: Optional[int] = Query(None, description="版本号,不传则导出最新版本"),
db: Session = Depends(get_db)
):
"""
导出脚本
Args:
script_id: 脚本ID
version: 版本号,不传则导出最新版本
db: 数据库会话
Returns:
FileResponse: 脚本文件
"""
try:
# 获取脚本详情
script = ScriptService.get_script_detail(db=db, script_id=script_id)
# 准备文件名和内容
file_name = script["fileName"]
code = script["code"]
# 返回文件内容
return StreamingResponse(
iter([code.encode("utf-8")]),
media_type="text/plain",
headers={"Content-Disposition": f'attachment; filename="{file_name}"'}
)
except ValueError as e:
return error_response(str(e), 404)
except Exception as e:
logger.error(f"导出脚本失败: {str(e)}")
return error_response(f"导出脚本失败: {str(e)}", 500)