VWED_server/utils/crypto_utils.py
2025-04-30 16:57:46 +08:00

139 lines
5.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
加密工具模块
提供任务导出数据的加密和解密功能
"""
import base64
import hashlib
import hmac
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import json
from typing import Dict, Any, Union, List
import struct
from config.settings import settings
class CryptoUtils:
"""加密工具类"""
# 文件头魔术数字 - "VWED" 的ASCII码 + 版本号1.0
MAGIC_HEADER = b'VWED\x01\x00'
@staticmethod
def encrypt_data(data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> bytes:
"""
加密数据并添加文件头和校验和
Args:
data: 需要加密的数据,可以是字典或字典列表
Returns:
bytes: 加密后的字节数据
"""
# 将数据转换为JSON字符串
json_data = json.dumps(data, ensure_ascii=False).encode('utf-8')
# 计算原始数据的校验和
checksum = hashlib.sha256(json_data).digest()
# 使用配置的加密算法和密钥创建加密器
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16] # AES密钥必须是16/24/32字节
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16] # IV必须是16字节
cipher = AES.new(key, AES.MODE_CBC, iv)
# 对数据进行填充和加密
padded_data = pad(json_data, AES.block_size)
encrypted_data = cipher.encrypt(padded_data)
# 创建HMAC签名确保数据完整性和来源
signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
# 构建文件结构: 魔术头(6字节) + 签名长度(2字节) + 签名 + 校验和(32字节) + 加密数据
file_data = (
CryptoUtils.MAGIC_HEADER +
struct.pack("<H", len(signature)) + # 2字节表示签名长度
signature + # 签名
checksum + # 原始数据校验和
encrypted_data # 加密后的数据
)
return file_data
@staticmethod
def decrypt_data(encrypted_file: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""
解密数据,并验证文件头和校验和
Args:
encrypted_file: 加密的文件数据
Returns:
Dict[str, Any]: 解密后的数据
Raises:
ValueError: 文件格式错误、校验失败等
"""
# 验证魔术头
if not encrypted_file.startswith(CryptoUtils.MAGIC_HEADER):
# 尝试旧格式解密
try:
return CryptoUtils._decrypt_legacy_data(encrypted_file)
except:
raise ValueError("文件格式无效或已损坏")
# 解析文件结构
header_size = len(CryptoUtils.MAGIC_HEADER) # 6字节
sig_size_bytes = encrypted_file[header_size:header_size+2]
sig_size = struct.unpack("<H", sig_size_bytes)[0] # 获取签名长度
signature_start = header_size + 2
signature = encrypted_file[signature_start:signature_start+sig_size]
checksum_start = signature_start + sig_size
checksum = encrypted_file[checksum_start:checksum_start+32] # SHA256 = 32字节
data_start = checksum_start + 32
encrypted_data = encrypted_file[data_start:]
# 验证签名
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16]
expected_signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
if not hmac.compare_digest(signature, expected_signature):
raise ValueError("文件签名无效,可能被篡改")
# 解密数据
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_padded = cipher.decrypt(encrypted_data)
decrypted_data = unpad(decrypted_padded, AES.block_size)
# 验证校验和
calculated_checksum = hashlib.sha256(decrypted_data).digest()
if not hmac.compare_digest(checksum, calculated_checksum):
raise ValueError("文件内容校验失败,数据已损坏")
# 将JSON字符串转回Python对象
return json.loads(decrypted_data.decode('utf-8'))
@staticmethod
def _decrypt_legacy_data(encrypted_data: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""处理旧版格式的加密文件(向后兼容)"""
# Base64解码
encrypted_bytes = base64.b64decode(encrypted_data)
# 创建解密器
cipher = AES.new(
settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16],
AES.MODE_CBC,
settings.TASK_EXPORT_IV.encode('utf-8')[:16]
)
# 解密并去除填充
decrypted_data = unpad(cipher.decrypt(encrypted_bytes), AES.block_size)
# 将JSON字符串转回Python对象
return json.loads(decrypted_data.decode('utf-8'))