tianfeng_task_modules/services/task_instance_service.py
2025-03-18 18:34:03 +08:00

293 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务实例服务模块
提供任务实例的增删改查服务
"""
from typing import Dict, Any, List, Optional, Tuple
from sqlalchemy.orm import Session
from data.session import get_db
from data.models.task_instance import TaskInstance, TaskInstanceStatus
from data.models.task import Task
from datetime import datetime
class TaskInstanceService:
"""任务实例服务类"""
def __init__(self):
"""初始化服务"""
self.db = next(get_db())
def create_instance(self, task_id: str, name: Optional[str] = None, variables: Optional[Dict[str, Any]] = None,
priority: int = 1, input_params: Optional[Dict[str, Any]] = None,
block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None,
status: TaskInstanceStatus = TaskInstanceStatus.EDITING) -> Dict[str, Any]:
"""
创建任务实例
Args:
task_id: 任务ID
name: 任务名称(如果为空,则从任务表中获取)
variables: 任务变量
priority: 任务优先级
input_params: 任务输入参数
block_outputs: 块输出参数
context_params: 上下文参数
status: 任务实例状态
Returns:
任务实例字典
"""
# 检查任务是否存在
task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first()
if not task:
raise ValueError(f"任务不存在: {task_id}")
# 如果名称为空,则使用任务表中的名称
if not name:
name = task.name
# 创建任务实例
instance = TaskInstance(
task_id=task_id,
name=name,
variables=variables or {},
priority=priority,
input_params=input_params or {},
block_outputs=block_outputs or {},
context_params=context_params or {},
status=status
)
# 保存到数据库
self.db.add(instance)
self.db.commit()
self.db.refresh(instance)
return instance.to_dict()
def get_instance_by_id(self, instance_id: str) -> Optional[Dict[str, Any]]:
"""
根据实例ID获取任务实例
Args:
instance_id: 任务实例ID
Returns:
任务实例字典如果不存在则返回None
"""
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
if not instance:
return None
return instance.to_dict()
def get_instances_by_task_id(self, task_id: str) -> List[Dict[str, Any]]:
"""
根据任务ID获取所有任务实例
Args:
task_id: 任务ID
Returns:
任务实例字典列表
"""
instances = self.db.query(TaskInstance).filter(TaskInstance.task_id == task_id, TaskInstance.is_deleted == False).all()
return [instance.to_dict() for instance in instances]
def get_latest_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
根据任务ID获取最新的任务实例
Args:
task_id: 任务ID
Returns:
最新的任务实例字典如果不存在则返回None
"""
instance = self.db.query(TaskInstance).filter(
TaskInstance.task_id == task_id,
TaskInstance.is_deleted == False
).order_by(TaskInstance.created_at.desc()).first()
if not instance:
return None
return instance.to_dict()
def get_editing_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
根据任务ID获取编辑中的任务实例
Args:
task_id: 任务ID
Returns:
编辑中的任务实例字典如果不存在则返回None
"""
instance = self.db.query(TaskInstance).filter(
TaskInstance.task_id == task_id,
TaskInstance.status == TaskInstanceStatus.EDITING,
TaskInstance.is_deleted == False
).order_by(TaskInstance.created_at.desc()).first()
if not instance:
return None
return instance.to_dict()
def get_latest_published_instance_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
根据任务ID获取最新的已发布任务实例
Args:
task_id: 任务ID
Returns:
最新的已发布任务实例字典如果不存在则返回None
"""
instance = self.db.query(TaskInstance).filter(
TaskInstance.task_id == task_id,
TaskInstance.status == TaskInstanceStatus.PUBLISHED,
TaskInstance.is_deleted == False
).order_by(TaskInstance.created_at.desc()).first()
if not instance:
return None
return instance.to_dict()
def get_last_published_instance(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
根据任务ID获取最新的已发布任务实例get_latest_published_instance_by_task_id的别名
Args:
task_id: 任务ID
Returns:
最新的已发布任务实例字典如果不存在则返回None
"""
return self.get_latest_published_instance_by_task_id(task_id)
def update_instance(self, instance_id: str, variables: Optional[Dict[str, Any]] = None,
priority: Optional[int] = None, input_params: Optional[Dict[str, Any]] = None,
block_outputs: Optional[Dict[str, Any]] = None, context_params: Optional[Dict[str, Any]] = None,
status: Optional[TaskInstanceStatus] = None) -> Optional[Dict[str, Any]]:
"""
更新任务实例
Args:
instance_id: 任务实例ID
variables: 任务变量
priority: 任务优先级
input_params: 任务输入参数
block_outputs: 块输出参数
context_params: 上下文参数
status: 任务实例状态
Returns:
更新后的任务实例字典如果不存在则返回None
"""
# 获取任务实例
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
if not instance:
return None
# 更新字段
if variables is not None:
instance.variables = variables
if priority is not None:
instance.priority = priority
if input_params is not None:
instance.input_params = input_params
if block_outputs is not None:
instance.block_outputs = block_outputs
if context_params is not None:
instance.context_params = context_params
if status is not None:
instance.status = status
# 保存到数据库
self.db.commit()
self.db.refresh(instance)
return instance.to_dict()
def publish_instance(self, instance_id: str) -> Optional[Dict[str, Any]]:
"""
发布任务实例
Args:
instance_id: 任务实例ID
Returns:
发布后的任务实例字典如果不存在则返回None
"""
return self.update_instance(instance_id, status=TaskInstanceStatus.PUBLISHED)
def delete_instance(self, instance_id: str) -> bool:
"""
删除任务实例
Args:
instance_id: 任务实例ID
Returns:
是否删除成功
"""
# 获取任务实例
instance = self.db.query(TaskInstance).filter(TaskInstance.instance_id == instance_id, TaskInstance.is_deleted == False).first()
if not instance:
return False
# 标记为已删除
instance.is_deleted = True
self.db.commit()
return True
def get_or_create_editing_instance(self, task_id: str) -> Dict[str, Any]:
"""
获取或创建编辑中的任务实例
如果任务已有编辑中的实例,则返回该实例
如果任务没有编辑中的实例,但有已发布的实例,则基于最新的已发布实例创建新的编辑中实例
如果任务没有任何实例,则创建新的编辑中实例
Args:
task_id: 任务ID
Returns:
编辑中的任务实例字典
"""
# 检查是否有编辑中的实例
editing_instance = self.get_editing_instance_by_task_id(task_id)
if editing_instance:
return editing_instance
# 检查是否有已发布的实例
published_instance = self.get_latest_published_instance_by_task_id(task_id)
if published_instance:
# 基于已发布实例创建新的编辑中实例
return self.create_instance(
task_id=task_id,
name=published_instance.get("name"),
variables=published_instance.get("variables"),
priority=published_instance.get("priority", 1),
input_params=published_instance.get("input_params"),
block_outputs=published_instance.get("block_outputs"),
context_params=published_instance.get("context_params"),
status=TaskInstanceStatus.EDITING
)
# 获取任务信息
task = self.db.query(Task).filter(Task.task_id == task_id, Task.is_deleted == False).first()
if not task:
raise ValueError(f"任务不存在: {task_id}")
# 创建新的编辑中实例
return self.create_instance(
task_id=task_id,
name=task.name,
variables=task.variables if hasattr(task, "variables") else {},
priority=1,
status=TaskInstanceStatus.EDITING
)