293 lines
10 KiB
Python
293 lines
10 KiB
Python
"""
|
||
任务实例服务模块
|
||
提供任务实例的增删改查服务
|
||
"""
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
from sqlalchemy.orm import Session
|
||
from data.session import get_db
|
||
from data.models.task_instance import TaskInstance, TaskInstanceStatus
|
||
from data.models.task import Task
|
||
from datetime import datetime
|
||
|
||
class TaskInstanceService:
|
||
"""任务实例服务类"""
|
||
|
||
def __init__(self):
|
||
"""初始化服务"""
|
||
self.db = next(get_db())
|
||
|
||
def create_instance(self, task_id: str, name: Optional[str] = None, variables: Optional[Dict[str, Any]] = None,
|
||
priority: int = 1, input_params: Optional[Dict[str, Any]] = None,
|
||
block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None,
|
||
status: TaskInstanceStatus = TaskInstanceStatus.EDITING) -> Dict[str, Any]:
|
||
"""
|
||
创建任务实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
name: 任务名称(如果为空,则从任务表中获取)
|
||
variables: 任务变量
|
||
priority: 任务优先级
|
||
input_params: 任务输入参数
|
||
block_outputs: 块输出参数
|
||
context_params: 上下文参数
|
||
status: 任务实例状态
|
||
|
||
Returns:
|
||
任务实例字典
|
||
"""
|
||
# 检查任务是否存在
|
||
task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first()
|
||
if not task:
|
||
raise ValueError(f"任务不存在: {task_id}")
|
||
|
||
# 如果名称为空,则使用任务表中的名称
|
||
if not name:
|
||
name = task.name
|
||
|
||
# 创建任务实例
|
||
instance = TaskInstance(
|
||
task_id=task_id,
|
||
name=name,
|
||
variables=variables or {},
|
||
priority=priority,
|
||
input_params=input_params or {},
|
||
block_outputs=block_outputs or {},
|
||
context_params=context_params or {},
|
||
status=status
|
||
)
|
||
|
||
# 保存到数据库
|
||
self.db.add(instance)
|
||
self.db.commit()
|
||
self.db.refresh(instance)
|
||
|
||
return instance.to_dict()
|
||
|
||
def get_instance_by_id(self, instance_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据实例ID获取任务实例
|
||
|
||
Args:
|
||
instance_id: 任务实例ID
|
||
|
||
Returns:
|
||
任务实例字典,如果不存在则返回None
|
||
"""
|
||
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
|
||
if not instance:
|
||
return None
|
||
|
||
return instance.to_dict()
|
||
|
||
def get_instances_by_task_id(self, task_id: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
根据任务ID获取所有任务实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
任务实例字典列表
|
||
"""
|
||
instances = self.db.query(TaskInstance).filter(TaskInstance.task_id == task_id, TaskInstance.is_deleted == False).all()
|
||
return [instance.to_dict() for instance in instances]
|
||
|
||
def get_latest_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据任务ID获取最新的任务实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
最新的任务实例字典,如果不存在则返回None
|
||
"""
|
||
instance = self.db.query(TaskInstance).filter(
|
||
TaskInstance.task_id == task_id,
|
||
TaskInstance.is_deleted == False
|
||
).order_by(TaskInstance.created_at.desc()).first()
|
||
|
||
if not instance:
|
||
return None
|
||
|
||
return instance.to_dict()
|
||
|
||
def get_editing_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据任务ID获取编辑中的任务实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
编辑中的任务实例字典,如果不存在则返回None
|
||
"""
|
||
instance = self.db.query(TaskInstance).filter(
|
||
TaskInstance.task_id == task_id,
|
||
TaskInstance.status == TaskInstanceStatus.EDITING,
|
||
TaskInstance.is_deleted == False
|
||
).order_by(TaskInstance.created_at.desc()).first()
|
||
|
||
if not instance:
|
||
return None
|
||
|
||
return instance.to_dict()
|
||
|
||
def get_latest_published_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据任务ID获取最新的已发布任务实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
最新的已发布任务实例字典,如果不存在则返回None
|
||
"""
|
||
instance = self.db.query(TaskInstance).filter(
|
||
TaskInstance.task_id == task_id,
|
||
TaskInstance.status == TaskInstanceStatus.PUBLISHED,
|
||
TaskInstance.is_deleted == False
|
||
).order_by(TaskInstance.created_at.desc()).first()
|
||
|
||
if not instance:
|
||
return None
|
||
|
||
return instance.to_dict()
|
||
|
||
def get_last_published_instance(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据任务ID获取最新的已发布任务实例(get_latest_published_instance_by_task_id的别名)
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
最新的已发布任务实例字典,如果不存在则返回None
|
||
"""
|
||
return self.get_latest_published_instance_by_task_id(task_id)
|
||
|
||
def update_instance(self, instance_id: str, variables: Optional[Dict[str, Any]] = None,
|
||
priority: Optional[int] = None, input_params: Optional[Dict[str, Any]] = None,
|
||
block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None,
|
||
status: Optional[TaskInstanceStatus] = None) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
更新任务实例
|
||
|
||
Args:
|
||
instance_id: 任务实例ID
|
||
variables: 任务变量
|
||
priority: 任务优先级
|
||
input_params: 任务输入参数
|
||
block_outputs: 块输出参数
|
||
context_params: 上下文参数
|
||
status: 任务实例状态
|
||
|
||
Returns:
|
||
更新后的任务实例字典,如果不存在则返回None
|
||
"""
|
||
# 获取任务实例
|
||
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
|
||
if not instance:
|
||
return None
|
||
|
||
# 更新字段
|
||
if variables is not None:
|
||
instance.variables = variables
|
||
if priority is not None:
|
||
instance.priority = priority
|
||
if input_params is not None:
|
||
instance.input_params = input_params
|
||
if block_outputs is not None:
|
||
instance.block_outputs = block_outputs
|
||
if context_params is not None:
|
||
instance.context_params = context_params
|
||
if status is not None:
|
||
instance.status = status
|
||
|
||
# 保存到数据库
|
||
self.db.commit()
|
||
self.db.refresh(instance)
|
||
|
||
return instance.to_dict()
|
||
|
||
def publish_instance(self, instance_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
发布任务实例
|
||
|
||
Args:
|
||
instance_id: 任务实例ID
|
||
|
||
Returns:
|
||
发布后的任务实例字典,如果不存在则返回None
|
||
"""
|
||
return self.update_instance(instance_id, status=TaskInstanceStatus.PUBLISHED)
|
||
|
||
def delete_instance(self, instance_id: str) -> bool:
|
||
"""
|
||
删除任务实例
|
||
|
||
Args:
|
||
instance_id: 任务实例ID
|
||
|
||
Returns:
|
||
是否删除成功
|
||
"""
|
||
# 获取任务实例
|
||
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
|
||
if not instance:
|
||
return False
|
||
|
||
# 标记为已删除
|
||
instance.is_deleted = True
|
||
self.db.commit()
|
||
|
||
return True
|
||
|
||
def get_or_create_editing_instance(self, task_id: str) -> Dict[str, Any]:
|
||
"""
|
||
获取或创建编辑中的任务实例
|
||
|
||
如果任务已有编辑中的实例,则返回该实例
|
||
如果任务没有编辑中的实例,但有已发布的实例,则基于最新的已发布实例创建新的编辑中实例
|
||
如果任务没有任何实例,则创建新的编辑中实例
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
编辑中的任务实例字典
|
||
"""
|
||
# 检查是否有编辑中的实例
|
||
editing_instance = self.get_editing_instance_by_task_id(task_id)
|
||
if editing_instance:
|
||
return editing_instance
|
||
|
||
# 检查是否有已发布的实例
|
||
published_instance = self.get_latest_published_instance_by_task_id(task_id)
|
||
if published_instance:
|
||
# 基于已发布实例创建新的编辑中实例
|
||
return self.create_instance(
|
||
task_id=task_id,
|
||
name=published_instance.get("name"),
|
||
variables=published_instance.get("variables"),
|
||
priority=published_instance.get("priority", 1),
|
||
input_params=published_instance.get("input_params"),
|
||
block_outputs=published_instance.get("block_outputs"),
|
||
context_params=published_instance.get("context_params"),
|
||
status=TaskInstanceStatus.EDITING
|
||
)
|
||
|
||
# 获取任务信息
|
||
task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first()
|
||
if not task:
|
||
raise ValueError(f"任务不存在: {task_id}")
|
||
|
||
# 创建新的编辑中实例
|
||
return self.create_instance(
|
||
task_id=task_id,
|
||
name=task.name,
|
||
variables=task.variables if hasattr(task, "variables") else {},
|
||
priority=1,
|
||
status=TaskInstanceStatus.EDITING
|
||
) |