2025-04-30 16:57:46 +08:00
|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
定时任务管理模块
|
|
|
|
|
负责管理周期性任务的调度
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
import json
|
|
|
|
|
from typing import Dict, List, Any, Optional, Set, Tuple, Callable, Coroutine
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
|
|
|
|
from config.settings import settings
|
|
|
|
|
from data.models.taskdef import VWEDTaskDef
|
|
|
|
|
from data.session import get_async_session
|
|
|
|
|
from utils.logger import get_logger
|
|
|
|
|
from data.enum.task_def_enum import EnableStatus, PeriodicTaskStatus
|
|
|
|
|
|
|
|
|
|
# 获取日志记录器
|
|
|
|
|
logger = get_logger("services.enhanced_scheduler.periodic_task_manager")
|
|
|
|
|
|
|
|
|
|
class PeriodicTaskManager:
|
|
|
|
|
"""
|
|
|
|
|
定时任务管理器
|
|
|
|
|
负责管理和调度周期性任务
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, check_interval: int = 5):
|
|
|
|
|
"""
|
|
|
|
|
初始化定时任务管理器
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
check_interval: 任务检查间隔(秒)
|
|
|
|
|
"""
|
|
|
|
|
self.check_interval = check_interval
|
|
|
|
|
self.periodic_tasks = {} # 定时任务字典 {task_def_id: task_info}
|
|
|
|
|
self.is_running = False
|
|
|
|
|
self.task_runner = None
|
|
|
|
|
|
|
|
|
|
# 任务执行回调函数
|
|
|
|
|
self.run_task_callback = None
|
|
|
|
|
|
|
|
|
|
logger.info(f"初始化定时任务管理器: 检查间隔={check_interval}秒")
|
|
|
|
|
|
|
|
|
|
def set_run_task_callback(self, callback: Callable[[str], Coroutine]):
|
|
|
|
|
"""
|
|
|
|
|
设置任务执行回调函数
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
callback: 任务执行回调函数,接受task_def_id参数,返回协程对象
|
|
|
|
|
"""
|
|
|
|
|
self.run_task_callback = callback
|
|
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
|
|
|
"""
|
|
|
|
|
启动定时任务管理器
|
|
|
|
|
"""
|
|
|
|
|
if self.is_running:
|
|
|
|
|
logger.warning("定时任务管理器已经在运行中")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not self.run_task_callback:
|
|
|
|
|
raise ValueError("未设置任务执行回调函数")
|
|
|
|
|
|
|
|
|
|
self.is_running = True
|
|
|
|
|
|
|
|
|
|
# 加载已配置的定时任务
|
|
|
|
|
await self._load_periodic_tasks()
|
|
|
|
|
|
|
|
|
|
# 启动任务执行器
|
|
|
|
|
self.task_runner = asyncio.create_task(self._task_runner())
|
|
|
|
|
|
|
|
|
|
logger.info("定时任务管理器启动成功")
|
|
|
|
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
|
|
"""
|
|
|
|
|
停止定时任务管理器
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_running:
|
|
|
|
|
logger.warning("定时任务管理器未在运行")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self.is_running = False
|
|
|
|
|
|
|
|
|
|
# 取消任务执行器
|
|
|
|
|
if self.task_runner:
|
|
|
|
|
self.task_runner.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await self.task_runner
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
self.task_runner = None
|
|
|
|
|
|
|
|
|
|
logger.info("定时任务管理器已停止")
|
|
|
|
|
|
|
|
|
|
async def update_task(self, task_def_id: str, enable: bool = None) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
更新定时任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task_def_id: 任务定义ID
|
|
|
|
|
enable: 是否启用
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 更新结果
|
|
|
|
|
"""
|
|
|
|
|
async with get_async_session() as session:
|
|
|
|
|
# 查询任务定义
|
|
|
|
|
result = await session.execute(
|
|
|
|
|
select(VWEDTaskDef).where(VWEDTaskDef.id == task_def_id)
|
|
|
|
|
)
|
|
|
|
|
task_def = result.scalars().first()
|
|
|
|
|
if not task_def:
|
|
|
|
|
return {
|
|
|
|
|
"success": False,
|
|
|
|
|
"message": "任务定义不存在",
|
|
|
|
|
"taskDefId": task_def_id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# 检查是否是定时任务
|
|
|
|
|
if task_def.periodic_task != PeriodicTaskStatus.PERIODIC:
|
|
|
|
|
return {
|
|
|
|
|
"success": False,
|
|
|
|
|
"message": "该任务不是定时任务",
|
|
|
|
|
"taskDefId": task_def_id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# 更新任务定义
|
|
|
|
|
updated = False
|
|
|
|
|
|
|
|
|
|
if enable is not None:
|
|
|
|
|
task_def.if_enable = EnableStatus.ENABLED if enable else EnableStatus.DISABLED
|
|
|
|
|
updated = True
|
|
|
|
|
|
|
|
|
|
# 如果有更新,提交到数据库
|
|
|
|
|
if updated:
|
|
|
|
|
await session.commit()
|
|
|
|
|
|
|
|
|
|
# 更新定时任务列表
|
|
|
|
|
if enable == True or task_def.if_enable == EnableStatus.ENABLED:
|
|
|
|
|
# 重新构建任务信息
|
|
|
|
|
period = task_def.period or 3600000 # 默认1小时
|
|
|
|
|
delay = task_def.delay or 3000 # 默认延迟3秒
|
|
|
|
|
|
|
|
|
|
# 计算下次运行时间
|
|
|
|
|
next_run = self._calculate_next_run_time(period, delay)
|
|
|
|
|
|
|
|
|
|
self.periodic_tasks[task_def_id] = {
|
|
|
|
|
"id": task_def.id,
|
|
|
|
|
"label": task_def.label,
|
|
|
|
|
"period": period, # 周期(毫秒)
|
|
|
|
|
"delay": delay, # 延迟(毫秒)
|
|
|
|
|
"last_run": None, # 上次运行时间
|
|
|
|
|
"next_run": next_run # 下次运行时间
|
|
|
|
|
}
|
|
|
|
|
elif enable == False and task_def_id in self.periodic_tasks:
|
|
|
|
|
# 从定时任务列表中移除
|
|
|
|
|
self.periodic_tasks.pop(task_def_id, None)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"success": True,
|
|
|
|
|
"message": f"定时任务已{'更新' if updated else '保持不变'}",
|
|
|
|
|
"taskDefId": task_def_id,
|
|
|
|
|
"enabled": task_def.if_enable == EnableStatus.ENABLED
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def _load_periodic_tasks(self) -> None:
|
|
|
|
|
"""
|
|
|
|
|
加载所有定时任务
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
async with get_async_session() as session:
|
|
|
|
|
# 查询所有启用的定时任务
|
|
|
|
|
result = await session.execute(
|
|
|
|
|
select(VWEDTaskDef).where(
|
|
|
|
|
VWEDTaskDef.periodic_task == PeriodicTaskStatus.PERIODIC, # 周期任务
|
|
|
|
|
VWEDTaskDef.if_enable == EnableStatus.ENABLED # 启用状态
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
tasks = result.scalars().all()
|
|
|
|
|
|
|
|
|
|
# 记录定时任务
|
|
|
|
|
for task in tasks:
|
|
|
|
|
# 计算下次运行时间
|
|
|
|
|
period = task.period or 3600000 # 默认1小时
|
|
|
|
|
delay = task.delay or 3000 # 默认延迟3秒
|
|
|
|
|
next_run = self._calculate_next_run_time(period, delay)
|
|
|
|
|
|
|
|
|
|
self.periodic_tasks[task.id] = {
|
|
|
|
|
"id": task.id,
|
|
|
|
|
"label": task.label,
|
|
|
|
|
"period": period, # 周期(毫秒)
|
|
|
|
|
"delay": delay, # 延迟(毫秒)
|
|
|
|
|
"last_run": None, # 上次运行时间
|
|
|
|
|
"next_run": next_run # 下次运行时间
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"已加载 {len(self.periodic_tasks)} 个定时任务")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"加载定时任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
async def _task_runner(self) -> None:
|
|
|
|
|
"""
|
|
|
|
|
任务执行器
|
|
|
|
|
定期检查并运行周期任务
|
|
|
|
|
"""
|
|
|
|
|
logger.info("定时任务执行器启动")
|
|
|
|
|
|
|
|
|
|
while self.is_running:
|
|
|
|
|
try:
|
|
|
|
|
# 检查是否有需要执行的定时任务
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
tasks_to_run = []
|
|
|
|
|
|
|
|
|
|
for task_id, task_info in list(self.periodic_tasks.items()):
|
|
|
|
|
if task_info["next_run"] <= now:
|
|
|
|
|
tasks_to_run.append(task_id)
|
|
|
|
|
|
|
|
|
|
# 更新下次运行时间
|
|
|
|
|
task_info["last_run"] = now
|
|
|
|
|
task_info["next_run"] = self._calculate_next_run_time(
|
|
|
|
|
task_info["period"],
|
|
|
|
|
task_info["delay"],
|
|
|
|
|
now
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 执行需要运行的任务
|
|
|
|
|
for task_id in tasks_to_run:
|
|
|
|
|
try:
|
|
|
|
|
await self.run_task_callback(task_id)
|
|
|
|
|
logger.info(f"定时任务 {task_id} 已提交执行")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"提交定时任务 {task_id} 失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
# 休眠一段时间
|
|
|
|
|
await asyncio.sleep(self.check_interval)
|
|
|
|
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
# 取消异常,退出循环
|
|
|
|
|
logger.info("定时任务执行器被取消")
|
|
|
|
|
break
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"定时任务执行器异常: {str(e)}")
|
|
|
|
|
# 出现异常时短暂休眠,避免频繁错误
|
|
|
|
|
await asyncio.sleep(5.0)
|
|
|
|
|
|
|
|
|
|
logger.info("定时任务执行器结束")
|
|
|
|
|
|
|
|
|
|
def _calculate_next_run_time(self, period_ms: int, delay_ms: int = 0, base_time: datetime = None) -> datetime:
|
|
|
|
|
"""
|
|
|
|
|
计算下次运行时间
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
period_ms: 周期(毫秒)
|
|
|
|
|
delay_ms: 延迟时间(毫秒),初次运行时使用
|
|
|
|
|
base_time: 基准时间,默认为当前时间
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
datetime: 下次运行时间
|
|
|
|
|
"""
|
|
|
|
|
now = base_time or datetime.now()
|
|
|
|
|
|
|
|
|
|
# 使用简单的周期计算
|
2025-05-12 15:43:21 +08:00
|
|
|
|
total_delay_seconds = (period_ms + delay_ms) / 1000 # 转换为秒
|
2025-04-30 16:57:46 +08:00
|
|
|
|
return now + timedelta(seconds=total_delay_seconds)
|
|
|
|
|
|
|
|
|
|
def get_task_status(self) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
获取任务状态信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 任务状态
|
|
|
|
|
"""
|
|
|
|
|
return {
|
|
|
|
|
"task_count": len(self.periodic_tasks),
|
|
|
|
|
"tasks": {
|
|
|
|
|
task_id: {
|
|
|
|
|
"id": task_info["id"],
|
|
|
|
|
"label": task_info["label"],
|
|
|
|
|
"last_run": task_info["last_run"].strftime("%Y-%m-%d %H:%M:%S") if task_info["last_run"] else None,
|
|
|
|
|
"next_run": task_info["next_run"].strftime("%Y-%m-%d %H:%M:%S") if task_info["next_run"] else None,
|
|
|
|
|
"period": task_info["period"],
|
|
|
|
|
"delay": task_info.get("delay", 0)
|
|
|
|
|
}
|
|
|
|
|
for task_id, task_info in self.periodic_tasks.items()
|
|
|
|
|
}
|
|
|
|
|
}
|