275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""
|
||
任务服务
|
||
"""
|
||
# services/task_service.py
|
||
from typing import Dict, Any, List, Optional
|
||
import json
|
||
import time
|
||
import uuid
|
||
import datetime
|
||
from core.workflow import WorkflowDefinition
|
||
from services.workflow_service import WorkflowService
|
||
from config.database import db_session
|
||
from config.task_config import get_task_type_name, TaskTypeConfig, TaskStatusConfig, TaskType, TaskStatus
|
||
from utils.logger import get_logger
|
||
|
||
# 获取日志记录器
|
||
logger = get_logger(__name__)
|
||
|
||
class TaskService:
|
||
"""任务服务,负责任务的CRUD操作和管理"""
|
||
|
||
def __init__(self):
|
||
self.workflow_service = WorkflowService()
|
||
# 导入Task模型
|
||
from data.models.task import Task, TaskType as DBTaskType
|
||
self.Task = Task
|
||
self.TaskType = DBTaskType
|
||
# 实际实现中,这里应该使用数据库存储任务记录
|
||
# 这里简化为使用内存存储
|
||
self._tasks = {}
|
||
|
||
def create_task(self, name: str, task_type: str, version: int = 1,
|
||
description: str = "", template_desc: str = "用户自有模板",
|
||
is_scheduled: bool = False) -> Dict[str, Any]:
|
||
"""创建任务"""
|
||
try:
|
||
# 验证任务类型是否在配置中存在
|
||
if task_type not in TaskTypeConfig.DETAILS:
|
||
logger.warning(f"未知的任务类型: {task_type},使用默认类型{TaskTypeConfig.NORMAL}")
|
||
task_type = TaskTypeConfig.NORMAL
|
||
|
||
# 将字符串类型转换为枚举类型
|
||
try:
|
||
task_type_enum = self.TaskType[task_type]
|
||
except (KeyError, AttributeError):
|
||
# 如果找不到对应的枚举值,默认使用NORMAL类型
|
||
logger.warning(f"无法将任务类型 {task_type} 转换为枚举,使用默认类型NORMAL")
|
||
task_type_enum = self.TaskType.NORMAL
|
||
|
||
# 使用Task模型的create_task方法创建任务并写入数据库
|
||
task_obj = self.Task.create_task(
|
||
name=name,
|
||
task_type=task_type_enum,
|
||
description=description,
|
||
template_description=template_desc,
|
||
is_scheduled=is_scheduled
|
||
)
|
||
|
||
# 使用to_json方法转换为字典
|
||
task = task_obj.to_json()
|
||
|
||
# 添加版本信息
|
||
task["version"] = version
|
||
|
||
logger.info(f"创建任务: {task['name']} (ID: {task['task_id']})")
|
||
|
||
return task
|
||
except Exception as e:
|
||
logger.error(f"创建任务失败: {str(e)}")
|
||
raise
|
||
|
||
def execute_task(self, task_id: str) -> Dict[str, Any]:
|
||
"""执行任务"""
|
||
try:
|
||
# 从数据库中获取任务
|
||
task_obj = self.Task.get_by_uuid(task_id)
|
||
|
||
if not task_obj:
|
||
raise ValueError(f"找不到任务: {task_id}")
|
||
|
||
# 执行任务逻辑...
|
||
# 这里简化为直接返回成功
|
||
current_time = int(time.time() * 1000)
|
||
result = {
|
||
"task_status": "completed",
|
||
"executing_robot": "robot-001",
|
||
"success": True,
|
||
"message": "任务执行成功"
|
||
}
|
||
|
||
# 使用to_json方法转换为字典
|
||
task = task_obj.to_json()
|
||
|
||
# 添加执行结果相关字段
|
||
task.update({
|
||
"task_status": result["task_status"],
|
||
"updated_at": current_time,
|
||
"start_time": current_time,
|
||
"end_time": current_time,
|
||
"executing_robot": result["executing_robot"],
|
||
"result": result
|
||
})
|
||
|
||
logger.info(f"执行任务成功: {task['name']} (ID: {task_id})")
|
||
|
||
return task
|
||
except Exception as e:
|
||
logger.error(f"执行任务失败: {str(e)}")
|
||
raise
|
||
|
||
def get_task_by_id(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""根据ID获取任务"""
|
||
try:
|
||
# 从数据库中获取任务
|
||
task_obj = self.Task.get_by_uuid(task_id)
|
||
|
||
if not task_obj:
|
||
return None
|
||
|
||
# 使用to_json方法转换为字典
|
||
return task_obj.to_json()
|
||
except Exception as e:
|
||
logger.error(f"获取任务失败: {str(e)}")
|
||
return None
|
||
|
||
def get_all_tasks(self, status: Optional[str] = None,
|
||
task_type: Optional[str] = None,
|
||
name: Optional[str] = None,
|
||
is_scheduled: Optional[bool] = None,
|
||
created_start: Optional[int] = None,
|
||
created_end: Optional[int] = None,
|
||
sort_by: str = "created_at",
|
||
sort_order: str = "desc",
|
||
page: int = 1,
|
||
page_size: int = 10) -> tuple[List[Dict[str, Any]], int]:
|
||
"""
|
||
获取任务列表
|
||
|
||
Args:
|
||
status (str, optional): 任务状态
|
||
task_type (str, optional): 任务类型
|
||
name (str, optional): 任务名称(模糊查询)
|
||
is_scheduled (bool, optional): 是否为定时任务
|
||
created_start (int, optional): 创建时间起始(毫秒时间戳)
|
||
created_end (int, optional): 创建时间结束(毫秒时间戳)
|
||
sort_by (str): 排序字段,可选值:created_at, updated_at, name
|
||
sort_order (str): 排序方式,可选值:asc, desc
|
||
page (int): 页码
|
||
page_size (int): 每页数量
|
||
|
||
Returns:
|
||
tuple: (任务列表, 总数)
|
||
"""
|
||
try:
|
||
# 构建查询
|
||
query = self.Task.query.filter(self.Task.is_deleted == False)
|
||
|
||
# 应用筛选条件
|
||
if status:
|
||
# 处理枚举值
|
||
if hasattr(status, "value"):
|
||
status = status.value
|
||
# 实际应该关联任务记录表查询状态
|
||
# 这里简化处理,假设任务状态存储在任务表中
|
||
pass
|
||
|
||
if task_type:
|
||
# 处理枚举值
|
||
if hasattr(task_type, "value"):
|
||
task_type = task_type.value
|
||
query = query.filter(self.Task.task_type == task_type)
|
||
|
||
if name:
|
||
query = query.filter(self.Task.name.like(f"%{name}%"))
|
||
|
||
if is_scheduled is not None:
|
||
query = query.filter(self.Task.is_scheduled == is_scheduled)
|
||
|
||
if created_start:
|
||
# 将毫秒时间戳转换为datetime
|
||
created_start_dt = datetime.datetime.fromtimestamp(created_start / 1000)
|
||
query = query.filter(self.Task.created_at >= created_start_dt)
|
||
|
||
if created_end:
|
||
# 将毫秒时间戳转换为datetime
|
||
created_end_dt = datetime.datetime.fromtimestamp(created_end / 1000)
|
||
query = query.filter(self.Task.created_at <= created_end_dt)
|
||
|
||
# 获取总数
|
||
total = query.count()
|
||
|
||
# 应用排序
|
||
# 处理枚举值
|
||
if hasattr(sort_by, "value"):
|
||
sort_by = sort_by.value
|
||
|
||
if sort_by == "created_at":
|
||
order_column = self.Task.created_at
|
||
elif sort_by == "updated_at":
|
||
order_column = self.Task.updated_at
|
||
elif sort_by == "name":
|
||
order_column = self.Task.name
|
||
else:
|
||
order_column = self.Task.created_at
|
||
|
||
# 处理枚举值
|
||
if hasattr(sort_order, "value"):
|
||
sort_order = sort_order.value
|
||
|
||
if sort_order.lower() == "asc":
|
||
query = query.order_by(order_column.asc())
|
||
else:
|
||
query = query.order_by(order_column.desc())
|
||
|
||
# 应用分页
|
||
query = query.offset((page - 1) * page_size).limit(page_size)
|
||
|
||
# 获取查询结果
|
||
task_objs = query.all()
|
||
|
||
# 使用to_json方法转换为字典列表
|
||
tasks = [task_obj.to_json() for task_obj in task_objs]
|
||
|
||
return tasks, total
|
||
except Exception as e:
|
||
logger.error(f"获取任务列表失败: {str(e)}")
|
||
return [], 0
|
||
|
||
def cancel_task(self, task_id: str) -> Dict[str, Any]:
|
||
"""取消任务"""
|
||
try:
|
||
# 从数据库中获取任务
|
||
task_obj = self.Task.get_by_uuid(task_id)
|
||
|
||
if not task_obj:
|
||
logger.error(f"取消任务失败: 找不到任务 {task_id}")
|
||
raise ValueError(f"找不到任务: {task_id}")
|
||
|
||
# 使用to_json方法转换为字典
|
||
task = task_obj.to_json()
|
||
|
||
# 更新任务状态为已取消
|
||
task.update({
|
||
"task_status": "cancelled",
|
||
"updated_at": int(time.time() * 1000)
|
||
})
|
||
|
||
logger.info(f"取消任务: {task['name']} (ID: {task_id})")
|
||
|
||
return task
|
||
except Exception as e:
|
||
logger.error(f"取消任务失败: {str(e)}")
|
||
raise
|
||
|
||
def delete_task(self, task_id: str) -> bool:
|
||
"""删除任务"""
|
||
try:
|
||
# 从数据库中获取任务
|
||
task_obj = self.Task.get_by_uuid(task_id)
|
||
|
||
if not task_obj:
|
||
logger.error(f"删除任务失败: 找不到任务 {task_id}")
|
||
return False
|
||
|
||
# 软删除任务
|
||
task_obj.is_deleted = True
|
||
task_obj.updated_at = datetime.datetime.now()
|
||
db_session.commit()
|
||
|
||
logger.info(f"删除任务: {task_id}")
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"删除任务失败: {str(e)}")
|
||
return False |