2025-03-17 14:58:05 +08:00

222 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# components/base_components.py
"""
基础组件
"""
from typing import Dict, Any, Optional
import time
import hashlib
import json
import uuid
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("check_task_instance_id_exists")
class CheckTaskInstanceIdExistsComponent(Component):
"""校验任务实例Id是否存在组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
task_instance_id = self.resolve_param("task_instance_id")
# 验证必要参数
self.validate_required_params(["task_instance_id"])
# 实际实现中这里应该查询数据库验证任务ID是否存在
# 这里简化为检查是否与当前任务ID相同
exists = task_instance_id == self.context.task_id
result = {"exists": exists}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("create_unique_id")
class CreateUniqueIdComponent(Component):
"""创建唯一ID组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
prefix = self.resolve_param("prefix", "")
# 生成唯一ID
unique_id = f"{prefix}{uuid.uuid4().hex}"
result = {"id": unique_id}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("current_timestamp")
class CurrentTimestampComponent(Component):
"""当前时间戳组件"""
def execute(self) -> Dict[str, Any]:
# 获取当前时间戳(毫秒级)
timestamp = int(time.time() * 1000)
result = {"timestamp": timestamp}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("current_time")
class CurrentTimeComponent(Component):
"""当前时间组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
format_str = self.resolve_param("format", "%Y-%m-%d %H:%M:%S")
# 获取格式化的当前时间
current_time = time.strftime(format_str, time.localtime())
result = {"time": current_time}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("execute_sql")
class ExecuteSqlComponent(Component):
"""执行SQL组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
sql = self.resolve_param("sql")
params = self.resolve_param("params", [])
# 验证必要参数
self.validate_required_params(["sql"])
# 实际实现中这里应该执行SQL语句
# 这里简化为返回模拟结果
result = {
"affected_rows": 1,
"success": True
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("query_sql")
class QuerySqlComponent(Component):
"""查询SQL组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
sql = self.resolve_param("sql")
params = self.resolve_param("params", [])
# 验证必要参数
self.validate_required_params(["sql"])
# 实际实现中这里应该执行SQL查询
# 这里简化为返回模拟结果
result = {
"result": [{"id": 1, "name": "示例数据"}],
"count": 1
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("string_md5_encrypt")
class StringMd5EncryptComponent(Component):
"""字符串MD5加密组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
text = self.resolve_param("text")
uppercase = self.resolve_param("uppercase", False)
salt = self.resolve_param("salt", "")
# 验证必要参数
self.validate_required_params(["text"])
# 执行MD5加密
md5_hash = hashlib.md5((text + salt).encode()).hexdigest()
if uppercase:
md5_hash = md5_hash.upper()
result = {"md5": md5_hash}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("string_to_json_array")
class StringToJsonArrayComponent(Component):
"""将字符串转换成JSON数组组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
text = self.resolve_param("text")
default = self.resolve_param("default", [])
# 验证必要参数
self.validate_required_params(["text"])
# 执行转换
try:
json_array = json.loads(text)
if not isinstance(json_array, list):
json_array = default
except Exception:
json_array = default
result = {"jsonArray": json_array}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("string_to_json_object")
class StringToJsonObjectComponent(Component):
"""将字符串转换成JSON对象组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
text = self.resolve_param("text")
default = self.resolve_param("default", {})
# 验证必要参数
self.validate_required_params(["text"])
# 执行转换
try:
json_object = json.loads(text)
if not isinstance(json_object, dict):
json_object = default
except Exception:
json_object = default
result = {"jsonObject": json_object}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("print")
class PrintComponent(Component):
"""打印组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
content = self.resolve_param("content")
# 验证必要参数
self.validate_required_params(["content"])
# 执行打印
print(content)
result = {"printed": True, "content": content}
# 存储结果
self.store_result(result)
return result