Spaces:
Running
Running
| """ | |
| Evaluator Module - The EVALUATE step of the Reasoning Loop. | |
| Interprets tool results and decides: | |
| - What did we learn from this action? | |
| - Does this answer the user's question? | |
| - Should we continue investigating or stop? | |
| - What follow-up questions emerged? | |
| The Evaluator transforms raw tool output into understanding. | |
| Architecture: | |
| Tool Result → Evaluator.evaluate() → EvaluationOutput | |
| - interpretation: natural language explanation | |
| - answered: did this answer the question? | |
| - confidence: how confident are we? | |
| - should_stop: should the loop stop? | |
| - next_questions: what to investigate next | |
| """ | |
| import json | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Any, List, Optional, Callable | |
| from .findings import Finding, FindingsAccumulator | |
| class EvaluationOutput: | |
| """Output from one EVALUATE step.""" | |
| interpretation: str # What we learned from the tool result | |
| answered: bool # Does this answer the user's question? | |
| confidence: float # 0.0-1.0 confidence | |
| should_stop: bool # Should the reasoning loop stop? | |
| next_questions: List[str] # Follow-up questions to investigate | |
| key_metric: Optional[str] = None # Most important metric extracted | |
| EVALUATOR_SYSTEM_PROMPT = """You are a senior data scientist interpreting analysis results. | |
| Your job: | |
| 1. Interpret what the tool result MEANS (not just what it shows) | |
| 2. Decide if this answers the user's original question | |
| 3. Identify follow-up questions worth investigating | |
| 4. Assign confidence level to your interpretation | |
| Be concise but insightful. Focus on: | |
| - Statistical significance (not just numbers) | |
| - Business implications (not just patterns) | |
| - Confounders and caveats | |
| - What's surprising vs expected | |
| IMPORTANT CONFIDENCE RULES: | |
| - If the tool returned feature_scores, feature_importance, or correlation values, and the user asked about features/importance/correlations → this IS the answer. Set answered=true, confidence ≥ 0.7. | |
| - If the tool returned actual ranked data (top features, sorted scores, correlation pairs), set confidence ≥ 0.6. | |
| - Do NOT keep saying "not answered" when the tool literally returned the requested information. | |
| - Only say answered=false when the result is genuinely unrelated to the question or contains NO useful data. | |
| CRITICAL: Output ONLY valid JSON, no other text.""" | |
| EVALUATOR_USER_TEMPLATE = """**User's original question**: {question} | |
| **Action taken**: {tool_name}({arguments}) | |
| **Tool result** (compressed): | |
| {result_summary} | |
| **What we knew before this step**: | |
| {prior_findings} | |
| Evaluate this result. Respond with ONLY this JSON: | |
| {{ | |
| "interpretation": "1-3 sentences: What does this result MEAN for answering the question?", | |
| "answered": true/false, | |
| "confidence": 0.0-1.0, | |
| "should_stop": true/false, | |
| "next_questions": ["follow-up question 1", "follow-up question 2"], | |
| "key_metric": "most important number or finding (optional)" | |
| }} | |
| Guidelines for should_stop: | |
| - true: Question is fully answered OR we've gathered enough evidence OR no more useful actions | |
| - false: Important aspects remain uninvestigated | |
| Guidelines for answered: | |
| - true: The result contains data that directly addresses the user's question (e.g., feature scores for "which features are important?", correlations for "what correlates with X?") | |
| - false: Result is unrelated to the question or contains only metadata without actual answers | |
| Guidelines for confidence: | |
| - 0.0-0.3: Weak evidence, need more investigation | |
| - 0.3-0.6: Moderate evidence, some aspects unclear | |
| - 0.6-0.8: Strong evidence, minor questions remain (e.g., got feature importance scores but could add more context) | |
| - 0.8-1.0: Very strong evidence, question well answered (e.g., got ranked feature list with scores AND correlations)""" | |
| class Evaluator: | |
| """ | |
| The EVALUATE step of the Reasoning Loop. | |
| Takes a tool result and interprets it in the context of | |
| the user's question and prior findings. | |
| Usage: | |
| evaluator = Evaluator(llm_caller=orchestrator._llm_text_call) | |
| evaluation = evaluator.evaluate( | |
| question="Why are customers churning?", | |
| tool_name="analyze_correlations", | |
| arguments={"file_path": "data.csv", "target_col": "churn"}, | |
| result=tool_result, | |
| findings=findings_accumulator | |
| ) | |
| if evaluation.should_stop: | |
| # Move to synthesis | |
| ... | |
| else: | |
| # Continue reasoning loop | |
| ... | |
| """ | |
| def __init__(self, llm_caller: Callable): | |
| """ | |
| Args: | |
| llm_caller: Function (system_prompt, user_prompt, max_tokens) -> str | |
| """ | |
| self.llm_caller = llm_caller | |
| def evaluate( | |
| self, | |
| question: str, | |
| tool_name: str, | |
| arguments: Dict[str, Any], | |
| result: Dict[str, Any], | |
| findings: FindingsAccumulator, | |
| result_compressor: Optional[Callable] = None | |
| ) -> EvaluationOutput: | |
| """ | |
| Evaluate a tool result. | |
| Args: | |
| question: User's original question | |
| tool_name: Name of the tool that was executed | |
| arguments: Tool arguments used | |
| result: Raw tool result dict | |
| findings: Accumulated findings so far | |
| result_compressor: Optional function to compress tool results | |
| Returns: | |
| EvaluationOutput with interpretation and next steps | |
| """ | |
| # Compress the result for LLM consumption | |
| if result_compressor: | |
| result_summary = json.dumps(result_compressor(tool_name, result), default=str) | |
| else: | |
| result_summary = self._default_compress(result) | |
| # Truncate if too long | |
| if len(result_summary) > 3000: | |
| result_summary = result_summary[:3000] + "... [truncated]" | |
| # Build argument string | |
| args_str = json.dumps(arguments, default=str) | |
| if len(args_str) > 500: | |
| args_str = args_str[:500] + "..." | |
| user_prompt = EVALUATOR_USER_TEMPLATE.format( | |
| question=question, | |
| tool_name=tool_name, | |
| arguments=args_str, | |
| result_summary=result_summary, | |
| prior_findings=findings.get_context_for_reasoning(max_findings=3) | |
| ) | |
| response_text = self.llm_caller( | |
| system_prompt=EVALUATOR_SYSTEM_PROMPT, | |
| user_prompt=user_prompt, | |
| max_tokens=1024 | |
| ) | |
| return self._parse_response(response_text, result_summary) | |
| def build_finding( | |
| self, | |
| iteration: int, | |
| hypothesis: str, | |
| tool_name: str, | |
| arguments: Dict[str, Any], | |
| result_summary: str, | |
| evaluation: "EvaluationOutput", | |
| success: bool = True, | |
| error_message: str = "" | |
| ) -> Finding: | |
| """ | |
| Build a Finding from a completed iteration. | |
| Convenience method that combines the action and evaluation | |
| into a single Finding for the accumulator. | |
| """ | |
| return Finding( | |
| iteration=iteration, | |
| hypothesis=hypothesis, | |
| action=tool_name, | |
| arguments=arguments, | |
| result_summary=result_summary[:1000], # Cap size | |
| interpretation=evaluation.interpretation, | |
| confidence=evaluation.confidence if success else 0.0, | |
| answered_question=evaluation.answered if success else False, | |
| next_questions=evaluation.next_questions, | |
| success=success, | |
| error_message=error_message | |
| ) | |
| def _parse_response(self, response_text: str, result_summary: str) -> EvaluationOutput: | |
| """Parse LLM response into EvaluationOutput.""" | |
| try: | |
| data = json.loads(response_text.strip()) | |
| except json.JSONDecodeError: | |
| # Try to extract JSON | |
| json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response_text, re.DOTALL) | |
| if json_match: | |
| try: | |
| data = json.loads(json_match.group(0)) | |
| except json.JSONDecodeError: | |
| return self._fallback_evaluation(response_text, result_summary) | |
| else: | |
| return self._fallback_evaluation(response_text, result_summary) | |
| return EvaluationOutput( | |
| interpretation=data.get("interpretation", "Result processed."), | |
| answered=data.get("answered", False), | |
| confidence=min(1.0, max(0.0, float(data.get("confidence", 0.3)))), | |
| should_stop=data.get("should_stop", False), | |
| next_questions=data.get("next_questions", []), | |
| key_metric=data.get("key_metric") | |
| ) | |
| def _fallback_evaluation(self, response_text: str, result_summary: str) -> EvaluationOutput: | |
| """Fallback when JSON parsing fails.""" | |
| # Use the raw response as interpretation | |
| interpretation = response_text.strip()[:500] if response_text else "Analysis step completed." | |
| return EvaluationOutput( | |
| interpretation=interpretation, | |
| answered=False, | |
| confidence=0.3, | |
| should_stop=False, | |
| next_questions=[], | |
| key_metric=None | |
| ) | |
| def _default_compress(self, result: Dict[str, Any]) -> str: | |
| """Default compression for tool results.""" | |
| if not isinstance(result, dict): | |
| return str(result)[:2000] | |
| compressed = {} | |
| # Always include status | |
| if "success" in result: | |
| compressed["success"] = result["success"] | |
| if "error" in result: | |
| compressed["error"] = str(result["error"])[:300] | |
| # Include key result fields | |
| result_data = result.get("result", result) | |
| if isinstance(result_data, dict): | |
| for key in ["num_rows", "num_columns", "missing_percentage", "task_type", | |
| "best_model", "best_score", "models", "correlations", | |
| "output_file", "output_path", "plots", "summary", | |
| "total_issues", "columns_affected", "features_created", | |
| "accuracy", "r2_score", "rmse", "f1_score"]: | |
| if key in result_data: | |
| value = result_data[key] | |
| # Truncate long values | |
| if isinstance(value, (list, dict)): | |
| compressed[key] = str(value)[:500] | |
| else: | |
| compressed[key] = value | |
| return json.dumps(compressed, default=str) | |