""" 任务实例服务模块 提供任务实例的增删改查服务 """ from typing import Dict, Any, List, Optional, Tuple from sqlalchemy.orm import Session from data.session import get_db from data.models.task_instance import TaskInstance, TaskInstanceStatus from data.models.task import Task from datetime import datetime class TaskInstanceService: """任务实例服务类""" def __init__(self): """初始化服务""" self.db = next(get_db()) def create_instance(self, task_id: str, name: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, priority: int = 1, input_params: Optional[Dict[str, Any]] = None, block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None, status: TaskInstanceStatus = TaskInstanceStatus.EDITING) -> Dict[str, Any]: """ 创建任务实例 Args: task_id: 任务ID name: 任务名称(如果为空,则从任务表中获取) variables: 任务变量 priority: 任务优先级 input_params: 任务输入参数 block_outputs: 块输出参数 context_params: 上下文参数 status: 任务实例状态 Returns: 任务实例字典 """ # 检查任务是否存在 task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first() if not task: raise ValueError(f"任务不存在: {task_id}") # 如果名称为空,则使用任务表中的名称 if not name: name = task.name # 创建任务实例 instance = TaskInstance( task_id=task_id, name=name, variables=variables or {}, priority=priority, input_params=input_params or {}, block_outputs=block_outputs or {}, context_params=context_params or {}, status=status ) # 保存到数据库 self.db.add(instance) self.db.commit() self.db.refresh(instance) return instance.to_dict() def get_instance_by_id(self, instance_id: str) -> Optional[Dict[str, Any]]: """ 根据实例ID获取任务实例 Args: instance_id: 任务实例ID Returns: 任务实例字典,如果不存在则返回None """ instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first() if not instance: return None return instance.to_dict() def get_instances_by_task_id(self, task_id: str) -> List[Dict[str, Any]]: """ 根据任务ID获取所有任务实例 Args: task_id: 任务ID Returns: 任务实例字典列表 """ instances = self.db.query(TaskInstance).filter(TaskInstance.task_id == task_id, TaskInstance.is_deleted == False).all() return [instance.to_dict() for instance in instances] def get_latest_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]: """ 根据任务ID获取最新的任务实例 Args: task_id: 任务ID Returns: 最新的任务实例字典,如果不存在则返回None """ instance = self.db.query(TaskInstance).filter( TaskInstance.task_id == task_id, TaskInstance.is_deleted == False ).order_by(TaskInstance.created_at.desc()).first() if not instance: return None return instance.to_dict() def get_editing_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]: """ 根据任务ID获取编辑中的任务实例 Args: task_id: 任务ID Returns: 编辑中的任务实例字典,如果不存在则返回None """ instance = self.db.query(TaskInstance).filter( TaskInstance.task_id == task_id, TaskInstance.status == TaskInstanceStatus.EDITING, TaskInstance.is_deleted == False ).order_by(TaskInstance.created_at.desc()).first() if not instance: return None return instance.to_dict() def get_latest_published_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]: """ 根据任务ID获取最新的已发布任务实例 Args: task_id: 任务ID Returns: 最新的已发布任务实例字典,如果不存在则返回None """ instance = self.db.query(TaskInstance).filter( TaskInstance.task_id == task_id, TaskInstance.status == TaskInstanceStatus.PUBLISHED, TaskInstance.is_deleted == False ).order_by(TaskInstance.created_at.desc()).first() if not instance: return None return instance.to_dict() def get_last_published_instance(self, task_id: str) -> Optional[Dict[str, Any]]: """ 根据任务ID获取最新的已发布任务实例(get_latest_published_instance_by_task_id的别名) Args: task_id: 任务ID Returns: 最新的已发布任务实例字典,如果不存在则返回None """ return self.get_latest_published_instance_by_task_id(task_id) def update_instance(self, instance_id: str, variables: Optional[Dict[str, Any]] = None, priority: Optional[int] = None, input_params: Optional[Dict[str, Any]] = None, block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None, status: Optional[TaskInstanceStatus] = None) -> Optional[Dict[str, Any]]: """ 更新任务实例 Args: instance_id: 任务实例ID variables: 任务变量 priority: 任务优先级 input_params: 任务输入参数 block_outputs: 块输出参数 context_params: 上下文参数 status: 任务实例状态 Returns: 更新后的任务实例字典,如果不存在则返回None """ # 获取任务实例 instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first() if not instance: return None # 更新字段 if variables is not None: instance.variables = variables if priority is not None: instance.priority = priority if input_params is not None: instance.input_params = input_params if block_outputs is not None: instance.block_outputs = block_outputs if context_params is not None: instance.context_params = context_params if status is not None: instance.status = status # 保存到数据库 self.db.commit() self.db.refresh(instance) return instance.to_dict() def publish_instance(self, instance_id: str) -> Optional[Dict[str, Any]]: """ 发布任务实例 Args: instance_id: 任务实例ID Returns: 发布后的任务实例字典,如果不存在则返回None """ return self.update_instance(instance_id, status=TaskInstanceStatus.PUBLISHED) def delete_instance(self, instance_id: str) -> bool: """ 删除任务实例 Args: instance_id: 任务实例ID Returns: 是否删除成功 """ # 获取任务实例 instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first() if not instance: return False # 标记为已删除 instance.is_deleted = True self.db.commit() return True def get_or_create_editing_instance(self, task_id: str) -> Dict[str, Any]: """ 获取或创建编辑中的任务实例 如果任务已有编辑中的实例,则返回该实例 如果任务没有编辑中的实例,但有已发布的实例,则基于最新的已发布实例创建新的编辑中实例 如果任务没有任何实例,则创建新的编辑中实例 Args: task_id: 任务ID Returns: 编辑中的任务实例字典 """ # 检查是否有编辑中的实例 editing_instance = self.get_editing_instance_by_task_id(task_id) if editing_instance: return editing_instance # 检查是否有已发布的实例 published_instance = self.get_latest_published_instance_by_task_id(task_id) if published_instance: # 基于已发布实例创建新的编辑中实例 return self.create_instance( task_id=task_id, name=published_instance.get("name"), variables=published_instance.get("variables"), priority=published_instance.get("priority", 1), input_params=published_instance.get("input_params"), block_outputs=published_instance.get("block_outputs"), context_params=published_instance.get("context_params"), status=TaskInstanceStatus.EDITING ) # 获取任务信息 task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first() if not task: raise ValueError(f"任务不存在: {task_id}") # 创建新的编辑中实例 return self.create_instance( task_id=task_id, name=task.name, variables=task.variables if hasattr(task, "variables") else {}, priority=1, status=TaskInstanceStatus.EDITING )