Spaces:
Running
Running
File size: 9,129 Bytes
3060aa0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | # PyFundaments: A Secure Python Architecture
# Copyright 2008-2025 - Volkan Kücükbudak
# Apache License V. 2
# Repo: https://github.com/VolkanSah/PyFundaments
# encryption.py
# A secure and robust encryption module using the cryptography library.
# This module is designed as a core component for a CMS architecture.
import os
import sys
import base64
import binascii
from typing import Dict, Union, Optional
# IMPORTANT: The cryptography library is required for this module.
# Please install it with 'pip install cryptography'.
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.exceptions import InvalidTag
except ImportError:
print("Error: The 'cryptography' library is required. Please install it with 'pip install cryptography'.")
sys.exit(1)
class Encryption:
"""
A class for symmetric encryption and decryption using AES-256-GCM.
It securely handles both string data and file streaming.
This version is designed as a reusable core component for a larger application.
"""
CIPHER_NAME = 'AES-256-GCM'
KEY_LENGTH = 32 # 256 bits
NONCE_LENGTH = 12 # 96 bits, standard for GCM
TAG_LENGTH = 16 # 128 bits
SALT_LENGTH = 16
@staticmethod
def generate_salt() -> str:
"""
Generates a new, cryptographically secure random salt for key derivation.
This should be done once during application setup and stored securely.
Returns:
A hex-encoded string of the salt.
"""
return binascii.hexlify(os.urandom(Encryption.SALT_LENGTH)).decode('utf-8')
def __init__(self, master_key: str, salt: str):
"""
Initializes the encryption class by deriving a secure key from a master key.
The provided master_key and a persistent salt are used for key derivation.
Args:
master_key: The string to be used as the master key.
salt: The hex-encoded string of the persistent salt.
Raises:
ValueError: If the provided salt is not a valid hex string or has an incorrect length.
"""
try:
salt_bytes = binascii.unhexlify(salt)
except binascii.Error:
raise ValueError("Invalid salt format. Must be a hex-encoded string.")
if len(salt_bytes) != self.SALT_LENGTH:
raise ValueError(f"Invalid salt length. Expected {self.SALT_LENGTH} bytes, got {len(salt_bytes)}.")
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=self.KEY_LENGTH,
salt=salt_bytes,
iterations=480000, # Recommended value for 2023+
backend=default_backend()
)
self.key = kdf.derive(master_key.encode('utf-8'))
def encrypt(self, data: str) -> Dict[str, str]:
"""
Encrypts a string using AES-256-GCM.
Args:
data: The string to be encrypted.
Returns:
A dictionary containing the base64-encoded ciphertext, hex-encoded IV/nonce,
and hex-encoded authentication tag.
"""
nonce = os.urandom(self.NONCE_LENGTH)
aesgcm = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce),
backend=default_backend()
).encryptor()
encrypted_data = aesgcm.update(data.encode('utf-8')) + aesgcm.finalize()
tag = aesgcm.tag
return {
'data': base64.b64encode(encrypted_data).decode('utf-8'),
'nonce': binascii.hexlify(nonce).decode('utf-8'),
'tag': binascii.hexlify(tag).decode('utf-8')
}
def decrypt(self, encrypted_data: str, nonce: str, tag: str) -> str:
"""
Decrypts an AES-256-GCM encrypted string.
Args:
encrypted_data: The base64-encoded ciphertext.
nonce: The hex-encoded nonce/IV.
tag: The hex-encoded authentication tag.
Returns:
The decrypted plaintext string.
Raises:
ValueError: If nonce, tag, or data format is invalid.
InvalidTag: If the authentication tag fails validation.
"""
try:
nonce_bytes = binascii.unhexlify(nonce)
tag_bytes = binascii.unhexlify(tag)
cipher_bytes = base64.b64decode(encrypted_data)
except (binascii.Error, ValueError) as e:
raise ValueError(f'Invalid data format: {e}')
aesgcm = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce_bytes, tag_bytes),
backend=default_backend()
).decryptor()
try:
decrypted_data = aesgcm.update(cipher_bytes) + aesgcm.finalize()
return decrypted_data.decode('utf-8')
except InvalidTag:
raise InvalidTag("Authentication tag validation failed. Data may be corrupted or key is incorrect.")
def encrypt_file(self, source_path: str, destination_path: str) -> Dict[str, str]:
"""
Encrypts a file using AES-256-GCM with a streaming approach.
Args:
source_path: Path to the file to be encrypted.
destination_path: Path where the encrypted file will be saved.
Returns:
A dictionary containing the hex-encoded IV/nonce and authentication tag.
"""
nonce = os.urandom(self.NONCE_LENGTH)
encryptor = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce),
backend=default_backend()
).encryptor()
try:
with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest:
fp_dest.write(nonce)
chunk_size = 8192
while True:
chunk = fp_source.read(chunk_size)
if not chunk:
break
encrypted_chunk = encryptor.update(chunk)
fp_dest.write(encrypted_chunk)
encryptor.finalize()
tag = encryptor.tag
fp_dest.write(tag)
return {
'nonce': binascii.hexlify(nonce).decode('utf-8'),
'tag': binascii.hexlify(tag).decode('utf-8')
}
except FileNotFoundError as e:
raise ValueError(f"File not found: {e.filename}") from e
except Exception as e:
raise IOError(f"File encryption failed: {e}") from e
def decrypt_file(self, source_path: str, destination_path: str) -> None:
"""
Decrypts an AES-256-GCM encrypted file.
Args:
source_path: Path to the encrypted file.
destination_path: Path where the decrypted file will be saved.
"""
try:
with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest:
nonce = fp_source.read(self.NONCE_LENGTH)
if len(nonce) != self.NONCE_LENGTH:
raise ValueError("Incomplete or invalid file format: Nonce is missing.")
fp_source.seek(-self.TAG_LENGTH, os.SEEK_END)
tag = fp_source.read(self.TAG_LENGTH)
if len(tag) != self.TAG_LENGTH:
raise ValueError("Incomplete or invalid file format: Tag is missing.")
fp_source.seek(self.NONCE_LENGTH, os.SEEK_SET) # Rewind to the start of the ciphertext
decryptor = Cipher(
algorithms.AES(self.key),
modes.GCM(nonce, tag),
backend=default_backend()
).decryptor()
chunk_size = 8192
encrypted_file_size = os.path.getsize(source_path) - self.NONCE_LENGTH - self.TAG_LENGTH
bytes_read = 0
while bytes_read < encrypted_file_size:
chunk_to_read = min(chunk_size, encrypted_file_size - bytes_read)
chunk = fp_source.read(chunk_to_read)
decrypted_chunk = decryptor.update(chunk)
fp_dest.write(decrypted_chunk)
bytes_read += len(chunk)
decryptor.finalize()
except InvalidTag as e:
raise IOError(f"File decryption failed. The authentication tag is invalid, suggesting the file was corrupted or tampered with. Error: {e}") from e
except FileNotFoundError as e:
raise ValueError(f"File not found: {e.filename}") from e
except Exception as e:
raise IOError(f"File decryption failed due to an unexpected error: {e}") from e
|