VWED_server/routes/lsp_websocket_api.py

299 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
LSP WebSocket API路由
为脚本编辑器提供基于WebSocket的Language Server Protocol支持
"""
import json
import asyncio
from typing import Dict, Any, Optional
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from utils.logger import get_logger
from services.online_script.lsp_completion_service import get_lsp_completion_service
logger = get_logger("routes.lsp_websocket")
router = APIRouter(prefix="/lsp", tags=["LSP WebSocket"])
class LSPConnectionManager:
"""LSP WebSocket连接管理器"""
def __init__(self):
self.active_connections: Dict[str, Dict[str, Any]] = {}
self.lsp_service = get_lsp_completion_service()
async def connect(self, websocket: WebSocket, client_id: str):
"""建立WebSocket连接"""
await websocket.accept()
self.active_connections[client_id] = {
'websocket': websocket,
'documents': {}, # 存储文档状态
'initialized': False
}
logger.info(f"LSP客户端连接: {client_id}")
def disconnect(self, client_id: str):
"""断开WebSocket连接"""
if client_id in self.active_connections:
del self.active_connections[client_id]
logger.info(f"LSP客户端断开: {client_id}")
async def send_message(self, client_id: str, message: Dict[str, Any]):
"""发送消息给指定客户端"""
if client_id in self.active_connections:
websocket = self.active_connections[client_id]['websocket']
try:
await websocket.send_text(json.dumps(message, ensure_ascii=False))
except Exception as e:
logger.error(f"发送LSP消息失败 {client_id}: {e}")
self.disconnect(client_id)
async def handle_message(self, client_id: str, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理LSP消息"""
try:
method = message.get('method')
params = message.get('params', {})
# 更新文档状态
if method == 'textDocument/didOpen':
await self._handle_did_open(client_id, params)
logger.info(f"客户端 {client_id} 打开文档")
elif method == 'textDocument/didChange':
await self._handle_did_change(client_id, params)
elif method == 'textDocument/didClose':
await self._handle_did_close(client_id, params)
# 为需要文档内容的请求添加文档文本
if method in ['textDocument/completion', 'textDocument/signatureHelp', 'textDocument/hover']:
text_document = params.get('textDocument', {})
uri = text_document.get('uri')
logger.debug(f"处理 {method} 请求, URI: {uri}, 客户端: {client_id}")
if uri and client_id in self.active_connections:
documents = self.active_connections[client_id]['documents']
logger.debug(f"当前文档列表: {list(documents.keys())}")
if uri in documents:
# 将文档内容添加到context中
if 'context' not in params:
params['context'] = {}
params['context']['documentText'] = documents[uri]['text']
message['params'] = params
logger.debug(f"已添加文档内容,长度: {len(documents[uri]['text'])}")
else:
logger.warning(f"未找到文档 {uri},当前已打开文档: {list(documents.keys())}")
# 提供空文档内容,避免补全完全失败
if 'context' not in params:
params['context'] = {}
params['context']['documentText'] = ''
message['params'] = params
else:
logger.warning(f"客户端 {client_id} 不存在或URI为空")
# 使用LSP服务处理消息
response = self.lsp_service.handle_lsp_message(message)
return response
except Exception as e:
logger.error(f"处理LSP消息失败 {client_id}: {e}", exc_info=True)
return {
"jsonrpc": "2.0",
"id": message.get('id'),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def _handle_did_open(self, client_id: str, params: Dict[str, Any]):
"""处理文档打开"""
text_document = params.get('textDocument', {})
uri = text_document.get('uri')
text = text_document.get('text', '')
version = text_document.get('version', 1)
if uri and client_id in self.active_connections:
self.active_connections[client_id]['documents'][uri] = {
'text': text,
'version': version
}
logger.debug(f"文档已打开: {uri}")
async def _handle_did_change(self, client_id: str, params: Dict[str, Any]):
"""处理文档变更"""
text_document = params.get('textDocument', {})
uri = text_document.get('uri')
version = text_document.get('version')
content_changes = params.get('contentChanges', [])
if uri and client_id in self.active_connections:
documents = self.active_connections[client_id]['documents']
if uri in documents:
# 应用文档变更
for change in content_changes:
if 'range' not in change:
# 全文更新
documents[uri]['text'] = change.get('text', '')
logger.debug(f"全文更新: {uri}")
else:
# 增量更新 - 正确实现
documents[uri]['text'] = self._apply_text_edit(
documents[uri]['text'],
change.get('range'),
change.get('text', '')
)
logger.debug(f"增量更新: {uri}, range: {change.get('range')}, text: {change.get('text', '')}")
documents[uri]['version'] = version
logger.debug(f"文档已更新: {uri}, 版本: {version}, 内容预览: {documents[uri]['text'][:100]}")
async def _handle_did_close(self, client_id: str, params: Dict[str, Any]):
"""处理文档关闭"""
text_document = params.get('textDocument', {})
uri = text_document.get('uri')
if uri and client_id in self.active_connections:
documents = self.active_connections[client_id]['documents']
if uri in documents:
del documents[uri]
logger.debug(f"文档已关闭: {uri}")
def _apply_text_edit(self, text: str, range_info: Dict[str, Any], new_text: str) -> str:
"""
应用文本编辑到文档
Args:
text: 原始文档文本
range_info: LSP Range对象包含start和end位置
new_text: 要插入的新文本
Returns:
更新后的文档文本
"""
if not range_info:
return new_text
start = range_info.get('start', {})
end = range_info.get('end', {})
start_line = start.get('line', 0)
start_char = start.get('character', 0)
end_line = end.get('line', 0)
end_char = end.get('character', 0)
# 将文档分割成行
lines = text.split('\n')
# 确保行数足够
while len(lines) <= max(start_line, end_line):
lines.append('')
# 获取起始行和结束行
start_line_text = lines[start_line] if start_line < len(lines) else ''
end_line_text = lines[end_line] if end_line < len(lines) else ''
# 构建新的文本
# 保留起始行的前半部分
before = start_line_text[:start_char]
# 保留结束行的后半部分
after = end_line_text[end_char:]
# 新文本
new_content = before + new_text + after
# 构建新的行数组
if start_line == end_line:
# 单行编辑
lines[start_line] = new_content
else:
# 多行编辑
new_lines = new_content.split('\n')
lines = lines[:start_line] + new_lines + lines[end_line + 1:]
return '\n'.join(lines)
# 全局连接管理器
lsp_manager = LSPConnectionManager()
@router.websocket("/code-completion/{client_id}")
async def lsp_websocket_endpoint(websocket: WebSocket, client_id: str):
"""LSP WebSocket端点"""
await lsp_manager.connect(websocket, client_id)
try:
while True:
# 接收消息
data = await websocket.receive_text()
try:
message = json.loads(data)
logger.debug(f"接收到LSP消息 {client_id}: {message.get('method', 'unknown')}")
# 处理消息
response = await lsp_manager.handle_message(client_id, message)
# 发送响应
if response:
await lsp_manager.send_message(client_id, response)
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败 {client_id}: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
await lsp_manager.send_message(client_id, error_response)
except Exception as e:
logger.error(f"处理消息失败 {client_id}: {e}", exc_info=True)
except WebSocketDisconnect:
logger.info(f"LSP客户端断开连接: {client_id}")
except Exception as e:
logger.error(f"WebSocket连接异常 {client_id}: {e}")
finally:
lsp_manager.disconnect(client_id)
@router.get("/health")
async def lsp_health_check():
"""LSP服务健康检查"""
return {
"status": "healthy",
"service": "VWED LSP Code Completion",
"active_connections": len(lsp_manager.active_connections),
"capabilities": [
"textDocument/completion",
"textDocument/signatureHelp",
"textDocument/hover",
"textDocument/didOpen",
"textDocument/didChange",
"textDocument/didClose"
]
}
@router.get("/clients")
async def get_active_clients():
"""获取活跃的LSP客户端"""
clients = []
for client_id, connection in lsp_manager.active_connections.items():
clients.append({
"client_id": client_id,
"initialized": connection.get('initialized', False),
"document_count": len(connection.get('documents', {})),
"documents": list(connection.get('documents', {}).keys())
})
return {
"active_clients": clients,
"total_count": len(clients)
}