tianfeng_task_modules/services/workflow_service.py
2025-03-17 14:58:05 +08:00

148 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# services/workflow_service.py
"""
工作流服务
"""
from typing import Dict, Any, List, Optional
import json
import time
import uuid
from core.workflow import WorkflowDefinition, WorkflowExecutor
from core.exceptions import WorkflowError
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger(__name__)
class WorkflowService:
"""工作流服务负责工作流的CRUD操作和执行"""
def __init__(self):
self.workflow_executor = WorkflowExecutor()
# 实际实现中,这里应该使用数据库存储工作流定义
# 这里简化为使用内存存储
self._workflows = {}
def create_workflow(self, name: str, workflow_type: str = "normal",
description: str = "", blocks: List[Dict[str, Any]] = None,
variables: Dict[str, Any] = None, schedule: Dict[str, Any] = None) -> WorkflowDefinition:
"""创建工作流"""
workflow_id = f"wf_{uuid.uuid4().hex}"
workflow = WorkflowDefinition(workflow_id, name, workflow_type)
workflow.description = description
if blocks:
workflow.blocks = blocks
if variables:
workflow.variables = variables
if schedule and workflow_type == "scheduled":
workflow.set_schedule(schedule)
# 保存工作流
self._workflows[workflow_id] = workflow
logger.info(f"创建工作流: {name} (ID: {workflow_id})")
return workflow
def update_workflow(self, workflow_id: str, name: Optional[str] = None,
description: Optional[str] = None, blocks: Optional[List[Dict[str, Any]]] = None,
variables: Optional[Dict[str, Any]] = None,
schedule: Optional[Dict[str, Any]] = None) -> Optional[WorkflowDefinition]:
"""更新工作流"""
if workflow_id not in self._workflows:
logger.error(f"更新工作流失败: 找不到工作流 {workflow_id}")
return None
workflow = self._workflows[workflow_id]
if name is not None:
workflow.name = name
if description is not None:
workflow.description = description
if blocks is not None:
workflow.blocks = blocks
if variables is not None:
workflow.variables = variables
if schedule is not None:
workflow.set_schedule(schedule)
# 更新时间戳
workflow.updated_at = int(time.time() * 1000)
# 保存更新后的工作流
self._workflows[workflow_id] = workflow
logger.info(f"更新工作流: {workflow.name} (ID: {workflow_id})")
return workflow
def delete_workflow(self, workflow_id: str) -> bool:
"""删除工作流"""
if workflow_id not in self._workflows:
logger.error(f"删除工作流失败: 找不到工作流 {workflow_id}")
return False
# 删除工作流
del self._workflows[workflow_id]
logger.info(f"删除工作流: {workflow_id}")
return True
def get_workflow_by_id(self, workflow_id: str) -> Optional[WorkflowDefinition]:
"""根据ID获取工作流"""
return self._workflows.get(workflow_id)
def get_all_workflows(self, workflow_type: Optional[str] = None) -> List[WorkflowDefinition]:
"""获取所有工作流"""
if workflow_type:
return [wf for wf in self._workflows.values() if wf.workflow_type == workflow_type]
else:
return list(self._workflows.values())
def execute_workflow(self, workflow: WorkflowDefinition,
task_inputs: Dict[str, Any] = None) -> Dict[str, Any]:
"""执行工作流"""
try:
# 执行工作流
result = self.workflow_executor.execute(workflow, task_inputs)
# 记录执行结果
# 实际实现中,这里应该将执行结果保存到数据库
return result
except Exception as e:
logger.error(f"执行工作流失败: {workflow.name} (ID: {workflow.workflow_id}), 错误: {str(e)}")
raise WorkflowError(f"执行工作流失败: {str(e)}")
def import_workflow(self, workflow_json: str) -> WorkflowDefinition:
"""导入工作流"""
try:
workflow = WorkflowDefinition.from_json(workflow_json)
# 保存导入的工作流
self._workflows[workflow.workflow_id] = workflow
logger.info(f"导入工作流: {workflow.name} (ID: {workflow.workflow_id})")
return workflow
except Exception as e:
logger.error(f"导入工作流失败: {str(e)}")
raise WorkflowError(f"导入工作流失败: {str(e)}")
def export_workflow(self, workflow_id: str) -> str:
"""导出工作流"""
workflow = self.get_workflow_by_id(workflow_id)
if not workflow:
logger.error(f"导出工作流失败: 找不到工作流 {workflow_id}")
raise WorkflowError(f"导出工作流失败: 找不到工作流 {workflow_id}")
return workflow.to_json()