111 lines
4.3 KiB
Plaintext
111 lines
4.3 KiB
Plaintext
---
|
||
description:
|
||
globs:
|
||
alwaysApply: false
|
||
---
|
||
# VWED任务模块 - 任务调度系统
|
||
|
||
## 增强版任务调度系统概述
|
||
|
||
增强版任务调度系统是VWED任务模块的核心组件,提供高性能、可靠性和可扩展性的任务处理能力。系统采用模块化设计,包含多个协同工作的组件。
|
||
|
||
## 核心组件
|
||
|
||
### EnhancedScheduler
|
||
|
||
主调度器,负责协调各个组件工作,是整个调度系统的核心。它管理任务的生命周期,从提交到完成的全过程。
|
||
|
||
### PriorityQueueManager
|
||
|
||
优先级队列管理器,实现多级优先级队列,通过动态阈值算法优化任务分布,确保高优先级任务优先处理,同时避免低优先级任务长时间饥饿。
|
||
|
||
### WorkerManager
|
||
|
||
工作线程管理器,负责创建和管理工作线程池,支持动态调整线程池大小,进行心跳监控,及时发现异常并进行恢复。
|
||
|
||
### PeriodicTaskManager
|
||
|
||
定时任务管理器,支持Cron表达式,提供高级定时任务管理功能,适用于需要周期性执行的任务。
|
||
|
||
### TaskPersistenceManager
|
||
|
||
任务持久化管理器,负责将任务队列持久化到存储系统,提供故障恢复机制,确保系统重启后任务不丢失。
|
||
|
||
## 工作流程
|
||
|
||
1. 任务提交:通过API接口或内部服务提交任务到调度系统
|
||
2. 任务接收:EnhancedScheduler接收任务并交由PriorityQueueManager进行优先级排队
|
||
3. 任务调度:WorkerManager根据系统负载情况,从队列中取出任务分配给工作线程
|
||
4. 任务执行:工作线程执行任务,并将执行状态反馈给调度器
|
||
5. 状态管理:调度器更新任务状态,并在必要时触发相应的回调函数
|
||
6. 结果处理:任务执行完成后,处理执行结果,更新数据库状态
|
||
|
||
## 系统特性
|
||
|
||
### 动态工作线程池
|
||
|
||
系统可根据任务负载情况自动调整工作线程数量,在最小工作线程数和最大工作线程数之间动态调整,以优化资源利用率。
|
||
|
||
```python
|
||
# 动态调整工作线程数
|
||
async def adjust_worker_count(self, current_queue_size: int):
|
||
"""根据队列大小动态调整工作线程数量"""
|
||
if current_queue_size > self.high_watermark and self.worker_count < self.max_worker_count:
|
||
# 增加工作线程
|
||
new_workers = min(self.worker_count + 2, self.max_worker_count) - self.worker_count
|
||
if new_workers > 0:
|
||
await self.add_workers(new_workers)
|
||
elif current_queue_size < self.low_watermark and self.worker_count > self.min_worker_count:
|
||
# 减少工作线程(只减空闲的)
|
||
idle_workers = len([w for w in self.workers.values() if w.status == WorkerStatus.IDLE])
|
||
if idle_workers > 2: # 保留一些空闲工作线程以应对突发负载
|
||
to_remove = min(idle_workers - 2, self.worker_count - self.min_worker_count)
|
||
if to_remove > 0:
|
||
await self.remove_workers(to_remove)
|
||
```
|
||
|
||
### 任务优先级管理
|
||
|
||
系统支持任务优先级设置,优先级越高的任务越先被调度执行。系统还实现了平衡调度策略,避免低优先级任务长时间得不到执行。
|
||
|
||
### 健壮性保障
|
||
|
||
系统实现了全面的异常处理和故障恢复机制:
|
||
|
||
1. 工作线程异常自动恢复
|
||
2. 系统重启后任务队列恢复
|
||
3. 僵尸任务检测和清理
|
||
4. 超时任务处理
|
||
|
||
### 监控和统计
|
||
|
||
系统提供丰富的监控和统计信息,包括:
|
||
|
||
1. 任务执行统计(成功率、平均执行时间等)
|
||
2. 队列状态监控(各优先级队列长度、等待时间等)
|
||
3. 资源使用监控(CPU、内存使用率等)
|
||
4. 工作线程健康状态
|
||
|
||
## 调度系统初始化
|
||
|
||
系统在应用启动时初始化,在应用关闭时优雅停止:
|
||
|
||
```python
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
# 启动前的初始化操作
|
||
# 启动增强版任务调度器
|
||
from services.enhanced_scheduler import scheduler
|
||
await scheduler.start(worker_count=settings.TASK_SCHEDULER_MIN_WORKER_COUNT)
|
||
logger.info(f"增强版任务调度器已启动,最小工作线程数: {settings.TASK_SCHEDULER_MIN_WORKER_COUNT},最大工作线程数: {settings.TASK_SCHEDULER_MAX_WORKER_COUNT}")
|
||
|
||
yield
|
||
|
||
# 应用程序关闭前的清理操作
|
||
# 停止增强版任务调度器
|
||
from services.enhanced_scheduler import scheduler
|
||
await scheduler.stop()
|
||
logger.info("增强版任务调度器已停止")
|
||
```
|
||
|