""" Evaluator Module - The EVALUATE step of the Reasoning Loop. Interprets tool results and decides: - What did we learn from this action? - Does this answer the user's question? - Should we continue investigating or stop? - What follow-up questions emerged? The Evaluator transforms raw tool output into understanding. Architecture: Tool Result → Evaluator.evaluate() → EvaluationOutput - interpretation: natural language explanation - answered: did this answer the question? - confidence: how confident are we? - should_stop: should the loop stop? - next_questions: what to investigate next """ import json import re from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Callable from .findings import Finding, FindingsAccumulator @dataclass class EvaluationOutput: """Output from one EVALUATE step.""" interpretation: str # What we learned from the tool result answered: bool # Does this answer the user's question? confidence: float # 0.0-1.0 confidence should_stop: bool # Should the reasoning loop stop? next_questions: List[str] # Follow-up questions to investigate key_metric: Optional[str] = None # Most important metric extracted EVALUATOR_SYSTEM_PROMPT = """You are a senior data scientist interpreting analysis results. Your job: 1. Interpret what the tool result MEANS (not just what it shows) 2. Decide if this answers the user's original question 3. Identify follow-up questions worth investigating 4. Assign confidence level to your interpretation Be concise but insightful. Focus on: - Statistical significance (not just numbers) - Business implications (not just patterns) - Confounders and caveats - What's surprising vs expected IMPORTANT CONFIDENCE RULES: - If the tool returned feature_scores, feature_importance, or correlation values, and the user asked about features/importance/correlations → this IS the answer. Set answered=true, confidence ≥ 0.7. - If the tool returned actual ranked data (top features, sorted scores, correlation pairs), set confidence ≥ 0.6. - Do NOT keep saying "not answered" when the tool literally returned the requested information. - Only say answered=false when the result is genuinely unrelated to the question or contains NO useful data. CRITICAL: Output ONLY valid JSON, no other text.""" EVALUATOR_USER_TEMPLATE = """**User's original question**: {question} **Action taken**: {tool_name}({arguments}) **Tool result** (compressed): {result_summary} **What we knew before this step**: {prior_findings} Evaluate this result. Respond with ONLY this JSON: {{ "interpretation": "1-3 sentences: What does this result MEAN for answering the question?", "answered": true/false, "confidence": 0.0-1.0, "should_stop": true/false, "next_questions": ["follow-up question 1", "follow-up question 2"], "key_metric": "most important number or finding (optional)" }} Guidelines for should_stop: - true: Question is fully answered OR we've gathered enough evidence OR no more useful actions - false: Important aspects remain uninvestigated Guidelines for answered: - true: The result contains data that directly addresses the user's question (e.g., feature scores for "which features are important?", correlations for "what correlates with X?") - false: Result is unrelated to the question or contains only metadata without actual answers Guidelines for confidence: - 0.0-0.3: Weak evidence, need more investigation - 0.3-0.6: Moderate evidence, some aspects unclear - 0.6-0.8: Strong evidence, minor questions remain (e.g., got feature importance scores but could add more context) - 0.8-1.0: Very strong evidence, question well answered (e.g., got ranked feature list with scores AND correlations)""" class Evaluator: """ The EVALUATE step of the Reasoning Loop. Takes a tool result and interprets it in the context of the user's question and prior findings. Usage: evaluator = Evaluator(llm_caller=orchestrator._llm_text_call) evaluation = evaluator.evaluate( question="Why are customers churning?", tool_name="analyze_correlations", arguments={"file_path": "data.csv", "target_col": "churn"}, result=tool_result, findings=findings_accumulator ) if evaluation.should_stop: # Move to synthesis ... else: # Continue reasoning loop ... """ def __init__(self, llm_caller: Callable): """ Args: llm_caller: Function (system_prompt, user_prompt, max_tokens) -> str """ self.llm_caller = llm_caller def evaluate( self, question: str, tool_name: str, arguments: Dict[str, Any], result: Dict[str, Any], findings: FindingsAccumulator, result_compressor: Optional[Callable] = None ) -> EvaluationOutput: """ Evaluate a tool result. Args: question: User's original question tool_name: Name of the tool that was executed arguments: Tool arguments used result: Raw tool result dict findings: Accumulated findings so far result_compressor: Optional function to compress tool results Returns: EvaluationOutput with interpretation and next steps """ # Compress the result for LLM consumption if result_compressor: result_summary = json.dumps(result_compressor(tool_name, result), default=str) else: result_summary = self._default_compress(result) # Truncate if too long if len(result_summary) > 3000: result_summary = result_summary[:3000] + "... [truncated]" # Build argument string args_str = json.dumps(arguments, default=str) if len(args_str) > 500: args_str = args_str[:500] + "..." user_prompt = EVALUATOR_USER_TEMPLATE.format( question=question, tool_name=tool_name, arguments=args_str, result_summary=result_summary, prior_findings=findings.get_context_for_reasoning(max_findings=3) ) response_text = self.llm_caller( system_prompt=EVALUATOR_SYSTEM_PROMPT, user_prompt=user_prompt, max_tokens=1024 ) return self._parse_response(response_text, result_summary) def build_finding( self, iteration: int, hypothesis: str, tool_name: str, arguments: Dict[str, Any], result_summary: str, evaluation: "EvaluationOutput", success: bool = True, error_message: str = "" ) -> Finding: """ Build a Finding from a completed iteration. Convenience method that combines the action and evaluation into a single Finding for the accumulator. """ return Finding( iteration=iteration, hypothesis=hypothesis, action=tool_name, arguments=arguments, result_summary=result_summary[:1000], # Cap size interpretation=evaluation.interpretation, confidence=evaluation.confidence if success else 0.0, answered_question=evaluation.answered if success else False, next_questions=evaluation.next_questions, success=success, error_message=error_message ) def _parse_response(self, response_text: str, result_summary: str) -> EvaluationOutput: """Parse LLM response into EvaluationOutput.""" try: data = json.loads(response_text.strip()) except json.JSONDecodeError: # Try to extract JSON json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response_text, re.DOTALL) if json_match: try: data = json.loads(json_match.group(0)) except json.JSONDecodeError: return self._fallback_evaluation(response_text, result_summary) else: return self._fallback_evaluation(response_text, result_summary) return EvaluationOutput( interpretation=data.get("interpretation", "Result processed."), answered=data.get("answered", False), confidence=min(1.0, max(0.0, float(data.get("confidence", 0.3)))), should_stop=data.get("should_stop", False), next_questions=data.get("next_questions", []), key_metric=data.get("key_metric") ) def _fallback_evaluation(self, response_text: str, result_summary: str) -> EvaluationOutput: """Fallback when JSON parsing fails.""" # Use the raw response as interpretation interpretation = response_text.strip()[:500] if response_text else "Analysis step completed." return EvaluationOutput( interpretation=interpretation, answered=False, confidence=0.3, should_stop=False, next_questions=[], key_metric=None ) def _default_compress(self, result: Dict[str, Any]) -> str: """Default compression for tool results.""" if not isinstance(result, dict): return str(result)[:2000] compressed = {} # Always include status if "success" in result: compressed["success"] = result["success"] if "error" in result: compressed["error"] = str(result["error"])[:300] # Include key result fields result_data = result.get("result", result) if isinstance(result_data, dict): for key in ["num_rows", "num_columns", "missing_percentage", "task_type", "best_model", "best_score", "models", "correlations", "output_file", "output_path", "plots", "summary", "total_issues", "columns_affected", "features_created", "accuracy", "r2_score", "rmse", "f1_score"]: if key in result_data: value = result_data[key] # Truncate long values if isinstance(value, (list, dict)): compressed[key] = str(value)[:500] else: compressed[key] = value return json.dumps(compressed, default=str)