564 lines
21 KiB
Python
564 lines
21 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
天风任务服务层
|
||
处理天风任务相关的业务逻辑
|
||
"""
|
||
|
||
import json
|
||
import uuid
|
||
import datetime
|
||
from sqlalchemy.orm import Session
|
||
from typing import List, Dict, Any, Optional
|
||
from sqlalchemy import and_, or_, desc, asc
|
||
|
||
from data.models.taskdef import VWEDTaskDef
|
||
from data.models.taskrecord import VWEDTaskRecord
|
||
from data.models.tasktemplate import VWEDTaskTemplate
|
||
from data.enum.task_def_enum import TaskTypeEnum, TaskStatusEnum, EnableStatus, PeriodicTaskStatus
|
||
from utils.logger import get_logger
|
||
from data.session import get_async_session
|
||
from sqlalchemy import select, update
|
||
from data.enum.task_record_enum import TaskStatus
|
||
# 设置日志
|
||
logger = get_logger("services.task_service")
|
||
|
||
# 自定义异常类
|
||
class TaskNameExistsError(Exception):
|
||
"""任务名称已存在异常"""
|
||
pass
|
||
|
||
class TaskService:
|
||
"""
|
||
天风任务服务类
|
||
提供天风任务的相关业务操作
|
||
"""
|
||
|
||
@staticmethod
|
||
def get_task_list(
|
||
db: Session,
|
||
page_num: int = 1,
|
||
page_size: int = 10,
|
||
keyword: Optional[str] = None,
|
||
status: Optional[int] = None,
|
||
sort_field: Optional[str] = None,
|
||
sort_order: Optional[str] = "desc"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取任务列表
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
page_num: 页码,从1开始
|
||
page_size: 每页记录数
|
||
keyword: 关键字(任务名称),模糊匹配
|
||
status: 状态过滤
|
||
sort_field: 排序字段
|
||
sort_order: 排序方式:asc(升序)、desc(降序)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 包含任务列表和分页信息
|
||
"""
|
||
try:
|
||
# 构建查询条件
|
||
query = db.query(VWEDTaskDef)
|
||
|
||
# 添加筛选条件
|
||
if keyword:
|
||
query = query.filter(VWEDTaskDef.label.like(f'%{keyword}%'))
|
||
|
||
if status is not None:
|
||
query = query.filter(VWEDTaskDef.status == status)
|
||
|
||
# 计算总数
|
||
total = query.count()
|
||
|
||
# 添加排序
|
||
if sort_field and sort_field in VWEDTaskDef.__table__.columns:
|
||
direction = desc if sort_order == "desc" else asc
|
||
query = query.order_by(direction(getattr(VWEDTaskDef, sort_field)))
|
||
else:
|
||
# 默认按创建时间倒序
|
||
query = query.order_by(desc(VWEDTaskDef.created_at))
|
||
|
||
# 分页处理
|
||
offset = (page_num - 1) * page_size
|
||
tasks = query.offset(offset).limit(page_size).all()
|
||
|
||
# 格式化返回结果
|
||
task_list = []
|
||
for task in tasks:
|
||
task_list.append({
|
||
"id": task.id,
|
||
"label": task.label,
|
||
"version": task.version,
|
||
"detail": task.detail,
|
||
"templateName": task.template_name,
|
||
"periodicTask": bool(task.periodic_task),
|
||
"ifEnable": task.if_enable,
|
||
"status": task.status,
|
||
"createDate": task.created_at,
|
||
"remark": task.remark,
|
||
"tenant_id": task.tenant_id,
|
||
"updated_at": task.updated_at
|
||
})
|
||
|
||
return {
|
||
"total": total,
|
||
"list": task_list,
|
||
"pageNum": page_num,
|
||
"pageSize": page_size,
|
||
"totalPages": (total + page_size - 1) // page_size
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取任务列表失败: {str(e)}")
|
||
raise Exception(f"获取任务列表失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def create_task(
|
||
db: Session,
|
||
label: str,
|
||
task_type: int,
|
||
remark: Optional[str] = None,
|
||
period: Optional[int] = None,
|
||
delay: Optional[int] = 3000,
|
||
release_sites: Optional[bool] = True,
|
||
token: Optional[str] = None,
|
||
tenant_id: str = "default",
|
||
map_id: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
创建任务
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
label: 任务名称
|
||
task_type: 任务类型:1-普通任务,2-定时任务
|
||
remark: 任务备注
|
||
period: 周期时间(毫秒),定时任务必填
|
||
delay: 延迟时间(毫秒),默认3000
|
||
release_sites: 是否释放站点,默认true
|
||
token: 用户token值,用于认证
|
||
tenant_id: 租户ID,用于多租户隔离,默认为"default"
|
||
map_id: 相关地图ID
|
||
Returns:
|
||
Dict[str, Any]: 新建任务信息
|
||
|
||
Raises:
|
||
ValueError: 参数验证失败时
|
||
TaskNameExistsError: 任务名称已存在时
|
||
"""
|
||
try:
|
||
# 验证任务名称
|
||
if not label or label.strip() == "":
|
||
raise ValueError("任务名称不能为空")
|
||
|
||
# 验证任务名称长度
|
||
if len(label.strip()) < 2:
|
||
raise ValueError("任务名称至少需要2个字符")
|
||
|
||
# 检查任务名称是否已存在
|
||
existing_task = db.query(VWEDTaskDef).filter(VWEDTaskDef.label == label.strip()).first()
|
||
if existing_task:
|
||
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
|
||
|
||
# 验证任务类型
|
||
if task_type not in [TaskTypeEnum.NORMAL, TaskTypeEnum.PERIODIC]:
|
||
raise ValueError("任务类型无效,应为1(普通任务)或2(定时任务)")
|
||
|
||
# 验证定时任务参数
|
||
if task_type == TaskTypeEnum.PERIODIC and not period:
|
||
raise ValueError("定时任务必须指定周期时间")
|
||
|
||
# 获取当前启用的模板
|
||
template = db.query(VWEDTaskTemplate).filter(
|
||
VWEDTaskTemplate.template_if_enable == 1
|
||
).first()
|
||
|
||
if not template:
|
||
raise ValueError("没有可用的任务模板,请先启用一个模板")
|
||
|
||
# 创建任务定义
|
||
task_id = str(uuid.uuid4())
|
||
new_task = VWEDTaskDef(
|
||
id=task_id,
|
||
label=label.strip(), # 去除首尾空白字符
|
||
version=1,
|
||
detail=json.dumps({
|
||
"inputParams": [],
|
||
"outputParams": [],
|
||
"rootBlock": None
|
||
}, ensure_ascii=False),
|
||
template_name=template.template_name,
|
||
period=period if period else 3000,
|
||
periodic_task=PeriodicTaskStatus.PERIODIC if task_type == TaskTypeEnum.PERIODIC else PeriodicTaskStatus.NON_PERIODIC,
|
||
status=TaskStatusEnum.PENDING,
|
||
if_enable=EnableStatus.DISABLED,
|
||
delay=delay,
|
||
release_sites=release_sites,
|
||
remark=remark,
|
||
tenant_id=tenant_id, # 使用传入的租户ID
|
||
user_token=token, # 保存用户token
|
||
map_id=map_id # 保存地图ID
|
||
)
|
||
|
||
db.add(new_task)
|
||
db.commit()
|
||
db.refresh(new_task)
|
||
|
||
# 返回任务信息
|
||
return {
|
||
"id": new_task.id,
|
||
"label": new_task.label,
|
||
"version": new_task.version,
|
||
"detail": new_task.detail,
|
||
"templateName": new_task.template_name,
|
||
"periodicTask": bool(new_task.periodic_task),
|
||
"status": new_task.status,
|
||
"createDate": new_task.created_at,
|
||
"mapId": new_task.map_id # 返回地图ID
|
||
}
|
||
|
||
except TaskNameExistsError as e:
|
||
db.rollback()
|
||
logger.warning(f"创建任务失败: {str(e)}")
|
||
raise # 重新抛出异常,不包装
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"创建任务失败: {str(e)}")
|
||
raise Exception(f"创建任务失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def delete_tasks(db: Session, ids: List[str]) -> bool:
|
||
"""
|
||
删除任务
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
ids: 要删除的任务ID列表
|
||
|
||
Returns:
|
||
bool: 是否删除成功
|
||
"""
|
||
try:
|
||
# 检查任务是否存在
|
||
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
|
||
if len(tasks) != len(ids):
|
||
missing_ids = set(ids) - {task.id for task in tasks}
|
||
raise ValueError(f"以下任务不存在: {', '.join(missing_ids)}")
|
||
|
||
# 删除任务
|
||
for task in tasks:
|
||
db.delete(task)
|
||
|
||
db.commit()
|
||
return True
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"删除任务失败: {str(e)}")
|
||
raise Exception(f"删除任务失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def get_task_by_id(db: Session, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据ID获取任务
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 任务信息,如果不存在则返回None
|
||
"""
|
||
try:
|
||
# 显式启动事务
|
||
task = db.query(VWEDTaskDef).filter(VWEDTaskDef.id == task_id).first()
|
||
if not task:
|
||
return None
|
||
|
||
# 格式化任务信息
|
||
task_info = {
|
||
"id": task.id,
|
||
"label": task.label,
|
||
"version": task.version,
|
||
"detail": json.loads(task.detail) if task.detail else {},
|
||
"templateName": task.template_name,
|
||
"period": task.period,
|
||
"periodicTask": bool(task.periodic_task),
|
||
"status": task.status,
|
||
"ifEnable": task.if_enable,
|
||
"delay": task.delay,
|
||
"releaseSites": bool(task.release_sites) if task.release_sites is not None else None,
|
||
"createDate": task.created_at,
|
||
"remark": task.remark,
|
||
}
|
||
|
||
# 提交事务以释放数据库锁
|
||
db.commit()
|
||
return task_info
|
||
|
||
except Exception as e:
|
||
# 发生异常时回滚事务
|
||
db.rollback()
|
||
logger.error(f"获取任务失败: {str(e)}")
|
||
raise Exception(f"获取任务失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def export_tasks(db: Session, ids: List[str]) -> List[Dict[str, Any]]:
|
||
"""
|
||
导出任务
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
ids: 任务ID列表
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 任务配置列表
|
||
"""
|
||
try:
|
||
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
|
||
if not tasks:
|
||
raise ValueError(f"未找到指定的任务")
|
||
|
||
# 格式化任务信息
|
||
tasks_export = []
|
||
for task in tasks:
|
||
task_export = {
|
||
"id": task.id,
|
||
"label": task.label,
|
||
"version": task.version,
|
||
"detail": task.detail,
|
||
"templateName": task.template_name,
|
||
"templateDescription": db.query(VWEDTaskTemplate.template_description)
|
||
.filter(VWEDTaskTemplate.template_name == task.template_name)
|
||
.scalar() or "",
|
||
"period": task.period,
|
||
"periodicTask": task.periodic_task,
|
||
"status": task.status,
|
||
"ifEnable": task.if_enable,
|
||
"delay": task.delay,
|
||
"remark": task.remark,
|
||
"windcategoryId": 0 # 默认分类ID
|
||
}
|
||
tasks_export.append(task_export)
|
||
|
||
return tasks_export
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出任务失败: {str(e)}")
|
||
raise Exception(f"导出任务失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
def import_task(
|
||
db: Session,
|
||
task_data: Dict[str, Any],
|
||
task_name: Optional[str] = None,
|
||
token: Optional[str] = None,
|
||
tenant_id: str = "default"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
导入任务
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
task_data: 任务配置数据
|
||
task_name: 导入后的任务名称,不填则使用原名称
|
||
token: 用户token,用于认证
|
||
tenant_id: 租户ID,默认为"default"
|
||
|
||
Returns:
|
||
Dict[str, Any]: 导入的任务信息
|
||
|
||
Raises:
|
||
ValueError: 参数验证失败时
|
||
TaskNameExistsError: 任务名称已存在时
|
||
"""
|
||
try:
|
||
# 创建新ID避免冲突
|
||
new_id = str(uuid.uuid4())
|
||
|
||
# 设置任务名称
|
||
label = task_name or task_data.get("label", "导入的任务")
|
||
|
||
# 获取detail字段,这是唯一必需的字段
|
||
detail = task_data.get("detail")
|
||
if not detail:
|
||
raise ValueError("任务数据缺少detail字段,无法导入")
|
||
|
||
# 检查任务名称是否已存在
|
||
existing_task = db.query(VWEDTaskDef).filter(
|
||
VWEDTaskDef.label == label.strip(),
|
||
VWEDTaskDef.tenant_id == tenant_id
|
||
).first()
|
||
if existing_task:
|
||
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
|
||
|
||
# 获取当前启用的模板
|
||
template = db.query(VWEDTaskTemplate).filter(
|
||
VWEDTaskTemplate.template_if_enable == 1
|
||
).first()
|
||
|
||
if not template:
|
||
template_name = task_data.get("templateName", "默认模板")
|
||
else:
|
||
template_name = template.template_name
|
||
|
||
# 创建新任务
|
||
new_task = VWEDTaskDef(
|
||
id=new_id,
|
||
label=label,
|
||
version=1, # 新任务从版本1开始
|
||
detail=detail,
|
||
template_name=template_name,
|
||
period=task_data.get("period", 3000),
|
||
periodic_task=task_data.get("periodicTask", PeriodicTaskStatus.NON_PERIODIC),
|
||
status=TaskStatusEnum.PENDING, # 初始状态为0
|
||
if_enable=EnableStatus.DISABLED, # 初始未启用
|
||
delay=task_data.get("delay", 3000),
|
||
release_sites=task_data.get("releaseSites", True), # 默认释放站点
|
||
remark=task_data.get("remark", ""),
|
||
tenant_id=tenant_id,
|
||
user_token=token
|
||
)
|
||
|
||
db.add(new_task)
|
||
db.commit()
|
||
db.refresh(new_task)
|
||
|
||
# 返回任务信息
|
||
return {
|
||
"id": new_task.id,
|
||
"label": new_task.label,
|
||
"version": new_task.version,
|
||
"status": new_task.status,
|
||
"createDate": new_task.created_at
|
||
}
|
||
|
||
except TaskNameExistsError as e:
|
||
db.rollback()
|
||
logger.warning(f"导入任务失败: {str(e)}")
|
||
raise # 重新抛出异常,不包装
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"导入任务失败: {str(e)}")
|
||
raise Exception(f"导入任务失败: {str(e)}")
|
||
|
||
@staticmethod
|
||
async def stop_task_def(task_def_id: str) -> Dict[str, Any]:
|
||
"""
|
||
停止指定任务定义下的所有运行任务实例,同时禁用定时任务
|
||
|
||
Args:
|
||
task_def_id: 任务定义ID
|
||
|
||
Returns:
|
||
字典包含停止结果信息:
|
||
- success: 是否操作成功
|
||
- message: 操作结果消息
|
||
- is_periodic: 是否为定时任务
|
||
- total_running: 运行中的任务总数
|
||
- stopped_count: 成功停止的任务数量
|
||
- failed_count: 停止失败的任务数量
|
||
- failed_tasks: 停止失败的任务记录ID列表
|
||
"""
|
||
# 导入增强版调度器
|
||
from services.enhanced_scheduler import scheduler
|
||
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 查询任务定义
|
||
result = await session.execute(
|
||
select(VWEDTaskDef).where(VWEDTaskDef.id == task_def_id)
|
||
)
|
||
task_def = result.scalars().first()
|
||
|
||
if not task_def:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到任务定义: {task_def_id}"
|
||
}
|
||
|
||
is_periodic = task_def.periodic_task == PeriodicTaskStatus.PERIODIC
|
||
|
||
# 初始化计数器
|
||
total_running = 0
|
||
stopped_count = 0
|
||
failed_count = 0
|
||
failed_tasks = []
|
||
|
||
# 如果是定时任务则禁用
|
||
if is_periodic and task_def.if_enable == EnableStatus.ENABLED:
|
||
# 更新任务定义状态为禁用
|
||
await session.execute(
|
||
update(VWEDTaskDef)
|
||
.where(VWEDTaskDef.id == task_def_id)
|
||
.values(if_enable=EnableStatus.DISABLED)
|
||
)
|
||
await session.commit()
|
||
# 通知调度器
|
||
update_result = await scheduler.update_periodic_task(task_def_id, enable=False)
|
||
|
||
# 将定时任务的禁用也计入停止任务的数量
|
||
total_running += 1
|
||
if update_result.get("success", True): # 假设通知调度器成功,除非明确返回失败
|
||
stopped_count += 1
|
||
else:
|
||
failed_count += 1
|
||
failed_tasks.append({
|
||
"taskRecordId": "periodic_" + task_def_id,
|
||
"reason": update_result.get("message", "禁用定时任务失败")
|
||
})
|
||
|
||
# 查找所有正在运行的任务记录
|
||
running_tasks_query = await session.execute(
|
||
select(VWEDTaskRecord)
|
||
.where(
|
||
VWEDTaskRecord.def_id == task_def_id,
|
||
VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码
|
||
)
|
||
)
|
||
running_tasks = running_tasks_query.scalars().all()
|
||
|
||
# 更新总计数
|
||
total_running += len(running_tasks)
|
||
|
||
# 取消所有运行中的任务
|
||
for task_record in running_tasks:
|
||
cancel_result = await scheduler.cancel_task(task_record.id)
|
||
|
||
if cancel_result.get("success", False):
|
||
stopped_count += 1
|
||
else:
|
||
failed_count += 1
|
||
failed_tasks.append({
|
||
"taskRecordId": task_record.id,
|
||
"reason": cancel_result.get("message", "未知原因")
|
||
})
|
||
|
||
# 更新任务定义状态为已结束(0)
|
||
await session.execute(
|
||
update(VWEDTaskDef)
|
||
.where(VWEDTaskDef.id == task_def_id)
|
||
.values(status=TaskStatusEnum.PENDING)
|
||
)
|
||
await session.commit()
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "操作完成",
|
||
"is_periodic": is_periodic,
|
||
"total_running": total_running,
|
||
"stopped_count": stopped_count,
|
||
"failed_count": failed_count,
|
||
"failed_tasks": failed_tasks
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"message": f"停止任务失败: {str(e)}"
|
||
} |