"""Custom LangSmith evaluators for PrimoGreedy analyst pipeline. Evaluator categories: 1. Hallucination catchers (LLM-as-a-Judge) — catalyst_grounding_score, company_identity_score 2. Format verifiers (exact-match) — format_score, verdict_validity_score 3. Math verifier — kelly_math_score Each evaluator conforms to the ``langsmith.evaluate()`` protocol: def evaluator(run, example) -> EvaluationResult | dict """ import os import re from dotenv import load_dotenv load_dotenv() VALID_VERDICTS = {"STRONG BUY", "BUY", "WATCH", "AVOID"} REQUIRED_HEADERS = [ "### THE QUANTITATIVE BASE", "### THE LYNCH PITCH", "### THE MUNGER INVERT", "### FINAL VERDICT", ] # --------------------------------------------------------------------------- # 1. Hallucination catchers (LLM-as-a-Judge) # --------------------------------------------------------------------------- def catalyst_grounding_score(run, example) -> dict: """Score whether the Lynch Pitch catalyst is grounded in provided context. Uses an LLM-as-a-Judge prompt to compare the analyst's catalyst claim against the data that was actually in the prompt. Returns 0 (fabricated) to 1 (fully grounded). """ inputs = run.inputs or {} outputs = run.outputs or {} context_parts = [] if inputs.get("financial_data"): context_parts.append(str(inputs["financial_data"])[:3000]) if inputs.get("sec_context"): context_parts.append(str(inputs["sec_context"])[:2000]) if inputs.get("deep_fundamentals"): context_parts.append(str(inputs["deep_fundamentals"])[:2000]) context = "\n".join(context_parts) verdict_text = str(outputs.get("final_verdict", "")) lynch_match = re.search( r"###\s*THE LYNCH PITCH.*?\n(.*?)(?=###|\Z)", verdict_text, re.DOTALL, ) lynch_pitch = lynch_match.group(1).strip() if lynch_match else verdict_text[:500] if not context or not lynch_pitch: return {"key": "catalyst_grounding", "score": 0.5, "comment": "Insufficient data"} try: from langchain_openai import ChatOpenAI judge_llm = ChatOpenAI( model=os.getenv("EVAL_MODEL", "nvidia/nemotron-3-nano-30b-a3b:free"), api_key=os.getenv("OPENROUTER_API_KEY"), base_url="https://openrouter.ai/api/v1", temperature=0, max_tokens=256, ) judge_prompt = ( "You are a fact-checking judge. Given the CONTEXT the analyst received " "and the CATALYST CLAIM it made, determine whether the claim has " "grounding in the context.\n\n" "Score on a scale from 0.0 (completely fabricated, no evidence in context) " "to 1.0 (fully grounded in the data provided).\n\n" "Respond with ONLY a JSON object: {\"score\": , \"reason\": \"\"}\n\n" f"CONTEXT:\n{context[:4000]}\n\n" f"CATALYST CLAIM:\n{lynch_pitch[:1000]}" ) response = judge_llm.invoke(judge_prompt) import json try: result = json.loads(response.content) score = float(result.get("score", 0.5)) reason = result.get("reason", "") except (json.JSONDecodeError, ValueError): score_match = re.search(r"(\d+\.?\d*)", response.content) score = float(score_match.group(1)) if score_match else 0.5 reason = response.content[:200] return {"key": "catalyst_grounding", "score": max(0, min(1, score)), "comment": reason} except Exception as exc: return {"key": "catalyst_grounding", "score": 0.5, "comment": f"Judge error: {exc}"} def company_identity_score(run, example) -> dict: """Check whether the LLM correctly identifies the company's business. Catches hallucinations like "High Arctic = Arctic drilling" by comparing the analyst's description against the actual sector/business from financial_data. """ inputs = run.inputs or {} outputs = run.outputs or {} financial_data = str(inputs.get("financial_data", "")) verdict_text = str(outputs.get("final_verdict", "")) if not financial_data or not verdict_text: return {"key": "company_identity", "score": 0.5, "comment": "Insufficient data"} try: from langchain_openai import ChatOpenAI import json judge_llm = ChatOpenAI( model=os.getenv("EVAL_MODEL", "nvidia/nemotron-3-nano-30b-a3b:free"), api_key=os.getenv("OPENROUTER_API_KEY"), base_url="https://openrouter.ai/api/v1", temperature=0, max_tokens=256, ) judge_prompt = ( "You are a fact-checking judge. Compare the FINANCIAL DATA (ground truth) " "with the ANALYST REPORT to check if the analyst correctly identifies " "what the company actually does.\n\n" "Score 0.0 if the analyst describes a completely different business, " "0.5 if partially correct, 1.0 if accurate.\n\n" "Respond with ONLY: {\"score\": , \"reason\": \"\"}\n\n" f"FINANCIAL DATA:\n{financial_data[:3000]}\n\n" f"ANALYST REPORT:\n{verdict_text[:3000]}" ) response = judge_llm.invoke(judge_prompt) try: result = json.loads(response.content) score = float(result.get("score", 0.5)) reason = result.get("reason", "") except (json.JSONDecodeError, ValueError): score_match = re.search(r"(\d+\.?\d*)", response.content) score = float(score_match.group(1)) if score_match else 0.5 reason = response.content[:200] return {"key": "company_identity", "score": max(0, min(1, score)), "comment": reason} except Exception as exc: return {"key": "company_identity", "score": 0.5, "comment": f"Judge error: {exc}"} # --------------------------------------------------------------------------- # 2. Format verifiers (exact-match, no LLM) # --------------------------------------------------------------------------- def format_score(run, example) -> dict: """Check structural correctness of the verdict report. Validates: - All 4 required headers are present - No duplicate headers (the double-header bug) - Kelly section present for BUY/STRONG BUY verdicts """ outputs = run.outputs or {} verdict_text = str(outputs.get("final_verdict", "")) if not verdict_text or "REJECTED" in verdict_text.upper(): return {"key": "format", "score": 1.0, "comment": "Rejected/empty, N/A"} issues = [] total_checks = 0 for header in REQUIRED_HEADERS: total_checks += 1 count = verdict_text.count(header) if count == 0: issues.append(f"Missing: {header}") elif count > 1: issues.append(f"Duplicated ({count}x): {header}") upper = verdict_text.upper() is_buy = "STRONG BUY" in upper or ("BUY" in upper and "AVOID" not in upper) if is_buy: total_checks += 1 if "POSITION SIZING" not in verdict_text and "Kelly" not in verdict_text: issues.append("Missing Kelly section for BUY verdict") passed = total_checks - len(issues) score = passed / total_checks if total_checks > 0 else 1.0 return { "key": "format", "score": score, "comment": "; ".join(issues) if issues else "All format checks passed", } def verdict_validity_score(run, example) -> dict: """Check that the final verdict is one of the 4 valid values.""" outputs = run.outputs or {} verdict_text = str(outputs.get("final_verdict", "")) if not verdict_text or "REJECTED" in verdict_text.upper(): return {"key": "verdict_validity", "score": 1.0, "comment": "Rejected, N/A"} found_verdict = None upper = verdict_text.upper() if "STRONG BUY" in upper: found_verdict = "STRONG BUY" elif "BUY" in upper: found_verdict = "BUY" elif "WATCH" in upper: found_verdict = "WATCH" elif "AVOID" in upper: found_verdict = "AVOID" if found_verdict and found_verdict in VALID_VERDICTS: return {"key": "verdict_validity", "score": 1.0, "comment": f"Valid: {found_verdict}"} return {"key": "verdict_validity", "score": 0.0, "comment": f"Invalid/missing verdict"} # --------------------------------------------------------------------------- # 3. Math verifier # --------------------------------------------------------------------------- def kelly_math_score(run, example) -> dict: """Verify Kelly position sizing math is within valid bounds. Checks that reported allocation is between 1% and 25% for BUY/STRONG BUY. """ outputs = run.outputs or {} verdict_text = str(outputs.get("final_verdict", "")) upper = verdict_text.upper() is_buy = "STRONG BUY" in upper or ("BUY" in upper and "AVOID" not in upper) if not is_buy: return {"key": "kelly_math", "score": 1.0, "comment": "Non-buy, N/A"} match = re.search(r"allocation:\s*([\d.]+)%", verdict_text) if not match: return {"key": "kelly_math", "score": 0.5, "comment": "No allocation found in BUY verdict"} pct = float(match.group(1)) if 1.0 <= pct <= 25.0: return {"key": "kelly_math", "score": 1.0, "comment": f"{pct}% within [1%, 25%]"} return {"key": "kelly_math", "score": 0.0, "comment": f"{pct}% outside valid range [1%, 25%]"} ALL_EVALUATORS = [ catalyst_grounding_score, company_identity_score, format_score, verdict_validity_score, kelly_math_score, ]