# retrieve data dari table cv_raw, extract profile, insert profile ke table cv_profile_extracted import json import asyncio from typing import ByteString, Dict, List from services.llms.LLM import model_4o_2 from services.prompts.profile_extraction import extract_one_profile from externals.databases._pgdb import fetch_data, execute_query from models.data_model import OutProfile, Union from utils.utils import pdf_reader, measure_runtime query_get_cv_raw = "select profile_id, file_content " \ "from public.cv_raw " \ "where is_extracted = false " \ "limit {batch_size};" query_get_cv_raw_excluded_id = "select profile_id, file_content " \ "from public.cv_raw " \ "where is_extracted = false " \ "and profile_id not in {failed_id}" \ "limit {batch_size};" query_get_cv_raw_by_id = "select profile_id, file_content " \ "from public.cv_raw " \ "where profile_id = {profile_id};" query_update_cv_raw = """ UPDATE cv_raw SET is_extracted = true WHERE profile_id = {profile_id}; """ async def retrieve_raw_profiles(batch_size: int = 10, failed_id: List = []): if failed_id == []: query = query_get_cv_raw.format(batch_size=batch_size) else: query = query_get_cv_raw_excluded_id.format(failed_id=tuple(failed_id), batch_size=batch_size) data = await fetch_data(query) return data async def retrieve_raw_profiles_by_id(id: int): query = query_get_cv_raw_by_id.format(profile_id=id) data = await fetch_data(query) return data async def update_raw_profile(profile_id: int): query = query_update_cv_raw.format(profile_id=profile_id) await execute_query(query) @measure_runtime async def extract_profile(file:ByteString): cv = await pdf_reader(file) llm = model_4o.with_structured_output(OutProfile) chain = extract_one_profile | llm input_chain = { "cv":cv } profile = await chain.ainvoke(input_chain, config=None) return profile def sanitize_type(data:Dict): data_mutated = data.copy() neutralized_mapper = { 'fullname' : "-", 'high_edu_univ_1' : "-", 'high_edu_major_1' : "-", 'high_edu_gpa_1' : 0, 'high_edu_univ_2' : "-", 'high_edu_major_2' : "-", 'high_edu_gpa_2' : 0, 'high_edu_univ_3' : "-", 'high_edu_major_3' : "-", 'high_edu_gpa_3' : 0, 'domicile' : "-", 'yoe' : 0, 'hardskills' : [], 'softskills' : [], 'certifications' : [], 'business_domain_experiences': [] } for k, v in data_mutated.items(): if v is None or (type(v) == str and v.lower() in ['null', '']): data_mutated[k] = neutralized_mapper[k] return data_mutated def helper_handle_list_to_text(data:Dict, cols:List): data_mutated = data.copy() for col in cols: if col in data_mutated: if type(data_mutated[col]) == list and data_mutated[col] != []: data_mutated[col] = ", ".join([p for p in data_mutated[col]]) elif type(data_mutated[col]) == list and data_mutated[col] == []: data_mutated[col] = "-" return data_mutated query_insert_profile_extracted = """ insert into cv_profile_extracted ("fullname", "profile_id", "univ_edu_1", "major_edu_1", "gpa_edu_1", "univ_edu_2", "major_edu_2", "gpa_edu_2", "univ_edu_3", "major_edu_3", "gpa_edu_3", "domicile", "yoe", "hardskills", "softskills", "certifications", "business_domain") values ('{fullname}', {profile_id}, '{high_edu_univ_1}', '{high_edu_major_1}', {high_edu_gpa_1}, '{high_edu_univ_2}', '{high_edu_major_2}', {high_edu_gpa_2}, '{high_edu_univ_3}', '{high_edu_major_3}', {high_edu_gpa_3}, '{domicile}', {yoe}, '{hardskills}', '{softskills}', '{certifications}', '{business_domain_experiences}'); """ async def wrap_extract_profile(file:ByteString, profile_id: Union[int, str], failed_id: List = []): try: extracted_profile = await extract_profile(file) extracted_profile = sanitize_type(extracted_profile.model_dump()) extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"]) query = query_insert_profile_extracted.format( fullname=extracted_profile['fullname'], profile_id=profile_id, high_edu_univ_1=extracted_profile['high_edu_univ_1'], high_edu_major_1=extracted_profile['high_edu_major_1'], high_edu_gpa_1=extracted_profile['high_edu_gpa_1'], high_edu_univ_2=extracted_profile['high_edu_univ_2'], high_edu_major_2=extracted_profile['high_edu_major_2'], high_edu_gpa_2=extracted_profile['high_edu_gpa_2'], high_edu_univ_3=extracted_profile['high_edu_univ_3'], high_edu_major_3=extracted_profile['high_edu_major_3'], high_edu_gpa_3=extracted_profile['high_edu_gpa_3'], domicile=extracted_profile['domicile'], yoe=extracted_profile['yoe'], hardskills=extracted_profile['hardskills'], softskills=extracted_profile['softskills'], certifications=extracted_profile['certifications'], business_domain_experiences=extracted_profile['business_domain_experiences'] ) await execute_query(query) # check profile inserted is_inserted = await fetch_data("select profile_id from cv_profile_extracted where profile_id = {profile_id}".format(profile_id=profile_id)) if is_inserted: await update_raw_profile(profile_id=profile_id) print(f"✅ Profile extracted and inserted for profile_id: {profile_id}") else: print(f"❌ Profile insertion failed for profile_id: {profile_id}") except Exception as E: failed_id.append({profile_id: str(E)}) print(f"❌ wrap_extract_profile error for profile_id: {profile_id}") # data = asyncio.run(retrieve_raw_profiles_by_id(369)) # profile_id = data[0]["profile_id"] # file_content = data[0]["file_content"] # text = asyncio.run(pdf_reader(file_content)) # extracted_profile = asyncio.run(extract_profile(file_content)) # profile = asyncio.run(wrap_extract_profile(file=file_content, profile_id=profile_id)) # extracted_profile = sanitize_type(extracted_profile.model_dump()) # extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"]) async def KBProfileExtraction(batch_size: int = 10, failed_id: List = []): try: raw_profiles = await retrieve_raw_profiles(batch_size=batch_size, failed_id=failed_id) tasks = [] for raw_profile in raw_profiles: profile_id = raw_profile["profile_id"] file_content = raw_profile["file_content"] task = asyncio.create_task(wrap_extract_profile(file=file_content, profile_id=profile_id, failed_id=failed_id)) tasks.append(task) extract_profiles = await asyncio.gather(*tasks) return True except Exception as E: print(f"❌ Error extracting profile, {E}") return False async def run_rawingest_pipeline(): profile_id_raw = await fetch_data("select distinct profile_id from cv_raw;") profile_id_raw = [f["profile_id"] for f in profile_id_raw] profile_id_extracted = await fetch_data("select distinct profile_id from cv_profile_extracted;") profile_id_extracted = [f["profile_id"] for f in profile_id_extracted] profile_id_tobe_extracted = [p for p in profile_id_raw if p not in profile_id_extracted] batch_size = 5 failed_id = [] nloops = (len(profile_id_tobe_extracted) // batch_size) + (1 if len(profile_id_tobe_extracted) % batch_size > 0 else 0) for _ in range(nloops): await KBProfileExtraction(batch_size=batch_size, failed_id=failed_id) with open('failed_id.json', 'w') as fp: json.dump({"failed_id": failed_id}, fp) asyncio.run(run_rawingest_pipeline())