Spaces:
Running
Running
| # PyFundaments: A Secure Python Architecture | |
| # Copyright 2008-2025 - Volkan Kücükbudak | |
| # Apache License V. 2 | |
| # Repo: https://github.com/VolkanSah/PyFundaments | |
| # root/fundaments/user_handler.py | |
| # A Python module for handling user authentication and session management. | |
| import sqlite3 | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from passlib.hash import pbkdf2_sha256 | |
| import os | |
| class Database: | |
| """ | |
| Handles the SQLite database connection and initialization. | |
| Supports dynamic path selection via environment variables with a | |
| fallback to the local application directory. | |
| """ | |
| def __init__(self, db_name="cms_database.db"): | |
| # 1. Attempt to load the database path from an environment variable | |
| # This allows for flexible configuration in production/Docker environments | |
| env_path = os.getenv("SQLITE_PATH") | |
| if env_path: | |
| # Use the absolute path provided by the environment variable | |
| full_db_path = os.path.abspath(env_path) | |
| else: | |
| # Fallback logic: Locate the 'app' directory relative to this script | |
| # Expected structure: root/fun/user_handler.py -> root/app/ | |
| base_path = os.path.dirname(os.path.abspath(__file__)) | |
| app_dir = os.path.join(base_path, "..", "app") | |
| full_db_path = os.path.join(app_dir, db_name) | |
| # 2. Ensure the target directory exists before attempting to connect | |
| # SQLite can create the file, but not the parent folders. | |
| db_dir = os.path.dirname(full_db_path) | |
| if db_dir and not os.path.exists(db_dir): | |
| os.makedirs(db_dir) | |
| # Initialize the connection and cursor | |
| self.conn = sqlite3.connect(full_db_path) | |
| self.cursor = self.conn.cursor() | |
| # Log the active database path for debugging purposes | |
| print(f"Database connected to: {full_db_path}") | |
| def execute(self, query, params=None): | |
| if params is None: | |
| params = [] | |
| self.cursor.execute(query, params) | |
| self.conn.commit() | |
| def fetchone(self, query, params=None): | |
| if params is None: | |
| params = [] | |
| self.cursor.execute(query, params) | |
| return self.cursor.fetchone() | |
| def fetchall(self, query, params=None): | |
| if params is None: | |
| params = [] | |
| self.cursor.execute(query, params) | |
| return self.cursor.fetchall() | |
| def close(self): | |
| self.conn.close() | |
| def setup_tables(self): | |
| """ | |
| Creates the necessary tables for users and sessions. | |
| """ | |
| self.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| password TEXT NOT NULL, | |
| is_admin INTEGER NOT NULL DEFAULT 0, | |
| account_locked INTEGER NOT NULL DEFAULT 0, | |
| failed_login_attempts INTEGER NOT NULL DEFAULT 0 | |
| ) | |
| """) | |
| self.execute(""" | |
| CREATE TABLE IF NOT EXISTS sessions ( | |
| id TEXT PRIMARY KEY, | |
| user_id INTEGER NOT NULL, | |
| ip_address TEXT, | |
| user_agent TEXT, | |
| last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE | |
| ) | |
| """) | |
| class Security: | |
| """ | |
| Handles secure password hashing and session regeneration. | |
| Using passlib for robust and secure password management. | |
| """ | |
| def hash_password(password: str) -> str: | |
| """Hashes a password using PBKDF2 with SHA256.""" | |
| return pbkdf2_sha256.hash(password) | |
| def verify_password(password: str, hashed_password: str) -> bool: | |
| """Verifies a password against a stored hash.""" | |
| return pbkdf2_sha256.verify(password, hashed_password) | |
| def regenerate_session(session_id: str): | |
| """ | |
| Simulates regenerating a session ID to prevent session fixation. | |
| In a real web framework, this would be a framework-specific function. | |
| """ | |
| print(f"Session regenerated. Old ID: {session_id}") | |
| new_session_id = str(uuid.uuid4()) | |
| print(f"New ID: {new_session_id}") | |
| return new_session_id | |
| class UserHandler: | |
| """ | |
| Handles user login, logout, and session validation. | |
| This class mirrors the logic from the user's PHP User class. | |
| """ | |
| def __init__(self, db: Database): | |
| self.db = db | |
| # A simple in-memory session store for this example | |
| self._session = {} | |
| def login(self, username: str, password: str, request_data: dict) -> bool: | |
| """ | |
| Logs in the user by verifying credentials and storing a new session. | |
| :param username: The user's username. | |
| :param password: The user's plain-text password. | |
| :param request_data: A dictionary containing 'ip_address' and 'user_agent'. | |
| :return: True if login is successful, False otherwise. | |
| """ | |
| try: | |
| # Step 1: Find the user in the database | |
| user_data = self.db.fetchone("SELECT id, username, password, is_admin, account_locked, failed_login_attempts FROM users WHERE username = ?", (username,)) | |
| if user_data is None: | |
| print(f"Login failed: Username '{username}' not found.") | |
| return False | |
| user = { | |
| 'id': user_data[0], | |
| 'username': user_data[1], | |
| 'password': user_data[2], | |
| 'is_admin': user_data[3], | |
| 'account_locked': user_data[4], | |
| 'failed_login_attempts': user_data[5] | |
| } | |
| # Check if account is locked | |
| if user['account_locked'] == 1: | |
| print(f"Login failed: Account for '{username}' is locked.") | |
| return False | |
| # Step 2: Verify the password | |
| if Security.verify_password(password, user['password']): | |
| print(f"Login successful for user: '{username}'") | |
| # Reset failed login attempts on success | |
| self.reset_failed_attempts(username) | |
| # Step 3: Create a new session record in the database | |
| session_id = str(uuid.uuid4()) | |
| ip_address = request_data.get('ip_address', 'unknown') | |
| user_agent = request_data.get('user_agent', 'unknown') | |
| self.db.execute( | |
| "INSERT INTO sessions (id, user_id, ip_address, user_agent) VALUES (?, ?, ?, ?)", | |
| (session_id, user['id'], ip_address, user_agent) | |
| ) | |
| # Step 4: Store session data in the in-memory session (or a session store) | |
| self._session = { | |
| 'session_id': session_id, | |
| 'user_id': user['id'], | |
| 'username': user['username'], | |
| 'is_admin': user['is_admin'] | |
| } | |
| # Security: Regenerate session ID | |
| self._session['session_id'] = Security.regenerate_session(session_id) | |
| return True | |
| else: | |
| print(f"Login failed: Incorrect password for user '{username}'.") | |
| # Increment failed login attempts | |
| self.increment_failed_attempts(username) | |
| return False | |
| except sqlite3.Error as e: | |
| print(f"Database error during login: {e}") | |
| return False | |
| def logout(self) -> bool: | |
| """ | |
| Logs out the current user by deleting the session from the database. | |
| :return: True if logout is successful, False otherwise. | |
| """ | |
| if 'user_id' not in self._session: | |
| print("No active session to log out.") | |
| return False | |
| try: | |
| # Step 1: Delete the session from the database | |
| self.db.execute("DELETE FROM sessions WHERE user_id = ?", (self._session['user_id'],)) | |
| # Step 2: Clear the in-memory session data | |
| self._session.clear() | |
| print("User logged out successfully.") | |
| return True | |
| except sqlite3.Error as e: | |
| print(f"Database error during logout: {e}") | |
| return False | |
| def is_logged_in(self) -> bool: | |
| """ | |
| Checks if the current user is logged in. | |
| :return: True if a valid session exists, False otherwise. | |
| """ | |
| if 'user_id' not in self._session: | |
| return False | |
| try: | |
| # Check for the session in the database | |
| session_data = self.db.fetchone( | |
| "SELECT * FROM sessions WHERE id = ? AND user_id = ?", | |
| (self._session['session_id'], self._session['user_id']) | |
| ) | |
| return session_data is not None | |
| except sqlite3.Error as e: | |
| print(f"Database error during is_logged_in check: {e}") | |
| return False | |
| def is_admin(self) -> bool: | |
| """ | |
| Checks if the logged-in user is an admin. | |
| :return: True if the user is an admin, False otherwise. | |
| """ | |
| return self._session.get('is_admin', 0) == 1 | |
| def validate_session(self, request_data: dict) -> bool: | |
| """ | |
| Validates the current session against IP address and user agent. | |
| :param request_data: A dictionary containing 'ip_address' and 'user_agent'. | |
| :return: True if the session is valid, False otherwise. | |
| """ | |
| if not self.is_logged_in(): | |
| return False | |
| try: | |
| ip_address = request_data.get('ip_address', 'unknown') | |
| user_agent = request_data.get('user_agent', 'unknown') | |
| session_data = self.db.fetchone( | |
| "SELECT * FROM sessions WHERE id = ? AND user_id = ? AND ip_address = ? AND user_agent = ?", | |
| (self._session['session_id'], self._session['user_id'], ip_address, user_agent) | |
| ) | |
| return session_data is not None | |
| except sqlite3.Error as e: | |
| print(f"Database error during session validation: {e}") | |
| return False | |
| def lock_account(self, username: str): | |
| """ | |
| Locks a user account. | |
| :param username: The username of the account to lock. | |
| """ | |
| try: | |
| self.db.execute("UPDATE users SET account_locked = 1 WHERE username = ?", (username,)) | |
| print(f"Account for '{username}' has been locked.") | |
| except sqlite3.Error as e: | |
| print(f"Database error while locking account: {e}") | |
| def reset_failed_attempts(self, username: str): | |
| """ | |
| Resets failed login attempts for a user. | |
| :param username: The username of the account. | |
| """ | |
| try: | |
| self.db.execute("UPDATE users SET failed_login_attempts = 0 WHERE username = ?", (username,)) | |
| except sqlite3.Error as e: | |
| print(f"Database error while resetting failed attempts: {e}") | |
| def increment_failed_attempts(self, username: str): | |
| """ | |
| Increments failed login attempts and locks the account if a threshold is met. | |
| :param username: The username of the account. | |
| """ | |
| try: | |
| # Get the current failed attempts | |
| user_data = self.db.fetchone("SELECT failed_login_attempts FROM users WHERE username = ?", (username,)) | |
| if user_data: | |
| attempts = user_data[0] + 1 | |
| self.db.execute( | |
| "UPDATE users SET failed_login_attempts = ? WHERE username = ?", | |
| (attempts, username) | |
| ) | |
| print(f"Failed login attempts for '{username}': {attempts}") | |
| # Check for threshold (e.g., 5 attempts) | |
| if attempts >= 5: | |
| self.lock_account(username) | |
| except sqlite3.Error as e: | |
| print(f"Database error while incrementing failed attempts: {e}") | |
| # --- Example Usage --- | |
| if __name__ == "__main__": | |
| db = Database() | |
| db.setup_tables() | |
| user_handler = UserHandler(db) | |
| # Clean up old test data if it exists | |
| db.execute("DELETE FROM users WHERE username IN (?, ?)", ("testuser", "adminuser")) | |
| db.execute("DELETE FROM sessions") | |
| # 1. Register a new user and an admin user | |
| hashed_password = Security.hash_password("secure_password_123") | |
| db.execute("INSERT INTO users (username, password) VALUES (?, ?)", ("testuser", hashed_password)) | |
| db.execute("INSERT INTO users (username, password, is_admin) VALUES (?, ?, 1)", ("adminuser", hashed_password)) | |
| print("--- Test 1: Successful Login ---") | |
| # Simulate a web request | |
| request_data = { | |
| 'ip_address': '192.168.1.100', | |
| 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| } | |
| login_success = user_handler.login("testuser", "secure_password_123", request_data) | |
| print(f"Login attempt status: {login_success}") | |
| print(f"Is user logged in? {user_handler.is_logged_in()}") | |
| print(f"Is user an admin? {user_handler.is_admin()}") | |
| print(f"Is session valid? {user_handler.validate_session(request_data)}") | |
| print("-" * 20) | |
| # 2. Simulate a logout | |
| print("--- Test 2: Logout ---") | |
| user_handler.logout() | |
| print(f"Is user logged in after logout? {user_handler.is_logged_in()}") | |
| print("-" * 20) | |
| # 3. Simulate a failed login and account lock | |
| print("--- Test 3: Failed Login and Account Lock ---") | |
| # Log in with the wrong password multiple times | |
| for i in range(6): | |
| user_handler.login("testuser", "wrong_password", request_data) | |
| # Now, try to log in with the correct password. It should fail because the account is locked. | |
| print("\nAttempting to log in with correct password after lock:") | |
| login_attempt_after_lock = user_handler.login("testuser", "secure_password_123", request_data) | |
| print(f"Login attempt status: {login_attempt_after_lock}") | |
| print("-" * 20) | |
| # 4. Reset failed attempts for a new login | |
| print("--- Test 4: Resetting failed attempts ---") | |
| user_handler.reset_failed_attempts("testuser") | |
| login_attempt_after_reset = user_handler.login("testuser", "secure_password_123", request_data) | |
| print(f"Login attempt status after reset: {login_attempt_after_reset}") | |
| db.close() | |
| # Optional: Clean up the database file after the run | |
| # OLD | |
| # os.remove("cms_database.db") | |
| # NEW : Clean up the database file after the run: | |
| # os.remove(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "app", "cms_database.db")) |