VWED_server/services/script_file_service.py

369 lines
15 KiB
Python
Raw Normal View History

2025-09-12 16:15:13 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
脚本文件管理服务
提供脚本文件的CRUD操作和文件系统管理
"""
import os
import shutil
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from data.session import get_session, get_async_session
from data.models.script_project import VWEDScriptProject
from data.models.script_file import VWEDScriptFile
from utils.logger import get_logger
logger = get_logger("services.script_file")
class ScriptFileService:
"""脚本文件管理服务"""
def __init__(self):
self.base_path = Path("data/VWED/python_script")
self.base_path.mkdir(parents=True, exist_ok=True)
async def create_project(self, project_name: str, description: str = "",
created_by: str = None) -> Dict[str, Any]:
"""创建新项目"""
try:
# 创建项目路径
project_path = f"projects/{project_name}"
full_project_path = self.base_path / project_path
# 检查项目是否已存在
async with get_async_session() as session:
existing_project = await session.query(VWEDScriptProject).filter(
VWEDScriptProject.project_name == project_name
).first()
if existing_project:
return {"success": False, "error": f"项目 {project_name} 已存在"}
# 创建文件系统目录
full_project_path.mkdir(parents=True, exist_ok=True)
# 创建数据库记录
new_project = VWEDScriptProject(
project_name=project_name,
project_path=project_path,
description=description,
created_by=created_by
)
session.add(new_project)
await session.commit()
await session.refresh(new_project)
logger.info(f"项目创建成功: {project_name}")
return {
"success": True,
"project": new_project.to_dict(),
"message": f"项目 {project_name} 创建成功"
}
except Exception as e:
logger.error(f"创建项目失败: {e}", exc_info=True)
return {"success": False, "error": f"创建项目失败: {str(e)}"}
async def get_projects(self, status: str = "active") -> Dict[str, Any]:
"""获取项目列表"""
try:
async with get_async_session() as session:
query = session.query(VWEDScriptProject)
if status:
query = query.filter(VWEDScriptProject.status == status)
projects = await query.all()
return {
"success": True,
"projects": [project.to_dict() for project in projects]
}
except Exception as e:
logger.error(f"获取项目列表失败: {e}", exc_info=True)
return {"success": False, "error": f"获取项目列表失败: {str(e)}"}
async def create_file(self, project_id: int, file_name: str, file_path: str,
content: str = "", file_type: str = "python",
created_by: str = None) -> Dict[str, Any]:
"""创建脚本文件"""
try:
async with get_async_session() as session:
# 检查项目是否存在
project = await session.get(VWEDScriptProject, project_id)
if not project:
return {"success": False, "error": f"项目ID {project_id} 不存在"}
# 构建完整文件路径
full_file_path = self.base_path / project.project_path / file_path
# 检查文件是否已存在
existing_file = await session.query(VWEDScriptFile).filter(
and_(VWEDScriptFile.project_id == project_id,
VWEDScriptFile.file_path == file_path)
).first()
if existing_file:
return {"success": False, "error": f"文件 {file_path} 已存在"}
# 创建目录
full_file_path.parent.mkdir(parents=True, exist_ok=True)
# 写入文件内容
with open(full_file_path, 'w', encoding='utf-8') as f:
f.write(content)
# 检测boot函数
has_boot_function = self._check_boot_function(content)
# 计算文件大小
file_size = full_file_path.stat().st_size
# 创建数据库记录
new_file = VWEDScriptFile(
project_id=project_id,
file_name=file_name,
file_path=file_path,
file_type=file_type,
content=content,
size=file_size,
has_boot_function=has_boot_function,
created_by=created_by
)
session.add(new_file)
await session.commit()
await session.refresh(new_file)
logger.info(f"文件创建成功: {project.project_name}/{file_path}")
return {
"success": True,
"file": new_file.to_dict(),
"message": f"文件 {file_name} 创建成功"
}
except Exception as e:
logger.error(f"创建文件失败: {e}", exc_info=True)
return {"success": False, "error": f"创建文件失败: {str(e)}"}
async def update_file_content(self, file_id: int, content: str,
updated_by: str = None) -> Dict[str, Any]:
"""更新文件内容"""
try:
async with get_async_session() as session:
script_file = await session.get(VWEDScriptFile, file_id)
if not script_file:
return {"success": False, "error": f"文件ID {file_id} 不存在"}
# 获取项目信息
project = await session.get(VWEDScriptProject, script_file.project_id)
full_file_path = self.base_path / project.project_path / script_file.file_path
# 备份原文件
backup_path = full_file_path.with_suffix(f".bak.{int(datetime.now().timestamp())}")
if full_file_path.exists():
shutil.copy2(full_file_path, backup_path)
# 写入新内容
with open(full_file_path, 'w', encoding='utf-8') as f:
f.write(content)
# 更新数据库记录
script_file.content = content
script_file.size = full_file_path.stat().st_size
script_file.has_boot_function = self._check_boot_function(content)
script_file.updated_at = datetime.now()
await session.commit()
logger.info(f"文件更新成功: {project.project_name}/{script_file.file_path}")
return {
"success": True,
"file": script_file.to_dict(),
"backup_path": str(backup_path),
"message": "文件内容更新成功"
}
except Exception as e:
logger.error(f"更新文件内容失败: {e}", exc_info=True)
return {"success": False, "error": f"更新文件内容失败: {str(e)}"}
async def get_file_content(self, file_id: int) -> Dict[str, Any]:
"""获取文件内容"""
try:
async with get_async_session() as session:
script_file = await session.get(VWEDScriptFile, file_id)
if not script_file:
return {"success": False, "error": f"文件ID {file_id} 不存在"}
# 从数据库读取内容(优先)
if script_file.content:
return {
"success": True,
"file": script_file.to_dict(),
"content": script_file.content
}
# 从文件系统读取
project = await session.get(VWEDScriptProject, script_file.project_id)
full_file_path = self.base_path / project.project_path / script_file.file_path
if not full_file_path.exists():
return {"success": False, "error": f"物理文件不存在: {script_file.file_path}"}
with open(full_file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"success": True,
"file": script_file.to_dict(),
"content": content
}
except Exception as e:
logger.error(f"获取文件内容失败: {e}", exc_info=True)
return {"success": False, "error": f"获取文件内容失败: {str(e)}"}
async def get_project_files(self, project_id: int, include_content: bool = False) -> Dict[str, Any]:
"""获取项目的所有文件"""
try:
async with get_async_session() as session:
project = await session.get(VWEDScriptProject, project_id)
if not project:
return {"success": False, "error": f"项目ID {project_id} 不存在"}
files = await session.query(VWEDScriptFile).filter(
VWEDScriptFile.project_id == project_id
).all()
file_list = []
for file in files:
file_dict = file.to_dict()
if include_content and file.content:
file_dict['content'] = file.content
file_list.append(file_dict)
return {
"success": True,
"project": project.to_dict(),
"files": file_list
}
except Exception as e:
logger.error(f"获取项目文件列表失败: {e}", exc_info=True)
return {"success": False, "error": f"获取项目文件列表失败: {str(e)}"}
async def delete_file(self, file_id: int) -> Dict[str, Any]:
"""删除文件"""
try:
async with get_async_session() as session:
script_file = await session.get(VWEDScriptFile, file_id)
if not script_file:
return {"success": False, "error": f"文件ID {file_id} 不存在"}
project = await session.get(VWEDScriptProject, script_file.project_id)
full_file_path = self.base_path / project.project_path / script_file.file_path
# 删除物理文件
if full_file_path.exists():
full_file_path.unlink()
# 删除数据库记录
await session.delete(script_file)
await session.commit()
logger.info(f"文件删除成功: {project.project_name}/{script_file.file_path}")
return {
"success": True,
"message": f"文件 {script_file.file_name} 删除成功"
}
except Exception as e:
logger.error(f"删除文件失败: {e}", exc_info=True)
return {"success": False, "error": f"删除文件失败: {str(e)}"}
async def search_files(self, keyword: str, project_id: int = None,
file_type: str = None, has_boot: bool = None) -> Dict[str, Any]:
"""搜索文件"""
try:
async with get_async_session() as session:
query = session.query(VWEDScriptFile)
# 关键字搜索(文件名或内容)
if keyword:
query = query.filter(
or_(VWEDScriptFile.file_name.contains(keyword),
VWEDScriptFile.content.contains(keyword))
)
# 项目筛选
if project_id:
query = query.filter(VWEDScriptFile.project_id == project_id)
# 文件类型筛选
if file_type:
query = query.filter(VWEDScriptFile.file_type == file_type)
# boot函数筛选
if has_boot is not None:
query = query.filter(VWEDScriptFile.has_boot_function == has_boot)
files = await query.all()
# 加载项目信息
file_list = []
for file in files:
project = await session.get(VWEDScriptProject, file.project_id)
file_dict = file.to_dict()
file_dict['project_name'] = project.project_name
file_list.append(file_dict)
return {
"success": True,
"files": file_list,
"count": len(file_list)
}
except Exception as e:
logger.error(f"搜索文件失败: {e}", exc_info=True)
return {"success": False, "error": f"搜索文件失败: {str(e)}"}
def _check_boot_function(self, content: str) -> bool:
"""检查是否包含boot函数"""
try:
# 简单检查是否定义了boot函数
return "def boot(" in content or "def boot():" in content
except:
return False
async def validate_script_syntax(self, content: str) -> Dict[str, Any]:
"""验证脚本语法"""
try:
compile(content, '<script>', 'exec')
return {"success": True, "valid": True, "message": "语法检查通过"}
except SyntaxError as e:
return {
"success": True,
"valid": False,
"error": f"语法错误: {e.msg}",
"line": e.lineno,
"offset": e.offset
}
except Exception as e:
return {"success": False, "error": f"语法检查失败: {str(e)}"}
# 全局文件服务实例
_file_service = ScriptFileService()
def get_file_service() -> ScriptFileService:
"""获取文件服务实例"""
return _file_service