212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
加密工具模块
|
||
提供任务导出数据的加密和解密功能
|
||
"""
|
||
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
from Crypto.Cipher import AES
|
||
from Crypto.Util.Padding import pad, unpad
|
||
import json
|
||
from typing import Dict, Any, Union, List
|
||
import struct
|
||
|
||
from config.settings import settings
|
||
|
||
class CryptoUtils:
|
||
"""加密工具类"""
|
||
|
||
# 文件头魔术数字 - "VWED" 的ASCII码 + 版本号1.0
|
||
MAGIC_HEADER = b'VWED\x01\x00'
|
||
|
||
@staticmethod
|
||
def encrypt_data(data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> bytes:
|
||
"""
|
||
加密数据并添加文件头和校验和
|
||
|
||
Args:
|
||
data: 需要加密的数据,可以是字典或字典列表
|
||
|
||
Returns:
|
||
bytes: 加密后的字节数据
|
||
"""
|
||
# 将数据转换为JSON字符串
|
||
json_data = json.dumps(data, ensure_ascii=False).encode('utf-8')
|
||
|
||
# 计算原始数据的校验和
|
||
checksum = hashlib.sha256(json_data).digest()
|
||
|
||
# 使用配置的加密算法和密钥创建加密器
|
||
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16] # AES密钥必须是16/24/32字节
|
||
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16] # IV必须是16字节
|
||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||
|
||
# 对数据进行填充和加密
|
||
padded_data = pad(json_data, AES.block_size)
|
||
encrypted_data = cipher.encrypt(padded_data)
|
||
|
||
# 创建HMAC签名确保数据完整性和来源
|
||
signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
|
||
|
||
# 构建文件结构: 魔术头(6字节) + 签名长度(2字节) + 签名 + 校验和(32字节) + 加密数据
|
||
file_data = (
|
||
CryptoUtils.MAGIC_HEADER +
|
||
struct.pack("<H", len(signature)) + # 2字节表示签名长度
|
||
signature + # 签名
|
||
checksum + # 原始数据校验和
|
||
encrypted_data # 加密后的数据
|
||
)
|
||
|
||
return file_data
|
||
|
||
@staticmethod
|
||
def decrypt_data(encrypted_file: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
|
||
"""
|
||
解密数据,并验证文件头和校验和
|
||
|
||
Args:
|
||
encrypted_file: 加密的文件数据
|
||
|
||
Returns:
|
||
Dict[str, Any]: 解密后的数据
|
||
|
||
Raises:
|
||
ValueError: 文件格式错误、校验失败等
|
||
"""
|
||
# 验证魔术头
|
||
if not encrypted_file.startswith(CryptoUtils.MAGIC_HEADER):
|
||
# 尝试旧格式解密
|
||
try:
|
||
return CryptoUtils._decrypt_legacy_data(encrypted_file)
|
||
except:
|
||
raise ValueError("文件格式无效或已损坏")
|
||
|
||
# 解析文件结构
|
||
header_size = len(CryptoUtils.MAGIC_HEADER) # 6字节
|
||
sig_size_bytes = encrypted_file[header_size:header_size+2]
|
||
sig_size = struct.unpack("<H", sig_size_bytes)[0] # 获取签名长度
|
||
|
||
signature_start = header_size + 2
|
||
signature = encrypted_file[signature_start:signature_start+sig_size]
|
||
|
||
checksum_start = signature_start + sig_size
|
||
checksum = encrypted_file[checksum_start:checksum_start+32] # SHA256 = 32字节
|
||
|
||
data_start = checksum_start + 32
|
||
encrypted_data = encrypted_file[data_start:]
|
||
|
||
# 验证签名
|
||
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16]
|
||
expected_signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
|
||
if not hmac.compare_digest(signature, expected_signature):
|
||
raise ValueError("文件签名无效,可能被篡改")
|
||
|
||
# 解密数据
|
||
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16]
|
||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||
decrypted_padded = cipher.decrypt(encrypted_data)
|
||
decrypted_data = unpad(decrypted_padded, AES.block_size)
|
||
|
||
# 验证校验和
|
||
calculated_checksum = hashlib.sha256(decrypted_data).digest()
|
||
if not hmac.compare_digest(checksum, calculated_checksum):
|
||
raise ValueError("文件内容校验失败,数据已损坏")
|
||
|
||
# 将JSON字符串转回Python对象
|
||
return json.loads(decrypted_data.decode('utf-8'))
|
||
|
||
@staticmethod
|
||
def _decrypt_legacy_data(encrypted_data: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
|
||
"""处理旧版格式的加密文件(向后兼容)"""
|
||
# Base64解码
|
||
encrypted_bytes = base64.b64decode(encrypted_data)
|
||
|
||
# 创建解密器
|
||
cipher = AES.new(
|
||
settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16],
|
||
AES.MODE_CBC,
|
||
settings.TASK_EXPORT_IV.encode('utf-8')[:16]
|
||
)
|
||
|
||
# 解密并去除填充
|
||
decrypted_data = unpad(cipher.decrypt(encrypted_bytes), AES.block_size)
|
||
|
||
# 将JSON字符串转回Python对象
|
||
return json.loads(decrypted_data.decode('utf-8'))
|
||
|
||
|
||
class JWTParser:
|
||
"""JWT Token 解析工具 - 仅用于解析,不生成"""
|
||
|
||
@staticmethod
|
||
def parse_jwt(token: str) -> Dict[str, Any]:
|
||
"""
|
||
解析JWT Token获取载荷信息
|
||
|
||
Args:
|
||
token: JWT token字符串
|
||
|
||
Returns:
|
||
Dict: 解析后的载荷数据,如果解析失败返回空字典
|
||
"""
|
||
try:
|
||
# 移除Bearer前缀(如果存在)
|
||
token = token.replace('Bearer ', '').strip()
|
||
|
||
# JWT由三部分组成,用点号分隔
|
||
parts = token.split('.')
|
||
if len(parts) != 3:
|
||
return {}
|
||
|
||
# 获取载荷部分(第二部分)
|
||
payload_part = parts[1]
|
||
|
||
# Base64 URL解码
|
||
# 添加必要的填充
|
||
missing_padding = len(payload_part) % 4
|
||
if missing_padding:
|
||
payload_part += '=' * (4 - missing_padding)
|
||
|
||
# 替换URL安全字符
|
||
payload_part = payload_part.replace('-', '+').replace('_', '/')
|
||
|
||
# 解码并解析JSON
|
||
decoded_bytes = base64.b64decode(payload_part)
|
||
payload_data = json.loads(decoded_bytes.decode('utf-8'))
|
||
|
||
return payload_data
|
||
|
||
except Exception:
|
||
return {}
|
||
|
||
@staticmethod
|
||
def get_username_from_jwt(token: str) -> str:
|
||
"""
|
||
从JWT Token中提取用户名
|
||
|
||
Args:
|
||
token: JWT token字符串
|
||
|
||
Returns:
|
||
str: 用户名,如果提取失败返回空字符串
|
||
"""
|
||
payload = JWTParser.parse_jwt(token)
|
||
|
||
# 尝试不同的用户名字段名
|
||
username_fields = ['username', 'user', 'name', 'user_name', 'sub']
|
||
|
||
for field in username_fields:
|
||
if field in payload:
|
||
return str(payload[field])
|
||
|
||
return ""
|
||
|
||
|
||
# jwp_obj = JWTParser()
|
||
# uname = jwp_obj.get_username_from_jwt("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NTgwMjc0MTgsInVzZXJuYW1lIjoiYWRtaW4ifQ.dkjDaUjGIiMRCDYWFXceyEUPS8R2atBpnae_I3EBkQY")
|
||
# print(uname)
|