287 lines
11 KiB
Python
287 lines
11 KiB
Python
# core/workflow.py
|
|
"""
|
|
工作流定义和执行
|
|
"""
|
|
from typing import Dict, Any, List, Optional
|
|
import time
|
|
import json
|
|
import uuid
|
|
from .context import TaskContext
|
|
from .component import ComponentFactory
|
|
from .exceptions import WorkflowError, ComponentError
|
|
from utils.logger import get_logger
|
|
|
|
# 获取日志记录器
|
|
logger = get_logger(__name__)
|
|
|
|
class WorkflowDefinition:
|
|
"""工作流定义"""
|
|
|
|
def __init__(self, workflow_id: Optional[str] = None, name: str = "", workflow_type: str = "normal"):
|
|
self.workflow_id = workflow_id or f"wf_{uuid.uuid4().hex}"
|
|
self.name = name
|
|
self.workflow_type = workflow_type # normal或scheduled
|
|
self.variables: Dict[str, Any] = {}
|
|
self.blocks: List[Dict[str, Any]] = []
|
|
self.schedule: Optional[Dict[str, Any]] = None # 定时任务的调度信息
|
|
self.version: str = "1.0"
|
|
self.description: str = ""
|
|
self.created_at: int = int(time.time() * 1000)
|
|
self.updated_at: int = int(time.time() * 1000)
|
|
|
|
def add_block(self, block: Dict[str, Any]) -> None:
|
|
"""添加块"""
|
|
self.blocks.append(block)
|
|
|
|
def set_variables(self, variables: Dict[str, Any]) -> None:
|
|
"""设置变量"""
|
|
self.variables = variables.copy()
|
|
|
|
def set_schedule(self, schedule: Dict[str, Any]) -> None:
|
|
"""设置调度信息"""
|
|
self.schedule = schedule
|
|
self.workflow_type = "scheduled"
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""转换为字典"""
|
|
return {
|
|
"workflow_id": self.workflow_id,
|
|
"name": self.name,
|
|
"workflow_type": self.workflow_type,
|
|
"variables": self.variables,
|
|
"blocks": self.blocks,
|
|
"schedule": self.schedule,
|
|
"version": self.version,
|
|
"description": self.description,
|
|
"created_at": self.created_at,
|
|
"updated_at": self.updated_at
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'WorkflowDefinition':
|
|
"""从字典创建工作流定义"""
|
|
workflow = cls(
|
|
workflow_id=data.get("workflow_id"),
|
|
name=data.get("name", ""),
|
|
workflow_type=data.get("workflow_type", "normal")
|
|
)
|
|
workflow.variables = data.get("variables", {})
|
|
workflow.blocks = data.get("blocks", [])
|
|
workflow.schedule = data.get("schedule")
|
|
workflow.version = data.get("version", "1.0")
|
|
workflow.description = data.get("description", "")
|
|
workflow.created_at = data.get("created_at", int(time.time() * 1000))
|
|
workflow.updated_at = data.get("updated_at", int(time.time() * 1000))
|
|
return workflow
|
|
|
|
def to_json(self) -> str:
|
|
"""转换为JSON字符串"""
|
|
return json.dumps(self.to_dict())
|
|
|
|
@classmethod
|
|
def from_json(cls, json_str: str) -> 'WorkflowDefinition':
|
|
"""从JSON字符串创建工作流定义"""
|
|
data = json.loads(json_str)
|
|
return cls.from_dict(data)
|
|
|
|
class WorkflowExecutor:
|
|
"""工作流执行器"""
|
|
|
|
def __init__(self):
|
|
self.component_factory = ComponentFactory
|
|
|
|
def execute(self, workflow: WorkflowDefinition, task_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""执行工作流"""
|
|
# 获取上下文
|
|
context = TaskContext.get_instance()
|
|
|
|
# 重置上下文
|
|
context.reset()
|
|
|
|
# 生成任务ID
|
|
task_id = f"task_{uuid.uuid4().hex}"
|
|
|
|
# 初始化任务
|
|
context.initialize_task(task_id, workflow.workflow_id, task_inputs)
|
|
|
|
# 设置任务变量
|
|
context.variables.update(workflow.variables)
|
|
|
|
logger.info(f"开始执行工作流: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
|
|
|
|
try:
|
|
# 执行组件
|
|
self._execute_blocks(workflow.blocks)
|
|
|
|
# 完成任务
|
|
context.complete_task("completed")
|
|
|
|
logger.info(f"工作流执行完成: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
|
|
|
|
# 返回执行结果
|
|
return context.to_dict()
|
|
|
|
except Exception as e:
|
|
# 设置任务状态为执行失败
|
|
context.complete_task("failed")
|
|
|
|
error_msg = f"工作流执行失败: {str(e)}"
|
|
logger.error(f"{error_msg}, 工作流: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
|
|
|
|
# 返回错误信息
|
|
result = context.to_dict()
|
|
result["error"] = error_msg
|
|
return result
|
|
|
|
def _execute_blocks(self, blocks: List[Dict[str, Any]]) -> None:
|
|
"""执行一系列组件"""
|
|
context = TaskContext.get_instance()
|
|
|
|
for block in blocks:
|
|
block_id = block["id"]
|
|
block_type = block["type"]
|
|
params = block.get("params", {})
|
|
|
|
logger.debug(f"执行组件: {block_id} ({block_type})")
|
|
|
|
# 特殊处理流程控制组件
|
|
if block_type == "if_condition":
|
|
self._execute_if_condition(block_id, params)
|
|
elif block_type == "for_each":
|
|
self._execute_for_each(block_id, params)
|
|
elif block_type == "while_loop":
|
|
self._execute_while_loop(block_id, params)
|
|
else:
|
|
# 创建并执行普通组件
|
|
try:
|
|
component = self.component_factory.create(block_id, block_type, params)
|
|
result = component.execute()
|
|
logger.debug(f"组件 {block_id} ({block_type}) 执行成功")
|
|
except Exception as e:
|
|
error_msg = f"组件 {block_id} ({block_type}) 执行失败: {str(e)}"
|
|
logger.error(error_msg)
|
|
context.set_block_result(block_id, {"error": error_msg})
|
|
|
|
# 检查是否需要中断执行
|
|
if block.get("stop_on_error", True):
|
|
raise ComponentError(error_msg)
|
|
|
|
def _execute_if_condition(self, block_id: str, params: Dict[str, Any]) -> None:
|
|
"""执行条件组件"""
|
|
context = TaskContext.get_instance()
|
|
|
|
try:
|
|
# 计算条件表达式
|
|
condition_expr = params.get("condition", "False")
|
|
condition_value = eval(condition_expr, {"__builtins__": {}}, {
|
|
"context": context,
|
|
"blocks": context.blocks,
|
|
"variables": context.variables
|
|
})
|
|
|
|
# 存储条件结果
|
|
context.set_block_result(block_id, {"condition": condition_value})
|
|
|
|
# 根据条件执行相应分支
|
|
if condition_value and "true_blocks" in params:
|
|
self._execute_blocks(params["true_blocks"])
|
|
elif not condition_value and "false_blocks" in params:
|
|
self._execute_blocks(params["false_blocks"])
|
|
|
|
except Exception as e:
|
|
error_msg = f"条件组件 {block_id} 执行失败: {str(e)}"
|
|
logger.error(error_msg)
|
|
context.set_block_result(block_id, {"error": error_msg})
|
|
raise ComponentError(error_msg)
|
|
|
|
def _execute_for_each(self, block_id: str, params: Dict[str, Any]) -> None:
|
|
"""执行循环组件"""
|
|
context = TaskContext.get_instance()
|
|
|
|
try:
|
|
# 获取循环项
|
|
items_expr = params.get("items", "[]")
|
|
items = eval(items_expr, {"__builtins__": {}}, {
|
|
"context": context,
|
|
"blocks": context.blocks,
|
|
"variables": context.variables
|
|
})
|
|
|
|
results = []
|
|
|
|
# 遍历执行循环体
|
|
for index, item in enumerate(items):
|
|
# 设置循环变量
|
|
context.set_variable("loop_item", item)
|
|
context.set_variable("loop_index", index)
|
|
|
|
# 执行循环体
|
|
if "loop_blocks" in params:
|
|
self._execute_blocks(params["loop_blocks"])
|
|
|
|
# 收集结果
|
|
if "result_var" in params:
|
|
result_var = params["result_var"]
|
|
if result_var in context.variables:
|
|
results.append(context.get_variable(result_var))
|
|
|
|
# 存储循环结果
|
|
context.set_block_result(block_id, {
|
|
"count": len(items),
|
|
"results": results
|
|
})
|
|
|
|
except Exception as e:
|
|
error_msg = f"循环组件 {block_id} 执行失败: {str(e)}"
|
|
logger.error(error_msg)
|
|
context.set_block_result(block_id, {"error": error_msg})
|
|
raise ComponentError(error_msg)
|
|
|
|
def _execute_while_loop(self, block_id: str, params: Dict[str, Any]) -> None:
|
|
"""执行while循环组件"""
|
|
context = TaskContext.get_instance()
|
|
|
|
try:
|
|
max_iterations = params.get("max_iterations", 100)
|
|
iteration = 0
|
|
results = []
|
|
|
|
# 循环执行
|
|
while iteration < max_iterations:
|
|
# 计算条件表达式
|
|
condition_expr = params.get("condition", "False")
|
|
condition_value = eval(condition_expr, {"__builtins__": {}}, {
|
|
"context": context,
|
|
"blocks": context.blocks,
|
|
"variables": context.variables
|
|
})
|
|
|
|
if not condition_value:
|
|
break
|
|
|
|
# 设置循环变量
|
|
context.set_variable("loop_iteration", iteration)
|
|
|
|
# 执行循环体
|
|
if "loop_blocks" in params:
|
|
self._execute_blocks(params["loop_blocks"])
|
|
|
|
# 收集结果
|
|
if "result_var" in params:
|
|
result_var = params["result_var"]
|
|
if result_var in context.variables:
|
|
results.append(context.get_variable(result_var))
|
|
|
|
iteration += 1
|
|
|
|
# 存储循环结果
|
|
context.set_block_result(block_id, {
|
|
"iterations": iteration,
|
|
"results": results
|
|
})
|
|
|
|
except Exception as e:
|
|
error_msg = f"While循环组件 {block_id} 执行失败: {str(e)}"
|
|
logger.error(error_msg)
|
|
context.set_block_result(block_id, {"error": error_msg})
|
|
raise ComponentError(error_msg) |