Spaces:
Sleeping
Sleeping
| # retrieve data dari table cv_raw, extract profile, insert profile ke table cv_profile_extracted | |
| import json | |
| import asyncio | |
| from typing import ByteString, Dict, List | |
| from services.llms.LLM import model_4o_2 | |
| from services.prompts.profile_extraction import extract_one_profile | |
| from externals.databases._pgdb import fetch_data, execute_query | |
| from models.data_model import OutProfile, Union | |
| from utils.utils import pdf_reader, measure_runtime | |
| query_get_cv_raw = "select profile_id, file_content " \ | |
| "from public.cv_raw " \ | |
| "where is_extracted = false " \ | |
| "limit {batch_size};" | |
| query_get_cv_raw_excluded_id = "select profile_id, file_content " \ | |
| "from public.cv_raw " \ | |
| "where is_extracted = false " \ | |
| "and profile_id not in {failed_id}" \ | |
| "limit {batch_size};" | |
| query_get_cv_raw_by_id = "select profile_id, file_content " \ | |
| "from public.cv_raw " \ | |
| "where profile_id = {profile_id};" | |
| query_update_cv_raw = """ | |
| UPDATE cv_raw | |
| SET is_extracted = true | |
| WHERE profile_id = {profile_id}; | |
| """ | |
| async def retrieve_raw_profiles(batch_size: int = 10, failed_id: List = []): | |
| if failed_id == []: | |
| query = query_get_cv_raw.format(batch_size=batch_size) | |
| else: | |
| query = query_get_cv_raw_excluded_id.format(failed_id=tuple(failed_id), batch_size=batch_size) | |
| data = await fetch_data(query) | |
| return data | |
| async def retrieve_raw_profiles_by_id(id: int): | |
| query = query_get_cv_raw_by_id.format(profile_id=id) | |
| data = await fetch_data(query) | |
| return data | |
| async def update_raw_profile(profile_id: int): | |
| query = query_update_cv_raw.format(profile_id=profile_id) | |
| await execute_query(query) | |
| async def extract_profile(file:ByteString): | |
| cv = await pdf_reader(file) | |
| llm = model_4o.with_structured_output(OutProfile) | |
| chain = extract_one_profile | llm | |
| input_chain = { | |
| "cv":cv | |
| } | |
| profile = await chain.ainvoke(input_chain, config=None) | |
| return profile | |
| def sanitize_type(data:Dict): | |
| data_mutated = data.copy() | |
| neutralized_mapper = { | |
| 'fullname' : "-", | |
| 'high_edu_univ_1' : "-", | |
| 'high_edu_major_1' : "-", | |
| 'high_edu_gpa_1' : 0, | |
| 'high_edu_univ_2' : "-", | |
| 'high_edu_major_2' : "-", | |
| 'high_edu_gpa_2' : 0, | |
| 'high_edu_univ_3' : "-", | |
| 'high_edu_major_3' : "-", | |
| 'high_edu_gpa_3' : 0, | |
| 'domicile' : "-", | |
| 'yoe' : 0, | |
| 'hardskills' : [], | |
| 'softskills' : [], | |
| 'certifications' : [], | |
| 'business_domain_experiences': [] | |
| } | |
| for k, v in data_mutated.items(): | |
| if v is None or (type(v) == str and v.lower() in ['null', '']): | |
| data_mutated[k] = neutralized_mapper[k] | |
| return data_mutated | |
| def helper_handle_list_to_text(data:Dict, cols:List): | |
| data_mutated = data.copy() | |
| for col in cols: | |
| if col in data_mutated: | |
| if type(data_mutated[col]) == list and data_mutated[col] != []: | |
| data_mutated[col] = ", ".join([p for p in data_mutated[col]]) | |
| elif type(data_mutated[col]) == list and data_mutated[col] == []: | |
| data_mutated[col] = "-" | |
| return data_mutated | |
| query_insert_profile_extracted = """ | |
| insert into cv_profile_extracted | |
| ("fullname", "profile_id", | |
| "univ_edu_1", "major_edu_1", "gpa_edu_1", | |
| "univ_edu_2", "major_edu_2", "gpa_edu_2", | |
| "univ_edu_3", "major_edu_3", "gpa_edu_3", | |
| "domicile", "yoe", | |
| "hardskills", "softskills", "certifications", "business_domain") | |
| values | |
| ('{fullname}', {profile_id}, | |
| '{high_edu_univ_1}', '{high_edu_major_1}', {high_edu_gpa_1}, | |
| '{high_edu_univ_2}', '{high_edu_major_2}', {high_edu_gpa_2}, | |
| '{high_edu_univ_3}', '{high_edu_major_3}', {high_edu_gpa_3}, | |
| '{domicile}', {yoe}, | |
| '{hardskills}', '{softskills}', '{certifications}', '{business_domain_experiences}'); | |
| """ | |
| async def wrap_extract_profile(file:ByteString, profile_id: Union[int, str], failed_id: List = []): | |
| try: | |
| extracted_profile = await extract_profile(file) | |
| extracted_profile = sanitize_type(extracted_profile.model_dump()) | |
| extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"]) | |
| query = query_insert_profile_extracted.format( | |
| fullname=extracted_profile['fullname'], | |
| profile_id=profile_id, | |
| high_edu_univ_1=extracted_profile['high_edu_univ_1'], | |
| high_edu_major_1=extracted_profile['high_edu_major_1'], | |
| high_edu_gpa_1=extracted_profile['high_edu_gpa_1'], | |
| high_edu_univ_2=extracted_profile['high_edu_univ_2'], | |
| high_edu_major_2=extracted_profile['high_edu_major_2'], | |
| high_edu_gpa_2=extracted_profile['high_edu_gpa_2'], | |
| high_edu_univ_3=extracted_profile['high_edu_univ_3'], | |
| high_edu_major_3=extracted_profile['high_edu_major_3'], | |
| high_edu_gpa_3=extracted_profile['high_edu_gpa_3'], | |
| domicile=extracted_profile['domicile'], | |
| yoe=extracted_profile['yoe'], | |
| hardskills=extracted_profile['hardskills'], | |
| softskills=extracted_profile['softskills'], | |
| certifications=extracted_profile['certifications'], | |
| business_domain_experiences=extracted_profile['business_domain_experiences'] | |
| ) | |
| await execute_query(query) | |
| # check profile inserted | |
| is_inserted = await fetch_data("select profile_id from cv_profile_extracted where profile_id = {profile_id}".format(profile_id=profile_id)) | |
| if is_inserted: | |
| await update_raw_profile(profile_id=profile_id) | |
| print(f"✅ Profile extracted and inserted for profile_id: {profile_id}") | |
| else: | |
| print(f"❌ Profile insertion failed for profile_id: {profile_id}") | |
| except Exception as E: | |
| failed_id.append({profile_id: str(E)}) | |
| print(f"❌ wrap_extract_profile error for profile_id: {profile_id}") | |
| # data = asyncio.run(retrieve_raw_profiles_by_id(369)) | |
| # profile_id = data[0]["profile_id"] | |
| # file_content = data[0]["file_content"] | |
| # text = asyncio.run(pdf_reader(file_content)) | |
| # extracted_profile = asyncio.run(extract_profile(file_content)) | |
| # profile = asyncio.run(wrap_extract_profile(file=file_content, profile_id=profile_id)) | |
| # extracted_profile = sanitize_type(extracted_profile.model_dump()) | |
| # extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"]) | |
| async def KBProfileExtraction(batch_size: int = 10, failed_id: List = []): | |
| try: | |
| raw_profiles = await retrieve_raw_profiles(batch_size=batch_size, failed_id=failed_id) | |
| tasks = [] | |
| for raw_profile in raw_profiles: | |
| profile_id = raw_profile["profile_id"] | |
| file_content = raw_profile["file_content"] | |
| task = asyncio.create_task(wrap_extract_profile(file=file_content, profile_id=profile_id, failed_id=failed_id)) | |
| tasks.append(task) | |
| extract_profiles = await asyncio.gather(*tasks) | |
| return True | |
| except Exception as E: | |
| print(f"❌ Error extracting profile, {E}") | |
| return False | |
| async def run_rawingest_pipeline(): | |
| profile_id_raw = await fetch_data("select distinct profile_id from cv_raw;") | |
| profile_id_raw = [f["profile_id"] for f in profile_id_raw] | |
| profile_id_extracted = await fetch_data("select distinct profile_id from cv_profile_extracted;") | |
| profile_id_extracted = [f["profile_id"] for f in profile_id_extracted] | |
| profile_id_tobe_extracted = [p for p in profile_id_raw if p not in profile_id_extracted] | |
| batch_size = 5 | |
| failed_id = [] | |
| nloops = (len(profile_id_tobe_extracted) // batch_size) + (1 if len(profile_id_tobe_extracted) % batch_size > 0 else 0) | |
| for _ in range(nloops): | |
| await KBProfileExtraction(batch_size=batch_size, failed_id=failed_id) | |
| with open('failed_id.json', 'w') as fp: | |
| json.dump({"failed_id": failed_id}, fp) | |
| asyncio.run(run_rawingest_pipeline()) |