148 lines
5.3 KiB
Python
148 lines
5.3 KiB
Python
# services/workflow_service.py
|
||
"""
|
||
工作流服务
|
||
"""
|
||
from typing import Dict, Any, List, Optional
|
||
import json
|
||
import time
|
||
import uuid
|
||
from core.workflow import WorkflowDefinition, WorkflowExecutor
|
||
from core.exceptions import WorkflowError
|
||
from utils.logger import get_logger
|
||
|
||
# 获取日志记录器
|
||
logger = get_logger(__name__)
|
||
|
||
class WorkflowService:
|
||
"""工作流服务,负责工作流的CRUD操作和执行"""
|
||
|
||
def __init__(self):
|
||
self.workflow_executor = WorkflowExecutor()
|
||
# 实际实现中,这里应该使用数据库存储工作流定义
|
||
# 这里简化为使用内存存储
|
||
self._workflows = {}
|
||
|
||
def create_workflow(self, name: str, workflow_type: str = "normal",
|
||
description: str = "", blocks: List[Dict[str, Any]] = None,
|
||
variables: Dict[str, Any] = None, schedule: Dict[str, Any] = None) -> WorkflowDefinition:
|
||
"""创建工作流"""
|
||
workflow_id = f"wf_{uuid.uuid4().hex}"
|
||
|
||
workflow = WorkflowDefinition(workflow_id, name, workflow_type)
|
||
workflow.description = description
|
||
|
||
if blocks:
|
||
workflow.blocks = blocks
|
||
|
||
if variables:
|
||
workflow.variables = variables
|
||
|
||
if schedule and workflow_type == "scheduled":
|
||
workflow.set_schedule(schedule)
|
||
|
||
# 保存工作流
|
||
self._workflows[workflow_id] = workflow
|
||
|
||
logger.info(f"创建工作流: {name} (ID: {workflow_id})")
|
||
|
||
return workflow
|
||
|
||
def update_workflow(self, workflow_id: str, name: Optional[str] = None,
|
||
description: Optional[str] = None, blocks: Optional[List[Dict[str, Any]]] = None,
|
||
variables: Optional[Dict[str, Any]] = None,
|
||
schedule: Optional[Dict[str, Any]] = None) -> Optional[WorkflowDefinition]:
|
||
"""更新工作流"""
|
||
if workflow_id not in self._workflows:
|
||
logger.error(f"更新工作流失败: 找不到工作流 {workflow_id}")
|
||
return None
|
||
|
||
workflow = self._workflows[workflow_id]
|
||
|
||
if name is not None:
|
||
workflow.name = name
|
||
|
||
if description is not None:
|
||
workflow.description = description
|
||
|
||
if blocks is not None:
|
||
workflow.blocks = blocks
|
||
|
||
if variables is not None:
|
||
workflow.variables = variables
|
||
|
||
if schedule is not None:
|
||
workflow.set_schedule(schedule)
|
||
|
||
# 更新时间戳
|
||
workflow.updated_at = int(time.time() * 1000)
|
||
|
||
# 保存更新后的工作流
|
||
self._workflows[workflow_id] = workflow
|
||
|
||
logger.info(f"更新工作流: {workflow.name} (ID: {workflow_id})")
|
||
|
||
return workflow
|
||
|
||
def delete_workflow(self, workflow_id: str) -> bool:
|
||
"""删除工作流"""
|
||
if workflow_id not in self._workflows:
|
||
logger.error(f"删除工作流失败: 找不到工作流 {workflow_id}")
|
||
return False
|
||
|
||
# 删除工作流
|
||
del self._workflows[workflow_id]
|
||
|
||
logger.info(f"删除工作流: {workflow_id}")
|
||
|
||
return True
|
||
|
||
def get_workflow_by_id(self, workflow_id: str) -> Optional[WorkflowDefinition]:
|
||
"""根据ID获取工作流"""
|
||
return self._workflows.get(workflow_id)
|
||
|
||
def get_all_workflows(self, workflow_type: Optional[str] = None) -> List[WorkflowDefinition]:
|
||
"""获取所有工作流"""
|
||
if workflow_type:
|
||
return [wf for wf in self._workflows.values() if wf.workflow_type == workflow_type]
|
||
else:
|
||
return list(self._workflows.values())
|
||
|
||
def execute_workflow(self, workflow: WorkflowDefinition,
|
||
task_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
|
||
"""执行工作流"""
|
||
try:
|
||
# 执行工作流
|
||
result = self.workflow_executor.execute(workflow, task_inputs)
|
||
|
||
# 记录执行结果
|
||
# 实际实现中,这里应该将执行结果保存到数据库
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"执行工作流失败: {workflow.name} (ID: {workflow.workflow_id}), 错误: {str(e)}")
|
||
raise WorkflowError(f"执行工作流失败: {str(e)}")
|
||
|
||
def import_workflow(self, workflow_json: str) -> WorkflowDefinition:
|
||
"""导入工作流"""
|
||
try:
|
||
workflow = WorkflowDefinition.from_json(workflow_json)
|
||
|
||
# 保存导入的工作流
|
||
self._workflows[workflow.workflow_id] = workflow
|
||
|
||
logger.info(f"导入工作流: {workflow.name} (ID: {workflow.workflow_id})")
|
||
|
||
return workflow
|
||
except Exception as e:
|
||
logger.error(f"导入工作流失败: {str(e)}")
|
||
raise WorkflowError(f"导入工作流失败: {str(e)}")
|
||
|
||
def export_workflow(self, workflow_id: str) -> str:
|
||
"""导出工作流"""
|
||
workflow = self.get_workflow_by_id(workflow_id)
|
||
|
||
if not workflow:
|
||
logger.error(f"导出工作流失败: 找不到工作流 {workflow_id}")
|
||
raise WorkflowError(f"导出工作流失败: 找不到工作流 {workflow_id}")
|
||
|
||
return workflow.to_json() |