2025-03-17 18:31:20 +08:00

209 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务模型模块
包含任务基本信息的数据模型
"""
import datetime
import enum
import uuid
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Enum, ForeignKey, JSON
from sqlalchemy.orm import relationship
from data.models.base import BaseModel
from config.task_config import (
TaskTypeConfig,
TaskStatusConfig,
get_task_type_enum,
get_task_status_enum
)
class TaskType(enum.Enum):
"""
任务类型枚举
与配置文件中的TaskTypeConfig保持一致
"""
NORMAL = get_task_type_enum(TaskTypeConfig.NORMAL) # 普通任务
SCHEDULED = get_task_type_enum(TaskTypeConfig.SCHEDULED) # 定时任务
class TaskStatus(enum.Enum):
"""
任务状态枚举
与配置文件中的TaskStatusConfig保持一致
"""
PENDING = get_task_status_enum(TaskStatusConfig.PENDING) # 待执行
RUNNING = get_task_status_enum(TaskStatusConfig.RUNNING) # 执行中
COMPLETED = get_task_status_enum(TaskStatusConfig.COMPLETED) # 已完成
CANCELLED = get_task_status_enum(TaskStatusConfig.CANCELLED) # 已取消
FAILED = get_task_status_enum(TaskStatusConfig.FAILED) # 执行失败
PAUSED = get_task_status_enum(TaskStatusConfig.PAUSED) # 暂停中
WAITING = get_task_status_enum(TaskStatusConfig.WAITING) # 等待中
class Task(BaseModel):
"""
任务模型
表示一个任务的基本信息
"""
__tablename__ = 'tasks'
# 添加UUID字段作为外部可见的任务ID
task_id = Column(String(36), nullable=False, unique=True, index=True, comment='任务UUID用于外部引用')
name = Column(String(100), nullable=False, comment='任务名称')
task_type = Column(String(100), nullable=False, comment='任务类型')
description = Column(String(500), nullable=True, comment='任务描述')
is_template = Column(Boolean, default=False, comment='是否为模板')
template_description = Column(String(500), nullable=True, comment='模板描述')
current_version_id = Column(Integer, nullable=True, comment='当前版本ID')
is_enabled = Column(Boolean, default=True, comment='是否启用')
created_by = Column(String(100), nullable=True, comment='创建用户ID')
updated_by = Column(String(100), nullable=True, comment='最后更新用户ID')
# 定时任务相关字段
is_scheduled = Column(Boolean, default=False, comment='是否为定时任务')
schedule_expression = Column(String(100), nullable=True, comment='定时表达式Cron格式')
next_run_time = Column(DateTime, nullable=True, comment='下次执行时间')
# 关联关系
versions = relationship('TaskVersion', back_populates='task', cascade='all, delete-orphan')
records = relationship('TaskRecord', back_populates='task')
flow_blocks = relationship('TaskFlowBlock', back_populates='task')
instances = relationship("TaskInstance", back_populates="task", cascade="all, delete-orphan")
def __repr__(self):
return f"<Task(id={self.id}, uuid='{self.task_id}', name='{self.name}', type='{self.task_type}')>"
def to_json(self, fields=None, exclude=None, timestamp_format='ms', include_type_name=True):
"""
将任务模型转换为JSON友好的字典
Args:
fields (list, optional): 需要包含的字段列表为None则包含所有字段
exclude (list, optional): 需要排除的字段列表
timestamp_format (str, optional): 时间戳格式,可选值:'ms'(毫秒时间戳), 'iso'ISO格式, 'str'(字符串格式)
include_type_name (bool, optional): 是否包含任务类型名称
Returns:
dict: 包含指定字段的字典
"""
# 调用父类的to_json方法获取基础字段
result = super().to_json(fields, exclude, timestamp_format)
# 使用UUID作为任务ID
result['task_id'] = self.task_id
# 添加任务类型名称
if include_type_name and 'task_type' in result:
from config.task_config import get_task_type_name
result['task_type'] = get_task_type_name(result['task_type'])
return result
@classmethod
def get_by_name(cls, name):
"""
根据名称获取任务
"""
return cls.query.filter(cls.name == name, cls.is_deleted == False).first()
@classmethod
def get_by_uuid(cls, task_id):
"""
根据UUID获取任务
"""
return cls.query.filter(cls.task_id == task_id, cls.is_deleted == False).first()
@classmethod
def get_templates(cls):
"""
获取所有任务模板
"""
return cls.query.filter(cls.is_template == True, cls.is_deleted == False).all()
@classmethod
def get_scheduled_tasks(cls):
"""
获取所有定时任务
"""
return cls.query.filter(cls.is_scheduled == True, cls.is_deleted == False).all()
@classmethod
def create_task(cls, name, task_type, description=None, is_template=False, template_description=None,
is_scheduled=False, schedule_expression=None, created_by=None, is_enabled=True):
"""
创建新任务
Args:
name (str): 任务名称
task_type (str or TaskType): 任务类型可以是字符串或TaskType枚举
description (str, optional): 任务描述
is_template (bool, optional): 是否为模板
template_description (str, optional): 模板描述
is_scheduled (bool, optional): 是否为定时任务
schedule_expression (str, optional): 定时表达式Cron格式
created_by (str, optional): 创建用户ID
is_enabled (bool, optional): 是否启用
Returns:
Task: 创建的任务对象
"""
from config.database import db_session
# 处理task_type参数确保它是字符串类型
if isinstance(task_type, TaskType):
task_type = task_type.name
# 生成UUID
task_id = str(uuid.uuid4())
# 创建任务对象
task = cls(
task_id=task_id,
name=name,
task_type=task_type,
description=description,
is_template=is_template,
template_description=template_description,
is_scheduled=is_scheduled,
schedule_expression=schedule_expression,
created_by=created_by,
updated_by=created_by,
is_enabled=is_enabled
)
# 如果是定时任务,计算下次执行时间
if is_scheduled and schedule_expression:
import croniter
import datetime
try:
cron = croniter.croniter(schedule_expression, datetime.datetime.now())
task.next_run_time = cron.get_next(datetime.datetime)
except Exception as e:
# 如果Cron表达式无效不设置下次执行时间
pass
# 保存到数据库
db_session.add(task)
db_session.commit()
# 记录用户操作
from data.models.user_operation import UserOperation, OperationType, TargetType
if created_by:
UserOperation.log_operation(
user_id=created_by,
user_name=None, # 用户名称需要从外部系统获取
operation_type=OperationType.CREATE,
target_type=TargetType.TASK,
target_id=task.id,
target_name=task.name,
operation_details={
'task_type': task_type,
'is_template': is_template,
'is_scheduled': is_scheduled
}
)
return task