111 lines
4.3 KiB
Plaintext
111 lines
4.3 KiB
Plaintext
|
---
|
|||
|
description:
|
|||
|
globs:
|
|||
|
alwaysApply: false
|
|||
|
---
|
|||
|
# VWED任务模块 - 任务调度系统
|
|||
|
|
|||
|
## 增强版任务调度系统概述
|
|||
|
|
|||
|
增强版任务调度系统是VWED任务模块的核心组件,提供高性能、可靠性和可扩展性的任务处理能力。系统采用模块化设计,包含多个协同工作的组件。
|
|||
|
|
|||
|
## 核心组件
|
|||
|
|
|||
|
### EnhancedScheduler
|
|||
|
|
|||
|
主调度器,负责协调各个组件工作,是整个调度系统的核心。它管理任务的生命周期,从提交到完成的全过程。
|
|||
|
|
|||
|
### PriorityQueueManager
|
|||
|
|
|||
|
优先级队列管理器,实现多级优先级队列,通过动态阈值算法优化任务分布,确保高优先级任务优先处理,同时避免低优先级任务长时间饥饿。
|
|||
|
|
|||
|
### WorkerManager
|
|||
|
|
|||
|
工作线程管理器,负责创建和管理工作线程池,支持动态调整线程池大小,进行心跳监控,及时发现异常并进行恢复。
|
|||
|
|
|||
|
### PeriodicTaskManager
|
|||
|
|
|||
|
定时任务管理器,支持Cron表达式,提供高级定时任务管理功能,适用于需要周期性执行的任务。
|
|||
|
|
|||
|
### TaskPersistenceManager
|
|||
|
|
|||
|
任务持久化管理器,负责将任务队列持久化到存储系统,提供故障恢复机制,确保系统重启后任务不丢失。
|
|||
|
|
|||
|
## 工作流程
|
|||
|
|
|||
|
1. 任务提交:通过API接口或内部服务提交任务到调度系统
|
|||
|
2. 任务接收:EnhancedScheduler接收任务并交由PriorityQueueManager进行优先级排队
|
|||
|
3. 任务调度:WorkerManager根据系统负载情况,从队列中取出任务分配给工作线程
|
|||
|
4. 任务执行:工作线程执行任务,并将执行状态反馈给调度器
|
|||
|
5. 状态管理:调度器更新任务状态,并在必要时触发相应的回调函数
|
|||
|
6. 结果处理:任务执行完成后,处理执行结果,更新数据库状态
|
|||
|
|
|||
|
## 系统特性
|
|||
|
|
|||
|
### 动态工作线程池
|
|||
|
|
|||
|
系统可根据任务负载情况自动调整工作线程数量,在最小工作线程数和最大工作线程数之间动态调整,以优化资源利用率。
|
|||
|
|
|||
|
```python
|
|||
|
# 动态调整工作线程数
|
|||
|
async def adjust_worker_count(self, current_queue_size: int):
|
|||
|
"""根据队列大小动态调整工作线程数量"""
|
|||
|
if current_queue_size > self.high_watermark and self.worker_count < self.max_worker_count:
|
|||
|
# 增加工作线程
|
|||
|
new_workers = min(self.worker_count + 2, self.max_worker_count) - self.worker_count
|
|||
|
if new_workers > 0:
|
|||
|
await self.add_workers(new_workers)
|
|||
|
elif current_queue_size < self.low_watermark and self.worker_count > self.min_worker_count:
|
|||
|
# 减少工作线程(只减空闲的)
|
|||
|
idle_workers = len([w for w in self.workers.values() if w.status == WorkerStatus.IDLE])
|
|||
|
if idle_workers > 2: # 保留一些空闲工作线程以应对突发负载
|
|||
|
to_remove = min(idle_workers - 2, self.worker_count - self.min_worker_count)
|
|||
|
if to_remove > 0:
|
|||
|
await self.remove_workers(to_remove)
|
|||
|
```
|
|||
|
|
|||
|
### 任务优先级管理
|
|||
|
|
|||
|
系统支持任务优先级设置,优先级越高的任务越先被调度执行。系统还实现了平衡调度策略,避免低优先级任务长时间得不到执行。
|
|||
|
|
|||
|
### 健壮性保障
|
|||
|
|
|||
|
系统实现了全面的异常处理和故障恢复机制:
|
|||
|
|
|||
|
1. 工作线程异常自动恢复
|
|||
|
2. 系统重启后任务队列恢复
|
|||
|
3. 僵尸任务检测和清理
|
|||
|
4. 超时任务处理
|
|||
|
|
|||
|
### 监控和统计
|
|||
|
|
|||
|
系统提供丰富的监控和统计信息,包括:
|
|||
|
|
|||
|
1. 任务执行统计(成功率、平均执行时间等)
|
|||
|
2. 队列状态监控(各优先级队列长度、等待时间等)
|
|||
|
3. 资源使用监控(CPU、内存使用率等)
|
|||
|
4. 工作线程健康状态
|
|||
|
|
|||
|
## 调度系统初始化
|
|||
|
|
|||
|
系统在应用启动时初始化,在应用关闭时优雅停止:
|
|||
|
|
|||
|
```python
|
|||
|
@asynccontextmanager
|
|||
|
async def lifespan(app: FastAPI):
|
|||
|
# 启动前的初始化操作
|
|||
|
# 启动增强版任务调度器
|
|||
|
from services.enhanced_scheduler import scheduler
|
|||
|
await scheduler.start(worker_count=settings.TASK_SCHEDULER_MIN_WORKER_COUNT)
|
|||
|
logger.info(f"增强版任务调度器已启动,最小工作线程数: {settings.TASK_SCHEDULER_MIN_WORKER_COUNT},最大工作线程数: {settings.TASK_SCHEDULER_MAX_WORKER_COUNT}")
|
|||
|
|
|||
|
yield
|
|||
|
|
|||
|
# 应用程序关闭前的清理操作
|
|||
|
# 停止增强版任务调度器
|
|||
|
from services.enhanced_scheduler import scheduler
|
|||
|
await scheduler.stop()
|
|||
|
logger.info("增强版任务调度器已停止")
|
|||
|
```
|
|||
|
|