# deep_dive_agentic.py """ Agentic analytical code generation + execution engine using Hugging Face + IPython FLOW: User Question ↓ LLM generates pandas code ↓ IPython executes code (stateful, notebook-like) ↓ LLM interprets results ↓ Return code + interpretation """ # --------------------------------------------------- # IMPORTS # --------------------------------------------------- import pandas as pd import json import os import re from IPython.core.interactiveshell import InteractiveShell try: from huggingface_hub import InferenceClient except ImportError as exc: raise ImportError( "huggingface_hub is required. Install with `pip install huggingface-hub`." ) from exc from analytics.performance_analysis import generate_metric_view # --------------------------------------------------- # HF CONFIG # --------------------------------------------------- HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") # --------------------------------------------------- # IPYTHON SHELL (STATEFUL EXECUTION ENGINE) # --------------------------------------------------- IPY_SHELL = InteractiveShell.instance() # preload global tools IPY_SHELL.user_ns["pd"] = pd IPY_SHELL.user_ns["generate_metric_view"] = generate_metric_view # --------------------------------------------------- # HELPER: HF CLIENT # --------------------------------------------------- def _get_hf_client(): if not HF_TOKEN: raise RuntimeError("HUGGINGFACE_API_TOKEN is required.") return InferenceClient(token=HF_TOKEN) # --------------------------------------------------- # HELPER: JSON EXTRACTION # --------------------------------------------------- def _extract_json(text: str): match = re.search(r"\{.*\}", text, re.S) if not match: return None payload = match.group(0) try: return json.loads(payload) except json.JSONDecodeError: try: cleaned = re.sub(r"[\n\r]+", " ", payload) cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned) return json.loads(cleaned) except Exception: return None # --------------------------------------------------- # HELPER: PANDAS FIXES # --------------------------------------------------- def _fix_pandas_compatibility(code: str): code = re.sub( r"\.reset_index\(name=(['\"])([^'\"]+)\1\)", r".reset_index(names=[\1\2\1])", code ) code = re.sub( r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)", r".reset_index(names=[\1])", code ) return code # --------------------------------------------------- # STEP 1: REQUIREMENT GENERATION # (UNCHANGED) # --------------------------------------------------- def generate_analysis_requirements(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame): client = _get_hf_client() acq_cols = { "account_id": "unique account identifier", "booking_date": "when account was originated", "booking_vintage": "year-month of origination (YYYY-MM)", "fico_band": "FICO score bracket", "sourcing_channel": "acquisition channel", "city_tier": "city classification", "occupation_type": "borrower occupation category", "credit_limit": "approved credit line amount" } perf_cols = { "account_id": "unique account identifier", "reporting_month": "month of performance observation", "mob": "months on books", "dpd": "days past due", "balance": "outstanding balance", "ncl_amount": "net charge-off amount", "payment": "payment amount" } prompt = ( "You are a senior credit risk analyst.\n" "Return ONLY JSON.\n\n" "User Question:\n" + question ) messages = [ {"role": "system", "content": "Return only valid JSON."}, {"role": "user", "content": prompt} ] response = client.chat.completions.create( model=HF_MODEL_ID, messages=messages, max_tokens=2048, temperature=0.1 ) response_text = response.choices[0].message.content spec = _extract_json(response_text) if not spec: return { "success": False, "requirements": [], "error": response_text[:300] } return { "success": True, "requirements": spec.get("requirements", []) } # --------------------------------------------------- # STEP 2: EXECUTION (IPYTHON BASED) # --------------------------------------------------- def execute_requirement_code(code: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame, requirement_num: int): """ Execute generated pandas code using IPython (stateful + notebook-like). """ try: print(f"\n[DEBUG] Executing requirement {requirement_num} via IPython") # refresh latest data into kernel IPY_SHELL.user_ns["acq"] = acq IPY_SHELL.user_ns["perf"] = perf IPY_SHELL.user_ns["master_df"] = master_df # optional fix code = _fix_pandas_compatibility(code) print("\n[EXECUTED CODE]\n", code) result = IPY_SHELL.run_cell(code, store_history=True) # error handling if result.error_before_exec or result.error_in_exec: err = str(result.error_in_exec or result.error_before_exec) return { "success": False, "result": None, "error": err } ns = IPY_SHELL.user_ns result_key = f"result_{requirement_num}" output = ns.get(result_key, ns.get("final_result", ns.get("result", None))) return { "success": True, "result": output, "error": None } except Exception as e: return { "success": False, "result": None, "error": str(e) } # --------------------------------------------------- # STEP 3: EXECUTE ALL REQUIREMENTS # --------------------------------------------------- def execute_all_requirements(requirements, acq, perf, master_df): all_results = [] context_text = "" for i, req in enumerate(requirements, 1): code = req.get("code", "") title = req.get("title", "") exec_result = execute_requirement_code(code, acq, perf, master_df, i) all_results.append({ "sequence": i, "title": title, "code": code, "execution_success": exec_result["success"], "result": exec_result["result"], "error": exec_result["error"] }) if exec_result["success"]: context_text += f"\n[{title}]\n{exec_result['result']}\n" else: context_text += f"\n[{title} FAILED]\n{exec_result['error']}\n" return all_results, context_text # --------------------------------------------------- # STEP 4: INTERPRETATION (UNCHANGED LOGIC) # --------------------------------------------------- def interpret_all_results(question: str, all_results: list, context_text: str): client = _get_hf_client() prompt = ( "You are a senior credit risk analyst.\n\n" "Question:\n" + question + "\n\n" "Results:\n" + context_text + "\n\n" "Provide insights." ) messages = [ {"role": "system", "content": "You are a senior analyst."}, {"role": "user", "content": prompt} ] response = client.chat.completions.create( model=HF_MODEL_ID, messages=messages, max_tokens=1024, temperature=0.3 ) return response.choices[0].message.content # --------------------------------------------------- # MASTER ORCHESTRATOR # --------------------------------------------------- def run_deep_dive_analysis(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame): print(f"\n[DEEP DIVE START] {question}") req_response = generate_analysis_requirements(question, acq, perf, master_df) if not req_response["success"]: return req_response requirements = req_response["requirements"][:3] all_results, context_text = execute_all_requirements( requirements, acq, perf, master_df ) interpretation = interpret_all_results(question, all_results, context_text) print("\n[DEEP DIVE COMPLETE]\n") return { "success": True, "requirements": requirements, "all_results": all_results, "interpretation": interpretation }