VWED_server/services/task_service.py
2025-05-12 15:43:21 +08:00

564 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
天风任务服务层
处理天风任务相关的业务逻辑
"""
import json
import uuid
import datetime
from sqlalchemy.orm import Session
from typing import List, Dict, Any, Optional
from sqlalchemy import and_, or_, desc, asc
from data.models.taskdef import VWEDTaskDef
from data.models.taskrecord import VWEDTaskRecord
from data.models.tasktemplate import VWEDTaskTemplate
from data.enum.task_def_enum import TaskTypeEnum, TaskStatusEnum, EnableStatus, PeriodicTaskStatus
from utils.logger import get_logger
from data.session import get_async_session
from sqlalchemy import select, update
from data.enum.task_record_enum import TaskStatus
# 设置日志
logger = get_logger("services.task_service")
# 自定义异常类
class TaskNameExistsError(Exception):
"""任务名称已存在异常"""
pass
class TaskService:
"""
天风任务服务类
提供天风任务的相关业务操作
"""
@staticmethod
def get_task_list(
db: Session,
page_num: int = 1,
page_size: int = 10,
keyword: Optional[str] = None,
status: Optional[int] = None,
sort_field: Optional[str] = None,
sort_order: Optional[str] = "desc"
) -> Dict[str, Any]:
"""
获取任务列表
Args:
db: 数据库会话
page_num: 页码从1开始
page_size: 每页记录数
keyword: 关键字(任务名称),模糊匹配
status: 状态过滤
sort_field: 排序字段
sort_order: 排序方式asc升序、desc降序
Returns:
Dict[str, Any]: 包含任务列表和分页信息
"""
try:
# 构建查询条件
query = db.query(VWEDTaskDef)
# 添加筛选条件
if keyword:
query = query.filter(VWEDTaskDef.label.like(f'%{keyword}%'))
if status is not None:
query = query.filter(VWEDTaskDef.status == status)
# 计算总数
total = query.count()
# 添加排序
if sort_field and sort_field in VWEDTaskDef.__table__.columns:
direction = desc if sort_order == "desc" else asc
query = query.order_by(direction(getattr(VWEDTaskDef, sort_field)))
else:
# 默认按创建时间倒序
query = query.order_by(desc(VWEDTaskDef.created_at))
# 分页处理
offset = (page_num - 1) * page_size
tasks = query.offset(offset).limit(page_size).all()
# 格式化返回结果
task_list = []
for task in tasks:
task_list.append({
"id": task.id,
"label": task.label,
"version": task.version,
"detail": task.detail,
"templateName": task.template_name,
"periodicTask": bool(task.periodic_task),
"ifEnable": task.if_enable,
"status": task.status,
"createDate": task.created_at,
"remark": task.remark,
"tenant_id": task.tenant_id,
"updated_at": task.updated_at
})
return {
"total": total,
"list": task_list,
"pageNum": page_num,
"pageSize": page_size,
"totalPages": (total + page_size - 1) // page_size
}
except Exception as e:
logger.error(f"获取任务列表失败: {str(e)}")
raise Exception(f"获取任务列表失败: {str(e)}")
@staticmethod
def create_task(
db: Session,
label: str,
task_type: int,
remark: Optional[str] = None,
period: Optional[int] = None,
delay: Optional[int] = 3000,
release_sites: Optional[bool] = True,
token: Optional[str] = None,
tenant_id: str = "default",
map_id: Optional[str] = None
) -> Dict[str, Any]:
"""
创建任务
Args:
db: 数据库会话
label: 任务名称
task_type: 任务类型1-普通任务2-定时任务
remark: 任务备注
period: 周期时间(毫秒),定时任务必填
delay: 延迟时间毫秒默认3000
release_sites: 是否释放站点默认true
token: 用户token值用于认证
tenant_id: 租户ID用于多租户隔离默认为"default"
map_id: 相关地图ID
Returns:
Dict[str, Any]: 新建任务信息
Raises:
ValueError: 参数验证失败时
TaskNameExistsError: 任务名称已存在时
"""
try:
# 验证任务名称
if not label or label.strip() == "":
raise ValueError("任务名称不能为空")
# 验证任务名称长度
if len(label.strip()) < 2:
raise ValueError("任务名称至少需要2个字符")
# 检查任务名称是否已存在
existing_task = db.query(VWEDTaskDef).filter(VWEDTaskDef.label == label.strip()).first()
if existing_task:
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
# 验证任务类型
if task_type not in [TaskTypeEnum.NORMAL, TaskTypeEnum.PERIODIC]:
raise ValueError("任务类型无效应为1(普通任务)或2(定时任务)")
# 验证定时任务参数
if task_type == TaskTypeEnum.PERIODIC and not period:
raise ValueError("定时任务必须指定周期时间")
# 获取当前启用的模板
template = db.query(VWEDTaskTemplate).filter(
VWEDTaskTemplate.template_if_enable == 1
).first()
if not template:
raise ValueError("没有可用的任务模板,请先启用一个模板")
# 创建任务定义
task_id = str(uuid.uuid4())
new_task = VWEDTaskDef(
id=task_id,
label=label.strip(), # 去除首尾空白字符
version=1,
detail=json.dumps({
"inputParams": [],
"outputParams": [],
"rootBlock": None
}, ensure_ascii=False),
template_name=template.template_name,
period=period if period else 3000,
periodic_task=PeriodicTaskStatus.PERIODIC if task_type == TaskTypeEnum.PERIODIC else PeriodicTaskStatus.NON_PERIODIC,
status=TaskStatusEnum.PENDING,
if_enable=EnableStatus.DISABLED,
delay=delay,
release_sites=release_sites,
remark=remark,
tenant_id=tenant_id, # 使用传入的租户ID
user_token=token, # 保存用户token
map_id=map_id # 保存地图ID
)
db.add(new_task)
db.commit()
db.refresh(new_task)
# 返回任务信息
return {
"id": new_task.id,
"label": new_task.label,
"version": new_task.version,
"detail": new_task.detail,
"templateName": new_task.template_name,
"periodicTask": bool(new_task.periodic_task),
"status": new_task.status,
"createDate": new_task.created_at,
"mapId": new_task.map_id # 返回地图ID
}
except TaskNameExistsError as e:
db.rollback()
logger.warning(f"创建任务失败: {str(e)}")
raise # 重新抛出异常,不包装
except Exception as e:
db.rollback()
logger.error(f"创建任务失败: {str(e)}")
raise Exception(f"创建任务失败: {str(e)}")
@staticmethod
def delete_tasks(db: Session, ids: List[str]) -> bool:
"""
删除任务
Args:
db: 数据库会话
ids: 要删除的任务ID列表
Returns:
bool: 是否删除成功
"""
try:
# 检查任务是否存在
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
if len(tasks) != len(ids):
missing_ids = set(ids) - {task.id for task in tasks}
raise ValueError(f"以下任务不存在: {', '.join(missing_ids)}")
# 删除任务
for task in tasks:
db.delete(task)
db.commit()
return True
except Exception as e:
db.rollback()
logger.error(f"删除任务失败: {str(e)}")
raise Exception(f"删除任务失败: {str(e)}")
@staticmethod
def get_task_by_id(db: Session, task_id: str) -> Optional[Dict[str, Any]]:
"""
根据ID获取任务
Args:
db: 数据库会话
task_id: 任务ID
Returns:
Optional[Dict[str, Any]]: 任务信息如果不存在则返回None
"""
try:
# 显式启动事务
task = db.query(VWEDTaskDef).filter(VWEDTaskDef.id == task_id).first()
if not task:
return None
# 格式化任务信息
task_info = {
"id": task.id,
"label": task.label,
"version": task.version,
"detail": json.loads(task.detail) if task.detail else {},
"templateName": task.template_name,
"period": task.period,
"periodicTask": bool(task.periodic_task),
"status": task.status,
"ifEnable": task.if_enable,
"delay": task.delay,
"releaseSites": bool(task.release_sites) if task.release_sites is not None else None,
"createDate": task.created_at,
"remark": task.remark,
}
# 提交事务以释放数据库锁
db.commit()
return task_info
except Exception as e:
# 发生异常时回滚事务
db.rollback()
logger.error(f"获取任务失败: {str(e)}")
raise Exception(f"获取任务失败: {str(e)}")
@staticmethod
def export_tasks(db: Session, ids: List[str]) -> List[Dict[str, Any]]:
"""
导出任务
Args:
db: 数据库会话
ids: 任务ID列表
Returns:
List[Dict[str, Any]]: 任务配置列表
"""
try:
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
if not tasks:
raise ValueError(f"未找到指定的任务")
# 格式化任务信息
tasks_export = []
for task in tasks:
task_export = {
"id": task.id,
"label": task.label,
"version": task.version,
"detail": task.detail,
"templateName": task.template_name,
"templateDescription": db.query(VWEDTaskTemplate.template_description)
.filter(VWEDTaskTemplate.template_name == task.template_name)
.scalar() or "",
"period": task.period,
"periodicTask": task.periodic_task,
"status": task.status,
"ifEnable": task.if_enable,
"delay": task.delay,
"remark": task.remark,
"windcategoryId": 0 # 默认分类ID
}
tasks_export.append(task_export)
return tasks_export
except Exception as e:
logger.error(f"导出任务失败: {str(e)}")
raise Exception(f"导出任务失败: {str(e)}")
@staticmethod
def import_task(
db: Session,
task_data: Dict[str, Any],
task_name: Optional[str] = None,
token: Optional[str] = None,
tenant_id: str = "default"
) -> Dict[str, Any]:
"""
导入任务
Args:
db: 数据库会话
task_data: 任务配置数据
task_name: 导入后的任务名称,不填则使用原名称
token: 用户token用于认证
tenant_id: 租户ID默认为"default"
Returns:
Dict[str, Any]: 导入的任务信息
Raises:
ValueError: 参数验证失败时
TaskNameExistsError: 任务名称已存在时
"""
try:
# 创建新ID避免冲突
new_id = str(uuid.uuid4())
# 设置任务名称
label = task_name or task_data.get("label", "导入的任务")
# 获取detail字段这是唯一必需的字段
detail = task_data.get("detail")
if not detail:
raise ValueError("任务数据缺少detail字段无法导入")
# 检查任务名称是否已存在
existing_task = db.query(VWEDTaskDef).filter(
VWEDTaskDef.label == label.strip(),
VWEDTaskDef.tenant_id == tenant_id
).first()
if existing_task:
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
# 获取当前启用的模板
template = db.query(VWEDTaskTemplate).filter(
VWEDTaskTemplate.template_if_enable == 1
).first()
if not template:
template_name = task_data.get("templateName", "默认模板")
else:
template_name = template.template_name
# 创建新任务
new_task = VWEDTaskDef(
id=new_id,
label=label,
version=1, # 新任务从版本1开始
detail=detail,
template_name=template_name,
period=task_data.get("period", 3000),
periodic_task=task_data.get("periodicTask", PeriodicTaskStatus.NON_PERIODIC),
status=TaskStatusEnum.PENDING, # 初始状态为0
if_enable=EnableStatus.DISABLED, # 初始未启用
delay=task_data.get("delay", 3000),
release_sites=task_data.get("releaseSites", True), # 默认释放站点
remark=task_data.get("remark", ""),
tenant_id=tenant_id,
user_token=token
)
db.add(new_task)
db.commit()
db.refresh(new_task)
# 返回任务信息
return {
"id": new_task.id,
"label": new_task.label,
"version": new_task.version,
"status": new_task.status,
"createDate": new_task.created_at
}
except TaskNameExistsError as e:
db.rollback()
logger.warning(f"导入任务失败: {str(e)}")
raise # 重新抛出异常,不包装
except Exception as e:
db.rollback()
logger.error(f"导入任务失败: {str(e)}")
raise Exception(f"导入任务失败: {str(e)}")
@staticmethod
async def stop_task_def(task_def_id: str) -> Dict[str, Any]:
"""
停止指定任务定义下的所有运行任务实例,同时禁用定时任务
Args:
task_def_id: 任务定义ID
Returns:
字典包含停止结果信息:
- success: 是否操作成功
- message: 操作结果消息
- is_periodic: 是否为定时任务
- total_running: 运行中的任务总数
- stopped_count: 成功停止的任务数量
- failed_count: 停止失败的任务数量
- failed_tasks: 停止失败的任务记录ID列表
"""
# 导入增强版调度器
from services.enhanced_scheduler import scheduler
try:
async with get_async_session() as session:
# 查询任务定义
result = await session.execute(
select(VWEDTaskDef).where(VWEDTaskDef.id == task_def_id)
)
task_def = result.scalars().first()
if not task_def:
return {
"success": False,
"message": f"未找到任务定义: {task_def_id}"
}
is_periodic = task_def.periodic_task == PeriodicTaskStatus.PERIODIC
# 初始化计数器
total_running = 0
stopped_count = 0
failed_count = 0
failed_tasks = []
# 如果是定时任务则禁用
if is_periodic and task_def.if_enable == EnableStatus.ENABLED:
# 更新任务定义状态为禁用
await session.execute(
update(VWEDTaskDef)
.where(VWEDTaskDef.id == task_def_id)
.values(if_enable=EnableStatus.DISABLED)
)
await session.commit()
# 通知调度器
update_result = await scheduler.update_periodic_task(task_def_id, enable=False)
# 将定时任务的禁用也计入停止任务的数量
total_running += 1
if update_result.get("success", True): # 假设通知调度器成功,除非明确返回失败
stopped_count += 1
else:
failed_count += 1
failed_tasks.append({
"taskRecordId": "periodic_" + task_def_id,
"reason": update_result.get("message", "禁用定时任务失败")
})
# 查找所有正在运行的任务记录
running_tasks_query = await session.execute(
select(VWEDTaskRecord)
.where(
VWEDTaskRecord.def_id == task_def_id,
VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码
)
)
running_tasks = running_tasks_query.scalars().all()
# 更新总计数
total_running += len(running_tasks)
# 取消所有运行中的任务
for task_record in running_tasks:
cancel_result = await scheduler.cancel_task(task_record.id)
if cancel_result.get("success", False):
stopped_count += 1
else:
failed_count += 1
failed_tasks.append({
"taskRecordId": task_record.id,
"reason": cancel_result.get("message", "未知原因")
})
# 更新任务定义状态为已结束(0)
await session.execute(
update(VWEDTaskDef)
.where(VWEDTaskDef.id == task_def_id)
.values(status=TaskStatusEnum.PENDING)
)
await session.commit()
return {
"success": True,
"message": "操作完成",
"is_periodic": is_periodic,
"total_running": total_running,
"stopped_count": stopped_count,
"failed_count": failed_count,
"failed_tasks": failed_tasks
}
except Exception as e:
return {
"success": False,
"message": f"停止任务失败: {str(e)}"
}