| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import sys |
| import base64 |
| import binascii |
| from typing import Dict, Union, Optional |
|
|
| |
| |
| try: |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
| from cryptography.hazmat.primitives import hashes |
| from cryptography.hazmat.backends import default_backend |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC |
| from cryptography.exceptions import InvalidTag |
| except ImportError: |
| print("Error: The 'cryptography' library is required. Please install it with 'pip install cryptography'.") |
| sys.exit(1) |
|
|
| class Encryption: |
| """ |
| A class for symmetric encryption and decryption using AES-256-GCM. |
| It securely handles both string data and file streaming. |
| |
| This version is designed as a reusable core component for a larger application. |
| """ |
| CIPHER_NAME = 'AES-256-GCM' |
| KEY_LENGTH = 32 |
| NONCE_LENGTH = 12 |
| TAG_LENGTH = 16 |
| SALT_LENGTH = 16 |
|
|
| @staticmethod |
| def generate_salt() -> str: |
| """ |
| Generates a new, cryptographically secure random salt for key derivation. |
| This should be done once during application setup and stored securely. |
| |
| Returns: |
| A hex-encoded string of the salt. |
| """ |
| return binascii.hexlify(os.urandom(Encryption.SALT_LENGTH)).decode('utf-8') |
|
|
| def __init__(self, master_key: str, salt: str): |
| """ |
| Initializes the encryption class by deriving a secure key from a master key. |
| The provided master_key and a persistent salt are used for key derivation. |
| |
| Args: |
| master_key: The string to be used as the master key. |
| salt: The hex-encoded string of the persistent salt. |
| |
| Raises: |
| ValueError: If the provided salt is not a valid hex string or has an incorrect length. |
| """ |
| try: |
| salt_bytes = binascii.unhexlify(salt) |
| except binascii.Error: |
| raise ValueError("Invalid salt format. Must be a hex-encoded string.") |
| |
| if len(salt_bytes) != self.SALT_LENGTH: |
| raise ValueError(f"Invalid salt length. Expected {self.SALT_LENGTH} bytes, got {len(salt_bytes)}.") |
| |
| kdf = PBKDF2HMAC( |
| algorithm=hashes.SHA256(), |
| length=self.KEY_LENGTH, |
| salt=salt_bytes, |
| iterations=480000, |
| backend=default_backend() |
| ) |
| self.key = kdf.derive(master_key.encode('utf-8')) |
|
|
| def encrypt(self, data: str) -> Dict[str, str]: |
| """ |
| Encrypts a string using AES-256-GCM. |
| |
| Args: |
| data: The string to be encrypted. |
| |
| Returns: |
| A dictionary containing the base64-encoded ciphertext, hex-encoded IV/nonce, |
| and hex-encoded authentication tag. |
| """ |
| nonce = os.urandom(self.NONCE_LENGTH) |
| |
| aesgcm = Cipher( |
| algorithms.AES(self.key), |
| modes.GCM(nonce), |
| backend=default_backend() |
| ).encryptor() |
| |
| encrypted_data = aesgcm.update(data.encode('utf-8')) + aesgcm.finalize() |
| tag = aesgcm.tag |
|
|
| return { |
| 'data': base64.b64encode(encrypted_data).decode('utf-8'), |
| 'nonce': binascii.hexlify(nonce).decode('utf-8'), |
| 'tag': binascii.hexlify(tag).decode('utf-8') |
| } |
|
|
| def decrypt(self, encrypted_data: str, nonce: str, tag: str) -> str: |
| """ |
| Decrypts an AES-256-GCM encrypted string. |
| |
| Args: |
| encrypted_data: The base64-encoded ciphertext. |
| nonce: The hex-encoded nonce/IV. |
| tag: The hex-encoded authentication tag. |
| |
| Returns: |
| The decrypted plaintext string. |
| |
| Raises: |
| ValueError: If nonce, tag, or data format is invalid. |
| InvalidTag: If the authentication tag fails validation. |
| """ |
| try: |
| nonce_bytes = binascii.unhexlify(nonce) |
| tag_bytes = binascii.unhexlify(tag) |
| cipher_bytes = base64.b64decode(encrypted_data) |
| except (binascii.Error, ValueError) as e: |
| raise ValueError(f'Invalid data format: {e}') |
| |
| aesgcm = Cipher( |
| algorithms.AES(self.key), |
| modes.GCM(nonce_bytes, tag_bytes), |
| backend=default_backend() |
| ).decryptor() |
| |
| try: |
| decrypted_data = aesgcm.update(cipher_bytes) + aesgcm.finalize() |
| return decrypted_data.decode('utf-8') |
| except InvalidTag: |
| raise InvalidTag("Authentication tag validation failed. Data may be corrupted or key is incorrect.") |
|
|
| def encrypt_file(self, source_path: str, destination_path: str) -> Dict[str, str]: |
| """ |
| Encrypts a file using AES-256-GCM with a streaming approach. |
| |
| Args: |
| source_path: Path to the file to be encrypted. |
| destination_path: Path where the encrypted file will be saved. |
| |
| Returns: |
| A dictionary containing the hex-encoded IV/nonce and authentication tag. |
| """ |
| nonce = os.urandom(self.NONCE_LENGTH) |
| |
| encryptor = Cipher( |
| algorithms.AES(self.key), |
| modes.GCM(nonce), |
| backend=default_backend() |
| ).encryptor() |
| |
| try: |
| with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest: |
| fp_dest.write(nonce) |
| |
| chunk_size = 8192 |
| while True: |
| chunk = fp_source.read(chunk_size) |
| if not chunk: |
| break |
| encrypted_chunk = encryptor.update(chunk) |
| fp_dest.write(encrypted_chunk) |
| |
| encryptor.finalize() |
| tag = encryptor.tag |
| fp_dest.write(tag) |
| |
| return { |
| 'nonce': binascii.hexlify(nonce).decode('utf-8'), |
| 'tag': binascii.hexlify(tag).decode('utf-8') |
| } |
| except FileNotFoundError as e: |
| raise ValueError(f"File not found: {e.filename}") from e |
| except Exception as e: |
| raise IOError(f"File encryption failed: {e}") from e |
|
|
| def decrypt_file(self, source_path: str, destination_path: str) -> None: |
| """ |
| Decrypts an AES-256-GCM encrypted file. |
| |
| Args: |
| source_path: Path to the encrypted file. |
| destination_path: Path where the decrypted file will be saved. |
| """ |
| try: |
| with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest: |
| nonce = fp_source.read(self.NONCE_LENGTH) |
| if len(nonce) != self.NONCE_LENGTH: |
| raise ValueError("Incomplete or invalid file format: Nonce is missing.") |
| |
| fp_source.seek(-self.TAG_LENGTH, os.SEEK_END) |
| tag = fp_source.read(self.TAG_LENGTH) |
| if len(tag) != self.TAG_LENGTH: |
| raise ValueError("Incomplete or invalid file format: Tag is missing.") |
| |
| fp_source.seek(self.NONCE_LENGTH, os.SEEK_SET) |
| |
| decryptor = Cipher( |
| algorithms.AES(self.key), |
| modes.GCM(nonce, tag), |
| backend=default_backend() |
| ).decryptor() |
| |
| chunk_size = 8192 |
| encrypted_file_size = os.path.getsize(source_path) - self.NONCE_LENGTH - self.TAG_LENGTH |
| |
| bytes_read = 0 |
| while bytes_read < encrypted_file_size: |
| chunk_to_read = min(chunk_size, encrypted_file_size - bytes_read) |
| chunk = fp_source.read(chunk_to_read) |
| |
| decrypted_chunk = decryptor.update(chunk) |
| fp_dest.write(decrypted_chunk) |
| bytes_read += len(chunk) |
| |
| decryptor.finalize() |
| |
| except InvalidTag as e: |
| raise IOError(f"File decryption failed. The authentication tag is invalid, suggesting the file was corrupted or tampered with. Error: {e}") from e |
| except FileNotFoundError as e: |
| raise ValueError(f"File not found: {e.filename}") from e |
| except Exception as e: |
| raise IOError(f"File decryption failed due to an unexpected error: {e}") from e |
|
|