2025-03-17 14:58:05 +08:00

188 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# components/flow_components.py
"""
流程控制组件
"""
from typing import Dict, Any, List, Optional
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("if")
class IfComponent(Component):
"""If条件组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的条件逻辑在工作流执行器中实现
# 这里只是记录条件表达式
condition = self.resolve_param("condition")
# 验证必要参数
self.validate_required_params(["condition"])
result = {"condition": condition}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("if_else")
class IfElseComponent(Component):
"""If-Else条件组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的条件逻辑在工作流执行器中实现
# 这里只是记录条件表达式
condition = self.resolve_param("condition")
# 验证必要参数
self.validate_required_params(["condition"])
result = {"condition": condition}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("if_else_if")
class IfElseIfComponent(Component):
"""If-Else-If条件组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的条件逻辑在工作流执行器中实现
# 这里只是记录条件表达式
conditions = self.resolve_param("conditions", [])
# 验证必要参数
self.validate_required_params(["conditions"])
result = {"conditions": conditions}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("for_each")
class ForEachComponent(Component):
"""遍历数组组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的循环逻辑在工作流执行器中实现
# 这里只是记录循环参数
items = self.resolve_param("items")
# 验证必要参数
self.validate_required_params(["items"])
result = {"items": items}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("while")
class WhileComponent(Component):
"""While循环组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的循环逻辑在工作流执行器中实现
# 这里只是记录循环条件
condition = self.resolve_param("condition")
max_iterations = self.resolve_param("max_iterations", 100)
# 验证必要参数
self.validate_required_params(["condition"])
result = {
"condition": condition,
"max_iterations": max_iterations
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("break")
class BreakComponent(Component):
"""Break组件中断当前循环"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的中断逻辑在工作流执行器中实现
# 这里只是记录中断意图
result = {"break_requested": True}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("return")
class ReturnComponent(Component):
"""Return组件结束当前流程并返回结果"""
def execute(self) -> Dict[str, Any]:
# 获取参数
return_value = self.resolve_param("return_value")
# 注意:实际的返回逻辑在工作流执行器中实现
# 这里只是记录返回值
result = {"return_value": return_value, "return_requested": True}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("delay")
class DelayComponent(Component):
"""延迟组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
milliseconds = self.resolve_param("milliseconds", 1000)
# 验证必要参数
self.validate_required_params(["milliseconds"])
# 执行延迟
import time
time.sleep(milliseconds / 1000)
result = {"delayed_ms": milliseconds}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("parallel_execute")
class ParallelExecuteComponent(Component):
"""并行执行组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的并行执行逻辑在工作流执行器中实现
# 这里只是记录并行执行意图
result = {"parallel_requested": True}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("serial_execute")
class SerialExecuteComponent(Component):
"""串行执行组件"""
def execute(self) -> Dict[str, Any]:
# 注意:实际的串行执行逻辑在工作流执行器中实现
# 这里只是记录串行执行意图
result = {"serial_requested": True}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("throw_exception")
class ThrowExceptionComponent(Component):
"""抛出异常组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
error_message = self.resolve_param("error_message", "自定义异常")
# 抛出异常
raise ComponentError(error_message)