CandidateExplorer / services /pipelines /PipelineKBProfileExtraction.py
ishaq101's picture
clean init
478dec6
# retrieve data dari table cv_raw, extract profile, insert profile ke table cv_profile_extracted
import json
import asyncio
from typing import ByteString, Dict, List
from services.llms.LLM import model_4o_2
from services.prompts.profile_extraction import extract_one_profile
from externals.databases._pgdb import fetch_data, execute_query
from models.data_model import OutProfile, Union
from utils.utils import pdf_reader, measure_runtime
query_get_cv_raw = "select profile_id, file_content " \
"from public.cv_raw " \
"where is_extracted = false " \
"limit {batch_size};"
query_get_cv_raw_excluded_id = "select profile_id, file_content " \
"from public.cv_raw " \
"where is_extracted = false " \
"and profile_id not in {failed_id}" \
"limit {batch_size};"
query_get_cv_raw_by_id = "select profile_id, file_content " \
"from public.cv_raw " \
"where profile_id = {profile_id};"
query_update_cv_raw = """
UPDATE cv_raw
SET is_extracted = true
WHERE profile_id = {profile_id};
"""
async def retrieve_raw_profiles(batch_size: int = 10, failed_id: List = []):
if failed_id == []:
query = query_get_cv_raw.format(batch_size=batch_size)
else:
query = query_get_cv_raw_excluded_id.format(failed_id=tuple(failed_id), batch_size=batch_size)
data = await fetch_data(query)
return data
async def retrieve_raw_profiles_by_id(id: int):
query = query_get_cv_raw_by_id.format(profile_id=id)
data = await fetch_data(query)
return data
async def update_raw_profile(profile_id: int):
query = query_update_cv_raw.format(profile_id=profile_id)
await execute_query(query)
@measure_runtime
async def extract_profile(file:ByteString):
cv = await pdf_reader(file)
llm = model_4o.with_structured_output(OutProfile)
chain = extract_one_profile | llm
input_chain = {
"cv":cv
}
profile = await chain.ainvoke(input_chain, config=None)
return profile
def sanitize_type(data:Dict):
data_mutated = data.copy()
neutralized_mapper = {
'fullname' : "-",
'high_edu_univ_1' : "-",
'high_edu_major_1' : "-",
'high_edu_gpa_1' : 0,
'high_edu_univ_2' : "-",
'high_edu_major_2' : "-",
'high_edu_gpa_2' : 0,
'high_edu_univ_3' : "-",
'high_edu_major_3' : "-",
'high_edu_gpa_3' : 0,
'domicile' : "-",
'yoe' : 0,
'hardskills' : [],
'softskills' : [],
'certifications' : [],
'business_domain_experiences': []
}
for k, v in data_mutated.items():
if v is None or (type(v) == str and v.lower() in ['null', '']):
data_mutated[k] = neutralized_mapper[k]
return data_mutated
def helper_handle_list_to_text(data:Dict, cols:List):
data_mutated = data.copy()
for col in cols:
if col in data_mutated:
if type(data_mutated[col]) == list and data_mutated[col] != []:
data_mutated[col] = ", ".join([p for p in data_mutated[col]])
elif type(data_mutated[col]) == list and data_mutated[col] == []:
data_mutated[col] = "-"
return data_mutated
query_insert_profile_extracted = """
insert into cv_profile_extracted
("fullname", "profile_id",
"univ_edu_1", "major_edu_1", "gpa_edu_1",
"univ_edu_2", "major_edu_2", "gpa_edu_2",
"univ_edu_3", "major_edu_3", "gpa_edu_3",
"domicile", "yoe",
"hardskills", "softskills", "certifications", "business_domain")
values
('{fullname}', {profile_id},
'{high_edu_univ_1}', '{high_edu_major_1}', {high_edu_gpa_1},
'{high_edu_univ_2}', '{high_edu_major_2}', {high_edu_gpa_2},
'{high_edu_univ_3}', '{high_edu_major_3}', {high_edu_gpa_3},
'{domicile}', {yoe},
'{hardskills}', '{softskills}', '{certifications}', '{business_domain_experiences}');
"""
async def wrap_extract_profile(file:ByteString, profile_id: Union[int, str], failed_id: List = []):
try:
extracted_profile = await extract_profile(file)
extracted_profile = sanitize_type(extracted_profile.model_dump())
extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"])
query = query_insert_profile_extracted.format(
fullname=extracted_profile['fullname'],
profile_id=profile_id,
high_edu_univ_1=extracted_profile['high_edu_univ_1'],
high_edu_major_1=extracted_profile['high_edu_major_1'],
high_edu_gpa_1=extracted_profile['high_edu_gpa_1'],
high_edu_univ_2=extracted_profile['high_edu_univ_2'],
high_edu_major_2=extracted_profile['high_edu_major_2'],
high_edu_gpa_2=extracted_profile['high_edu_gpa_2'],
high_edu_univ_3=extracted_profile['high_edu_univ_3'],
high_edu_major_3=extracted_profile['high_edu_major_3'],
high_edu_gpa_3=extracted_profile['high_edu_gpa_3'],
domicile=extracted_profile['domicile'],
yoe=extracted_profile['yoe'],
hardskills=extracted_profile['hardskills'],
softskills=extracted_profile['softskills'],
certifications=extracted_profile['certifications'],
business_domain_experiences=extracted_profile['business_domain_experiences']
)
await execute_query(query)
# check profile inserted
is_inserted = await fetch_data("select profile_id from cv_profile_extracted where profile_id = {profile_id}".format(profile_id=profile_id))
if is_inserted:
await update_raw_profile(profile_id=profile_id)
print(f"✅ Profile extracted and inserted for profile_id: {profile_id}")
else:
print(f"❌ Profile insertion failed for profile_id: {profile_id}")
except Exception as E:
failed_id.append({profile_id: str(E)})
print(f"❌ wrap_extract_profile error for profile_id: {profile_id}")
# data = asyncio.run(retrieve_raw_profiles_by_id(369))
# profile_id = data[0]["profile_id"]
# file_content = data[0]["file_content"]
# text = asyncio.run(pdf_reader(file_content))
# extracted_profile = asyncio.run(extract_profile(file_content))
# profile = asyncio.run(wrap_extract_profile(file=file_content, profile_id=profile_id))
# extracted_profile = sanitize_type(extracted_profile.model_dump())
# extracted_profile = helper_handle_list_to_text(extracted_profile, cols=["hardskills", "softskills", "certifications", "business_domain_experiences"])
async def KBProfileExtraction(batch_size: int = 10, failed_id: List = []):
try:
raw_profiles = await retrieve_raw_profiles(batch_size=batch_size, failed_id=failed_id)
tasks = []
for raw_profile in raw_profiles:
profile_id = raw_profile["profile_id"]
file_content = raw_profile["file_content"]
task = asyncio.create_task(wrap_extract_profile(file=file_content, profile_id=profile_id, failed_id=failed_id))
tasks.append(task)
extract_profiles = await asyncio.gather(*tasks)
return True
except Exception as E:
print(f"❌ Error extracting profile, {E}")
return False
async def run_rawingest_pipeline():
profile_id_raw = await fetch_data("select distinct profile_id from cv_raw;")
profile_id_raw = [f["profile_id"] for f in profile_id_raw]
profile_id_extracted = await fetch_data("select distinct profile_id from cv_profile_extracted;")
profile_id_extracted = [f["profile_id"] for f in profile_id_extracted]
profile_id_tobe_extracted = [p for p in profile_id_raw if p not in profile_id_extracted]
batch_size = 5
failed_id = []
nloops = (len(profile_id_tobe_extracted) // batch_size) + (1 if len(profile_id_tobe_extracted) % batch_size > 0 else 0)
for _ in range(nloops):
await KBProfileExtraction(batch_size=batch_size, failed_id=failed_id)
with open('failed_id.json', 'w') as fp:
json.dump({"failed_id": failed_id}, fp)
asyncio.run(run_rawingest_pipeline())