Risk_Manager / analytics /deep_dive_agentic(wip)_using ipythonshell.py
GenAICoder's picture
Rename analytics/deep_dive_agentic(wip).py to analytics/deep_dive_agentic(wip)_using ipythonshell.py
205e627 verified
Raw
History Blame Contribute Delete
8.7 kB
# deep_dive_agentic.py
"""
Agentic analytical code generation + execution engine using Hugging Face + IPython
FLOW:
User Question
LLM generates pandas code
IPython executes code (stateful, notebook-like)
LLM interprets results
Return code + interpretation
"""
# ---------------------------------------------------
# IMPORTS
# ---------------------------------------------------
import pandas as pd
import json
import os
import re
from IPython.core.interactiveshell import InteractiveShell
try:
from huggingface_hub import InferenceClient
except ImportError as exc:
raise ImportError(
"huggingface_hub is required. Install with `pip install huggingface-hub`."
) from exc
from analytics.performance_analysis import generate_metric_view
# ---------------------------------------------------
# HF CONFIG
# ---------------------------------------------------
HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct")
HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
# ---------------------------------------------------
# IPYTHON SHELL (STATEFUL EXECUTION ENGINE)
# ---------------------------------------------------
IPY_SHELL = InteractiveShell.instance()
# preload global tools
IPY_SHELL.user_ns["pd"] = pd
IPY_SHELL.user_ns["generate_metric_view"] = generate_metric_view
# ---------------------------------------------------
# HELPER: HF CLIENT
# ---------------------------------------------------
def _get_hf_client():
if not HF_TOKEN:
raise RuntimeError("HUGGINGFACE_API_TOKEN is required.")
return InferenceClient(token=HF_TOKEN)
# ---------------------------------------------------
# HELPER: JSON EXTRACTION
# ---------------------------------------------------
def _extract_json(text: str):
match = re.search(r"\{.*\}", text, re.S)
if not match:
return None
payload = match.group(0)
try:
return json.loads(payload)
except json.JSONDecodeError:
try:
cleaned = re.sub(r"[\n\r]+", " ", payload)
cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned)
return json.loads(cleaned)
except Exception:
return None
# ---------------------------------------------------
# HELPER: PANDAS FIXES
# ---------------------------------------------------
def _fix_pandas_compatibility(code: str):
code = re.sub(
r"\.reset_index\(name=(['\"])([^'\"]+)\1\)",
r".reset_index(names=[\1\2\1])",
code
)
code = re.sub(
r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)",
r".reset_index(names=[\1])",
code
)
return code
# ---------------------------------------------------
# STEP 1: REQUIREMENT GENERATION
# (UNCHANGED)
# ---------------------------------------------------
def generate_analysis_requirements(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame):
client = _get_hf_client()
acq_cols = {
"account_id": "unique account identifier",
"booking_date": "when account was originated",
"booking_vintage": "year-month of origination (YYYY-MM)",
"fico_band": "FICO score bracket",
"sourcing_channel": "acquisition channel",
"city_tier": "city classification",
"occupation_type": "borrower occupation category",
"credit_limit": "approved credit line amount"
}
perf_cols = {
"account_id": "unique account identifier",
"reporting_month": "month of performance observation",
"mob": "months on books",
"dpd": "days past due",
"balance": "outstanding balance",
"ncl_amount": "net charge-off amount",
"payment": "payment amount"
}
prompt = (
"You are a senior credit risk analyst.\n"
"Return ONLY JSON.\n\n"
"User Question:\n" + question
)
messages = [
{"role": "system", "content": "Return only valid JSON."},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=2048,
temperature=0.1
)
response_text = response.choices[0].message.content
spec = _extract_json(response_text)
if not spec:
return {
"success": False,
"requirements": [],
"error": response_text[:300]
}
return {
"success": True,
"requirements": spec.get("requirements", [])
}
# ---------------------------------------------------
# STEP 2: EXECUTION (IPYTHON BASED)
# ---------------------------------------------------
def execute_requirement_code(code: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame, requirement_num: int):
"""
Execute generated pandas code using IPython (stateful + notebook-like).
"""
try:
print(f"\n[DEBUG] Executing requirement {requirement_num} via IPython")
# refresh latest data into kernel
IPY_SHELL.user_ns["acq"] = acq
IPY_SHELL.user_ns["perf"] = perf
IPY_SHELL.user_ns["master_df"] = master_df
# optional fix
code = _fix_pandas_compatibility(code)
print("\n[EXECUTED CODE]\n", code)
result = IPY_SHELL.run_cell(code, store_history=True)
# error handling
if result.error_before_exec or result.error_in_exec:
err = str(result.error_in_exec or result.error_before_exec)
return {
"success": False,
"result": None,
"error": err
}
ns = IPY_SHELL.user_ns
result_key = f"result_{requirement_num}"
output = ns.get(result_key, ns.get("final_result", ns.get("result", None)))
return {
"success": True,
"result": output,
"error": None
}
except Exception as e:
return {
"success": False,
"result": None,
"error": str(e)
}
# ---------------------------------------------------
# STEP 3: EXECUTE ALL REQUIREMENTS
# ---------------------------------------------------
def execute_all_requirements(requirements, acq, perf, master_df):
all_results = []
context_text = ""
for i, req in enumerate(requirements, 1):
code = req.get("code", "")
title = req.get("title", "")
exec_result = execute_requirement_code(code, acq, perf, master_df, i)
all_results.append({
"sequence": i,
"title": title,
"code": code,
"execution_success": exec_result["success"],
"result": exec_result["result"],
"error": exec_result["error"]
})
if exec_result["success"]:
context_text += f"\n[{title}]\n{exec_result['result']}\n"
else:
context_text += f"\n[{title} FAILED]\n{exec_result['error']}\n"
return all_results, context_text
# ---------------------------------------------------
# STEP 4: INTERPRETATION (UNCHANGED LOGIC)
# ---------------------------------------------------
def interpret_all_results(question: str, all_results: list, context_text: str):
client = _get_hf_client()
prompt = (
"You are a senior credit risk analyst.\n\n"
"Question:\n" + question + "\n\n"
"Results:\n" + context_text + "\n\n"
"Provide insights."
)
messages = [
{"role": "system", "content": "You are a senior analyst."},
{"role": "user", "content": prompt}
]
response = client.chat.completions.create(
model=HF_MODEL_ID,
messages=messages,
max_tokens=1024,
temperature=0.3
)
return response.choices[0].message.content
# ---------------------------------------------------
# MASTER ORCHESTRATOR
# ---------------------------------------------------
def run_deep_dive_analysis(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame):
print(f"\n[DEEP DIVE START] {question}")
req_response = generate_analysis_requirements(question, acq, perf, master_df)
if not req_response["success"]:
return req_response
requirements = req_response["requirements"][:3]
all_results, context_text = execute_all_requirements(
requirements, acq, perf, master_df
)
interpretation = interpret_all_results(question, all_results, context_text)
print("\n[DEEP DIVE COMPLETE]\n")
return {
"success": True,
"requirements": requirements,
"all_results": all_results,
"interpretation": interpretation
}