2025-03-17 14:58:05 +08:00

142 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# components/task_components.py
"""
任务组件
"""
from typing import Dict, Any, Optional
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("cache_data")
class CacheDataComponent(Component):
"""缓存数据组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
key = self.resolve_param("key")
value = self.resolve_param("value")
# 验证必要参数
self.validate_required_params(["key"])
# 缓存数据
cache_key = f"cache_{key}"
self.context.set_variable(cache_key, value)
result = {"key": key}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("clear_cache_data")
class ClearCacheDataComponent(Component):
"""清除缓存数据组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
key = self.resolve_param("key")
# 验证必要参数
self.validate_required_params(["key"])
# 清除缓存数据
cache_key = f"cache_{key}"
if cache_key in self.context.variables:
del self.context.variables[cache_key]
result = {"success": True}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_cache_data")
class GetCacheDataComponent(Component):
"""获取缓存数据组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
key = self.resolve_param("key")
# 验证必要参数
self.validate_required_params(["key"])
# 获取缓存数据
cache_key = f"cache_{key}"
value = self.context.get_variable(cache_key)
result = {"value": value}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("set_task_status")
class SetTaskStatusComponent(Component):
"""设置任务状态组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
status = self.resolve_param("status")
# 验证必要参数
self.validate_required_params(["status"])
# 设置任务状态
self.context.set_task_status(status)
result = {"status": status}
# 存储结果
self.store_result(result)
return result
# components/task_components.py (继续)
@ComponentFactory.register("jump_to_block")
class JumpToBlockComponent(Component):
"""跳到某个块组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_block_id = self.resolve_param("target_block_id")
# 验证必要参数
self.validate_required_params(["target_block_id"])
# 注意:实际的跳转逻辑需要在工作流执行器中实现
# 这里只是记录跳转意图
result = {
"target_block_id": target_block_id,
"jump_requested": True
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("get_task_input_param")
class GetTaskInputParamComponent(Component):
"""获取任务的输入参数组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
task_instance_id = self.resolve_param("task_instance_id")
param_name = self.resolve_param("param_name")
# 验证必要参数
self.validate_required_params(["param_name"])
# 获取输入参数
# 如果指定了任务实例ID则需要从数据库或其他存储中获取
# 这里简化为直接从当前任务的输入参数中获取
if task_instance_id and task_instance_id != self.context.task_id:
# 实际实现中,这里应该查询数据库获取指定任务的输入参数
param_value = None
else:
param_value = self.context.task_inputs.get(param_name)
result = {"inputParamValue": param_value}
# 存储结果
self.store_result(result)
return result