51 lines
1.3 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
核心块处理器模块
提供RootBp核心处理器
"""
import logging
from typing import Dict, Any
from .base import BlockHandler, register_handler
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger("services.execution.handlers.core")
# 根块处理器
@register_handler("RootBp")
class RootBlockHandler(BlockHandler):
"""根块处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: Any # TaskContext类型
) -> Dict[str, Any]:
"""执行根块"""
try:
from services.execution.block_executor import BlockExecutor
# 创建块执行器
executor = BlockExecutor(context)
# 执行根块的子块
result = await executor.execute_children(block)
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
# 处理异常
result = {
"success": False,
"message": f"根块执行异常: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result