Spaces:
Sleeping
Sleeping
Rename analytics/deep_dive_agentic(wip).py to analytics/deep_dive_agentic(wip)_using ipythonshell.py
205e627 verified | # deep_dive_agentic.py | |
| """ | |
| Agentic analytical code generation + execution engine using Hugging Face + IPython | |
| FLOW: | |
| User Question | |
| ↓ | |
| LLM generates pandas code | |
| ↓ | |
| IPython executes code (stateful, notebook-like) | |
| ↓ | |
| LLM interprets results | |
| ↓ | |
| Return code + interpretation | |
| """ | |
| # --------------------------------------------------- | |
| # IMPORTS | |
| # --------------------------------------------------- | |
| import pandas as pd | |
| import json | |
| import os | |
| import re | |
| from IPython.core.interactiveshell import InteractiveShell | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except ImportError as exc: | |
| raise ImportError( | |
| "huggingface_hub is required. Install with `pip install huggingface-hub`." | |
| ) from exc | |
| from analytics.performance_analysis import generate_metric_view | |
| # --------------------------------------------------- | |
| # HF CONFIG | |
| # --------------------------------------------------- | |
| HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "Qwen/Qwen2.5-7B-Instruct") | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN") | |
| # --------------------------------------------------- | |
| # IPYTHON SHELL (STATEFUL EXECUTION ENGINE) | |
| # --------------------------------------------------- | |
| IPY_SHELL = InteractiveShell.instance() | |
| # preload global tools | |
| IPY_SHELL.user_ns["pd"] = pd | |
| IPY_SHELL.user_ns["generate_metric_view"] = generate_metric_view | |
| # --------------------------------------------------- | |
| # HELPER: HF CLIENT | |
| # --------------------------------------------------- | |
| def _get_hf_client(): | |
| if not HF_TOKEN: | |
| raise RuntimeError("HUGGINGFACE_API_TOKEN is required.") | |
| return InferenceClient(token=HF_TOKEN) | |
| # --------------------------------------------------- | |
| # HELPER: JSON EXTRACTION | |
| # --------------------------------------------------- | |
| def _extract_json(text: str): | |
| match = re.search(r"\{.*\}", text, re.S) | |
| if not match: | |
| return None | |
| payload = match.group(0) | |
| try: | |
| return json.loads(payload) | |
| except json.JSONDecodeError: | |
| try: | |
| cleaned = re.sub(r"[\n\r]+", " ", payload) | |
| cleaned = re.sub(r"(['\"])?([a-zA-Z0-9_]+)(['\"])?\s*:\s*", r'"\2": ', cleaned) | |
| return json.loads(cleaned) | |
| except Exception: | |
| return None | |
| # --------------------------------------------------- | |
| # HELPER: PANDAS FIXES | |
| # --------------------------------------------------- | |
| def _fix_pandas_compatibility(code: str): | |
| code = re.sub( | |
| r"\.reset_index\(name=(['\"])([^'\"]+)\1\)", | |
| r".reset_index(names=[\1\2\1])", | |
| code | |
| ) | |
| code = re.sub( | |
| r"\.reset_index\(name=([a-zA-Z_][a-zA-Z0-9_]*)\)", | |
| r".reset_index(names=[\1])", | |
| code | |
| ) | |
| return code | |
| # --------------------------------------------------- | |
| # STEP 1: REQUIREMENT GENERATION | |
| # (UNCHANGED) | |
| # --------------------------------------------------- | |
| def generate_analysis_requirements(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame): | |
| client = _get_hf_client() | |
| acq_cols = { | |
| "account_id": "unique account identifier", | |
| "booking_date": "when account was originated", | |
| "booking_vintage": "year-month of origination (YYYY-MM)", | |
| "fico_band": "FICO score bracket", | |
| "sourcing_channel": "acquisition channel", | |
| "city_tier": "city classification", | |
| "occupation_type": "borrower occupation category", | |
| "credit_limit": "approved credit line amount" | |
| } | |
| perf_cols = { | |
| "account_id": "unique account identifier", | |
| "reporting_month": "month of performance observation", | |
| "mob": "months on books", | |
| "dpd": "days past due", | |
| "balance": "outstanding balance", | |
| "ncl_amount": "net charge-off amount", | |
| "payment": "payment amount" | |
| } | |
| prompt = ( | |
| "You are a senior credit risk analyst.\n" | |
| "Return ONLY JSON.\n\n" | |
| "User Question:\n" + question | |
| ) | |
| messages = [ | |
| {"role": "system", "content": "Return only valid JSON."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, | |
| max_tokens=2048, | |
| temperature=0.1 | |
| ) | |
| response_text = response.choices[0].message.content | |
| spec = _extract_json(response_text) | |
| if not spec: | |
| return { | |
| "success": False, | |
| "requirements": [], | |
| "error": response_text[:300] | |
| } | |
| return { | |
| "success": True, | |
| "requirements": spec.get("requirements", []) | |
| } | |
| # --------------------------------------------------- | |
| # STEP 2: EXECUTION (IPYTHON BASED) | |
| # --------------------------------------------------- | |
| def execute_requirement_code(code: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame, requirement_num: int): | |
| """ | |
| Execute generated pandas code using IPython (stateful + notebook-like). | |
| """ | |
| try: | |
| print(f"\n[DEBUG] Executing requirement {requirement_num} via IPython") | |
| # refresh latest data into kernel | |
| IPY_SHELL.user_ns["acq"] = acq | |
| IPY_SHELL.user_ns["perf"] = perf | |
| IPY_SHELL.user_ns["master_df"] = master_df | |
| # optional fix | |
| code = _fix_pandas_compatibility(code) | |
| print("\n[EXECUTED CODE]\n", code) | |
| result = IPY_SHELL.run_cell(code, store_history=True) | |
| # error handling | |
| if result.error_before_exec or result.error_in_exec: | |
| err = str(result.error_in_exec or result.error_before_exec) | |
| return { | |
| "success": False, | |
| "result": None, | |
| "error": err | |
| } | |
| ns = IPY_SHELL.user_ns | |
| result_key = f"result_{requirement_num}" | |
| output = ns.get(result_key, ns.get("final_result", ns.get("result", None))) | |
| return { | |
| "success": True, | |
| "result": output, | |
| "error": None | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "result": None, | |
| "error": str(e) | |
| } | |
| # --------------------------------------------------- | |
| # STEP 3: EXECUTE ALL REQUIREMENTS | |
| # --------------------------------------------------- | |
| def execute_all_requirements(requirements, acq, perf, master_df): | |
| all_results = [] | |
| context_text = "" | |
| for i, req in enumerate(requirements, 1): | |
| code = req.get("code", "") | |
| title = req.get("title", "") | |
| exec_result = execute_requirement_code(code, acq, perf, master_df, i) | |
| all_results.append({ | |
| "sequence": i, | |
| "title": title, | |
| "code": code, | |
| "execution_success": exec_result["success"], | |
| "result": exec_result["result"], | |
| "error": exec_result["error"] | |
| }) | |
| if exec_result["success"]: | |
| context_text += f"\n[{title}]\n{exec_result['result']}\n" | |
| else: | |
| context_text += f"\n[{title} FAILED]\n{exec_result['error']}\n" | |
| return all_results, context_text | |
| # --------------------------------------------------- | |
| # STEP 4: INTERPRETATION (UNCHANGED LOGIC) | |
| # --------------------------------------------------- | |
| def interpret_all_results(question: str, all_results: list, context_text: str): | |
| client = _get_hf_client() | |
| prompt = ( | |
| "You are a senior credit risk analyst.\n\n" | |
| "Question:\n" + question + "\n\n" | |
| "Results:\n" + context_text + "\n\n" | |
| "Provide insights." | |
| ) | |
| messages = [ | |
| {"role": "system", "content": "You are a senior analyst."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = client.chat.completions.create( | |
| model=HF_MODEL_ID, | |
| messages=messages, | |
| max_tokens=1024, | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content | |
| # --------------------------------------------------- | |
| # MASTER ORCHESTRATOR | |
| # --------------------------------------------------- | |
| def run_deep_dive_analysis(question: str, acq: pd.DataFrame, perf: pd.DataFrame, master_df: pd.DataFrame): | |
| print(f"\n[DEEP DIVE START] {question}") | |
| req_response = generate_analysis_requirements(question, acq, perf, master_df) | |
| if not req_response["success"]: | |
| return req_response | |
| requirements = req_response["requirements"][:3] | |
| all_results, context_text = execute_all_requirements( | |
| requirements, acq, perf, master_df | |
| ) | |
| interpretation = interpret_all_results(question, all_results, context_text) | |
| print("\n[DEEP DIVE COMPLETE]\n") | |
| return { | |
| "success": True, | |
| "requirements": requirements, | |
| "all_results": all_results, | |
| "interpretation": interpretation | |
| } | |