2025-03-17 14:58:05 +08:00

287 lines
11 KiB
Python

# core/workflow.py
"""
工作流定义和执行
"""
from typing import Dict, Any, List, Optional
import time
import json
import uuid
from .context import TaskContext
from .component import ComponentFactory
from .exceptions import WorkflowError, ComponentError
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger(__name__)
class WorkflowDefinition:
"""工作流定义"""
def __init__(self, workflow_id: Optional[str] = None, name: str = "", workflow_type: str = "normal"):
self.workflow_id = workflow_id or f"wf_{uuid.uuid4().hex}"
self.name = name
self.workflow_type = workflow_type # normal或scheduled
self.variables: Dict[str, Any] = {}
self.blocks: List[Dict[str, Any]] = []
self.schedule: Optional[Dict[str, Any]] = None # 定时任务的调度信息
self.version: str = "1.0"
self.description: str = ""
self.created_at: int = int(time.time() * 1000)
self.updated_at: int = int(time.time() * 1000)
def add_block(self, block: Dict[str, Any]) -> None:
"""添加块"""
self.blocks.append(block)
def set_variables(self, variables: Dict[str, Any]) -> None:
"""设置变量"""
self.variables = variables.copy()
def set_schedule(self, schedule: Dict[str, Any]) -> None:
"""设置调度信息"""
self.schedule = schedule
self.workflow_type = "scheduled"
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"workflow_id": self.workflow_id,
"name": self.name,
"workflow_type": self.workflow_type,
"variables": self.variables,
"blocks": self.blocks,
"schedule": self.schedule,
"version": self.version,
"description": self.description,
"created_at": self.created_at,
"updated_at": self.updated_at
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WorkflowDefinition':
"""从字典创建工作流定义"""
workflow = cls(
workflow_id=data.get("workflow_id"),
name=data.get("name", ""),
workflow_type=data.get("workflow_type", "normal")
)
workflow.variables = data.get("variables", {})
workflow.blocks = data.get("blocks", [])
workflow.schedule = data.get("schedule")
workflow.version = data.get("version", "1.0")
workflow.description = data.get("description", "")
workflow.created_at = data.get("created_at", int(time.time() * 1000))
workflow.updated_at = data.get("updated_at", int(time.time() * 1000))
return workflow
def to_json(self) -> str:
"""转换为JSON字符串"""
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> 'WorkflowDefinition':
"""从JSON字符串创建工作流定义"""
data = json.loads(json_str)
return cls.from_dict(data)
class WorkflowExecutor:
"""工作流执行器"""
def __init__(self):
self.component_factory = ComponentFactory
def execute(self, workflow: WorkflowDefinition, task_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
"""执行工作流"""
# 获取上下文
context = TaskContext.get_instance()
# 重置上下文
context.reset()
# 生成任务ID
task_id = f"task_{uuid.uuid4().hex}"
# 初始化任务
context.initialize_task(task_id, workflow.workflow_id, task_inputs)
# 设置任务变量
context.variables.update(workflow.variables)
logger.info(f"开始执行工作流: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
try:
# 执行组件
self._execute_blocks(workflow.blocks)
# 完成任务
context.complete_task("completed")
logger.info(f"工作流执行完成: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
# 返回执行结果
return context.to_dict()
except Exception as e:
# 设置任务状态为执行失败
context.complete_task("failed")
error_msg = f"工作流执行失败: {str(e)}"
logger.error(f"{error_msg}, 工作流: {workflow.name} (ID: {workflow.workflow_id}), 任务ID: {task_id}")
# 返回错误信息
result = context.to_dict()
result["error"] = error_msg
return result
def _execute_blocks(self, blocks: List[Dict[str, Any]]) -> None:
"""执行一系列组件"""
context = TaskContext.get_instance()
for block in blocks:
block_id = block["id"]
block_type = block["type"]
params = block.get("params", {})
logger.debug(f"执行组件: {block_id} ({block_type})")
# 特殊处理流程控制组件
if block_type == "if_condition":
self._execute_if_condition(block_id, params)
elif block_type == "for_each":
self._execute_for_each(block_id, params)
elif block_type == "while_loop":
self._execute_while_loop(block_id, params)
else:
# 创建并执行普通组件
try:
component = self.component_factory.create(block_id, block_type, params)
result = component.execute()
logger.debug(f"组件 {block_id} ({block_type}) 执行成功")
except Exception as e:
error_msg = f"组件 {block_id} ({block_type}) 执行失败: {str(e)}"
logger.error(error_msg)
context.set_block_result(block_id, {"error": error_msg})
# 检查是否需要中断执行
if block.get("stop_on_error", True):
raise ComponentError(error_msg)
def _execute_if_condition(self, block_id: str, params: Dict[str, Any]) -> None:
"""执行条件组件"""
context = TaskContext.get_instance()
try:
# 计算条件表达式
condition_expr = params.get("condition", "False")
condition_value = eval(condition_expr, {"__builtins__": {}}, {
"context": context,
"blocks": context.blocks,
"variables": context.variables
})
# 存储条件结果
context.set_block_result(block_id, {"condition": condition_value})
# 根据条件执行相应分支
if condition_value and "true_blocks" in params:
self._execute_blocks(params["true_blocks"])
elif not condition_value and "false_blocks" in params:
self._execute_blocks(params["false_blocks"])
except Exception as e:
error_msg = f"条件组件 {block_id} 执行失败: {str(e)}"
logger.error(error_msg)
context.set_block_result(block_id, {"error": error_msg})
raise ComponentError(error_msg)
def _execute_for_each(self, block_id: str, params: Dict[str, Any]) -> None:
"""执行循环组件"""
context = TaskContext.get_instance()
try:
# 获取循环项
items_expr = params.get("items", "[]")
items = eval(items_expr, {"__builtins__": {}}, {
"context": context,
"blocks": context.blocks,
"variables": context.variables
})
results = []
# 遍历执行循环体
for index, item in enumerate(items):
# 设置循环变量
context.set_variable("loop_item", item)
context.set_variable("loop_index", index)
# 执行循环体
if "loop_blocks" in params:
self._execute_blocks(params["loop_blocks"])
# 收集结果
if "result_var" in params:
result_var = params["result_var"]
if result_var in context.variables:
results.append(context.get_variable(result_var))
# 存储循环结果
context.set_block_result(block_id, {
"count": len(items),
"results": results
})
except Exception as e:
error_msg = f"循环组件 {block_id} 执行失败: {str(e)}"
logger.error(error_msg)
context.set_block_result(block_id, {"error": error_msg})
raise ComponentError(error_msg)
def _execute_while_loop(self, block_id: str, params: Dict[str, Any]) -> None:
"""执行while循环组件"""
context = TaskContext.get_instance()
try:
max_iterations = params.get("max_iterations", 100)
iteration = 0
results = []
# 循环执行
while iteration < max_iterations:
# 计算条件表达式
condition_expr = params.get("condition", "False")
condition_value = eval(condition_expr, {"__builtins__": {}}, {
"context": context,
"blocks": context.blocks,
"variables": context.variables
})
if not condition_value:
break
# 设置循环变量
context.set_variable("loop_iteration", iteration)
# 执行循环体
if "loop_blocks" in params:
self._execute_blocks(params["loop_blocks"])
# 收集结果
if "result_var" in params:
result_var = params["result_var"]
if result_var in context.variables:
results.append(context.get_variable(result_var))
iteration += 1
# 存储循环结果
context.set_block_result(block_id, {
"iterations": iteration,
"results": results
})
except Exception as e:
error_msg = f"While循环组件 {block_id} 执行失败: {str(e)}"
logger.error(error_msg)
context.set_block_result(block_id, {"error": error_msg})
raise ComponentError(error_msg)