139 lines
5.1 KiB
Python
139 lines
5.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
加密工具模块
|
|
提供任务导出数据的加密和解密功能
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad, unpad
|
|
import json
|
|
from typing import Dict, Any, Union, List
|
|
import struct
|
|
|
|
from config.settings import settings
|
|
|
|
class CryptoUtils:
|
|
"""加密工具类"""
|
|
|
|
# 文件头魔术数字 - "VWED" 的ASCII码 + 版本号1.0
|
|
MAGIC_HEADER = b'VWED\x01\x00'
|
|
|
|
@staticmethod
|
|
def encrypt_data(data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> bytes:
|
|
"""
|
|
加密数据并添加文件头和校验和
|
|
|
|
Args:
|
|
data: 需要加密的数据,可以是字典或字典列表
|
|
|
|
Returns:
|
|
bytes: 加密后的字节数据
|
|
"""
|
|
# 将数据转换为JSON字符串
|
|
json_data = json.dumps(data, ensure_ascii=False).encode('utf-8')
|
|
|
|
# 计算原始数据的校验和
|
|
checksum = hashlib.sha256(json_data).digest()
|
|
|
|
# 使用配置的加密算法和密钥创建加密器
|
|
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16] # AES密钥必须是16/24/32字节
|
|
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16] # IV必须是16字节
|
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
|
|
|
# 对数据进行填充和加密
|
|
padded_data = pad(json_data, AES.block_size)
|
|
encrypted_data = cipher.encrypt(padded_data)
|
|
|
|
# 创建HMAC签名确保数据完整性和来源
|
|
signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
|
|
|
|
# 构建文件结构: 魔术头(6字节) + 签名长度(2字节) + 签名 + 校验和(32字节) + 加密数据
|
|
file_data = (
|
|
CryptoUtils.MAGIC_HEADER +
|
|
struct.pack("<H", len(signature)) + # 2字节表示签名长度
|
|
signature + # 签名
|
|
checksum + # 原始数据校验和
|
|
encrypted_data # 加密后的数据
|
|
)
|
|
|
|
return file_data
|
|
|
|
@staticmethod
|
|
def decrypt_data(encrypted_file: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
|
|
"""
|
|
解密数据,并验证文件头和校验和
|
|
|
|
Args:
|
|
encrypted_file: 加密的文件数据
|
|
|
|
Returns:
|
|
Dict[str, Any]: 解密后的数据
|
|
|
|
Raises:
|
|
ValueError: 文件格式错误、校验失败等
|
|
"""
|
|
# 验证魔术头
|
|
if not encrypted_file.startswith(CryptoUtils.MAGIC_HEADER):
|
|
# 尝试旧格式解密
|
|
try:
|
|
return CryptoUtils._decrypt_legacy_data(encrypted_file)
|
|
except:
|
|
raise ValueError("文件格式无效或已损坏")
|
|
|
|
# 解析文件结构
|
|
header_size = len(CryptoUtils.MAGIC_HEADER) # 6字节
|
|
sig_size_bytes = encrypted_file[header_size:header_size+2]
|
|
sig_size = struct.unpack("<H", sig_size_bytes)[0] # 获取签名长度
|
|
|
|
signature_start = header_size + 2
|
|
signature = encrypted_file[signature_start:signature_start+sig_size]
|
|
|
|
checksum_start = signature_start + sig_size
|
|
checksum = encrypted_file[checksum_start:checksum_start+32] # SHA256 = 32字节
|
|
|
|
data_start = checksum_start + 32
|
|
encrypted_data = encrypted_file[data_start:]
|
|
|
|
# 验证签名
|
|
key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16]
|
|
expected_signature = hmac.new(key, encrypted_data, hashlib.sha256).digest()
|
|
if not hmac.compare_digest(signature, expected_signature):
|
|
raise ValueError("文件签名无效,可能被篡改")
|
|
|
|
# 解密数据
|
|
iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16]
|
|
cipher = AES.new(key, AES.MODE_CBC, iv)
|
|
decrypted_padded = cipher.decrypt(encrypted_data)
|
|
decrypted_data = unpad(decrypted_padded, AES.block_size)
|
|
|
|
# 验证校验和
|
|
calculated_checksum = hashlib.sha256(decrypted_data).digest()
|
|
if not hmac.compare_digest(checksum, calculated_checksum):
|
|
raise ValueError("文件内容校验失败,数据已损坏")
|
|
|
|
# 将JSON字符串转回Python对象
|
|
return json.loads(decrypted_data.decode('utf-8'))
|
|
|
|
@staticmethod
|
|
def _decrypt_legacy_data(encrypted_data: bytes) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
|
|
"""处理旧版格式的加密文件(向后兼容)"""
|
|
# Base64解码
|
|
encrypted_bytes = base64.b64decode(encrypted_data)
|
|
|
|
# 创建解密器
|
|
cipher = AES.new(
|
|
settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16],
|
|
AES.MODE_CBC,
|
|
settings.TASK_EXPORT_IV.encode('utf-8')[:16]
|
|
)
|
|
|
|
# 解密并去除填充
|
|
decrypted_data = unpad(cipher.decrypt(encrypted_bytes), AES.block_size)
|
|
|
|
# 将JSON字符串转回Python对象
|
|
return json.loads(decrypted_data.decode('utf-8')) |