2025-03-17 14:58:05 +08:00

275 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务服务
"""
# services/task_service.py
from typing import Dict, Any, List, Optional
import json
import time
import uuid
import datetime
from core.workflow import WorkflowDefinition
from services.workflow_service import WorkflowService
from config.database import db_session
from config.task_config import get_task_type_name, TaskTypeConfig, TaskStatusConfig, TaskType, TaskStatus
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger(__name__)
class TaskService:
"""任务服务负责任务的CRUD操作和管理"""
def __init__(self):
self.workflow_service = WorkflowService()
# 导入Task模型
from data.models.task import Task, TaskType as DBTaskType
self.Task = Task
self.TaskType = DBTaskType
# 实际实现中,这里应该使用数据库存储任务记录
# 这里简化为使用内存存储
self._tasks = {}
def create_task(self, name: str, task_type: str, version: int = 1,
description: str = "", template_desc: str = "用户自有模板",
is_scheduled: bool = False) -> Dict[str, Any]:
"""创建任务"""
try:
# 验证任务类型是否在配置中存在
if task_type not in TaskTypeConfig.DETAILS:
logger.warning(f"未知的任务类型: {task_type},使用默认类型{TaskTypeConfig.NORMAL}")
task_type = TaskTypeConfig.NORMAL
# 将字符串类型转换为枚举类型
try:
task_type_enum = self.TaskType[task_type]
except (KeyError, AttributeError):
# 如果找不到对应的枚举值默认使用NORMAL类型
logger.warning(f"无法将任务类型 {task_type} 转换为枚举使用默认类型NORMAL")
task_type_enum = self.TaskType.NORMAL
# 使用Task模型的create_task方法创建任务并写入数据库
task_obj = self.Task.create_task(
name=name,
task_type=task_type_enum,
description=description,
template_description=template_desc,
is_scheduled=is_scheduled
)
# 使用to_json方法转换为字典
task = task_obj.to_json()
# 添加版本信息
task["version"] = version
logger.info(f"创建任务: {task['name']} (ID: {task['task_id']})")
return task
except Exception as e:
logger.error(f"创建任务失败: {str(e)}")
raise
def execute_task(self, task_id: str) -> Dict[str, Any]:
"""执行任务"""
try:
# 从数据库中获取任务
task_obj = self.Task.get_by_uuid(task_id)
if not task_obj:
raise ValueError(f"找不到任务: {task_id}")
# 执行任务逻辑...
# 这里简化为直接返回成功
current_time = int(time.time() * 1000)
result = {
"task_status": "completed",
"executing_robot": "robot-001",
"success": True,
"message": "任务执行成功"
}
# 使用to_json方法转换为字典
task = task_obj.to_json()
# 添加执行结果相关字段
task.update({
"task_status": result["task_status"],
"updated_at": current_time,
"start_time": current_time,
"end_time": current_time,
"executing_robot": result["executing_robot"],
"result": result
})
logger.info(f"执行任务成功: {task['name']} (ID: {task_id})")
return task
except Exception as e:
logger.error(f"执行任务失败: {str(e)}")
raise
def get_task_by_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取任务"""
try:
# 从数据库中获取任务
task_obj = self.Task.get_by_uuid(task_id)
if not task_obj:
return None
# 使用to_json方法转换为字典
return task_obj.to_json()
except Exception as e:
logger.error(f"获取任务失败: {str(e)}")
return None
def get_all_tasks(self, status: Optional[str] = None,
task_type: Optional[str] = None,
name: Optional[str] = None,
is_scheduled: Optional[bool] = None,
created_start: Optional[int] = None,
created_end: Optional[int] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
page: int = 1,
page_size: int = 10) -> tuple[List[Dict[str, Any]], int]:
"""
获取任务列表
Args:
status (str, optional): 任务状态
task_type (str, optional): 任务类型
name (str, optional): 任务名称(模糊查询)
is_scheduled (bool, optional): 是否为定时任务
created_start (int, optional): 创建时间起始(毫秒时间戳)
created_end (int, optional): 创建时间结束(毫秒时间戳)
sort_by (str): 排序字段可选值created_at, updated_at, name
sort_order (str): 排序方式可选值asc, desc
page (int): 页码
page_size (int): 每页数量
Returns:
tuple: (任务列表, 总数)
"""
try:
# 构建查询
query = self.Task.query.filter(self.Task.is_deleted == False)
# 应用筛选条件
if status:
# 处理枚举值
if hasattr(status, "value"):
status = status.value
# 实际应该关联任务记录表查询状态
# 这里简化处理,假设任务状态存储在任务表中
pass
if task_type:
# 处理枚举值
if hasattr(task_type, "value"):
task_type = task_type.value
query = query.filter(self.Task.task_type == task_type)
if name:
query = query.filter(self.Task.name.like(f"%{name}%"))
if is_scheduled is not None:
query = query.filter(self.Task.is_scheduled == is_scheduled)
if created_start:
# 将毫秒时间戳转换为datetime
created_start_dt = datetime.datetime.fromtimestamp(created_start / 1000)
query = query.filter(self.Task.created_at >= created_start_dt)
if created_end:
# 将毫秒时间戳转换为datetime
created_end_dt = datetime.datetime.fromtimestamp(created_end / 1000)
query = query.filter(self.Task.created_at <= created_end_dt)
# 获取总数
total = query.count()
# 应用排序
# 处理枚举值
if hasattr(sort_by, "value"):
sort_by = sort_by.value
if sort_by == "created_at":
order_column = self.Task.created_at
elif sort_by == "updated_at":
order_column = self.Task.updated_at
elif sort_by == "name":
order_column = self.Task.name
else:
order_column = self.Task.created_at
# 处理枚举值
if hasattr(sort_order, "value"):
sort_order = sort_order.value
if sort_order.lower() == "asc":
query = query.order_by(order_column.asc())
else:
query = query.order_by(order_column.desc())
# 应用分页
query = query.offset((page - 1) * page_size).limit(page_size)
# 获取查询结果
task_objs = query.all()
# 使用to_json方法转换为字典列表
tasks = [task_obj.to_json() for task_obj in task_objs]
return tasks, total
except Exception as e:
logger.error(f"获取任务列表失败: {str(e)}")
return [], 0
def cancel_task(self, task_id: str) -> Dict[str, Any]:
"""取消任务"""
try:
# 从数据库中获取任务
task_obj = self.Task.get_by_uuid(task_id)
if not task_obj:
logger.error(f"取消任务失败: 找不到任务 {task_id}")
raise ValueError(f"找不到任务: {task_id}")
# 使用to_json方法转换为字典
task = task_obj.to_json()
# 更新任务状态为已取消
task.update({
"task_status": "cancelled",
"updated_at": int(time.time() * 1000)
})
logger.info(f"取消任务: {task['name']} (ID: {task_id})")
return task
except Exception as e:
logger.error(f"取消任务失败: {str(e)}")
raise
def delete_task(self, task_id: str) -> bool:
"""删除任务"""
try:
# 从数据库中获取任务
task_obj = self.Task.get_by_uuid(task_id)
if not task_obj:
logger.error(f"删除任务失败: 找不到任务 {task_id}")
return False
# 软删除任务
task_obj.is_deleted = True
task_obj.updated_at = datetime.datetime.now()
db_session.commit()
logger.info(f"删除任务: {task_id}")
return True
except Exception as e:
logger.error(f"删除任务失败: {str(e)}")
return False