#!/usr/bin/env python # -*- coding: utf-8 -*- """ 内置函数管理模块 为任务编辑器提供系统内置函数信息 支持异步函数调用 """ import asyncio from typing import Dict, List, Any, Callable, Awaitable, Union, Optional from utils.string_utils import ( is_blank, is_not_blank, is_any_blank, is_all_blank, is_numeric, is_alpha, starts_with, ends_with, contains, split, chop, chomp, index_of, difference, replace, remove, right, left, length, reverse ) # 函数类型定义 AsyncFunc = Callable[..., Awaitable[Any]] # 注册所有内置函数及其元数据 BUILT_IN_FUNCTIONS = [ { "id": "isBlank()", "name": "是否为空", "description": "检查字符串是否为null、空字符串或只包含空白字符", "syntax": "isBlank(str)", "example": "isBlank(' ') // 返回 true", "category": "string", "func": is_blank }, { "id": "isNotBlank()", "name": "是否不为空", "description": "检查字符串是否不为空,与isBlank()相反", "syntax": "isNotBlank(str)", "example": "isNotBlank('hello') // 返回 true", "category": "string", "func": is_not_blank }, { "id": "isAnyBlank()", "name": "是否任意一个为空", "description": "检查多个字符串中是否有任意一个为空", "syntax": "isAnyBlank(str1, str2, ...)", "example": "isAnyBlank('hello', '') // 返回 true", "category": "string", "func": is_any_blank }, { "id": "isAllBlank()", "name": "是否全部为空", "description": "检查一个或多个字符串是否全部为空", "syntax": "isAllBlank(str1, str2, ...)", "example": "isAllBlank('', ' ') // 返回 true", "category": "string", "func": is_all_blank }, { "id": "isNumeric()", "name": "是否为数字", "description": "检查字符串是否为数字", "syntax": "isNumeric(str)", "example": "isNumeric('123') // 返回 true", "category": "string", "func": is_numeric }, { "id": "isAlpha()", "name": "是否为字母", "description": "检查字符串是否只包含字母", "syntax": "isAlpha(str)", "example": "isAlpha('abc') // 返回 true", "category": "string", "func": is_alpha }, { "id": "startsWith()", "name": "以某字符开头", "description": "检查字符串是否以某个字符开头", "syntax": "startsWith(str, prefix)", "example": "startsWith('hello', 'he') // 返回 true", "category": "string", "func": starts_with }, { "id": "endsWith()", "name": "以某字符结尾", "description": "判断字符串是否以某个字符结尾", "syntax": "endsWith(str, suffix)", "example": "endsWith('hello', 'lo') // 返回 true", "category": "string", "func": ends_with }, { "id": "contains()", "name": "包含字符", "description": "判断字符串是否包含某个字符", "syntax": "contains(str, substr)", "example": "contains('hello', 'll') // 返回 true", "category": "string", "func": contains }, { "id": "split()", "name": "拆分字符串", "description": "将字符串按照某个字符进行拆分,返回字符串数组", "syntax": "split(str, separator)", "example": "split('a,b,c', ',') // 返回 ['a', 'b', 'c']", "category": "string", "func": split }, { "id": "chop()", "name": "去掉最后一个字符", "description": "将字符串的最后一个字符去掉,返回去掉之后的字符串", "syntax": "chop(str)", "example": "chop('hello') // 返回 'hell'", "category": "string", "func": chop }, { "id": "chomp()", "name": "去除尾部子字符串", "description": "切除字符串最后面的换行符等", "syntax": "chomp(str)", "example": "chomp('hello\\n') // 返回 'hello'", "category": "string", "func": chomp }, { "id": "indexOf()", "name": "查找索引位置", "description": "获取某个字符在字符串中的索引位置", "syntax": "indexOf(str, substr)", "example": "indexOf('hello', 'l') // 返回 2", "category": "string", "func": index_of }, { "id": "difference()", "name": "获取差异部分", "description": "获取第二个字符串在第一个字符串中不一样的部分", "syntax": "difference(str1, str2)", "example": "difference('abc', 'abd') // 返回 'd'", "category": "string", "func": difference }, { "id": "replace", "name": "替换字符", "description": "将字符串中的某些字符替换成另外的字符", "syntax": "replace(str, old, new)", "example": "replace('hello', 'l', 'x') // 返回 'hexxo'", "category": "string", "func": replace }, { "id": "remove()", "name": "删除字符", "description": "删除字符串的某一部分", "syntax": "remove(str, substr)", "example": "remove('hello', 'll') // 返回 'heo'", "category": "string", "func": remove }, { "id": "right()", "name": "截取右侧字符", "description": "从字符串右边截取指定长度的子字符串", "syntax": "right(str, length)", "example": "right('hello', 2) // 返回 'lo'", "category": "string", "func": right }, { "id": "left()", "name": "截取左侧字符", "description": "截取字符串左侧指定长度的字符串", "syntax": "left(str, length)", "example": "left('hello', 2) // 返回 'he'", "category": "string", "func": left }, { "id": "length()", "name": "获取字符串长度", "description": "获得字符串长度", "syntax": "length(str)", "example": "length('hello') // 返回 5", "category": "string", "func": length }, { "id": "reverse()", "name": "反转字符串", "description": "反转字符串", "syntax": "reverse(str)", "example": "reverse('hello') // 返回 'olleh'", "category": "string", "func": reverse } ] # 创建函数字典以便根据ID快速查找 FUNCTION_MAP = {func["id"]: func for func in BUILT_IN_FUNCTIONS} def get_function_list() -> List[Dict[str, Any]]: """ 获取所有内置函数的元数据列表 Returns: List[Dict[str, Any]]: 函数列表,不包含func实现 """ return [{k: v for k, v in func.items() if k != 'func'} for func in BUILT_IN_FUNCTIONS] def get_function_by_id(function_id: str) -> Optional[Dict[str, Any]]: """ 根据ID获取指定函数的元数据 Args: function_id: 函数ID Returns: Optional[Dict[str, Any]]: 函数元数据,如果不存在则返回None """ return FUNCTION_MAP.get(function_id) async def call_function(function_id: str, *args, **kwargs) -> Any: """ 异步调用指定ID的函数 Args: function_id: 函数ID *args: 位置参数 **kwargs: 关键字参数 Returns: Any: 调用函数的结果 Raises: ValueError: 如果函数不存在 """ func_data = FUNCTION_MAP.get(function_id) if not func_data: raise ValueError(f"内置函数 {function_id} 不存在") # 获取异步函数并调用 async_func: AsyncFunc = func_data["func"] return await async_func(*args, **kwargs) def call_function_sync(function_id: str, *args, **kwargs) -> Any: """ 同步调用指定ID的函数(在内部使用事件循环执行异步函数) 适用于非异步环境中调用异步函数 Args: function_id: 函数ID *args: 位置参数 **kwargs: 关键字参数 Returns: Any: 调用函数的结果 Raises: ValueError: 如果函数不存在 """ # 创建一个新的事件循环来执行异步函数 loop = asyncio.new_event_loop() try: return loop.run_until_complete(call_function(function_id, *args, **kwargs)) finally: loop.close()