VWED_server/.cursor/rules/task-scheduler.mdc

111 lines
4.3 KiB
Plaintext
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
---
description:
globs:
alwaysApply: false
---
# VWED任务模块 - 任务调度系统
## 增强版任务调度系统概述
增强版任务调度系统是VWED任务模块的核心组件提供高性能、可靠性和可扩展性的任务处理能力。系统采用模块化设计包含多个协同工作的组件。
## 核心组件
### EnhancedScheduler
主调度器,负责协调各个组件工作,是整个调度系统的核心。它管理任务的生命周期,从提交到完成的全过程。
### PriorityQueueManager
优先级队列管理器,实现多级优先级队列,通过动态阈值算法优化任务分布,确保高优先级任务优先处理,同时避免低优先级任务长时间饥饿。
### WorkerManager
工作线程管理器,负责创建和管理工作线程池,支持动态调整线程池大小,进行心跳监控,及时发现异常并进行恢复。
### PeriodicTaskManager
定时任务管理器支持Cron表达式提供高级定时任务管理功能适用于需要周期性执行的任务。
### TaskPersistenceManager
任务持久化管理器,负责将任务队列持久化到存储系统,提供故障恢复机制,确保系统重启后任务不丢失。
## 工作流程
1. 任务提交通过API接口或内部服务提交任务到调度系统
2. 任务接收EnhancedScheduler接收任务并交由PriorityQueueManager进行优先级排队
3. 任务调度WorkerManager根据系统负载情况从队列中取出任务分配给工作线程
4. 任务执行:工作线程执行任务,并将执行状态反馈给调度器
5. 状态管理:调度器更新任务状态,并在必要时触发相应的回调函数
6. 结果处理:任务执行完成后,处理执行结果,更新数据库状态
## 系统特性
### 动态工作线程池
系统可根据任务负载情况自动调整工作线程数量,在最小工作线程数和最大工作线程数之间动态调整,以优化资源利用率。
```python
# 动态调整工作线程数
async def adjust_worker_count(self, current_queue_size: int):
"""根据队列大小动态调整工作线程数量"""
if current_queue_size > self.high_watermark and self.worker_count < self.max_worker_count:
# 增加工作线程
new_workers = min(self.worker_count + 2, self.max_worker_count) - self.worker_count
if new_workers > 0:
await self.add_workers(new_workers)
elif current_queue_size < self.low_watermark and self.worker_count > self.min_worker_count:
# 减少工作线程(只减空闲的)
idle_workers = len([w for w in self.workers.values() if w.status == WorkerStatus.IDLE])
if idle_workers > 2: # 保留一些空闲工作线程以应对突发负载
to_remove = min(idle_workers - 2, self.worker_count - self.min_worker_count)
if to_remove > 0:
await self.remove_workers(to_remove)
```
### 任务优先级管理
系统支持任务优先级设置,优先级越高的任务越先被调度执行。系统还实现了平衡调度策略,避免低优先级任务长时间得不到执行。
### 健壮性保障
系统实现了全面的异常处理和故障恢复机制:
1. 工作线程异常自动恢复
2. 系统重启后任务队列恢复
3. 僵尸任务检测和清理
4. 超时任务处理
### 监控和统计
系统提供丰富的监控和统计信息,包括:
1. 任务执行统计(成功率、平均执行时间等)
2. 队列状态监控(各优先级队列长度、等待时间等)
3. 资源使用监控CPU、内存使用率等
4. 工作线程健康状态
## 调度系统初始化
系统在应用启动时初始化,在应用关闭时优雅停止:
```python
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动前的初始化操作
# 启动增强版任务调度器
from services.enhanced_scheduler import scheduler
await scheduler.start(worker_count=settings.TASK_SCHEDULER_MIN_WORKER_COUNT)
logger.info(f"增强版任务调度器已启动,最小工作线程数: {settings.TASK_SCHEDULER_MIN_WORKER_COUNT},最大工作线程数: {settings.TASK_SCHEDULER_MAX_WORKER_COUNT}")
yield
# 应用程序关闭前的清理操作
# 停止增强版任务调度器
from services.enhanced_scheduler import scheduler
await scheduler.stop()
logger.info("增强版任务调度器已停止")
```