2025-03-17 14:58:05 +08:00

262 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务流程模型模块
包含任务流程图节点和连接相关的数据模型
"""
import enum
from sqlalchemy import Column, Integer, String, Float, Text, Boolean, Enum, ForeignKey, JSON, UniqueConstraint
from sqlalchemy.orm import relationship
from data.models.base import BaseModel
class NodeType(enum.Enum):
"""
节点类型枚举
"""
COMPONENT = 'component' # 组件节点
START = 'start' # 开始节点
END = 'end' # 结束节点
CONDITION = 'condition' # 条件节点
LOOP = 'loop' # 循环节点
PARALLEL = 'parallel' # 并行节点
SUBPROCESS = 'subprocess' # 子流程节点
CUSTOM = 'custom' # 自定义节点
class TaskFlowNode(BaseModel):
"""
任务流程节点模型
表示任务流程图中的一个节点
"""
__tablename__ = 'task_flow_nodes'
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False, comment='任务ID')
version_id = Column(Integer, ForeignKey('task_versions.id'), nullable=False, comment='任务版本ID')
node_type = Column(Enum(NodeType), nullable=False, comment='节点类型')
component_type_id = Column(Integer, ForeignKey('component_types.id'), nullable=True, comment='组件类型ID')
name = Column(String(100), nullable=False, comment='节点名称')
code = Column(String(100), nullable=True, comment='节点代码')
description = Column(String(500), nullable=True, comment='节点描述')
position_x = Column(Float, nullable=False, default=0, comment='节点X坐标')
position_y = Column(Float, nullable=False, default=0, comment='节点Y坐标')
width = Column(Float, nullable=False, default=200, comment='节点宽度')
height = Column(Float, nullable=False, default=100, comment='节点高度')
config = Column(JSON, nullable=True, comment='节点配置')
is_disabled = Column(Boolean, default=False, comment='是否禁用')
# 关联关系
task = relationship('Task', back_populates='flow_nodes')
version = relationship('TaskVersion', back_populates='flow_nodes')
component_type = relationship('ComponentType')
outgoing_connections = relationship('TaskFlowConnection', foreign_keys='TaskFlowConnection.source_node_id', back_populates='source_node', cascade='all, delete-orphan')
incoming_connections = relationship('TaskFlowConnection', foreign_keys='TaskFlowConnection.target_node_id', back_populates='target_node', cascade='all, delete-orphan')
parameter_values = relationship('ComponentParameterValue', back_populates='component_instance', cascade='all, delete-orphan')
def __repr__(self):
return f"<TaskFlowNode(id={self.id}, name='{self.name}', task_id={self.task_id}, version_id={self.version_id})>"
@classmethod
def get_by_task_version(cls, task_id, version_id):
"""
获取任务版本的所有节点
"""
return cls.query.filter(
cls.task_id == task_id,
cls.version_id == version_id,
cls.is_deleted == False
).all()
@classmethod
def create_node(cls, task_id, version_id, node_type, name, component_type_id=None,
code=None, description=None, position_x=0, position_y=0,
width=200, height=100, config=None, is_disabled=False):
"""
创建节点
"""
from config.database import db_session
node = cls(
task_id=task_id,
version_id=version_id,
node_type=node_type,
component_type_id=component_type_id,
name=name,
code=code,
description=description,
position_x=position_x,
position_y=position_y,
width=width,
height=height,
config=config,
is_disabled=is_disabled
)
db_session.add(node)
db_session.commit()
return node
def update_position(self, position_x, position_y, width=None, height=None):
"""
更新节点位置和大小
"""
from config.database import db_session
self.position_x = position_x
self.position_y = position_y
if width is not None:
self.width = width
if height is not None:
self.height = height
db_session.commit()
return self
def update_config(self, config):
"""
更新节点配置
"""
from config.database import db_session
self.config = config
db_session.commit()
return self
def get_parameters(self):
"""
获取节点的所有参数值
Returns:
dict: 参数值字典,键为参数代码,值为参数值
"""
from data.models.component_parameter import ComponentParameterValue
return ComponentParameterValue.get_by_component_instance(self.id)
def set_parameter(self, parameter_code, value, value_format='simple', is_expression=False, expression=None):
"""
设置节点参数值
Args:
parameter_code (str): 参数代码
value (any): 参数值
value_format (str, optional): 值格式可选值simple, json, expression
is_expression (bool, optional): 是否为表达式
expression (str, optional): 表达式内容
Returns:
ComponentParameterValue: 参数值对象
"""
from data.models.component_parameter import ComponentParameterValue, ParameterValueFormat
# 转换值格式
if value_format == 'simple':
format_enum = ParameterValueFormat.SIMPLE
elif value_format == 'json':
format_enum = ParameterValueFormat.JSON
elif value_format == 'expression':
format_enum = ParameterValueFormat.EXPRESSION
else:
format_enum = ParameterValueFormat.SIMPLE
return ComponentParameterValue.set_parameter_value(
self.id, parameter_code, value, format_enum, is_expression, expression
)
def delete_parameter(self, parameter_code):
"""
删除节点参数值
Args:
parameter_code (str): 参数代码
Returns:
bool: 是否删除成功
"""
from data.models.component_parameter import ComponentParameterValue
return ComponentParameterValue.delete_parameter_value(self.id, parameter_code)
class ConnectionType(enum.Enum):
"""
连接类型枚举
"""
NORMAL = 'normal' # 普通连接
SUCCESS = 'success' # 成功路径
FAILURE = 'failure' # 失败路径
TRUE = 'true' # 条件为真
FALSE = 'false' # 条件为假
CUSTOM = 'custom' # 自定义路径
class TaskFlowConnection(BaseModel):
"""
任务流程连接模型
表示任务流程图中节点之间的连接
"""
__tablename__ = 'task_flow_connections'
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False, comment='任务ID')
version_id = Column(Integer, ForeignKey('task_versions.id'), nullable=False, comment='任务版本ID')
source_node_id = Column(Integer, ForeignKey('task_flow_nodes.id'), nullable=False, comment='源节点ID')
target_node_id = Column(Integer, ForeignKey('task_flow_nodes.id'), nullable=False, comment='目标节点ID')
label = Column(String(100), nullable=True, comment='连接标签')
condition = Column(String(500), nullable=True, comment='连接条件')
config = Column(JSON, nullable=True, comment='连接配置')
is_default = Column(Boolean, default=False, comment='是否为默认连接')
# 关联关系
task = relationship('Task')
version = relationship('TaskVersion')
source_node = relationship('TaskFlowNode', foreign_keys=[source_node_id], back_populates='outgoing_connections')
target_node = relationship('TaskFlowNode', foreign_keys=[target_node_id], back_populates='incoming_connections')
def __repr__(self):
return f"<TaskFlowConnection(id={self.id}, source_node_id={self.source_node_id}, target_node_id={self.target_node_id})>"
@classmethod
def get_by_task_version(cls, task_id, version_id):
"""
获取任务版本的所有连接
"""
return cls.query.filter(
cls.task_id == task_id,
cls.version_id == version_id,
cls.is_deleted == False
).all()
@classmethod
def create_connection(cls, task_id, version_id, source_node_id, target_node_id,
label=None, condition=None, config=None, is_default=False):
"""
创建连接
"""
from config.database import db_session
connection = cls(
task_id=task_id,
version_id=version_id,
source_node_id=source_node_id,
target_node_id=target_node_id,
label=label,
condition=condition,
config=config,
is_default=is_default
)
db_session.add(connection)
db_session.commit()
return connection
def update_condition(self, condition, is_default=None):
"""
更新连接条件
"""
from config.database import db_session
self.condition = condition
if is_default is not None:
self.is_default = is_default
db_session.commit()
return self