247 lines
6.8 KiB
Plaintext
247 lines
6.8 KiB
Plaintext
---
|
||
description:
|
||
globs:
|
||
alwaysApply: false
|
||
---
|
||
# VWED任务模块 - 任务执行机制
|
||
|
||
## 任务执行概述
|
||
|
||
VWED任务模块系统的任务执行机制负责解析任务定义并按照指定的流程执行任务。任务执行是一个复杂的过程,涉及多个组件的协同工作。
|
||
|
||
## 核心组件
|
||
|
||
### 任务执行器 (TaskExecutor)
|
||
|
||
任务执行器是负责执行整个任务的核心组件,管理任务的生命周期和状态。
|
||
|
||
**相关文件**: [services/execution/task_executor.py](mdc:services/execution/task_executor.py)
|
||
|
||
**主要功能**:
|
||
- 任务初始化和准备
|
||
- 任务流程解析
|
||
- 执行状态管理
|
||
- 异常处理和恢复
|
||
- 结果收集和处理
|
||
|
||
### 任务块执行器 (BlockExecutor)
|
||
|
||
任务块执行器负责执行任务中的单个块,是任务执行的基本单元。
|
||
|
||
**相关文件**: [services/execution/block_executor.py](mdc:services/execution/block_executor.py)
|
||
|
||
**主要功能**:
|
||
- 块参数解析和验证
|
||
- 块逻辑执行
|
||
- 块状态管理
|
||
- 块间数据传递
|
||
- 执行结果处理
|
||
|
||
### 任务上下文 (TaskContext)
|
||
|
||
任务上下文维护任务执行过程中的所有数据和状态,提供数据共享和传递机制。
|
||
|
||
**相关文件**: [services/execution/task_context.py](mdc:services/execution/task_context.py)
|
||
|
||
**主要功能**:
|
||
- 变量存储和访问
|
||
- 状态追踪
|
||
- 上下文隔离
|
||
- 数据持久化
|
||
- 上下文恢复
|
||
|
||
### 块处理器 (BlockHandlers)
|
||
|
||
块处理器实现了各种类型任务块的具体执行逻辑,包括条件判断、循环、数据处理等。
|
||
|
||
**相关文件**: [services/execution/block_handlers.py](mdc:services/execution/block_handlers.py)
|
||
|
||
**主要功能**:
|
||
- 实现各类块的执行逻辑
|
||
- 参数处理和验证
|
||
- 块间流程控制
|
||
- 特定功能实现
|
||
|
||
## 执行流程
|
||
|
||
### 1. 任务提交
|
||
|
||
任务执行从任务提交开始,用户通过API接口或系统内部逻辑提交任务至调度系统。
|
||
|
||
```python
|
||
# 通过API提交任务
|
||
@router.post("/{task_id}/execute", response_model=ResponseModel)
|
||
async def execute_task(
|
||
task_id: int,
|
||
execution_params: Optional[TaskExecutionParams] = Body(None),
|
||
background_tasks: BackgroundTasks = BackgroundTasks(),
|
||
):
|
||
# 提交任务到调度系统
|
||
result = await task_service.execute_task(task_id, execution_params, background_tasks)
|
||
return format_response(message="任务提交成功", data=result)
|
||
```
|
||
|
||
### 2. 任务调度
|
||
|
||
调度系统接收任务请求,根据优先级和资源情况,将任务分配给工作线程。
|
||
|
||
```python
|
||
# 任务调度
|
||
async def schedule_task(self, task_func, priority=0, *args, **kwargs):
|
||
# 创建任务
|
||
task = Task(task_func, args, kwargs, priority)
|
||
# 加入优先级队列
|
||
await self.queue_manager.put_task(task)
|
||
# 返回任务ID
|
||
return task.id
|
||
```
|
||
|
||
### 3. 任务初始化
|
||
|
||
任务执行器接收到任务后,进行初始化操作,包括加载任务定义、创建执行上下文等。
|
||
|
||
```python
|
||
# 任务初始化
|
||
async def initialize(self):
|
||
# 加载任务定义
|
||
self.task_def = await self.load_task_definition()
|
||
# 创建任务记录
|
||
self.task_record = await self.create_task_record()
|
||
# 初始化执行上下文
|
||
self.context = TaskContext(self.task_record.id, self.execution_params)
|
||
# 加载初始数据
|
||
await self.load_initial_data()
|
||
```
|
||
|
||
### 4. 任务执行
|
||
|
||
任务执行器按照任务定义的流程,依次执行各个任务块。
|
||
|
||
```python
|
||
# 任务执行
|
||
async def execute(self):
|
||
try:
|
||
# 更新任务状态为执行中
|
||
await self.update_task_status(TaskStatus.RUNNING)
|
||
|
||
# 获取起始块
|
||
current_block = self.get_start_block()
|
||
|
||
# 循环执行任务块,直到结束或出错
|
||
while current_block and not self.should_stop:
|
||
# 执行当前块
|
||
block_result = await self.execute_block(current_block)
|
||
|
||
# 记录块执行结果
|
||
await self.record_block_result(current_block, block_result)
|
||
|
||
# 获取下一个要执行的块
|
||
current_block = self.get_next_block(current_block, block_result)
|
||
|
||
# 更新任务状态为完成
|
||
await self.update_task_status(TaskStatus.COMPLETED)
|
||
|
||
except Exception as e:
|
||
# 异常处理
|
||
await self.handle_exception(e)
|
||
```
|
||
|
||
### 5. 块执行
|
||
|
||
块执行器负责执行单个任务块,根据块类型调用相应的处理逻辑。
|
||
|
||
```python
|
||
# 块执行
|
||
async def execute_block(self, block_definition):
|
||
# 创建块执行记录
|
||
block_record = await self.create_block_record(block_definition)
|
||
|
||
try:
|
||
# 获取块类型
|
||
block_type = block_definition.get("type")
|
||
|
||
# 获取对应的块处理器
|
||
handler = self.get_block_handler(block_type)
|
||
|
||
# 前置处理
|
||
await self.pre_execute_block(block_definition, block_record)
|
||
|
||
# 执行块逻辑
|
||
result = await handler.execute(block_definition, self.context)
|
||
|
||
# 后置处理
|
||
await self.post_execute_block(block_definition, block_record, result)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# 处理块执行异常
|
||
await self.handle_block_exception(block_definition, block_record, e)
|
||
raise
|
||
```
|
||
|
||
### 6. 结果处理
|
||
|
||
任务执行完成后,执行器会收集执行结果,更新任务状态,并进行必要的清理工作。
|
||
|
||
```python
|
||
# 结果处理
|
||
async def process_result(self):
|
||
# 收集执行结果
|
||
result = self.collect_execution_result()
|
||
|
||
# 更新任务记录
|
||
await self.update_task_record(result)
|
||
|
||
# 触发后续动作
|
||
await self.trigger_actions(result)
|
||
|
||
# 资源清理
|
||
await self.cleanup()
|
||
|
||
return result
|
||
```
|
||
|
||
## 执行特性
|
||
|
||
### 上下文管理
|
||
|
||
系统使用TaskContext管理任务执行过程中的数据和状态,提供了变量存储、检索和作用域管理功能。
|
||
|
||
```python
|
||
# 上下文示例
|
||
context = TaskContext(task_record_id)
|
||
|
||
# 设置变量
|
||
await context.set_variable("customer_name", "张三")
|
||
|
||
# 获取变量
|
||
customer_name = await context.get_variable("customer_name")
|
||
|
||
# 检查变量是否存在
|
||
if await context.has_variable("order_id"):
|
||
# 处理逻辑
|
||
pass
|
||
```
|
||
|
||
### 错误处理
|
||
|
||
系统实现了多层次的错误处理机制,确保任务执行的稳定性和可靠性。
|
||
|
||
1. **块级错误处理**:每个块可以定义自己的错误处理策略
|
||
2. **任务级错误处理**:任务可以定义全局错误处理策略
|
||
3. **系统级错误处理**:系统提供默认的错误处理机制
|
||
|
||
### 状态追踪
|
||
|
||
系统对任务执行过程中的状态进行全面追踪,包括任务状态、块状态和变量状态。
|
||
|
||
### 超时管理
|
||
|
||
系统支持任务和块级别的超时设置,防止任务长时间未完成占用系统资源。
|
||
|
||
### 事务支持
|
||
|
||
系统支持事务处理,可以在任务执行过程中进行数据库事务操作,确保数据一致性。
|
||
|