#!/usr/bin/env python # -*- coding: utf-8 -*- """ 加密工具模块 提供任务导出数据的加密和解密功能 """ import base64 import hashlib import hmac from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad import json from typing import Dict, Any, Union, List import struct from config.settings import settings class CryptoUtils: """加密工具类""" # 文件头魔术数字 - "VWED" 的ASCII码 + 版本号1.0 MAGIC_HEADER = b'VWED\x01\x00' @staticmethod def encrypt_data(data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> bytes: """ 加密数据并添加文件头和校验和 Args: data: 需要加密的数据,可以是字典或字典列表 Returns: bytes: 加密后的字节数据 """ # 将数据转换为JSON字符串 json_data = json.dumps(data, ensure_ascii=False).encode('utf-8') # 计算原始数据的校验和 checksum = hashlib.sha256(json_data).digest() # 使用配置的加密算法和密钥创建加密器 key = settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16] # AES密钥必须是16/24/32字节 iv = settings.TASK_EXPORT_IV.encode('utf-8')[:16] # IV必须是16字节 cipher = AES.new(key, AES.MODE_CBC, iv) # 对数据进行填充和加密 padded_data = pad(json_data, AES.block_size) encrypted_data = cipher.encrypt(padded_data) # 创建HMAC签名确保数据完整性和来源 signature = hmac.new(key, encrypted_data, hashlib.sha256).digest() # 构建文件结构: 魔术头(6字节) + 签名长度(2字节) + 签名 + 校验和(32字节) + 加密数据 file_data = ( CryptoUtils.MAGIC_HEADER + struct.pack(" Union[Dict[str, Any], List[Dict[str, Any]]]: """ 解密数据,并验证文件头和校验和 Args: encrypted_file: 加密的文件数据 Returns: Dict[str, Any]: 解密后的数据 Raises: ValueError: 文件格式错误、校验失败等 """ # 验证魔术头 if not encrypted_file.startswith(CryptoUtils.MAGIC_HEADER): # 尝试旧格式解密 try: return CryptoUtils._decrypt_legacy_data(encrypted_file) except: raise ValueError("文件格式无效或已损坏") # 解析文件结构 header_size = len(CryptoUtils.MAGIC_HEADER) # 6字节 sig_size_bytes = encrypted_file[header_size:header_size+2] sig_size = struct.unpack(" Union[Dict[str, Any], List[Dict[str, Any]]]: """处理旧版格式的加密文件(向后兼容)""" # Base64解码 encrypted_bytes = base64.b64decode(encrypted_data) # 创建解密器 cipher = AES.new( settings.TASK_EXPORT_ENCRYPTION_KEY.encode('utf-8')[:16], AES.MODE_CBC, settings.TASK_EXPORT_IV.encode('utf-8')[:16] ) # 解密并去除填充 decrypted_data = unpad(cipher.decrypt(encrypted_bytes), AES.block_size) # 将JSON字符串转回Python对象 return json.loads(decrypted_data.decode('utf-8'))