VWED_server/services/online_script/lsp_completion_service.py

675 lines
26 KiB
Python
Raw Normal View History

2025-09-30 13:52:36 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LSP (Language Server Protocol) 代码补全服务
基于WebSocket通信为前端脚本编辑器提供智能补全功能
"""
import ast
import re
import json
import inspect
import keyword
import builtins
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass
from utils.logger import get_logger
from .built_in_modules import (
VWEDApiModule, VWEDFunctionModule, VWEDEventModule, VWEDTimerModule,
VWEDLogModule, VWEDTaskModule, VWEDDataModule, VWEDUtilModule, VWEDDeviceModule
)
logger = get_logger("services.lsp_completion")
@dataclass
class LSPPosition:
"""LSP位置信息"""
line: int
character: int
@dataclass
class LSPRange:
"""LSP范围信息"""
start: LSPPosition
end: LSPPosition
@dataclass
class LSPCompletionItem:
"""LSP补全项"""
label: str
kind: int # CompletionItemKind
detail: Optional[str] = None
documentation: Optional[str] = None
insert_text: Optional[str] = None
insert_text_format: int = 1 # PlainText = 1, Snippet = 2
sort_text: Optional[str] = None
filter_text: Optional[str] = None
class LSPCompletionItemKind:
"""LSP补全项类型枚举"""
Text = 1
Method = 2
Function = 3
Constructor = 4
Field = 5
Variable = 6
Class = 7
Interface = 8
Module = 9
Property = 10
Unit = 11
Value = 12
Enum = 13
Keyword = 14
Snippet = 15
Color = 16
File = 17
Reference = 18
class VWEDLSPCompletionService:
"""VWED LSP代码补全服务"""
def __init__(self):
self.builtin_modules = self._init_builtin_modules()
self.vwed_namespace = self._build_vwed_namespace()
self.python_keywords = keyword.kwlist
self.python_builtins = dir(builtins)
# 打印初始化信息
logger.info(f"LSP代码补全服务初始化完成")
logger.info(f"已加载VWED模块: {list(self.builtin_modules.keys())}")
logger.info(f"VWED命名空间: {list(self.vwed_namespace.keys())}")
for module_name, methods in self.vwed_namespace.items():
logger.debug(f"模块 {module_name} 包含方法: {list(methods.keys())[:5]}...") # 只显示前5个方法
def _init_builtin_modules(self) -> Dict[str, Any]:
"""初始化内置模块"""
return {
'api': VWEDApiModule(None),
'function': VWEDFunctionModule(None),
'event': VWEDEventModule(None),
'timer': VWEDTimerModule(None),
'log': VWEDLogModule(None),
'task': VWEDTaskModule(None),
'data': VWEDDataModule(None),
'util': VWEDUtilModule(None),
'device': VWEDDeviceModule(None),
}
def _build_vwed_namespace(self) -> Dict[str, Dict[str, Any]]:
"""构建VWED命名空间结构"""
namespace = {}
for module_name, module_instance in self.builtin_modules.items():
module_methods = {}
for attr_name in dir(module_instance):
if not attr_name.startswith('_'):
attr = getattr(module_instance, attr_name)
if callable(attr):
try:
sig = inspect.signature(attr)
doc = inspect.getdoc(attr) or f"VWED.{module_name}.{attr_name} 方法"
module_methods[attr_name] = {
'type': 'method',
'signature': str(sig),
'doc': doc,
'parameters': self._extract_parameters(sig)
}
except Exception as e:
logger.warning(f"获取方法 {attr_name} 签名失败: {e}")
module_methods[attr_name] = {
'type': 'method',
'signature': '(...)',
'doc': f"VWED.{module_name}.{attr_name} 方法",
'parameters': []
}
namespace[module_name] = module_methods
return namespace
def _extract_parameters(self, signature: inspect.Signature) -> List[Dict[str, Any]]:
"""提取函数参数信息"""
parameters = []
for param_name, param in signature.parameters.items():
if param_name == 'self':
continue
param_info = {
'name': param_name,
'kind': param.kind.name,
'default': str(param.default) if param.default != inspect.Parameter.empty else None,
'annotation': str(param.annotation) if param.annotation != inspect.Parameter.empty else None
}
parameters.append(param_info)
return parameters
def handle_lsp_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理LSP消息"""
try:
method = message.get('method')
params = message.get('params', {})
request_id = message.get('id')
if method == 'textDocument/completion':
result = self._handle_completion(params)
return self._create_response(request_id, result)
elif method == 'textDocument/signatureHelp':
result = self._handle_signature_help(params)
return self._create_response(request_id, result)
elif method == 'textDocument/hover':
result = self._handle_hover(params)
return self._create_response(request_id, result)
elif method == 'textDocument/didOpen':
# 文档打开通知,无需响应
return None
elif method == 'textDocument/didChange':
# 文档变更通知,无需响应
return None
elif method == 'initialize':
result = self._handle_initialize(params)
return self._create_response(request_id, result)
else:
logger.warning(f"未处理的LSP方法: {method}")
return self._create_error_response(request_id, -32601, "Method not found")
except Exception as e:
logger.error(f"处理LSP消息失败: {e}", exc_info=True)
return self._create_error_response(
message.get('id'), -32603, f"Internal error: {str(e)}"
)
def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求"""
return {
"capabilities": {
"completionProvider": {
"triggerCharacters": [".", " "],
"resolveProvider": False
},
"signatureHelpProvider": {
"triggerCharacters": ["(", ","]
},
"hoverProvider": True,
"textDocumentSync": {
"openClose": True,
"change": 2 # Incremental
}
},
"serverInfo": {
"name": "VWED Python Language Server",
"version": "1.0.0"
}
}
def _handle_completion(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理代码补全请求"""
try:
text_document = params.get('textDocument', {})
position = params.get('position', {})
context = params.get('context', {})
# 这里需要获取文档内容,实际实现中需要维护文档状态
# 暂时从context中获取或者通过其他方式获取完整文档内容
document_text = context.get('documentText', '')
line = position.get('line', 0)
character = position.get('character', 0)
logger.debug(f"补全请求 - 行: {line}, 列: {character}")
logger.debug(f"文档内容长度: {len(document_text)}")
# 转换为字符位置
cursor_position = self._position_to_offset(document_text, line, character)
# 获取补全建议
completions = self._get_completions(document_text, cursor_position)
logger.info(f"生成补全建议数量: {len(completions)}")
if completions:
logger.debug(f"前3个补全建议: {[c['label'] for c in completions[:3]]}")
result = {
"isIncomplete": False,
"items": [self._to_lsp_completion_item(item) for item in completions]
}
return result
except Exception as e:
logger.error(f"处理补全请求失败: {e}", exc_info=True)
return {"isIncomplete": False, "items": []}
def _handle_signature_help(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理函数签名帮助请求"""
try:
position = params.get('position', {})
context = params.get('context', {})
document_text = context.get('documentText', '')
line = position.get('line', 0)
character = position.get('character', 0)
cursor_position = self._position_to_offset(document_text, line, character)
signature_info = self._get_signature_help(document_text, cursor_position)
if signature_info:
return {
"signatures": [signature_info],
"activeSignature": 0,
"activeParameter": signature_info.get('activeParameter', 0)
}
return None
except Exception as e:
logger.error(f"处理签名帮助失败: {e}")
return None
def _handle_hover(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理悬停信息请求"""
try:
position = params.get('position', {})
context = params.get('context', {})
document_text = context.get('documentText', '')
line = position.get('line', 0)
character = position.get('character', 0)
hover_info = self._get_hover_info(document_text, line, character)
if hover_info:
return {
"contents": {
"kind": "markdown",
"value": hover_info
}
}
return None
except Exception as e:
logger.error(f"处理悬停信息失败: {e}")
return None
def _get_completions(self, code: str, cursor_position: int) -> List[Dict[str, Any]]:
"""获取代码补全建议"""
try:
line, column = self._get_cursor_position(code, cursor_position)
current_line = self._get_current_line(code, line)
prefix = self._get_completion_prefix(current_line, column)
logger.debug(f"光标位置 - 行: {line}, 列: {column}")
logger.debug(f"当前行: '{current_line}'")
logger.debug(f"补全前缀: '{prefix}'")
# 分析代码获取变量
variables = self._extract_variables_from_code(code)
logger.debug(f"提取的变量: {variables}")
completions = []
# VWED对象补全
if 'VWED.' in current_line:
logger.debug("检测到VWED调用获取VWED补全")
vwed_completions = self._get_vwed_completions(prefix, current_line)
completions.extend(vwed_completions)
logger.debug(f"VWED补全数量: {len(vwed_completions)}")
# Python关键字和内置函数补全
if '.' not in prefix:
logger.debug("获取Python关键字和内置函数补全")
keyword_completions = self._get_keyword_completions(prefix)
builtin_completions = self._get_builtin_completions(prefix)
variable_completions = self._get_variable_completions(prefix, variables)
completions.extend(keyword_completions)
completions.extend(builtin_completions)
completions.extend(variable_completions)
logger.debug(f"关键字补全数量: {len(keyword_completions)}")
logger.debug(f"内置函数补全数量: {len(builtin_completions)}")
logger.debug(f"变量补全数量: {len(variable_completions)}")
# 如果没有前缀但检测到VWED也提供基本的VWED模块补全
if not prefix and 'VWED' in current_line:
logger.debug("提供基本VWED模块补全")
for module in self.vwed_namespace.keys():
completions.append({
'label': module,
'kind': 'module',
'detail': f'VWED.{module}',
'documentation': f'VWED内置{module}模块',
'insertText': module
})
final_completions = self._deduplicate_and_sort(completions, prefix)
logger.debug(f"最终补全数量: {len(final_completions)}")
return final_completions
except Exception as e:
logger.error(f"获取补全建议失败: {e}", exc_info=True)
return []
def _get_signature_help(self, code: str, cursor_position: int) -> Optional[Dict[str, Any]]:
"""获取函数签名帮助"""
try:
line, column = self._get_cursor_position(code, cursor_position)
current_line = self._get_current_line(code, line)
function_call = self._find_function_call(current_line, column)
if function_call and function_call.startswith('VWED.'):
parts = function_call.split('.')
if len(parts) >= 3:
module_name = parts[1]
method_name = parts[2]
if (module_name in self.vwed_namespace and
method_name in self.vwed_namespace[module_name]):
method_info = self.vwed_namespace[module_name][method_name]
parameters = method_info.get('parameters', [])
# 计算当前参数位置
active_param = self._calculate_active_parameter(current_line, column)
return {
"label": f"{function_call}{method_info['signature']}",
"documentation": method_info.get('doc', ''),
"parameters": [
{"label": param['name'], "documentation": param.get('annotation', '')}
for param in parameters
],
"activeParameter": min(active_param, len(parameters) - 1) if parameters else 0
}
return None
except Exception as e:
logger.error(f"获取签名帮助失败: {e}")
return None
def _get_hover_info(self, code: str, line: int, character: int) -> Optional[str]:
"""获取悬停信息"""
try:
current_line = self._get_current_line(code, line)
word = self._get_word_at_position(current_line, character)
if word.startswith('VWED.'):
parts = word.split('.')
if len(parts) >= 3:
module_name = parts[1]
method_name = parts[2]
if (module_name in self.vwed_namespace and
method_name in self.vwed_namespace[module_name]):
method_info = self.vwed_namespace[module_name][method_name]
return f"```python\n{word}{method_info['signature']}\n```\n\n{method_info['doc']}"
return None
except Exception as e:
logger.error(f"获取悬停信息失败: {e}")
return None
def _position_to_offset(self, text: str, line: int, character: int) -> int:
"""将行列位置转换为字符偏移"""
lines = text.split('\n')
offset = 0
for i in range(min(line, len(lines) - 1)):
offset += len(lines[i]) + 1 # +1 for newline
offset += min(character, len(lines[line]) if line < len(lines) else 0)
return offset
def _get_cursor_position(self, code: str, cursor_position: int) -> Tuple[int, int]:
"""将光标位置转换为行列坐标"""
lines = code[:cursor_position].split('\n')
line = len(lines) - 1
column = len(lines[-1])
return line, column
def _get_current_line(self, code: str, line_number: int) -> str:
"""获取指定行的内容"""
lines = code.split('\n')
if 0 <= line_number < len(lines):
return lines[line_number]
return ""
def _get_completion_prefix(self, line: str, column: int) -> str:
"""获取补全前缀"""
start = column
while start > 0 and (line[start - 1].isalnum() or line[start - 1] in ['_', '.']):
start -= 1
return line[start:column]
def _get_word_at_position(self, line: str, character: int) -> str:
"""获取指定位置的单词"""
start = character
end = character
# 向左扩展
while start > 0 and (line[start - 1].isalnum() or line[start - 1] in ['_', '.']):
start -= 1
# 向右扩展
while end < len(line) and (line[end].isalnum() or line[end] in ['_', '.']):
end += 1
return line[start:end]
def _find_function_call(self, line: str, column: int) -> Optional[str]:
"""查找当前位置的函数调用"""
paren_pos = line.rfind('(', 0, column)
if paren_pos == -1:
return None
func_end = paren_pos
func_start = func_end
while func_start > 0 and (line[func_start - 1].isalnum() or line[func_start - 1] in ['_', '.']):
func_start -= 1
return line[func_start:func_end] if func_start < func_end else None
def _calculate_active_parameter(self, line: str, column: int) -> int:
"""计算当前活跃的参数位置"""
paren_pos = line.rfind('(', 0, column)
if paren_pos == -1:
return 0
# 计算逗号数量
param_text = line[paren_pos + 1:column]
return param_text.count(',')
def _extract_variables_from_code(self, code: str) -> List[str]:
"""从代码中提取变量名"""
variables = []
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Store):
variables.append(node.id)
elif isinstance(node, ast.FunctionDef):
variables.append(node.name)
elif isinstance(node, ast.ClassDef):
variables.append(node.name)
except SyntaxError:
# 语法错误时使用正则表达式
variable_pattern = r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*='
matches = re.findall(variable_pattern, code)
variables.extend(matches)
return list(set(variables))
def _get_vwed_completions(self, prefix: str, line: str) -> List[Dict[str, Any]]:
"""获取VWED对象的补全建议"""
completions = []
vwed_match = re.search(r'VWED\.([a-zA-Z_][a-zA-Z0-9_]*)?\.?([a-zA-Z_][a-zA-Z0-9_]*)?', line)
if vwed_match:
module_name = vwed_match.group(1)
method_name = vwed_match.group(2)
if module_name and module_name in self.vwed_namespace:
# 补全模块方法
for method, info in self.vwed_namespace[module_name].items():
if not method_name or method.startswith(method_name):
completions.append({
'label': method,
'kind': 'method',
'detail': info['signature'],
'documentation': info['doc'],
'insertText': method
})
elif not module_name or any(mod.startswith(module_name or '') for mod in self.vwed_namespace):
# 补全模块名
for module in self.vwed_namespace.keys():
if not module_name or module.startswith(module_name):
completions.append({
'label': module,
'kind': 'module',
'detail': f'VWED.{module}',
'documentation': f'VWED内置{module}模块',
'insertText': module
})
return completions
def _get_keyword_completions(self, prefix: str) -> List[Dict[str, Any]]:
"""获取Python关键字补全"""
return [
{
'label': kw,
'kind': 'keyword',
'detail': 'Python关键字',
'insertText': kw
}
for kw in self.python_keywords if kw.startswith(prefix)
]
def _get_builtin_completions(self, prefix: str) -> List[Dict[str, Any]]:
"""获取Python内置函数补全"""
completions = []
for builtin_name in self.python_builtins:
if builtin_name.startswith(prefix) and not builtin_name.startswith('_'):
builtin_obj = getattr(builtins, builtin_name)
completions.append({
'label': builtin_name,
'kind': 'function' if callable(builtin_obj) else 'variable',
'detail': 'Python内置函数' if callable(builtin_obj) else 'Python内置变量',
'documentation': inspect.getdoc(builtin_obj) or "",
'insertText': builtin_name
})
return completions
def _get_variable_completions(self, prefix: str, variables: List[str]) -> List[Dict[str, Any]]:
"""获取变量补全"""
return [
{
'label': var_name,
'kind': 'variable',
'detail': '局部变量',
'insertText': var_name
}
for var_name in variables if var_name.startswith(prefix)
]
def _deduplicate_and_sort(self, completions: List[Dict[str, Any]], prefix: str) -> List[Dict[str, Any]]:
"""去重并排序补全建议"""
seen = set()
unique_completions = []
for completion in completions:
label = completion['label']
if label not in seen:
seen.add(label)
unique_completions.append(completion)
def sort_key(completion):
label = completion['label']
if label == prefix:
return (0, label)
elif label.startswith(prefix):
return (1, label)
else:
return (2, label)
unique_completions.sort(key=sort_key)
return unique_completions
def _to_lsp_completion_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""转换为LSP补全项格式"""
kind_mapping = {
'method': LSPCompletionItemKind.Method,
'function': LSPCompletionItemKind.Function,
'variable': LSPCompletionItemKind.Variable,
'keyword': LSPCompletionItemKind.Keyword,
'module': LSPCompletionItemKind.Module,
'class': LSPCompletionItemKind.Class
}
return {
"label": item['label'],
"kind": kind_mapping.get(item['kind'], LSPCompletionItemKind.Text),
"detail": item.get('detail'),
"documentation": item.get('documentation'),
"insertText": item.get('insertText', item['label']),
"sortText": f"{item.get('priority', 5):02d}_{item['label']}"
}
def _create_response(self, request_id: Any, result: Any) -> Dict[str, Any]:
"""创建LSP响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def _create_error_response(self, request_id: Any, code: int, message: str) -> Dict[str, Any]:
"""创建LSP错误响应"""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}
# 全局LSP补全服务实例
_lsp_completion_service = VWEDLSPCompletionService()
def get_lsp_completion_service() -> VWEDLSPCompletionService:
"""获取LSP代码补全服务实例"""
return _lsp_completion_service