#!/usr/bin/env python # -*- coding: utf-8 -*- """ 天风任务服务层 处理天风任务相关的业务逻辑 """ import json import uuid import datetime from sqlalchemy.orm import Session from typing import List, Dict, Any, Optional from sqlalchemy import and_, or_, desc, asc from data.models.taskdef import VWEDTaskDef from data.models.taskrecord import VWEDTaskRecord from data.models.tasktemplate import VWEDTaskTemplate from data.enum.task_def_enum import TaskTypeEnum, TaskStatusEnum, EnableStatus, PeriodicTaskStatus from utils.logger import get_logger from data.session import get_async_session from sqlalchemy import select, update from data.enum.task_record_enum import TaskStatus # 设置日志 logger = get_logger("services.task_service") # 自定义异常类 class TaskNameExistsError(Exception): """任务名称已存在异常""" pass class TaskService: """ 天风任务服务类 提供天风任务的相关业务操作 """ @staticmethod def get_task_list( db: Session, page_num: int = 1, page_size: int = 10, keyword: Optional[str] = None, status: Optional[int] = None, sort_field: Optional[str] = None, sort_order: Optional[str] = "desc" ) -> Dict[str, Any]: """ 获取任务列表 Args: db: 数据库会话 page_num: 页码,从1开始 page_size: 每页记录数 keyword: 关键字(任务名称),模糊匹配 status: 状态过滤 sort_field: 排序字段 sort_order: 排序方式:asc(升序)、desc(降序) Returns: Dict[str, Any]: 包含任务列表和分页信息 """ try: # 构建查询条件 query = db.query(VWEDTaskDef) # 添加筛选条件 if keyword: query = query.filter(VWEDTaskDef.label.like(f'%{keyword}%')) if status is not None: query = query.filter(VWEDTaskDef.status == status) # 计算总数 total = query.count() # 添加排序 if sort_field and sort_field in VWEDTaskDef.__table__.columns: direction = desc if sort_order == "desc" else asc query = query.order_by(direction(getattr(VWEDTaskDef, sort_field))) else: # 默认按创建时间倒序 query = query.order_by(desc(VWEDTaskDef.created_at)) # 分页处理 offset = (page_num - 1) * page_size tasks = query.offset(offset).limit(page_size).all() # 格式化返回结果 task_list = [] for task in tasks: task_list.append({ "id": task.id, "label": task.label, "version": task.version, "detail": task.detail, "templateName": task.template_name, "periodicTask": bool(task.periodic_task), "ifEnable": task.if_enable, "status": task.status, "createDate": task.created_at, "remark": task.remark, "tenant_id": task.tenant_id, "updated_at": task.updated_at }) return { "total": total, "list": task_list, "pageNum": page_num, "pageSize": page_size, "totalPages": (total + page_size - 1) // page_size } except Exception as e: logger.error(f"获取任务列表失败: {str(e)}") raise Exception(f"获取任务列表失败: {str(e)}") @staticmethod def create_task( db: Session, label: str, task_type: int, remark: Optional[str] = None, period: Optional[int] = None, delay: Optional[int] = 3000, release_sites: Optional[bool] = True, token: Optional[str] = None, tenant_id: str = "default", map_id: Optional[str] = None ) -> Dict[str, Any]: """ 创建任务 Args: db: 数据库会话 label: 任务名称 task_type: 任务类型:1-普通任务,2-定时任务 remark: 任务备注 period: 周期时间(毫秒),定时任务必填 delay: 延迟时间(毫秒),默认3000 release_sites: 是否释放站点,默认true token: 用户token值,用于认证 tenant_id: 租户ID,用于多租户隔离,默认为"default" map_id: 相关地图ID Returns: Dict[str, Any]: 新建任务信息 Raises: ValueError: 参数验证失败时 TaskNameExistsError: 任务名称已存在时 """ try: # 验证任务名称 if not label or label.strip() == "": raise ValueError("任务名称不能为空") # 验证任务名称长度 if len(label.strip()) < 2: raise ValueError("任务名称至少需要2个字符") # 检查任务名称是否已存在 existing_task = db.query(VWEDTaskDef).filter(VWEDTaskDef.label == label.strip()).first() if existing_task: raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称") # 验证任务类型 if task_type not in [TaskTypeEnum.NORMAL, TaskTypeEnum.PERIODIC]: raise ValueError("任务类型无效,应为1(普通任务)或2(定时任务)") # 验证定时任务参数 if task_type == TaskTypeEnum.PERIODIC and not period: raise ValueError("定时任务必须指定周期时间") # 获取当前启用的模板 template = db.query(VWEDTaskTemplate).filter( VWEDTaskTemplate.template_if_enable == 1 ).first() if not template: raise ValueError("没有可用的任务模板,请先启用一个模板") # 创建任务定义 task_id = str(uuid.uuid4()) new_task = VWEDTaskDef( id=task_id, label=label.strip(), # 去除首尾空白字符 version=1, detail=json.dumps({ "inputParams": [], "outputParams": [], "rootBlock": None }, ensure_ascii=False), template_name=template.template_name, period=period if period else 3000, periodic_task=PeriodicTaskStatus.PERIODIC if task_type == TaskTypeEnum.PERIODIC else PeriodicTaskStatus.NON_PERIODIC, status=TaskStatusEnum.PENDING, if_enable=EnableStatus.DISABLED, delay=delay, release_sites=release_sites, remark=remark, tenant_id=tenant_id, # 使用传入的租户ID user_token=token, # 保存用户token map_id=map_id # 保存地图ID ) db.add(new_task) db.commit() db.refresh(new_task) # 返回任务信息 return { "id": new_task.id, "label": new_task.label, "version": new_task.version, "detail": new_task.detail, "templateName": new_task.template_name, "periodicTask": bool(new_task.periodic_task), "status": new_task.status, "createDate": new_task.created_at, "mapId": new_task.map_id # 返回地图ID } except TaskNameExistsError as e: db.rollback() logger.warning(f"创建任务失败: {str(e)}") raise # 重新抛出异常,不包装 except Exception as e: db.rollback() logger.error(f"创建任务失败: {str(e)}") raise Exception(f"创建任务失败: {str(e)}") @staticmethod def delete_tasks(db: Session, ids: List[str]) -> bool: """ 删除任务 Args: db: 数据库会话 ids: 要删除的任务ID列表 Returns: bool: 是否删除成功 """ try: # 检查任务是否存在 tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all() if len(tasks) != len(ids): missing_ids = set(ids) - {task.id for task in tasks} raise ValueError(f"以下任务不存在: {', '.join(missing_ids)}") # 删除任务 for task in tasks: db.delete(task) db.commit() return True except Exception as e: db.rollback() logger.error(f"删除任务失败: {str(e)}") raise Exception(f"删除任务失败: {str(e)}") @staticmethod def get_task_by_id(db: Session, task_id: str) -> Optional[Dict[str, Any]]: """ 根据ID获取任务 Args: db: 数据库会话 task_id: 任务ID Returns: Optional[Dict[str, Any]]: 任务信息,如果不存在则返回None """ try: # 显式启动事务 task = db.query(VWEDTaskDef).filter(VWEDTaskDef.id == task_id).first() if not task: return None # 格式化任务信息 task_info = { "id": task.id, "label": task.label, "version": task.version, "detail": json.loads(task.detail) if task.detail else {}, "templateName": task.template_name, "period": task.period, "periodicTask": bool(task.periodic_task), "status": task.status, "ifEnable": task.if_enable, "delay": task.delay, "releaseSites": bool(task.release_sites) if task.release_sites is not None else None, "createDate": task.created_at, "remark": task.remark, } # 提交事务以释放数据库锁 db.commit() return task_info except Exception as e: # 发生异常时回滚事务 db.rollback() logger.error(f"获取任务失败: {str(e)}") raise Exception(f"获取任务失败: {str(e)}") @staticmethod def export_tasks(db: Session, ids: List[str]) -> List[Dict[str, Any]]: """ 导出任务 Args: db: 数据库会话 ids: 任务ID列表 Returns: List[Dict[str, Any]]: 任务配置列表 """ try: tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all() if not tasks: raise ValueError(f"未找到指定的任务") # 格式化任务信息 tasks_export = [] for task in tasks: task_export = { "id": task.id, "label": task.label, "version": task.version, "detail": task.detail, "templateName": task.template_name, "templateDescription": db.query(VWEDTaskTemplate.template_description) .filter(VWEDTaskTemplate.template_name == task.template_name) .scalar() or "", "period": task.period, "periodicTask": task.periodic_task, "status": task.status, "ifEnable": task.if_enable, "delay": task.delay, "remark": task.remark, "windcategoryId": 0 # 默认分类ID } tasks_export.append(task_export) return tasks_export except Exception as e: logger.error(f"导出任务失败: {str(e)}") raise Exception(f"导出任务失败: {str(e)}") @staticmethod def import_task( db: Session, task_data: Dict[str, Any], task_name: Optional[str] = None, token: Optional[str] = None, tenant_id: str = "default" ) -> Dict[str, Any]: """ 导入任务 Args: db: 数据库会话 task_data: 任务配置数据 task_name: 导入后的任务名称,不填则使用原名称 token: 用户token,用于认证 tenant_id: 租户ID,默认为"default" Returns: Dict[str, Any]: 导入的任务信息 Raises: ValueError: 参数验证失败时 TaskNameExistsError: 任务名称已存在时 """ try: # 创建新ID避免冲突 new_id = str(uuid.uuid4()) # 设置任务名称 label = task_name or task_data.get("label", "导入的任务") # 获取detail字段,这是唯一必需的字段 detail = task_data.get("detail") if not detail: raise ValueError("任务数据缺少detail字段,无法导入") # 检查任务名称是否已存在 existing_task = db.query(VWEDTaskDef).filter( VWEDTaskDef.label == label.strip(), VWEDTaskDef.tenant_id == tenant_id ).first() if existing_task: raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称") # 获取当前启用的模板 template = db.query(VWEDTaskTemplate).filter( VWEDTaskTemplate.template_if_enable == 1 ).first() if not template: template_name = task_data.get("templateName", "默认模板") else: template_name = template.template_name # 创建新任务 new_task = VWEDTaskDef( id=new_id, label=label, version=1, # 新任务从版本1开始 detail=detail, template_name=template_name, period=task_data.get("period", 3000), periodic_task=task_data.get("periodicTask", PeriodicTaskStatus.NON_PERIODIC), status=TaskStatusEnum.PENDING, # 初始状态为0 if_enable=EnableStatus.DISABLED, # 初始未启用 delay=task_data.get("delay", 3000), release_sites=task_data.get("releaseSites", True), # 默认释放站点 remark=task_data.get("remark", ""), tenant_id=tenant_id, user_token=token ) db.add(new_task) db.commit() db.refresh(new_task) # 返回任务信息 return { "id": new_task.id, "label": new_task.label, "version": new_task.version, "status": new_task.status, "createDate": new_task.created_at } except TaskNameExistsError as e: db.rollback() logger.warning(f"导入任务失败: {str(e)}") raise # 重新抛出异常,不包装 except Exception as e: db.rollback() logger.error(f"导入任务失败: {str(e)}") raise Exception(f"导入任务失败: {str(e)}") @staticmethod async def stop_task_def(task_def_id: str) -> Dict[str, Any]: """ 停止指定任务定义下的所有运行任务实例,同时禁用定时任务 Args: task_def_id: 任务定义ID Returns: 字典包含停止结果信息: - success: 是否操作成功 - message: 操作结果消息 - is_periodic: 是否为定时任务 - total_running: 运行中的任务总数 - stopped_count: 成功停止的任务数量 - failed_count: 停止失败的任务数量 - failed_tasks: 停止失败的任务记录ID列表 """ # 导入增强版调度器 from services.enhanced_scheduler import scheduler try: async with get_async_session() as session: # 查询任务定义 result = await session.execute( select(VWEDTaskDef).where(VWEDTaskDef.id == task_def_id) ) task_def = result.scalars().first() if not task_def: return { "success": False, "message": f"未找到任务定义: {task_def_id}" } is_periodic = task_def.periodic_task == PeriodicTaskStatus.PERIODIC # 初始化计数器 total_running = 0 stopped_count = 0 failed_count = 0 failed_tasks = [] # 如果是定时任务则禁用 if is_periodic and task_def.if_enable == EnableStatus.ENABLED: # 更新任务定义状态为禁用 await session.execute( update(VWEDTaskDef) .where(VWEDTaskDef.id == task_def_id) .values(if_enable=EnableStatus.DISABLED) ) await session.commit() # 通知调度器 update_result = await scheduler.update_periodic_task(task_def_id, enable=False) # 将定时任务的禁用也计入停止任务的数量 total_running += 1 if update_result.get("success", True): # 假设通知调度器成功,除非明确返回失败 stopped_count += 1 else: failed_count += 1 failed_tasks.append({ "taskRecordId": "periodic_" + task_def_id, "reason": update_result.get("message", "禁用定时任务失败") }) # 查找所有正在运行的任务记录 running_tasks_query = await session.execute( select(VWEDTaskRecord) .where( VWEDTaskRecord.def_id == task_def_id, VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码 ) ) running_tasks = running_tasks_query.scalars().all() # 更新总计数 total_running += len(running_tasks) # 取消所有运行中的任务 for task_record in running_tasks: cancel_result = await scheduler.cancel_task(task_record.id) if cancel_result.get("success", False): stopped_count += 1 else: failed_count += 1 failed_tasks.append({ "taskRecordId": task_record.id, "reason": cancel_result.get("message", "未知原因") }) # 更新任务定义状态为已结束(0) await session.execute( update(VWEDTaskDef) .where(VWEDTaskDef.id == task_def_id) .values(status=TaskStatusEnum.PENDING) ) await session.commit() return { "success": True, "message": "操作完成", "is_periodic": is_periodic, "total_running": total_running, "stopped_count": stopped_count, "failed_count": failed_count, "failed_tasks": failed_tasks } except Exception as e: return { "success": False, "message": f"停止任务失败: {str(e)}" }