75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""
|
|
子任务组件
|
|
"""
|
|
# components/subtask_components.py
|
|
from typing import Dict, Any, Optional
|
|
from core.component import Component, ComponentFactory
|
|
from core.exceptions import ComponentError
|
|
from services.workflow_service import WorkflowService
|
|
|
|
@ComponentFactory.register("subtask")
|
|
class SubtaskComponent(Component):
|
|
"""子任务组件"""
|
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
# 获取参数
|
|
subtask_id = self.resolve_param("subtask_id")
|
|
async_execute = self.resolve_param("async_execute", False)
|
|
input_params = {}
|
|
|
|
# 收集所有以input_开头的参数作为子任务输入
|
|
for key, value in self.params.items():
|
|
if key.startswith("input_"):
|
|
param_name = key[6:] # 去掉"input_"前缀
|
|
input_params[param_name] = self.resolve_param(key)
|
|
|
|
# 验证必要参数
|
|
self.validate_required_params(["subtask_id"])
|
|
|
|
try:
|
|
# 获取工作流服务
|
|
workflow_service = WorkflowService()
|
|
|
|
# 获取子任务定义
|
|
subtask_definition = workflow_service.get_workflow_by_id(subtask_id)
|
|
|
|
if not subtask_definition:
|
|
raise ComponentError(f"找不到子任务: {subtask_id}")
|
|
|
|
# 执行子任务
|
|
if async_execute:
|
|
# 异步执行子任务
|
|
# 实际实现中,这里应该启动一个新线程或任务来执行子任务
|
|
# 这里简化为返回模拟数据
|
|
result = {
|
|
"subtaskId": subtask_id,
|
|
"status": "started",
|
|
"async": True,
|
|
"inputParams": input_params
|
|
}
|
|
else:
|
|
# 同步执行子任务
|
|
subtask_result = workflow_service.execute_workflow(
|
|
subtask_definition,
|
|
input_params
|
|
)
|
|
|
|
result = {
|
|
"subtaskId": subtask_id,
|
|
"status": subtask_result.get("task_status", "completed"),
|
|
"async": False,
|
|
"result": subtask_result
|
|
}
|
|
|
|
# 存储结果
|
|
self.store_result(result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_result = {
|
|
"subtaskId": subtask_id,
|
|
"status": "failed",
|
|
"error": str(e)
|
|
}
|
|
self.store_result(error_result)
|
|
raise ComponentError(f"子任务执行失败: {str(e)}") |