File size: 9,807 Bytes
3060aa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# PyFundaments: A Secure Python Architecture
# Copyright 2008-2025 - Volkan Kücükbudak
# Apache License V. 2
# Repo: https://github.com/VolkanSah/PyFundaments
# access_control.py
# This is the standalone module for access control.
# It is intended to be imported by main.py or app.py.

import sys
from typing import Optional, List, Dict, Any

# Your foundational PostgreSQL module is imported here.
# IMPORTANT: Ensure 'postgresql.py' is in the same directory or accessible
# via the Python path.
from fundaments import postgresql as db
# local db use: db.execute_secured_query(), db.init_db_pool(), etc. but this fundaments are optimized for clouds


# If asyncpg is not installed, exit the script gracefully.
try:
    import asyncpg
except ImportError:
    print("Error: The 'asyncpg' library is required. Please install it with 'pip install asyncpg'.")
    sys.exit(1)


class AccessControl:
    """
    Asynchronous class for managing access control.
    It builds directly on the functions of your secure database module.
    This is the layer that uses the foundation without modifying it.

    How to extend this class:
    To add new functionality, simply create a new asynchronous method
    within this class. This method can call the `db.execute_secured_query`
    function to interact with the database. For example:

    async def get_user_last_login(self):
        sql = "SELECT last_login FROM users WHERE id = $1"
        result = await db.execute_secured_query(sql, self.user_id, fetch_method='fetchrow')
        return result['last_login'] if result else None
    """
    def __init__(self, user_id: Optional[int] = None):
        self.user_id = user_id

    async def has_permission(self, permission_name: str) -> bool:
        """
        Checks if the user has a specific permission.
        Uses the secure execute_secured_query function from the foundation.
        """
        if self.user_id is None:
            return False

        # SQL Query Explanation:
        # This query counts the number of rows where the user (user_id)
        # has a role (user_role_assignments) that is linked to a permission
        # (role_permissions) which has the specified name (user_permissions.name).
        # If the count is > 0, the user has the permission.
        sql = """
            SELECT COUNT(*) AS count
            FROM user_role_assignments ura
            JOIN role_permissions rp ON ura.role_id = rp.role_id
            JOIN user_permissions up ON rp.permission_id = up.id
            WHERE ura.user_id = $1 AND up.name = $2
        """
        try:
            result = await db.execute_secured_query(
                sql,
                self.user_id,
                permission_name,
                fetch_method='fetchrow'
            )
            return result['count'] > 0
        except Exception as e:
            # Error handling is managed by your postgresql module.
            # We just re-raise the exception here to propagate it.
            raise Exception(f'Failed to check permission: {e}')

    async def get_user_permissions(self) -> List[Dict[str, Any]]:
        """Returns all permissions for a user."""
        if self.user_id is None:
            return []

        # SQL Query Explanation:
        # This query retrieves all distinct permission names and descriptions
        # associated with the user's roles. `DISTINCT` ensures each permission
        # is listed only once.
        sql = """
            SELECT DISTINCT up.name, up.description
            FROM user_role_assignments ura
            JOIN role_permissions rp ON ura.role_id = rp.role_id
            JOIN user_permissions up ON rp.permission_id = up.id
            WHERE ura.user_id = $1
            ORDER BY up.name
        """
        try:
            return await db.execute_secured_query(sql, self.user_id)
        except Exception as e:
            raise Exception(f'Failed to get user permissions: {e}')

    async def get_user_roles(self) -> List[Dict[str, Any]]:
        """Returns all roles for a user."""
        if self.user_id is None:
            return []

        # SQL Query Explanation:
        # This query selects the details (id, name, description) of the roles
        # that are assigned to the specified user (user_id) in the
        # `user_role_assignments` table.
        sql = """
            SELECT r.id, r.name, r.description
            FROM user_role_assignments ura
            JOIN user_roles r ON ura.role_id = r.id
            WHERE ura.user_id = $1
            ORDER BY r.name
        """
        try:
            return await db.execute_secured_query(sql, self.user_id)
        except Exception as e:
            raise Exception(f'Failed to get user roles: {e}')

    async def assign_role(self, role_id: int) -> None:
        """Assigns a role to a user."""
        if self.user_id is None:
            raise Exception('No user specified')

        # SQL Query Explanation:
        # Inserts a new row into the `user_role_assignments` table to create
        # the relationship between a user and a role.
        sql = "INSERT INTO user_role_assignments (user_id, role_id) VALUES ($1, $2)"
        try:
            await db.execute_secured_query(
                sql,
                self.user_id,
                role_id,
                fetch_method='execute'
            )
        except Exception as e:
            raise Exception(f'Failed to assign role: {e}')

    async def remove_role(self, role_id: int) -> None:
        """Removes a role from a user."""
        if self.user_id is None:
            raise Exception('No user specified')

        # SQL Query Explanation:
        # Deletes the row from the `user_role_assignments` table that matches
        # the specified user ($1) and role ($2).
        sql = "DELETE FROM user_role_assignments WHERE user_id = $1 AND role_id = $2"
        try:
            await db.execute_secured_query(
                sql,
                self.user_id,
                role_id,
                fetch_method='execute'
            )
        except Exception as e:
            raise Exception(f'Failed to remove role: {e}')

    async def get_all_roles(self) -> List[Dict[str, Any]]:
        """Returns all available roles."""
        # SQL Query Explanation:
        # Selects all roles from the `user_roles` table.
        sql = "SELECT id, name, description FROM user_roles ORDER BY name"
        try:
            return await db.execute_secured_query(sql)
        except Exception as e:
            raise Exception(f'Failed to get roles: {e}')

    async def get_all_permissions(self) -> List[Dict[str, Any]]:
        """Returns all available permissions."""
        # SQL Query Explanation:
        # Selects all permissions from the `user_permissions` table.
        sql = "SELECT id, name, description FROM user_permissions ORDER BY name"
        try:
            return await db.execute_secured_query(sql)
        except Exception as e:
            raise Exception(f'Failed to get permissions: {e}')

    async def create_role(self, name: str, description: str) -> int:
        """Creates a new role."""
        # SQL Query Explanation:
        # Inserts a new role into the `user_roles` table and returns the
        # automatically generated ID of the new role (`RETURNING id`).
        sql = "INSERT INTO user_roles (name, description) VALUES ($1, $2) RETURNING id"
        try:
            result = await db.execute_secured_query(
                sql,
                name,
                description,
                fetch_method='fetchrow'
            )
            return result['id']
        except Exception as e:
            raise Exception(f'Failed to create role: {e}')

    async def update_role_permissions(self, role_id: int, permission_ids: List[int]) -> None:
        """Updates the permissions for a role."""
        # IMPORTANT: Since your module does not handle transactions across multiple
        # queries, we perform these actions sequentially. Query-level security
        # is guaranteed by your module.
        try:
            # SQL Query Explanation:
            # Deletes all existing permissions for the given role.
            sql_delete = "DELETE FROM role_permissions WHERE role_id = $1"
            await db.execute_secured_query(sql_delete, role_id, fetch_method='execute')

            # SQL Query Explanation:
            # Inserts a new row for each permission_id passed into the
            # `role_permissions` table.
            if permission_ids:
                sql_insert = "INSERT INTO role_permissions (role_id, permission_id) VALUES ($1, $2)"
                for permission_id in permission_ids:
                    await db.execute_secured_query(
                        sql_insert,
                        role_id,
                        permission_id,
                        fetch_method='execute'
                    )
        except Exception as e:
            # If an error occurs, the underlying foundation will log the issue.
            # We re-raise the error here.
            raise Exception(f'Failed to update role permissions: {e}')

    async def get_role_permissions(self, role_id: int) -> List[Dict[str, Any]]:
        """Returns all permissions for a role."""
        # SQL Query Explanation:
        # Selects the details (id, name, description) of all permissions
        # linked to the specified role.
        sql = """
            SELECT p.id, p.name, p.description
            FROM role_permissions rp
            JOIN user_permissions p ON rp.permission_id = p.id
            WHERE rp.role_id = $1
            ORDER BY p.name
        """
        try:
            return await db.execute_secured_query(sql, role_id)
        except Exception as e:
            raise Exception(f'Failed to get role permissions: {e}')