File size: 9,129 Bytes
3060aa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# PyFundaments: A Secure Python Architecture
# Copyright 2008-2025 - Volkan Kücükbudak
# Apache License V. 2
# Repo: https://github.com/VolkanSah/PyFundaments
# encryption.py
# A secure and robust encryption module using the cryptography library.
# This module is designed as a core component for a CMS architecture.

import os
import sys
import base64
import binascii
from typing import Dict, Union, Optional

# IMPORTANT: The cryptography library is required for this module.
# Please install it with 'pip install cryptography'.
try:
    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
    from cryptography.exceptions import InvalidTag
except ImportError:
    print("Error: The 'cryptography' library is required. Please install it with 'pip install cryptography'.")
    sys.exit(1)

class Encryption:
    """
    A class for symmetric encryption and decryption using AES-256-GCM.
    It securely handles both string data and file streaming.
    
    This version is designed as a reusable core component for a larger application.
    """
    CIPHER_NAME = 'AES-256-GCM'
    KEY_LENGTH = 32 # 256 bits
    NONCE_LENGTH = 12 # 96 bits, standard for GCM
    TAG_LENGTH = 16 # 128 bits
    SALT_LENGTH = 16

    @staticmethod
    def generate_salt() -> str:
        """
        Generates a new, cryptographically secure random salt for key derivation.
        This should be done once during application setup and stored securely.
        
        Returns:
            A hex-encoded string of the salt.
        """
        return binascii.hexlify(os.urandom(Encryption.SALT_LENGTH)).decode('utf-8')

    def __init__(self, master_key: str, salt: str):
        """
        Initializes the encryption class by deriving a secure key from a master key.
        The provided master_key and a persistent salt are used for key derivation.
        
        Args:
            master_key: The string to be used as the master key.
            salt: The hex-encoded string of the persistent salt.
        
        Raises:
            ValueError: If the provided salt is not a valid hex string or has an incorrect length.
        """
        try:
            salt_bytes = binascii.unhexlify(salt)
        except binascii.Error:
            raise ValueError("Invalid salt format. Must be a hex-encoded string.")
        
        if len(salt_bytes) != self.SALT_LENGTH:
            raise ValueError(f"Invalid salt length. Expected {self.SALT_LENGTH} bytes, got {len(salt_bytes)}.")
            
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=self.KEY_LENGTH,
            salt=salt_bytes,
            iterations=480000, # Recommended value for 2023+
            backend=default_backend()
        )
        self.key = kdf.derive(master_key.encode('utf-8'))

    def encrypt(self, data: str) -> Dict[str, str]:
        """
        Encrypts a string using AES-256-GCM.
        
        Args:
            data: The string to be encrypted.

        Returns:
            A dictionary containing the base64-encoded ciphertext, hex-encoded IV/nonce,
            and hex-encoded authentication tag.
        """
        nonce = os.urandom(self.NONCE_LENGTH)
        
        aesgcm = Cipher(
            algorithms.AES(self.key),
            modes.GCM(nonce),
            backend=default_backend()
        ).encryptor()
        
        encrypted_data = aesgcm.update(data.encode('utf-8')) + aesgcm.finalize()
        tag = aesgcm.tag

        return {
            'data': base64.b64encode(encrypted_data).decode('utf-8'),
            'nonce': binascii.hexlify(nonce).decode('utf-8'),
            'tag': binascii.hexlify(tag).decode('utf-8')
        }

    def decrypt(self, encrypted_data: str, nonce: str, tag: str) -> str:
        """
        Decrypts an AES-256-GCM encrypted string.
        
        Args:
            encrypted_data: The base64-encoded ciphertext.
            nonce: The hex-encoded nonce/IV.
            tag: The hex-encoded authentication tag.
            
        Returns:
            The decrypted plaintext string.
        
        Raises:
            ValueError: If nonce, tag, or data format is invalid.
            InvalidTag: If the authentication tag fails validation.
        """
        try:
            nonce_bytes = binascii.unhexlify(nonce)
            tag_bytes = binascii.unhexlify(tag)
            cipher_bytes = base64.b64decode(encrypted_data)
        except (binascii.Error, ValueError) as e:
            raise ValueError(f'Invalid data format: {e}')
        
        aesgcm = Cipher(
            algorithms.AES(self.key),
            modes.GCM(nonce_bytes, tag_bytes),
            backend=default_backend()
        ).decryptor()
        
        try:
            decrypted_data = aesgcm.update(cipher_bytes) + aesgcm.finalize()
            return decrypted_data.decode('utf-8')
        except InvalidTag:
            raise InvalidTag("Authentication tag validation failed. Data may be corrupted or key is incorrect.")

    def encrypt_file(self, source_path: str, destination_path: str) -> Dict[str, str]:
        """
        Encrypts a file using AES-256-GCM with a streaming approach.
        
        Args:
            source_path: Path to the file to be encrypted.
            destination_path: Path where the encrypted file will be saved.
            
        Returns:
            A dictionary containing the hex-encoded IV/nonce and authentication tag.
        """
        nonce = os.urandom(self.NONCE_LENGTH)
        
        encryptor = Cipher(
            algorithms.AES(self.key),
            modes.GCM(nonce),
            backend=default_backend()
        ).encryptor()
        
        try:
            with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest:
                fp_dest.write(nonce)
                
                chunk_size = 8192
                while True:
                    chunk = fp_source.read(chunk_size)
                    if not chunk:
                        break
                    encrypted_chunk = encryptor.update(chunk)
                    fp_dest.write(encrypted_chunk)
                
                encryptor.finalize()
                tag = encryptor.tag
                fp_dest.write(tag)
            
            return {
                'nonce': binascii.hexlify(nonce).decode('utf-8'),
                'tag': binascii.hexlify(tag).decode('utf-8')
            }
        except FileNotFoundError as e:
            raise ValueError(f"File not found: {e.filename}") from e
        except Exception as e:
            raise IOError(f"File encryption failed: {e}") from e

    def decrypt_file(self, source_path: str, destination_path: str) -> None:
        """
        Decrypts an AES-256-GCM encrypted file.
        
        Args:
            source_path: Path to the encrypted file.
            destination_path: Path where the decrypted file will be saved.
        """
        try:
            with open(source_path, 'rb') as fp_source, open(destination_path, 'wb') as fp_dest:
                nonce = fp_source.read(self.NONCE_LENGTH)
                if len(nonce) != self.NONCE_LENGTH:
                    raise ValueError("Incomplete or invalid file format: Nonce is missing.")
                    
                fp_source.seek(-self.TAG_LENGTH, os.SEEK_END)
                tag = fp_source.read(self.TAG_LENGTH)
                if len(tag) != self.TAG_LENGTH:
                    raise ValueError("Incomplete or invalid file format: Tag is missing.")
                    
                fp_source.seek(self.NONCE_LENGTH, os.SEEK_SET) # Rewind to the start of the ciphertext
                
                decryptor = Cipher(
                    algorithms.AES(self.key),
                    modes.GCM(nonce, tag),
                    backend=default_backend()
                ).decryptor()
                
                chunk_size = 8192
                encrypted_file_size = os.path.getsize(source_path) - self.NONCE_LENGTH - self.TAG_LENGTH
                
                bytes_read = 0
                while bytes_read < encrypted_file_size:
                    chunk_to_read = min(chunk_size, encrypted_file_size - bytes_read)
                    chunk = fp_source.read(chunk_to_read)
                    
                    decrypted_chunk = decryptor.update(chunk)
                    fp_dest.write(decrypted_chunk)
                    bytes_read += len(chunk)
                
                decryptor.finalize()
                
        except InvalidTag as e:
            raise IOError(f"File decryption failed. The authentication tag is invalid, suggesting the file was corrupted or tampered with. Error: {e}") from e
        except FileNotFoundError as e:
            raise ValueError(f"File not found: {e.filename}") from e
        except Exception as e:
            raise IOError(f"File decryption failed due to an unexpected error: {e}") from e