122 lines
4.4 KiB
Python
122 lines
4.4 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
任务导入导出模型模块
|
||
包含任务导入导出记录相关的数据模型
|
||
"""
|
||
|
||
import enum
|
||
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Enum, ForeignKey, JSON
|
||
from sqlalchemy.orm import relationship
|
||
from data.models.base import BaseModel
|
||
|
||
class ImportExportType(enum.Enum):
|
||
"""
|
||
导入导出类型枚举
|
||
"""
|
||
IMPORT = 'import' # 导入
|
||
EXPORT = 'export' # 导出
|
||
|
||
class TaskImportExport(BaseModel):
|
||
"""
|
||
任务导入导出模型
|
||
记录任务的导入导出历史
|
||
"""
|
||
__tablename__ = 'task_import_exports'
|
||
|
||
operation_type = Column(Enum(ImportExportType), nullable=False, comment='操作类型')
|
||
file_name = Column(String(255), nullable=False, comment='文件名称')
|
||
file_size = Column(Integer, nullable=True, comment='文件大小(字节)')
|
||
file_md5 = Column(String(32), nullable=True, comment='文件MD5校验值')
|
||
task_ids = Column(JSON, nullable=True, comment='相关任务ID列表(JSON数组)')
|
||
task_count = Column(Integer, nullable=True, comment='任务数量')
|
||
user_id = Column(String(100), nullable=False, comment='操作用户ID')
|
||
user_name = Column(String(100), nullable=True, comment='操作用户名称')
|
||
operation_time = Column(DateTime, nullable=False, comment='操作时间')
|
||
operation_ip = Column(String(50), nullable=True, comment='操作IP')
|
||
is_success = Column(Boolean, default=True, comment='操作是否成功')
|
||
error_message = Column(Text, nullable=True, comment='错误信息')
|
||
file_content_preview = Column(Text, nullable=True, comment='文件内容预览')
|
||
|
||
def __repr__(self):
|
||
return f"<TaskImportExport(id={self.id}, operation_type='{self.operation_type}', file_name='{self.file_name}')>"
|
||
|
||
@classmethod
|
||
def get_by_user(cls, user_id, page=1, per_page=20):
|
||
"""
|
||
获取用户的导入导出记录
|
||
"""
|
||
return cls.query.filter(
|
||
cls.user_id == user_id,
|
||
cls.is_deleted == False
|
||
).order_by(cls.operation_time.desc()).paginate(page=page, per_page=per_page)
|
||
|
||
@classmethod
|
||
def get_by_task(cls, task_id, page=1, per_page=20):
|
||
"""
|
||
获取任务的导入导出记录
|
||
"""
|
||
from sqlalchemy import func
|
||
return cls.query.filter(
|
||
func.json_contains(cls.task_ids, str(task_id)),
|
||
cls.is_deleted == False
|
||
).order_by(cls.operation_time.desc()).paginate(page=page, per_page=per_page)
|
||
|
||
@classmethod
|
||
def log_import(cls, file_name, file_size, file_md5, task_ids, task_count, user_id, user_name,
|
||
operation_ip=None, is_success=True, error_message=None, file_content_preview=None):
|
||
"""
|
||
记录任务导入操作
|
||
"""
|
||
import datetime
|
||
from config.database import db_session
|
||
|
||
record = cls(
|
||
operation_type=ImportExportType.IMPORT,
|
||
file_name=file_name,
|
||
file_size=file_size,
|
||
file_md5=file_md5,
|
||
task_ids=task_ids,
|
||
task_count=task_count,
|
||
user_id=user_id,
|
||
user_name=user_name,
|
||
operation_time=datetime.datetime.now(),
|
||
operation_ip=operation_ip,
|
||
is_success=is_success,
|
||
error_message=error_message,
|
||
file_content_preview=file_content_preview
|
||
)
|
||
|
||
db_session.add(record)
|
||
db_session.commit()
|
||
return record
|
||
|
||
@classmethod
|
||
def log_export(cls, file_name, file_size, file_md5, task_ids, task_count, user_id, user_name,
|
||
operation_ip=None, is_success=True, error_message=None, file_content_preview=None):
|
||
"""
|
||
记录任务导出操作
|
||
"""
|
||
import datetime
|
||
from config.database import db_session
|
||
|
||
record = cls(
|
||
operation_type=ImportExportType.EXPORT,
|
||
file_name=file_name,
|
||
file_size=file_size,
|
||
file_md5=file_md5,
|
||
task_ids=task_ids,
|
||
task_count=task_count,
|
||
user_id=user_id,
|
||
user_name=user_name,
|
||
operation_time=datetime.datetime.now(),
|
||
operation_ip=operation_ip,
|
||
is_success=is_success,
|
||
error_message=error_message,
|
||
file_content_preview=file_content_preview
|
||
)
|
||
|
||
db_session.add(record)
|
||
db_session.commit()
|
||
return record |