""" 任务服务 """ # services/task_service.py from typing import Dict, Any, List, Optional import json import time import uuid import datetime from core.workflow import WorkflowDefinition from services.workflow_service import WorkflowService from config.database import db_session from config.task_config import get_task_type_name, TaskTypeConfig, TaskStatusConfig, TaskType, TaskStatus from utils.logger import get_logger # 获取日志记录器 logger = get_logger(__name__) class TaskService: """任务服务,负责任务的CRUD操作和管理""" def __init__(self): self.workflow_service = WorkflowService() # 导入Task模型 from data.models.task import Task, TaskType as DBTaskType self.Task = Task self.TaskType = DBTaskType # 实际实现中,这里应该使用数据库存储任务记录 # 这里简化为使用内存存储 self._tasks = {} def create_task(self, name: str, task_type: str, version: int = 1, description: str = "", template_desc: str = "用户自有模板", is_scheduled: bool = False) -> Dict[str, Any]: """创建任务""" try: # 验证任务类型是否在配置中存在 if task_type not in TaskTypeConfig.DETAILS: logger.warning(f"未知的任务类型: {task_type},使用默认类型{TaskTypeConfig.NORMAL}") task_type = TaskTypeConfig.NORMAL # 将字符串类型转换为枚举类型 try: task_type_enum = self.TaskType[task_type] except (KeyError, AttributeError): # 如果找不到对应的枚举值,默认使用NORMAL类型 logger.warning(f"无法将任务类型 {task_type} 转换为枚举,使用默认类型NORMAL") task_type_enum = self.TaskType.NORMAL # 使用Task模型的create_task方法创建任务并写入数据库 task_obj = self.Task.create_task( name=name, task_type=task_type_enum, description=description, template_description=template_desc, is_scheduled=is_scheduled ) # 使用to_json方法转换为字典 task = task_obj.to_json() # 添加版本信息 task["version"] = version logger.info(f"创建任务: {task['name']} (ID: {task['task_id']})") return task except Exception as e: logger.error(f"创建任务失败: {str(e)}") raise def execute_task(self, task_id: str) -> Dict[str, Any]: """执行任务""" try: # 从数据库中获取任务 task_obj = self.Task.get_by_uuid(task_id) if not task_obj: raise ValueError(f"找不到任务: {task_id}") # 执行任务逻辑... # 这里简化为直接返回成功 current_time = int(time.time() * 1000) result = { "task_status": "completed", "executing_robot": "robot-001", "success": True, "message": "任务执行成功" } # 使用to_json方法转换为字典 task = task_obj.to_json() # 添加执行结果相关字段 task.update({ "task_status": result["task_status"], "updated_at": current_time, "start_time": current_time, "end_time": current_time, "executing_robot": result["executing_robot"], "result": result }) logger.info(f"执行任务成功: {task['name']} (ID: {task_id})") return task except Exception as e: logger.error(f"执行任务失败: {str(e)}") raise def get_task_by_id(self, task_id: str) -> Optional[Dict[str, Any]]: """根据ID获取任务""" try: # 从数据库中获取任务 task_obj = self.Task.get_by_uuid(task_id) if not task_obj: return None # 使用to_json方法转换为字典 return task_obj.to_json() except Exception as e: logger.error(f"获取任务失败: {str(e)}") return None def get_all_tasks(self, status: Optional[str] = None, task_type: Optional[str] = None, name: Optional[str] = None, is_scheduled: Optional[bool] = None, created_start: Optional[int] = None, created_end: Optional[int] = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 10) -> tuple[List[Dict[str, Any]], int]: """ 获取任务列表 Args: status (str, optional): 任务状态 task_type (str, optional): 任务类型 name (str, optional): 任务名称(模糊查询) is_scheduled (bool, optional): 是否为定时任务 created_start (int, optional): 创建时间起始(毫秒时间戳) created_end (int, optional): 创建时间结束(毫秒时间戳) sort_by (str): 排序字段,可选值:created_at, updated_at, name sort_order (str): 排序方式,可选值:asc, desc page (int): 页码 page_size (int): 每页数量 Returns: tuple: (任务列表, 总数) """ try: # 构建查询 query = self.Task.query.filter(self.Task.is_deleted == False) # 应用筛选条件 if status: # 处理枚举值 if hasattr(status, "value"): status = status.value # 实际应该关联任务记录表查询状态 # 这里简化处理,假设任务状态存储在任务表中 pass if task_type: # 处理枚举值 if hasattr(task_type, "value"): task_type = task_type.value query = query.filter(self.Task.task_type == task_type) if name: query = query.filter(self.Task.name.like(f"%{name}%")) if is_scheduled is not None: query = query.filter(self.Task.is_scheduled == is_scheduled) if created_start: # 将毫秒时间戳转换为datetime created_start_dt = datetime.datetime.fromtimestamp(created_start / 1000) query = query.filter(self.Task.created_at >= created_start_dt) if created_end: # 将毫秒时间戳转换为datetime created_end_dt = datetime.datetime.fromtimestamp(created_end / 1000) query = query.filter(self.Task.created_at <= created_end_dt) # 获取总数 total = query.count() # 应用排序 # 处理枚举值 if hasattr(sort_by, "value"): sort_by = sort_by.value if sort_by == "created_at": order_column = self.Task.created_at elif sort_by == "updated_at": order_column = self.Task.updated_at elif sort_by == "name": order_column = self.Task.name else: order_column = self.Task.created_at # 处理枚举值 if hasattr(sort_order, "value"): sort_order = sort_order.value if sort_order.lower() == "asc": query = query.order_by(order_column.asc()) else: query = query.order_by(order_column.desc()) # 应用分页 query = query.offset((page - 1) * page_size).limit(page_size) # 获取查询结果 task_objs = query.all() # 使用to_json方法转换为字典列表 tasks = [task_obj.to_json() for task_obj in task_objs] return tasks, total except Exception as e: logger.error(f"获取任务列表失败: {str(e)}") return [], 0 def cancel_task(self, task_id: str) -> Dict[str, Any]: """取消任务""" try: # 从数据库中获取任务 task_obj = self.Task.get_by_uuid(task_id) if not task_obj: logger.error(f"取消任务失败: 找不到任务 {task_id}") raise ValueError(f"找不到任务: {task_id}") # 使用to_json方法转换为字典 task = task_obj.to_json() # 更新任务状态为已取消 task.update({ "task_status": "cancelled", "updated_at": int(time.time() * 1000) }) logger.info(f"取消任务: {task['name']} (ID: {task_id})") return task except Exception as e: logger.error(f"取消任务失败: {str(e)}") raise def delete_task(self, task_id: str) -> bool: """删除任务""" try: # 从数据库中获取任务 task_obj = self.Task.get_by_uuid(task_id) if not task_obj: logger.error(f"删除任务失败: 找不到任务 {task_id}") return False # 软删除任务 task_obj.is_deleted = True task_obj.updated_at = datetime.datetime.now() db_session.commit() logger.info(f"删除任务: {task_id}") return True except Exception as e: logger.error(f"删除任务失败: {str(e)}") return False