VWED_server/.cursor/rules/task-scheduler.mdc
2025-04-30 16:57:46 +08:00

111 lines
4.3 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
description:
globs:
alwaysApply: false
---
# VWED任务模块 - 任务调度系统
## 增强版任务调度系统概述
增强版任务调度系统是VWED任务模块的核心组件提供高性能、可靠性和可扩展性的任务处理能力。系统采用模块化设计包含多个协同工作的组件。
## 核心组件
### EnhancedScheduler
主调度器,负责协调各个组件工作,是整个调度系统的核心。它管理任务的生命周期,从提交到完成的全过程。
### PriorityQueueManager
优先级队列管理器,实现多级优先级队列,通过动态阈值算法优化任务分布,确保高优先级任务优先处理,同时避免低优先级任务长时间饥饿。
### WorkerManager
工作线程管理器,负责创建和管理工作线程池,支持动态调整线程池大小,进行心跳监控,及时发现异常并进行恢复。
### PeriodicTaskManager
定时任务管理器支持Cron表达式提供高级定时任务管理功能适用于需要周期性执行的任务。
### TaskPersistenceManager
任务持久化管理器,负责将任务队列持久化到存储系统,提供故障恢复机制,确保系统重启后任务不丢失。
## 工作流程
1. 任务提交通过API接口或内部服务提交任务到调度系统
2. 任务接收EnhancedScheduler接收任务并交由PriorityQueueManager进行优先级排队
3. 任务调度WorkerManager根据系统负载情况从队列中取出任务分配给工作线程
4. 任务执行:工作线程执行任务,并将执行状态反馈给调度器
5. 状态管理:调度器更新任务状态,并在必要时触发相应的回调函数
6. 结果处理:任务执行完成后,处理执行结果,更新数据库状态
## 系统特性
### 动态工作线程池
系统可根据任务负载情况自动调整工作线程数量,在最小工作线程数和最大工作线程数之间动态调整,以优化资源利用率。
```python
# 动态调整工作线程数
async def adjust_worker_count(self, current_queue_size: int):
"""根据队列大小动态调整工作线程数量"""
if current_queue_size > self.high_watermark and self.worker_count < self.max_worker_count:
# 增加工作线程
new_workers = min(self.worker_count + 2, self.max_worker_count) - self.worker_count
if new_workers > 0:
await self.add_workers(new_workers)
elif current_queue_size < self.low_watermark and self.worker_count > self.min_worker_count:
# 减少工作线程(只减空闲的)
idle_workers = len([w for w in self.workers.values() if w.status == WorkerStatus.IDLE])
if idle_workers > 2: # 保留一些空闲工作线程以应对突发负载
to_remove = min(idle_workers - 2, self.worker_count - self.min_worker_count)
if to_remove > 0:
await self.remove_workers(to_remove)
```
### 任务优先级管理
系统支持任务优先级设置,优先级越高的任务越先被调度执行。系统还实现了平衡调度策略,避免低优先级任务长时间得不到执行。
### 健壮性保障
系统实现了全面的异常处理和故障恢复机制:
1. 工作线程异常自动恢复
2. 系统重启后任务队列恢复
3. 僵尸任务检测和清理
4. 超时任务处理
### 监控和统计
系统提供丰富的监控和统计信息,包括:
1. 任务执行统计(成功率、平均执行时间等)
2. 队列状态监控(各优先级队列长度、等待时间等)
3. 资源使用监控CPU、内存使用率等
4. 工作线程健康状态
## 调度系统初始化
系统在应用启动时初始化,在应用关闭时优雅停止:
```python
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动前的初始化操作
# 启动增强版任务调度器
from services.enhanced_scheduler import scheduler
await scheduler.start(worker_count=settings.TASK_SCHEDULER_MIN_WORKER_COUNT)
logger.info(f"增强版任务调度器已启动,最小工作线程数: {settings.TASK_SCHEDULER_MIN_WORKER_COUNT},最大工作线程数: {settings.TASK_SCHEDULER_MAX_WORKER_COUNT}")
yield
# 应用程序关闭前的清理操作
# 停止增强版任务调度器
from services.enhanced_scheduler import scheduler
await scheduler.stop()
logger.info("增强版任务调度器已停止")
```