Spaces:
Runtime error
Runtime error
| <|im_start|>user | |
| You are an expert Code Documentation writer. Your job is to document Python codebases. The Documentation should be in "Goolge Style Python Docstrings". User will give you a function definition and your job is to generate docstring for that function in the said format. | |
| Now I will define a format which you will be following to generate the docstings. For each function definition, you will generate the following: | |
| 1. Description: This section should provide a clear and concise explanation of what the function does. | |
| 2. Arguments: In this section, describe the function's parameters (arguments) and their types. Include both mandatory and optional arguments. For each argument, provide a detailed explanation of its purpose and expected data type. | |
| 3. Returns: If the function returns a value, explain what that value represents and its data type. If the function doesn't return anything (i.e., it has a None return type), mention that explicitly. | |
| 4. Raises: Describe any exceptions or errors that the function may raise during its execution. Specify the conditions under which these exceptions might occur. | |
| However, you must generate the final docstrings in a specific format which you should be able to figure out by looking at the following examples. | |
| Code | |
| def insert_to_database(cls, db_name:str, coll_name:str, data:dict)->Union[InsertOneResult, InsertManyResult]: | |
| con = DBConnection.get_client() | |
| mydb = con[db_name] | |
| mycol = mydb[coll_name] | |
| if isinstance(data, list): | |
| return mycol.insert_many(data) | |
| else: | |
| return mycol.insert_one(data) | |
| """insert a single record or iterable of records to the database. | |
| Args: | |
| db_name (str): name of the database | |
| coll_name (str): name of the collection | |
| data (dict): data to be inserted | |
| Returns: | |
| An instance of class: pymongo.results.InsertOneResult or | |
| pymongo.results.InsertManyResult | |
| """ | |
| Code | |
| def signup(response_result: FrontendResponseModel, data: Union[UserAuth,BulkSignup]): | |
| if isinstance(data, UserAuth): | |
| # querying database to check if user already exist | |
| user = DBQueries.filtered_db_search("Auth", data.role, [], AADHAR=data.AADHAR_NO) | |
| if len(list(user)) != 0: | |
| # user with the entered credentials already exists | |
| raise ExistingUserException(response_result) | |
| DBQueries.insert_to_database("Auth", data.role, userinfo) # saving user to database | |
| response_result['status'] = f'success' | |
| response_result['message'] = [f'User with this AADHAR NO created successfully'] | |
| else: | |
| AADHAR_NOS = data.AADHAR_NOS | |
| passwords = data.passwords | |
| village_name = data.village_name | |
| users = DBQueries.filtered_db_search("Auth", role_manager.user, ["_id","password","village_name"], search_idxs="AADHAR":"$in":AADHAR_NOS) | |
| users = [user["AADHAR"] for user in users] | |
| invalid_users = [] | |
| valid_users = [] | |
| users_created = [] | |
| for user in zip(AADHAR_NOS,passwords): | |
| if user[0] in users: | |
| invalid_users.append(user[0]) | |
| else: | |
| userinfo["AADHAR"] = user[0] | |
| userinfo["password"] = Auth.get_password_hash(user[1]) | |
| valid_users.append(userinfo) | |
| users_created.append(user[0]) | |
| if len(valid_users)!=0: | |
| DBQueries.insert_to_database("Auth", role_manager.user, valid_users) # saving user to database | |
| response_result['status'] = f'success' | |
| response_result['message'] = [f'Users created successfully'] | |
| else: | |
| response_result['status'] = f'failure' | |
| response_result['message'] = [f'No users created'] | |
| """Wrapper method to handle signup process. | |
| Args: | |
| response_result: FrontendResponseModel. A TypedDict to return the | |
| response captured from the API to the frontend. | |
| data: UserAuth. New user's prospective credentials from the frontend | |
| to create their account. | |
| Raises: | |
| ExistingUserException: If account with entered AADHAR Number already exists. | |
| """ | |
| Code | |
| def fetch_individualdata(response_result: dict, db_name: str, AADHAR_No: str)->Cursor[_DocumentType]: | |
| exclude_fields = field_manager.get_form_fields(FormData, exclude=["fam_info"]) | |
| exclude_fields += ["_id","timestamp","volunteer_id"] | |
| indivdata = [docs for docs in DBQueries.filtered_db_search(db_name,collection_names["fam_data"],exclude_fields,search_idxs="fam_info.AADHAR_No":AADHAR_No)] | |
| if len(indivdata) == 0: | |
| raise InfoNotFoundException(response_result, "person with this id does not exist in the database") | |
| fam_members = [doc for doc in indivdata[0]["fam_info"] if doc["AADHAR_No"] == AADHAR_No] | |
| return fam_members[0] | |
| """Wrapper function to fetch individual data from the database. | |
| Args: | |
| response_result (dict): response result to be returned in case of error. | |
| db_name (str): name of the database. | |
| AADHAR_No (str): id of the respondent. | |
| Returns: | |
| Cursor[_DocumentType]: A cursor containing the data of the individual | |
| fetched from the database. | |
| """ | |
| Code | |
| def user_login(tokens: TokenSchema, form_data: UserAuth): | |
| user = DBQueries.filtered_db_search("Auth", form_data.role, ['_id'], AADHAR=form_data.AADHAR_NO) | |
| data = list(user) | |
| if len(data) == 0: | |
| # no such users in the database | |
| raise LoginFailedException(tokens) | |
| if not Auth.verify_password(form_data.password, data[0]['password']) or \ | |
| not Auth.verify_village_name(data[0]['village_name'], form_data.village_name): | |
| # incorrect credentials | |
| raise LoginFailedException(tokens) | |
| # successful login | |
| sub = form_data.AADHAR_NO + "_" + form_data.role + "_" + form_data.village_name | |
| tokens['access_token'] = Auth.create_access_token(sub) | |
| tokens['refresh_token'] = Auth.create_refresh_token(sub) | |
| tokens['status'] = 'login successful' | |
| tokens['role'] = form_data.role | |
| """Wrapper method to handle sign-ins and generating `access_tokens`. | |
| Args: | |
| tokens: TokenSchema. A TypedDict to return `access_token`, | |
| `refresh_access_tokens`, `status`, and `role` | |
| related information to grant genuine users their | |
| respective level of authorization according to | |
| the maintained hierarchy. | |
| form_data: UserAuth. Sign-in credentials entered by the users at the | |
| time of signing in. | |
| Raises: | |
| LoginFailedException: If no user with entered credentials exists. | |
| """ | |
| Code | |
| def token_validation(token: str) -> bool: | |
| try: | |
| payload = jwt.decode( | |
| token, settings.JWT_SECRET_KEY, algorithms=[settings.ALGORITHM] | |
| ) | |
| token_data = TokenPayload(**payload) | |
| if datetime.fromtimestamp(token_data.exp) < datetime.now(): | |
| return False | |
| except(jwt.JWTError, ValidationError): | |
| return False | |
| return True | |
| """Decodes JWTs to check their validity by inspecting expiry and | |
| authorization code. | |
| Args: | |
| token: str. Authenticated `access_token` of the user. | |
| Returns: | |
| bool value to indicate validity of the access tokens. | |
| Raises: | |
| jwt.JWTError: If decode fails. | |
| ValidationError: If JWTs are not in RFC 7519 standard. | |
| """ | |
| Code | |
| {instruction} <|im_end|> | |
| <|im_start|>assistant |