tianfeng_task_modules/components/subtask_components.py
2025-03-17 14:58:05 +08:00

75 lines
2.6 KiB
Python

"""
子任务组件
"""
# components/subtask_components.py
from typing import Dict, Any, Optional
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
from services.workflow_service import WorkflowService
@ComponentFactory.register("subtask")
class SubtaskComponent(Component):
"""子任务组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
subtask_id = self.resolve_param("subtask_id")
async_execute = self.resolve_param("async_execute", False)
input_params = {}
# 收集所有以input_开头的参数作为子任务输入
for key, value in self.params.items():
if key.startswith("input_"):
param_name = key[6:] # 去掉"input_"前缀
input_params[param_name] = self.resolve_param(key)
# 验证必要参数
self.validate_required_params(["subtask_id"])
try:
# 获取工作流服务
workflow_service = WorkflowService()
# 获取子任务定义
subtask_definition = workflow_service.get_workflow_by_id(subtask_id)
if not subtask_definition:
raise ComponentError(f"找不到子任务: {subtask_id}")
# 执行子任务
if async_execute:
# 异步执行子任务
# 实际实现中,这里应该启动一个新线程或任务来执行子任务
# 这里简化为返回模拟数据
result = {
"subtaskId": subtask_id,
"status": "started",
"async": True,
"inputParams": input_params
}
else:
# 同步执行子任务
subtask_result = workflow_service.execute_workflow(
subtask_definition,
input_params
)
result = {
"subtaskId": subtask_id,
"status": subtask_result.get("task_status", "completed"),
"async": False,
"result": subtask_result
}
# 存储结果
self.store_result(result)
return result
except Exception as e:
error_result = {
"subtaskId": subtask_id,
"status": "failed",
"error": str(e)
}
self.store_result(error_result)
raise ComponentError(f"子任务执行失败: {str(e)}")