# PyFundaments: A Secure Python Architecture # Copyright 2008-2025 - Volkan Kücükbudak # Apache License V. 2 # Repo: https://github.com/VolkanSah/PyFundaments # encryption.py # A secure and robust encryption module using the cryptography library. # This module is designed as a core component for a CMS architecture. import os import sys import base64 import binascii from typing import Dict, Union, Optional # IMPORTANT: The cryptography library is required for this module. # Please install it with 'pip install cryptography'. try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.exceptions import InvalidTag except ImportError: print("Error: The 'cryptography' library is required. Please install it with 'pip install cryptography'.") sys.exit(1) class Encryption: """ A class for symmetric encryption and decryption using AES-256-GCM. It securely handles both string data and file streaming. This version is designed as a reusable core component for a larger application. """ CIPHER_NAME = 'AES-256-GCM' KEY_LENGTH = 32 # 256 bits NONCE_LENGTH = 12 # 96 bits, standard for GCM TAG_LENGTH = 16 # 128 bits SALT_LENGTH = 16 @staticmethod def generate_salt() -> str: """ Generates a new, cryptographically secure random salt for key derivation. This should be done once during application setup and stored securely. Returns: A hex-encoded string of the salt. """ return binascii.hexlify(os.urandom(Encryption.SALT_LENGTH)).decode('utf-8') def __init__(self, master_key: str, salt: str): """ Initializes the encryption class by deriving a secure key from a master key. The provided master_key and a persistent salt are used for key derivation. Args: master_key: The string to be used as the master key. salt: The hex-encoded string of the persistent salt. Raises: ValueError: If the provided salt is not a valid hex string or has an incorrect length. """ try: salt_bytes = binascii.unhexlify(salt) except binascii.Error: raise ValueError("Invalid salt format. Must be a hex-encoded string.") if len(salt_bytes) != self.SALT_LENGTH: raise ValueError(f"Invalid salt length. Expected {self.SALT_LENGTH} bytes, got {len(salt_bytes)}.") kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=self.KEY_LENGTH, salt=salt_bytes, iterations=480000, # Recommended value for 2023+ backend=default_backend() ) self.key = kdf.derive(master_key.encode('utf-8')) def encrypt(self, data: str) -> Dict[str, str]: """ Encrypts a string using AES-256-GCM. Args: data: The string to be encrypted. Returns: A dictionary containing the base64-encoded ciphertext, hex-encoded IV/nonce, and hex-encoded authentication tag. """ nonce = os.urandom(self.NONCE_LENGTH) aesgcm = Cipher( algorithms.AES(self.key), modes.GCM(nonce), backend=default_backend() ).encryptor() encrypted_data = aesgcm.update(data.encode('utf-8')) + aesgcm.finalize() tag = aesgcm.tag return { 'data': base64.b64encode(encrypted_data).decode('utf-8'), 'nonce': binascii.hexlify(nonce).decode('utf-8'), 'tag': binascii.hexlify(tag).decode('utf-8') } def decrypt(self, encrypted_data: str, nonce: str, tag: str) -> str: """ Decrypts an AES-256-GCM encrypted string. Args: encrypted_data: The base64-encoded ciphertext. nonce: The hex-encoded nonce/IV. tag: The hex-encoded authentication tag. Returns: The decrypted plaintext string. Raises: ValueError: If nonce, tag, or data format is invalid. InvalidTag: If the authentication tag fails validation. """ try: nonce_bytes = binascii.unhexlify(nonce) tag_bytes = binascii.unhexlify(tag) cipher_bytes = base64.b64decode(encrypted_data) except (binascii.Error, ValueError) as e: raise ValueError(f'Invalid data format: {e}') aesgcm = Cipher( algorithms.AES(self.key), modes.GCM(nonce_bytes, tag_bytes), backend=default_backend() ).decryptor() try: decrypted_data = aesgcm.update(cipher_bytes) + aesgcm.finalize() return decrypted_data.decode('utf-8') except InvalidTag: raise InvalidTag("Authentication tag validation failed. Data may be corrupted or key is incorrect.") def encrypt_file(self, source_path: str, destination_path: str) -> Dict[str, str]: """ Encrypts a file using AES-256-GCM with a streaming approach. Args: source_path: Path to the file to be encrypted. destination_path: Path where the encrypted file will be saved. Returns: A dictionary containing the hex-encoded IV/nonce and authentication tag. """ nonce = os.urandom(self.NONCE_LENGTH) encryptor = Cipher( algorithms.AES(self.key), modes.GCM(nonce), backend=default_backend() ).encryptor() try: with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest: fp_dest.write(nonce) chunk_size = 8192 while True: chunk = fp_source.read(chunk_size) if not chunk: break encrypted_chunk = encryptor.update(chunk) fp_dest.write(encrypted_chunk) encryptor.finalize() tag = encryptor.tag fp_dest.write(tag) return { 'nonce': binascii.hexlify(nonce).decode('utf-8'), 'tag': binascii.hexlify(tag).decode('utf-8') } except FileNotFoundError as e: raise ValueError(f"File not found: {e.filename}") from e except Exception as e: raise IOError(f"File encryption failed: {e}") from e def decrypt_file(self, source_path: str, destination_path: str) -> None: """ Decrypts an AES-256-GCM encrypted file. Args: source_path: Path to the encrypted file. destination_path: Path where the decrypted file will be saved. """ try: with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest: nonce = fp_source.read(self.NONCE_LENGTH) if len(nonce) != self.NONCE_LENGTH: raise ValueError("Incomplete or invalid file format: Nonce is missing.") fp_source.seek(-self.TAG_LENGTH, os.SEEK_END) tag = fp_source.read(self.TAG_LENGTH) if len(tag) != self.TAG_LENGTH: raise ValueError("Incomplete or invalid file format: Tag is missing.") fp_source.seek(self.NONCE_LENGTH, os.SEEK_SET) # Rewind to the start of the ciphertext decryptor = Cipher( algorithms.AES(self.key), modes.GCM(nonce, tag), backend=default_backend() ).decryptor() chunk_size = 8192 encrypted_file_size = os.path.getsize(source_path) - self.NONCE_LENGTH - self.TAG_LENGTH bytes_read = 0 while bytes_read < encrypted_file_size: chunk_to_read = min(chunk_size, encrypted_file_size - bytes_read) chunk = fp_source.read(chunk_to_read) decrypted_chunk = decryptor.update(chunk) fp_dest.write(decrypted_chunk) bytes_read += len(chunk) decryptor.finalize() except InvalidTag as e: raise IOError(f"File decryption failed. The authentication tag is invalid, suggesting the file was corrupted or tampered with. Error: {e}") from e except FileNotFoundError as e: raise ValueError(f"File not found: {e.filename}") from e except Exception as e: raise IOError(f"File decryption failed due to an unexpected error: {e}") from e