Spaces:
Sleeping
Sleeping
| """ | |
| Main GepaOptimizer class - the heart of the optimization system | |
| """ | |
| import time | |
| import logging | |
| from typing import Any, Dict, List, Optional, Union | |
| import asyncio | |
| import io | |
| import sys | |
| from contextlib import redirect_stdout, redirect_stderr | |
| import gepa | |
| from ..utils.api_keys import APIKeyManager | |
| from .result import ResultProcessor | |
| from ..data.converters import UniversalConverter | |
| from ..models.result import OptimizationResult, OptimizedResult | |
| from ..models.config import OptimizationConfig, ModelConfig | |
| from ..utils.helpers import sanitize_prompt | |
| from ..utils.exceptions import GepaDependencyError, InvalidInputError, DatasetError, GepaOptimizerError | |
| logger = logging.getLogger(__name__) | |
| class GepaOptimizer: | |
| """ | |
| Main class for prompt optimization using GEPA | |
| This is the primary interface that users interact with. | |
| Provides both simple and advanced optimization capabilities. | |
| """ | |
| def __init__(self, config: Optional[OptimizationConfig] = None, | |
| adapter_type: str = "universal", | |
| custom_adapter: Optional[Any] = None, | |
| llm_model_name: Optional[str] = None, | |
| metric_weights: Optional[Dict[str, float]] = None, | |
| **kwargs): | |
| """ | |
| Initialize the optimizer | |
| Args: | |
| config: Optimization configuration (required) | |
| adapter_type: Type of adapter to use ("universal" only - fully configurable) | |
| custom_adapter: Custom adapter instance (overrides adapter_type) | |
| llm_model_name: [Deprecated] Use config.model instead. Will be removed in future versions. | |
| metric_weights: [Deprecated] Not used - evaluator handles metrics. Will be removed in future versions. | |
| **kwargs: Additional parameters for universal adapter (llm_client, evaluator, etc.) | |
| Raises: | |
| ValueError: If required configuration is missing | |
| GepaDependencyError: If GEPA library is not available | |
| """ | |
| if config is None: | |
| raise ValueError("config parameter is required. Use OptimizationConfig to configure the optimizer.") | |
| # Initialize logger first | |
| self.logger = logging.getLogger(__name__) | |
| self.config = config | |
| self.converter = UniversalConverter(data_split_config=config.data_split) | |
| self.api_manager = APIKeyManager() | |
| self.result_processor = ResultProcessor() | |
| # Initialize adapter based on configuration | |
| if custom_adapter: | |
| # User provided custom adapter | |
| from .base_adapter import BaseGepaAdapter | |
| if not isinstance(custom_adapter, BaseGepaAdapter): | |
| raise TypeError("custom_adapter must be an instance of BaseGepaAdapter") | |
| self.adapter = custom_adapter | |
| self.logger.info("Using user-provided custom adapter") | |
| elif adapter_type == "universal": | |
| # Universal adapter requires user to provide components | |
| llm_client = kwargs.get('llm_client') | |
| evaluator = kwargs.get('evaluator') | |
| if not llm_client or not evaluator: | |
| raise ValueError( | |
| "llm_client and evaluator are required for universal adapter. " | |
| "Example: GepaOptimizer(config=config, adapter_type='universal', " | |
| "llm_client=llm_client, evaluator=evaluator)" | |
| ) | |
| from .universal_adapter import UniversalGepaAdapter | |
| self.adapter = UniversalGepaAdapter( | |
| llm_client=llm_client, | |
| evaluator=evaluator, | |
| data_converter=kwargs.get('data_converter') | |
| ) | |
| self.logger.info("Using universal adapter") | |
| else: | |
| raise ValueError( | |
| f"Unknown adapter_type: {adapter_type}. " | |
| f"Only 'universal' is supported. " | |
| f"Provide llm_client and evaluator when using universal adapter." | |
| ) | |
| # Keep backward compatibility | |
| self.custom_adapter = self.adapter | |
| # Log model configuration | |
| model_info = self.adapter.get_performance_stats() | |
| self.logger.info(f"Initialized adapter: {model_info}") | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| # Validate GEPA availability | |
| if gepa is None: | |
| raise GepaDependencyError("GEPA library is not available. Please install it with: pip install gepa") | |
| async def train(self, | |
| seed_prompt: str, | |
| dataset: Union[List[Any], str, Dict, Any], | |
| **kwargs) -> OptimizedResult: | |
| """ | |
| Main training method for prompt optimization | |
| Args: | |
| seed_prompt: Initial prompt to optimize | |
| dataset: Training data in any format | |
| **kwargs: Additional parameters that can override config | |
| Returns: | |
| OptimizedResult: Optimization result with improved prompt | |
| Raises: | |
| InvalidInputError: For invalid input parameters | |
| DatasetError: For issues with dataset processing | |
| GepaOptimizerError: For optimization failures | |
| """ | |
| start_time = time.time() | |
| session_id = f"opt_{int(start_time)}_{id(self)}" | |
| try: | |
| self.logger.info(f"Starting optimization session: {session_id}") | |
| self.logger.info(f"Using model: {self.config.model.model_name} (provider: {self.config.model.provider})") | |
| # Debug logging removed - not needed in production | |
| # 🔥 FIX E: Reset Pareto logger at start of each optimization run | |
| from ..utils.pareto_logger import reset_pareto_logger | |
| reset_pareto_logger() | |
| self.logger.info("✅ Reset Pareto logger for new optimization run") | |
| # Update config with any overrides from kwargs | |
| self._update_config_from_kwargs(kwargs) | |
| # Step 1: Validate inputs | |
| self._validate_inputs(seed_prompt) | |
| # Step 2: Convert dataset to GEPA format with 3-way split | |
| # 🔥 FIX: Support pre-split datasets (user-provided train/val/test) | |
| if isinstance(dataset, dict) and all(k in dataset for k in ['train', 'val', 'test']): | |
| # User provided pre-split dataset - use it directly | |
| self.logger.info("✅ Detected pre-split dataset - using user's split (no re-splitting)") | |
| trainset_raw = dataset.get('train', []) | |
| valset_raw = dataset.get('val', []) | |
| testset_raw = dataset.get('test', []) | |
| # Still need to standardize the format (convert to GEPA format) | |
| trainset = self.converter._standardize(trainset_raw) | |
| valset = self.converter._standardize(valset_raw) | |
| testset = self.converter._standardize(testset_raw) if testset_raw else [] | |
| self.logger.info( | |
| f"Using pre-split dataset: {len(trainset)} train (Dfeedback), " | |
| f"{len(valset)} val (Dpareto), {len(testset)} test (held-out)" | |
| ) | |
| else: | |
| # Standard path: convert and split automatically | |
| self.logger.info("Converting dataset to GEPA format with 3-way split...") | |
| trainset, valset, testset = self.converter.convert( | |
| dataset, | |
| split_config=self.config.data_split | |
| ) | |
| # Log split with adaptive strategy info | |
| split_strategy = self.config.data_split.small_dataset_strategy | |
| strategy_note = "" | |
| if split_strategy == 'adaptive': | |
| total_size = len(trainset) + len(valset) + len(testset) | |
| train_ratio, val_ratio, test_ratio = self.config.data_split.get_adaptive_ratios(total_size) | |
| strategy_note = f" (adaptive: {train_ratio*100:.0f}%/{val_ratio*100:.0f}%/{test_ratio*100:.0f}% ratios)" | |
| self.logger.info( | |
| f"Dataset split{strategy_note}: {len(trainset)} train (Dfeedback), " | |
| f"{len(valset)} val (Dpareto), {len(testset)} test (held-out)" | |
| ) | |
| if not trainset: | |
| raise DatasetError("Dataset appears to be empty after conversion") | |
| # Step 3: Create seed candidate | |
| seed_candidate = self._create_seed_candidate(seed_prompt) | |
| # 🔥 CRITICAL: Set valset info in adapter BEFORE baseline evaluation | |
| # This ensures adapter correctly detects 'dpareto' dataset type | |
| # Use direct assignment (don't rely on hasattr) to ensure attributes are set | |
| try: | |
| self.adapter._valset_size = len(valset) if valset else 0 | |
| self.logger.info(f"✅ Set valset_size in adapter: {len(valset) if valset else 0} for Dpareto detection") | |
| except AttributeError: | |
| self.logger.warning("⚠️ Could not set _valset_size in adapter - attribute not supported") | |
| try: | |
| self.adapter._valset = valset | |
| self.logger.info(f"✅ Stored valset in adapter ({len(valset) if valset else 0} samples)") | |
| except AttributeError: | |
| self.logger.warning("⚠️ Could not set _valset in adapter - attribute not supported") | |
| # Step 3.5: Calculate baseline score on VALIDATION set (not test set) | |
| # This ensures fair comparison since optimization uses validation set for Pareto selection | |
| baseline_val_score = None | |
| if valset: | |
| self.logger.info("📊 Evaluating seed prompt on validation set for baseline...") | |
| # Set baseline flag so adapter knows this is baseline, not optimization | |
| # Use direct assignment to ensure the flag is set | |
| try: | |
| self.adapter._is_baseline_evaluation = True | |
| self.logger.info("✅ Set baseline evaluation flag in adapter") | |
| except AttributeError: | |
| self.logger.warning("⚠️ Could not set _is_baseline_evaluation in adapter") | |
| try: | |
| # Evaluate on validation set (same as what GEPA will use for Pareto selection) | |
| eval_result = self.adapter.evaluate( | |
| batch=valset, | |
| candidate=seed_candidate, | |
| capture_traces=False | |
| ) | |
| baseline_val_score = sum(eval_result.scores) / len(eval_result.scores) if eval_result.scores else 0.0 | |
| self.logger.info(f"📊 Baseline validation score: {baseline_val_score:.4f} (on {len(valset)} samples)") | |
| # Store baseline in adapter for later use | |
| if hasattr(self.adapter, '_baseline_score'): | |
| self.adapter._baseline_score = baseline_val_score | |
| # 🔥 CRITICAL FIX: Also set baseline in Pareto logger | |
| # This ensures candidates can be properly evaluated against baseline | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| pareto_log.set_baseline(baseline_val_score) | |
| self.logger.info(f"✅ Baseline set in Pareto logger: {baseline_val_score:.4f}") | |
| except Exception as e: | |
| self.logger.warning(f"Baseline evaluation failed: {e}") | |
| import traceback | |
| self.logger.debug(f"Baseline evaluation error: {traceback.format_exc()}") | |
| finally: | |
| try: | |
| self.adapter._is_baseline_evaluation = False | |
| self.logger.debug("✅ Reset baseline evaluation flag - optimization can begin") | |
| except AttributeError: | |
| pass # Ignore if attribute not supported | |
| # Step 4: Run GEPA optimization | |
| self.logger.info("Starting GEPA optimization...") | |
| gepa_result, actual_iterations = await self._run_gepa_optimization( | |
| adapter=self.adapter, | |
| seed_candidate=seed_candidate, | |
| trainset=trainset, | |
| valset=valset, | |
| **kwargs | |
| ) | |
| # Step 5: Extract best candidate | |
| best_candidate = self._extract_best_candidate(gepa_result) | |
| # 🔥 CRITICAL: Extract optimized prompt from best_candidate | |
| # This is the actual optimized prompt that GEPA found | |
| self.logger.info(f"\n{'═'*80}") | |
| self.logger.info(f"📝 EXTRACTING OPTIMIZED PROMPT FROM GEPA RESULT") | |
| self.logger.info(f"{'═'*80}") | |
| self.logger.info(f"best_candidate keys: {list(best_candidate.keys()) if isinstance(best_candidate, dict) else 'N/A'}") | |
| optimized_prompt = best_candidate.get('system_prompt', seed_prompt) | |
| if not optimized_prompt or optimized_prompt.strip() == '': | |
| # Fallback: try other keys or use seed prompt | |
| optimized_prompt = best_candidate.get('prompt', best_candidate.get('text', seed_prompt)) | |
| # Get fitness score if available | |
| best_fitness = best_candidate.get('fitness') or self.adapter.get_best_score() if hasattr(self.adapter, 'get_best_score') else None | |
| candidate_source = best_candidate.get('source', 'unknown') | |
| self.logger.info(f"\n✅ EXTRACTED OPTIMIZED PROMPT:") | |
| self.logger.info(f" Source: {candidate_source}") | |
| if best_fitness is not None: | |
| self.logger.info(f" Fitness: f={best_fitness:.4f}") | |
| self.logger.info(f" Length: {len(optimized_prompt)} characters") | |
| self.logger.info(f" Words: {len(optimized_prompt.split())} words") | |
| self.logger.info(f"\n📝 FULL OPTIMIZED PROMPT TEXT:") | |
| self.logger.info(f"{'─'*80}") | |
| self.logger.info(optimized_prompt) | |
| self.logger.info(f"{'─'*80}") | |
| if optimized_prompt != seed_prompt: | |
| self.logger.info(f"\n✅ SUCCESS: Prompt WAS OPTIMIZED!") | |
| self.logger.info(f" Seed length: {len(seed_prompt)} chars") | |
| self.logger.info(f" Optimized length: {len(optimized_prompt)} chars") | |
| self.logger.info(f" Difference: {len(optimized_prompt) - len(seed_prompt):+d} chars") | |
| if best_fitness is not None: | |
| baseline_fitness = 0.5 # Default baseline, could be improved | |
| improvement = best_fitness - baseline_fitness | |
| improvement_pct = (improvement / baseline_fitness * 100) if baseline_fitness > 0 else 0 | |
| self.logger.info(f" Fitness: f={best_fitness:.4f} (improvement: {improvement:+.4f} ({improvement_pct:+.1f}%))") | |
| else: | |
| self.logger.warning(f"\n⚠️ WARNING: Optimized prompt is IDENTICAL to seed prompt") | |
| self.logger.warning(f" This means GEPA didn't modify the prompt during optimization") | |
| if best_fitness is not None: | |
| self.logger.warning(f" Best fitness found: f={best_fitness:.4f}") | |
| self.logger.warning(f" 💡 Check if LLEGO best candidate is being properly extracted") | |
| self.logger.info(f"{'═'*80}\n") | |
| # Step 5.5: Calculate improvement metrics (validation vs validation) | |
| optimized_test_score = None | |
| improvement_data = {} | |
| # 🔥 FIX: Calculate improvement based on VALIDATION scores (fair comparison) | |
| # Compare optimized VALIDATION score vs validation baseline (both on Dpareto) | |
| # This ensures fair comparison - both evaluated on the same validation set | |
| optimized_val_score = best_fitness # Best candidate's fitness is from validation set (Dpareto) | |
| if baseline_val_score is not None and optimized_val_score is not None: | |
| absolute_improvement = optimized_val_score - baseline_val_score | |
| relative_improvement = ( | |
| (absolute_improvement / baseline_val_score * 100) | |
| if baseline_val_score > 0 else 0 | |
| ) | |
| improvement_data = { | |
| 'baseline_val_score': baseline_val_score, | |
| 'optimized_val_score': optimized_val_score, | |
| 'absolute_improvement': absolute_improvement, | |
| 'relative_improvement_percent': relative_improvement | |
| } | |
| self.logger.info( | |
| f"📈 Validation improvement: {relative_improvement:+.2f}% " | |
| f"(baseline val: {baseline_val_score:.4f} → optimized val: {optimized_val_score:.4f})" | |
| ) | |
| # Step 5.6: Evaluate optimized prompt on test set (if available) for final reporting | |
| if testset and self.config.evaluate_on_test: | |
| self.logger.info("📊 Evaluating optimized prompt on test set...") | |
| # 🔥 CRITICAL FIX: Clear LLEGO candidate queue before test evaluation | |
| # This prevents the LLEGO wrapper from intercepting test evaluation calls | |
| # and returning wrong candidates instead of actually running the optimized prompt | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| if hasattr(self.adapter, 'llm_client') and isinstance(self.adapter.llm_client, LLEGOEnhancedLLMClient): | |
| if hasattr(self.adapter.llm_client, '_adapter_generated_candidates'): | |
| self.adapter.llm_client._adapter_generated_candidates = [] | |
| self.logger.info("✅ Cleared LLEGO candidate queue for clean test evaluation") | |
| if hasattr(self.adapter.llm_client, '_candidate_queue'): | |
| self.adapter.llm_client._candidate_queue = [] | |
| self.logger.info("✅ Cleared LLEGO hybrid candidate queue for clean test evaluation") | |
| # Evaluate on test set for final reporting (but improvement is based on validation) | |
| try: | |
| optimized_test_score = self._evaluate_candidate_on_testset( | |
| best_candidate, | |
| testset | |
| ) | |
| self.logger.info(f"📊 Optimized test score: {optimized_test_score:.4f}") | |
| # Add test score to improvement_data for reference (but improvement is based on validation) | |
| improvement_data['optimized_test_score'] = optimized_test_score | |
| if baseline_val_score is not None: | |
| test_vs_baseline = ( | |
| ((optimized_test_score - baseline_val_score) / baseline_val_score * 100) | |
| if baseline_val_score > 0 else 0 | |
| ) | |
| self.logger.info( | |
| f"📊 Test set vs validation baseline: {test_vs_baseline:+.2f}% " | |
| f"(baseline val: {baseline_val_score:.4f} → optimized test: {optimized_test_score:.4f})" | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Test evaluation failed: {e}") | |
| # Step 6: Process results | |
| optimization_time = time.time() - start_time | |
| processed_result = self.result_processor.process_full_result( | |
| result=gepa_result, | |
| original_prompt=seed_prompt, | |
| optimization_time=optimization_time, | |
| actual_iterations=actual_iterations, | |
| test_metrics=improvement_data # Add test metrics | |
| ) | |
| # Merge improvement data | |
| final_improvement_data = {**processed_result.get('improvement_data', {}), **improvement_data} | |
| # Step 7: Create result objects | |
| # 🔥 CRITICAL: Use extracted optimized_prompt instead of processed_result | |
| result = OptimizedResult( | |
| original_prompt=seed_prompt, | |
| optimized_prompt=optimized_prompt, # Use extracted prompt, not processed_result! | |
| improvement_data=final_improvement_data, | |
| optimization_time=optimization_time, | |
| dataset_size=len(trainset) + len(valset) + len(testset), | |
| total_iterations=processed_result.get('total_iterations', 0), | |
| status=processed_result.get('status', 'completed'), | |
| error_message=processed_result.get('error_message'), | |
| detailed_result=OptimizationResult( | |
| session_id=session_id, | |
| original_prompt=seed_prompt, | |
| optimized_prompt=optimized_prompt, # Use extracted prompt! | |
| improvement_data=final_improvement_data, | |
| optimization_time=optimization_time, | |
| dataset_size=len(trainset) + len(valset) + len(testset), | |
| total_iterations=processed_result.get('total_iterations', 0), | |
| status=processed_result.get('status', 'completed'), | |
| error_message=processed_result.get('error_message') | |
| ) | |
| ) | |
| self.logger.info(f"✅ Optimization completed in {optimization_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| optimization_time = time.time() - start_time | |
| error_msg = f"Optimization failed: {str(e)}" | |
| self.logger.error(error_msg) | |
| # Return failed result | |
| return OptimizedResult( | |
| original_prompt=seed_prompt, | |
| optimized_prompt=seed_prompt, # Return original on failure | |
| improvement_data={'error': error_msg}, | |
| optimization_time=optimization_time, | |
| dataset_size=0, | |
| total_iterations=0, | |
| status='failed', | |
| error_message=error_msg | |
| ) | |
| def _update_config_from_kwargs(self, kwargs: Dict[str, Any]) -> None: | |
| """Update configuration with runtime overrides from kwargs.""" | |
| updated_params = [] | |
| for key, value in kwargs.items(): | |
| if hasattr(self.config, key): | |
| setattr(self.config, key, value) | |
| updated_params.append(f"{key}={value}") | |
| else: | |
| self.logger.warning(f"Unknown parameter '{key}' ignored") | |
| if updated_params: | |
| self.logger.info(f"Updated config parameters: {', '.join(updated_params)}") | |
| def _validate_inputs(self, seed_prompt: str) -> None: | |
| """ | |
| Validate input parameters for optimization | |
| Args: | |
| seed_prompt: The seed prompt to validate | |
| Raises: | |
| InvalidInputError: If validation fails | |
| """ | |
| if not seed_prompt or not isinstance(seed_prompt, str): | |
| raise InvalidInputError("Seed prompt must be a non-empty string") | |
| if len(seed_prompt.strip()) < 10: | |
| raise InvalidInputError("Seed prompt is too short (minimum 10 characters)") | |
| # Validate model configuration | |
| model_config = self.config.model | |
| if not hasattr(model_config, 'model_name') or not model_config.model_name: | |
| raise InvalidInputError("Model name is required") | |
| reflection_config = self.config.reflection_model | |
| if not hasattr(reflection_config, 'model_name') or not reflection_config.model_name: | |
| raise InvalidInputError("Reflection model name is required") | |
| def _clean_reflection_prompt(self, prompt: str, max_length: int = 50000) -> str: | |
| """ | |
| Clean reflection prompt by removing base64 images and truncating if too long. | |
| 🔥 CRITICAL: GEPA's reflective dataset includes base64 images which create | |
| massive prompts (7MB+) that exceed token limits. This function: | |
| 1. Strips all base64 image data | |
| 2. Removes excessive detailed_scores entries | |
| 3. Truncates to reasonable size | |
| 4. Preserves essential feedback information | |
| Args: | |
| prompt: Original prompt from GEPA (may contain base64) | |
| max_length: Maximum length after cleaning (default: 50K chars) | |
| Returns: | |
| Cleaned prompt without base64, within size limits | |
| """ | |
| import re | |
| # Step 1: Remove base64 image strings (typically very long alphanumeric strings) | |
| # Base64 images are usually 50K+ characters of A-Za-z0-9+/= pattern | |
| # Look for very long base64-like sequences | |
| base64_pattern = r'[A-Za-z0-9+/=]{5000,}' # Sequences of 5000+ base64 chars | |
| cleaned = re.sub(base64_pattern, '[IMAGE_DATA_REMOVED]', prompt) | |
| # Step 2: Remove detailed_scores sections that might contain base64 references | |
| # These are usually in markdown format: "### detailed_scores\n...base64..." | |
| detailed_scores_pattern = r'### detailed_scores[^\n]*\n[^#]*(?:image_base64|base64)[^\n]*(?:\n[^#]*)*' | |
| cleaned = re.sub(detailed_scores_pattern, '### detailed_scores: [REMOVED_FOR_BREVITY]', cleaned, flags=re.IGNORECASE | re.MULTILINE) | |
| # Step 3: Remove any remaining image_base64 references | |
| cleaned = re.sub(r'image_base64[^\n]*', 'image_base64: [REMOVED]', cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r'"[A-Za-z0-9+/=]{10000,}"', '[LARGE_DATA_STRING_REMOVED]', cleaned) # Very long strings likely base64 | |
| # Step 4: Truncate if still too long (keep the beginning which usually has the most important info) | |
| if len(cleaned) > max_length: | |
| # Keep first part (usually contains prompt and key feedback) | |
| # Add truncation notice | |
| truncated_size = len(cleaned) - max_length | |
| cleaned = cleaned[:max_length] + f"\n\n[TRUNCATED {truncated_size} characters of detailed evaluation data]" | |
| self.logger.warning(f"⚠️ Prompt truncated: {len(prompt)} → {len(cleaned)} chars") | |
| return cleaned | |
| def _validate_models(self, task_lm, reflection_lm): | |
| """ | |
| Validate if specified models are supported. | |
| Note: No hardcoded restrictions - the API provider will validate model existence. | |
| This method is kept for potential future validation logic but doesn't restrict users. | |
| """ | |
| # No hardcoded model restrictions - users can specify any model | |
| # The API provider will handle validation and return errors if model doesn't exist | |
| self.logger.debug(f"Using task model: {task_lm}, reflection model: {reflection_lm}") | |
| def _create_seed_candidate(self, seed_prompt: str) -> Dict[str, str]: | |
| """Create a seed candidate from the input prompt.""" | |
| sanitized_prompt = sanitize_prompt(seed_prompt) | |
| return {'system_prompt': sanitized_prompt} | |
| async def _run_gepa_optimization(self, adapter, seed_candidate: Any, trainset: List[Any], valset: List[Any], **kwargs) -> tuple: # Return tuple | |
| """ | |
| Run GEPA optimization with the given adapter and data | |
| Args: | |
| adapter: Custom adapter for GEPA | |
| seed_candidate: Initial prompt candidate | |
| trainset: Training dataset | |
| valset: Validation dataset | |
| **kwargs: Additional optimization parameters that can override config | |
| Returns: | |
| Dict with optimization results | |
| Raises: | |
| GepaOptimizerError: If optimization fails | |
| Note: | |
| The following parameters are required in the config: | |
| - max_metric_calls: Maximum number of metric evaluations | |
| - batch_size: Batch size for evaluation | |
| - max_iterations: Maximum number of optimization iterations | |
| """ | |
| try: | |
| # Get optimization parameters from config (these are required fields) | |
| max_metric_calls = self.config.max_metric_calls | |
| batch_size = self.config.batch_size | |
| max_iterations = self.config.max_iterations | |
| # Create reflection model client | |
| from ..llms.vision_llm import VisionLLMClient | |
| base_reflection_lm_client = VisionLLMClient( | |
| provider=self.config.reflection_model.provider, | |
| model_name=self.config.reflection_model.model_name, | |
| api_key=self.config.reflection_model.api_key, | |
| base_url=self.config.reflection_model.base_url, | |
| temperature=self.config.reflection_model.temperature, | |
| max_tokens=self.config.reflection_model.max_tokens, | |
| top_p=self.config.reflection_model.top_p, | |
| frequency_penalty=self.config.reflection_model.frequency_penalty, | |
| presence_penalty=self.config.reflection_model.presence_penalty | |
| ) | |
| # reflection_lm_client will be set below (may be wrapped with LLEGO) | |
| reflection_lm_client = base_reflection_lm_client | |
| # 🆕 LLEGO Integration: Create enhanced reflection callable | |
| if self.config.use_llego_operators: | |
| self.logger.info("🧬 LLEGO genetic operators ENABLED") | |
| self.logger.info(f" α={self.config.alpha}, τ={self.config.tau}, ν={self.config.nu}") | |
| self.logger.info(f" Crossover offspring: {self.config.n_crossover}, Mutation offspring: {self.config.n_mutation}") | |
| # Import LLEGO operators | |
| from ..operators.llego_operators import LLEGOIntegrationLayer, PromptCandidate | |
| # Initialize LLEGO integration layer | |
| llego = LLEGOIntegrationLayer( | |
| alpha=self.config.alpha, | |
| tau=self.config.tau, | |
| nu=self.config.nu, | |
| population_size=self.config.population_size, | |
| n_crossover=self.config.n_crossover, | |
| n_mutation=self.config.n_mutation | |
| ) | |
| # Initialize with seed prompt | |
| llego.initialize_population( | |
| seed_prompt=seed_candidate.get('system_prompt', ''), | |
| initial_fitness=0.5 | |
| ) | |
| # 🔥 HYBRID MODE FIX: Wrap reflection_lm_client with LLEGO for hybrid mode | |
| # This ensures reflection calls go through LLEGO wrapper for candidate generation | |
| if self.config.enable_gepa_reflection_with_llego: | |
| self.logger.info("🔥 HYBRID MODE: Wrapping reflection_lm_client with LLEGO") | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| # Wrap reflection_lm_client with LLEGO so hybrid generation is triggered | |
| reflection_lm_client = LLEGOEnhancedLLMClient( | |
| base_llm=base_reflection_lm_client, | |
| llego_layer=llego, | |
| config=self.config, # Pass config for hybrid mode! | |
| verbose=True | |
| ) | |
| self.logger.info("✅ reflection_lm_client wrapped with LLEGO (hybrid mode enabled)") | |
| # 🔥 CRITICAL: Store reflection_lm_client reference in adapter so it can set context | |
| # This allows make_reflective_dataset to set reflection context on BOTH clients | |
| if hasattr(adapter, 'reflection_lm_client'): | |
| adapter.reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Stored reflection_lm_client reference in adapter") | |
| else: | |
| # Add reflection_lm_client attribute to adapter | |
| adapter.reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Added reflection_lm_client attribute to adapter") | |
| # 🔥 NEW: Also store config and reflection_lm_client for adapter-level generation | |
| if hasattr(adapter, '_config'): | |
| adapter._config = self.config | |
| self.logger.info("✅ Stored config in adapter for hybrid mode") | |
| else: | |
| adapter._config = self.config | |
| self.logger.info("✅ Added _config attribute to adapter") | |
| if hasattr(adapter, '_reflection_lm_client'): | |
| adapter._reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Stored _reflection_lm_client in adapter for hybrid mode") | |
| else: | |
| adapter._reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Added _reflection_lm_client attribute to adapter") | |
| # 🔥 CRITICAL FIX: Ensure LLEGO layer is stored in adapter | |
| # Without this, adapter.llego will be None and population updates are skipped! | |
| if hasattr(adapter, 'llego'): | |
| if adapter.llego is None: | |
| adapter.llego = llego | |
| self.logger.info("✅ CRITICAL: Set LLEGO layer in adapter (was None)") | |
| else: | |
| self.logger.debug("✅ LLEGO layer already set in adapter") | |
| else: | |
| # Add llego attribute if it doesn't exist | |
| adapter.llego = llego | |
| self.logger.info("✅ CRITICAL: Added LLEGO layer to adapter") | |
| # 🔥 CRITICAL: Always set _reflection_lm_client in adapter (even without hybrid mode) | |
| # This is required for propose_new_texts() to work | |
| if not hasattr(adapter, '_reflection_lm_client') or adapter._reflection_lm_client is None: | |
| adapter._reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Set _reflection_lm_client in adapter (required for propose_new_texts)") | |
| # 🔥 HYBRID MODE FIX: Inject config into LLEGO wrapper for hybrid mode | |
| # The adapter already has LLEGO wrapper, we just need to update its config | |
| if self.config.enable_gepa_reflection_with_llego: | |
| # HYBRID MODE: Update the LLEGO wrapper's config | |
| self.logger.info("🔥 HYBRID MODE: Enabling hybrid candidate generation in LLEGO wrapper") | |
| # Get the LLM client (may already be wrapped) | |
| llm_client = self.adapter.llm_client | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| if isinstance(llm_client, LLEGOEnhancedLLMClient): | |
| # Already wrapped, just update config | |
| llm_client.config = self.config | |
| self.logger.info("✅ Updated LLEGO wrapper with hybrid mode config") | |
| else: | |
| # Not wrapped yet, wrap it now with config | |
| llego_wrapped_llm = LLEGOEnhancedLLMClient( | |
| base_llm=llm_client, | |
| llego_layer=llego, | |
| config=self.config, # ← Pass config for hybrid mode! | |
| verbose=True | |
| ) | |
| # Update adapter's LLM client | |
| self.adapter.llm_client = llego_wrapped_llm | |
| self.logger.info("✅ Wrapped LLM client with LLEGO (hybrid mode enabled)") | |
| adapter = self.adapter | |
| else: | |
| # LLEGO-ONLY MODE: Wrap adapter with LLEGO layer (no hybrid) | |
| self.logger.info("🧬 LLEGO-ONLY MODE: Recreating adapter with LLEGO integration...") | |
| if hasattr(self, 'adapter') and self.adapter: | |
| from .universal_adapter import UniversalGepaAdapter | |
| # Get original LLM client and evaluator from current adapter | |
| original_llm = self.adapter.llm_client | |
| # If it's already wrapped, unwrap it | |
| if hasattr(original_llm, 'base_llm'): | |
| original_llm = original_llm.base_llm | |
| evaluator = self.adapter.evaluator | |
| data_converter = self.adapter.data_converter | |
| # Recreate adapter with LLEGO (no hybrid mode config) | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| llego_wrapped_llm = LLEGOEnhancedLLMClient( | |
| base_llm=original_llm, | |
| llego_layer=llego, | |
| config=None, # No hybrid mode | |
| verbose=True | |
| ) | |
| adapter = UniversalGepaAdapter( | |
| llm_client=llego_wrapped_llm, | |
| evaluator=evaluator, | |
| data_converter=data_converter, | |
| llego_layer=llego | |
| ) | |
| self.logger.info("✅ Adapter recreated with LLEGO-enhanced LLM client") | |
| else: | |
| adapter = self.adapter | |
| # Create LLEGO-enhanced reflection callable | |
| # When hybrid mode is enabled, reflection_lm_client is wrapped with LLEGO | |
| # The wrapper will automatically generate hybrid candidates when called | |
| def reflection_lm_callable(prompt: str) -> str: | |
| """ | |
| Reflection callable that delegates to LLEGO-wrapped client. | |
| In hybrid mode, the wrapper generates candidates from both GEPA and LLEGO. | |
| 🔥 CRITICAL: Clean the prompt to remove base64 images and truncate if too long. | |
| """ | |
| # 🔥 FIX: Clean prompt to remove base64 images and truncate excessive data | |
| cleaned_prompt = self._clean_reflection_prompt(prompt) | |
| self.logger.info(f"\n{'🔥'*40}") | |
| self.logger.info(f"🔥 reflection_lm_callable CALLED (delegating to LLEGO wrapper)") | |
| self.logger.info(f"🔥 Original prompt length: {len(prompt)} chars") | |
| self.logger.info(f"🔥 Cleaned prompt length: {len(cleaned_prompt)} chars") | |
| self.logger.info(f"🔥 Truncation: {len(prompt) - len(cleaned_prompt)} chars removed") | |
| self.logger.info(f"🔥 First 200 chars (cleaned): {cleaned_prompt[:200]}...") | |
| self.logger.info(f"{'🔥'*40}\n") | |
| try: | |
| # 🔥 CRITICAL: Set reflection context BEFORE generating | |
| # This signals to the LLEGO wrapper that we're in reflection mode | |
| if isinstance(reflection_lm_client, LLEGOEnhancedLLMClient): | |
| reflection_lm_client.set_reflection_context( | |
| current_prompt=cleaned_prompt, # Use cleaned prompt | |
| feedback=None, | |
| in_reflection=True # Enable reflection mode | |
| ) | |
| self.logger.info("✅ Reflection context set on reflection_lm_client") | |
| # 🔥 HYBRID MODE: If reflection_lm_client is wrapped with LLEGO, | |
| # calling generate() will trigger hybrid candidate generation | |
| # The wrapper handles queuing and returns candidates one by one | |
| # 🔥 CRITICAL: System prompt must instruct LLM to generate improved prompt, not feedback | |
| optimization_system_prompt = """You are an expert prompt engineer specializing in iterative prompt optimization. | |
| Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues. | |
| Core Requirements: | |
| 1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary) | |
| 2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening) | |
| 3. PRESERVE the core task domain and output format requirements | |
| 4. INTEGRATE improvements from feedback naturally into the prompt structure | |
| 5. MAINTAIN clarity, specificity, and actionability | |
| Quality Standards: | |
| - Be specific and concrete (avoid vague instructions) | |
| - Use clear, imperative language for task instructions | |
| - Include edge case handling if feedback identifies confusion | |
| - Ensure the prompt is self-contained and unambiguous | |
| DO NOT include: | |
| - Analysis of what went wrong | |
| - Explanations of your changes | |
| - Meta-text like "Here's an improved version..." or "Based on feedback..." | |
| - Recommendations or suggestions (those are already in the feedback) | |
| Output the improved prompt directly and only the prompt.""" | |
| result = reflection_lm_client.generate( | |
| system_prompt=optimization_system_prompt, | |
| user_prompt=cleaned_prompt, # Use cleaned prompt (no base64, truncated) | |
| image_base64="" | |
| ) | |
| # Extract content from result | |
| if isinstance(result, dict): | |
| candidate = result.get("content", str(result)) | |
| source = result.get("source", "unknown") | |
| self.logger.info(f"✅ Candidate from {source} (FULL TEXT):") | |
| self.logger.info(f" '{candidate}'") | |
| return candidate | |
| else: | |
| candidate = str(result) | |
| self.logger.info(f"✅ Candidate generated (FULL TEXT):") | |
| self.logger.info(f" '{candidate}'") | |
| return candidate | |
| except Exception as e: | |
| self.logger.error(f"❌ Error in reflection_lm_callable: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # Fallback: return prompt as-is | |
| return prompt | |
| # Set up reflection context for LLEGO wrapper | |
| if self.config.enable_gepa_reflection_with_llego and isinstance(reflection_lm_client, LLEGOEnhancedLLMClient): | |
| # Store current prompt in reflection context for LLEGO operators | |
| reflection_lm_client.set_reflection_context( | |
| current_prompt=seed_candidate.get('system_prompt', ''), | |
| feedback=None, | |
| in_reflection=True | |
| ) | |
| else: | |
| # Standard GEPA reflection (no LLEGO) | |
| adapter = self.adapter # Use the original adapter | |
| # 🔥 CRITICAL: Always set _reflection_lm_client in adapter (even without LLEGO) | |
| # This is required for propose_new_texts() to work | |
| if not hasattr(adapter, '_reflection_lm_client') or adapter._reflection_lm_client is None: | |
| adapter._reflection_lm_client = reflection_lm_client | |
| self.logger.info("✅ Set _reflection_lm_client in adapter (required for propose_new_texts)") | |
| # Define standard reflection callable (no LLEGO enhancement) | |
| def reflection_lm_callable(prompt: str) -> str: | |
| """Standard callable wrapper for reflection model that GEPA expects""" | |
| try: | |
| # 🔥 CRITICAL: System prompt must instruct LLM to generate improved prompt, not feedback | |
| optimization_system_prompt = """You are an expert prompt engineer specializing in iterative prompt optimization. | |
| Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues. | |
| Core Requirements: | |
| 1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary) | |
| 2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening) | |
| 3. PRESERVE the core task domain and output format requirements | |
| 4. INTEGRATE improvements from feedback naturally into the prompt structure | |
| 5. MAINTAIN clarity, specificity, and actionability | |
| Quality Standards: | |
| - Be specific and concrete (avoid vague instructions) | |
| - Use clear, imperative language for task instructions | |
| - Include edge case handling if feedback identifies confusion | |
| - Ensure the prompt is self-contained and unambiguous | |
| DO NOT include: | |
| - Analysis of what went wrong | |
| - Explanations of your changes | |
| - Meta-text like "Here's an improved version..." or "Based on feedback..." | |
| - Recommendations or suggestions (those are already in the feedback) | |
| Output the improved prompt directly and only the prompt.""" | |
| # For reflection, we only need text generation (no images) | |
| result = reflection_lm_client.generate( | |
| system_prompt=optimization_system_prompt, | |
| user_prompt=prompt, | |
| image_base64="" # No image for reflection | |
| ) | |
| # Extract string content from the result dictionary | |
| if isinstance(result, dict): | |
| return result.get("content", str(result)) | |
| else: | |
| return str(result) | |
| except Exception as e: | |
| self.logger.error(f"Reflection model error: {e}") | |
| return prompt # Return original prompt on error | |
| self.logger.info( | |
| f"Starting GEPA optimization with {max_iterations} iterations, " | |
| f"batch size {batch_size}, max metric calls: {max_metric_calls}" | |
| ) | |
| self.logger.info( | |
| f"GEPA parameters: candidate_selection_strategy=pareto, " | |
| f"reflection_minibatch_size={batch_size}, " | |
| f"skip_perfect_score=False, " | |
| f"module_selector=round_robin" | |
| ) | |
| # Prepare optimization parameters with ONLY valid GEPA parameters | |
| # Note: 'adapter' variable is set above (either LLEGO-enhanced or standard) | |
| # 🔥 FIX: ALWAYS pass reflection_lm_callable to GEPA (required for generating new candidates!) | |
| # Previously this was only passed when use_llego_operators=True, which broke standard reflection | |
| reflection_lm_passed = reflection_lm_callable # Always pass reflection callable | |
| self.logger.info(f"✅ reflection_lm_callable passed to GEPA (LLEGO={self.config.use_llego_operators})") | |
| # Debug logging removed - not needed in production | |
| gepa_params = { | |
| 'adapter': adapter, # Use the adapter created above (with or without LLEGO) | |
| 'seed_candidate': seed_candidate, | |
| 'trainset': trainset, | |
| 'valset': valset, | |
| 'max_metric_calls': max_metric_calls, | |
| # NOTE: GEPA does NOT have num_iterations - it uses max_metric_calls to control iterations | |
| # 🔥 CRITICAL: When using an adapter, GEPA expects: | |
| # - adapter.make_reflective_dataset() to create feedback data | |
| # - GEPA's internal proposer to generate candidates from that data | |
| # - task_lm and reflection_lm must be None (GEPA will use model from adapter) | |
| 'task_lm': None, # Don't pass - adapter handles this | |
| 'reflection_lm': reflection_lm_passed, # Pass LLEGO-enhanced reflection (may be ignored!) | |
| # Valid GEPA parameters based on actual library | |
| 'candidate_selection_strategy': 'pareto', # Use Pareto selection | |
| 'skip_perfect_score': False, # Don't skip perfect scores | |
| 'reflection_minibatch_size': batch_size, # Use batch size for reflection | |
| 'perfect_score': 1.0, # Perfect score threshold | |
| 'module_selector': 'round_robin', # Cycle through components | |
| 'display_progress_bar': self.config.verbose, # Show progress if verbose | |
| 'raise_on_exception': True, # Raise exceptions for debugging | |
| } | |
| # 🔥 CRITICAL FIX: Filter kwargs to only include valid GEPA parameters | |
| # GEPA does NOT accept num_iterations, max_iterations, or other non-GEPA params | |
| VALID_GEPA_PARAMS = { | |
| 'seed_candidate', 'trainset', 'valset', 'adapter', 'task_lm', 'reflection_lm', | |
| 'candidate_selection_strategy', 'skip_perfect_score', 'batch_sampler', | |
| 'reflection_minibatch_size', 'perfect_score', 'reflection_prompt_template', | |
| 'module_selector', 'use_merge', 'max_merge_invocations', 'merge_val_overlap_floor', | |
| 'max_metric_calls', 'stop_callbacks', 'logger', 'run_dir', 'use_wandb', | |
| 'wandb_api_key', 'wandb_init_kwargs', 'use_mlflow', 'mlflow_tracking_uri', | |
| 'mlflow_experiment_name', 'track_best_outputs', 'display_progress_bar', | |
| 'use_cloudpickle', 'seed', 'raise_on_exception', 'val_evaluation_policy' | |
| } | |
| # Only add valid kwargs that aren't already in gepa_params | |
| for key, value in kwargs.items(): | |
| if key in VALID_GEPA_PARAMS and key not in gepa_params: | |
| gepa_params[key] = value | |
| elif key not in VALID_GEPA_PARAMS: | |
| self.logger.debug(f"⚠️ Filtering out invalid GEPA parameter: {key}") | |
| # Debug logging removed - not needed in production | |
| # 🎯 NEW: Capture GEPA's internal logging for pareto front information | |
| gepa_output = io.StringIO() | |
| # Log iteration start | |
| from ..utils.clean_logger import get_clean_logger | |
| clean_log = get_clean_logger() | |
| clean_log.log_iteration_start(1, seed_prompt=seed_candidate.get('system_prompt', '')) | |
| # 🔥 CRITICAL: Pass valset size to adapter for better dataset type detection | |
| if hasattr(adapter, '_valset_size'): | |
| adapter._valset_size = len(valset) | |
| self.logger.debug(f"✅ Set valset_size in adapter: {len(valset)} for Dpareto detection") | |
| # 🔥 CRITICAL FIX: Store valset in adapter so we can evaluate generated candidates on it | |
| # This ensures generated candidates are evaluated on Dpareto for Pareto selection | |
| if hasattr(adapter, '_valset'): | |
| adapter._valset = valset | |
| self.logger.debug(f"✅ Stored valset in adapter ({len(valset)} samples) for Dpareto evaluation of generated candidates") | |
| else: | |
| # Add _valset attribute if it doesn't exist | |
| adapter._valset = valset | |
| self.logger.debug(f"✅ Added _valset attribute to adapter ({len(valset)} samples)") | |
| # Run GEPA optimization (synchronous call wrapped in async) | |
| result = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: self._run_gepa_with_logging(gepa_params, gepa_output) | |
| ) | |
| # 🎯 NEW: Process and log pareto front information, extract iteration count | |
| gepa_logs = gepa_output.getvalue() | |
| actual_iterations = self._log_pareto_front_info(gepa_logs) # Get iteration count | |
| return result, actual_iterations # Return both result and iteration count | |
| except Exception as e: | |
| # Try to extract partial results before failing | |
| self.logger.warning(f"GEPA optimization failed: {e}") | |
| # Check if we have any cached results from the adapter | |
| best_candidate = adapter.get_best_candidate() | |
| best_score = adapter.get_best_score() | |
| if best_candidate and best_score > 0: | |
| self.logger.info(f"🎯 Using cached best result with score: {best_score:.4f}") | |
| # Create a mock GEPA result with the best candidate found | |
| return { | |
| 'best_candidate': best_candidate, | |
| 'best_score': best_score, | |
| 'partial_result': True, | |
| 'error': f'GEPA failed but returning best result found: {str(e)}' | |
| } | |
| else: | |
| # If no cached results, re-raise the error | |
| raise GepaOptimizerError(f"GEPA optimization failed: {str(e)}") | |
| def _run_gepa_with_logging(self, gepa_params: Dict[str, Any], output_buffer: io.StringIO) -> Any: | |
| """Run GEPA optimization while capturing its output.""" | |
| self.logger.info("🔄 Calling gepa.optimize() - GEPA should now:") | |
| self.logger.info(" 1. Evaluate seed on validation set") | |
| self.logger.info(" 2. For each iteration: evaluate on training minibatch (capture_traces=True)") | |
| self.logger.info(" 3. Call make_reflective_dataset() with trajectories") | |
| self.logger.info(" 4. Call propose_new_texts() or reflection_lm to generate new candidates") | |
| self.logger.info(" 5. Evaluate new candidates and update Pareto front") | |
| # Capture GEPA's print statements and logging | |
| with redirect_stdout(output_buffer), redirect_stderr(output_buffer): | |
| result = gepa.optimize(**gepa_params) | |
| # Log GEPA output for debugging | |
| gepa_output = output_buffer.getvalue() | |
| if gepa_output: | |
| self.logger.info("📋 GEPA Output (captured):") | |
| for line in gepa_output.split('\n')[:50]: # First 50 lines | |
| if line.strip(): | |
| self.logger.info(f" GEPA: {line}") | |
| return result | |
| def _log_pareto_front_info(self, gepa_logs: str) -> int: # Return int instead of None | |
| """Extract and log pareto front information from GEPA logs. Returns max iteration count.""" | |
| lines = gepa_logs.split('\n') | |
| current_iteration = 0 | |
| max_iteration = 0 # Track max iteration | |
| for line in lines: | |
| # Look for iteration information | |
| if 'iteration' in line.lower(): | |
| # Try to extract iteration number | |
| import re | |
| iteration_match = re.search(r'iteration\s+(\d+)', line.lower()) | |
| if iteration_match: | |
| current_iteration = int(iteration_match.group(1)) | |
| max_iteration = max(max_iteration, current_iteration) # Track max | |
| # Log iteration change | |
| from ..utils.clean_logger import get_clean_logger | |
| clean_log = get_clean_logger() | |
| if current_iteration > clean_log.current_iteration: | |
| clean_log.current_iteration = current_iteration | |
| # Look for pareto front information | |
| if 'pareto front' in line.lower() or 'new program' in line.lower(): | |
| self.logger.info(f"GEPA Pareto Update: {line.strip()}") | |
| elif 'iteration' in line.lower() and ('score' in line.lower() or 'program' in line.lower()): | |
| self.logger.debug(f"{line.strip()}") | |
| elif 'best' in line.lower() and 'score' in line.lower(): | |
| self.logger.info(f"{line.strip()}") | |
| # Look for evaluation information | |
| if 'evaluating' in line.lower() and 'candidate' in line.lower(): | |
| self.logger.debug(f"{line.strip()}") | |
| self.logger.info(f"GEPA Optimization Complete: {max_iteration} iterations") | |
| # Debug logging removed - not needed in production | |
| return max_iteration # Return the max iteration count | |
| def _extract_best_candidate(self, gepa_result: Any) -> Dict[str, str]: | |
| """ | |
| Extract the best candidate from GEPA Pareto front (single source of truth). | |
| GEPA Pareto front is the single source of truth because: | |
| - All candidates (GEPA reflection, LLEGO crossover, LLEGO mutation) are evaluated on Dpareto | |
| - All non-dominated candidates are added to GEPA Pareto front | |
| - Therefore, the best candidate MUST be in GEPA Pareto front | |
| Args: | |
| gepa_result: Raw result from gepa.optimize() (used only as fallback edge case) | |
| Returns: | |
| Best candidate dictionary with prompt components from GEPA Pareto front | |
| """ | |
| try: | |
| self.logger.info(f"\n{'═'*80}") | |
| self.logger.info(f"🔍 EXTRACTING BEST CANDIDATE FROM GEPA PARETO FRONT") | |
| self.logger.info(f"{'═'*80}") | |
| # ======================================================================== | |
| # PRIMARY: Get best candidate from GEPA Pareto front (single source of truth) | |
| # ======================================================================== | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| if pareto_log.pareto_front: | |
| try: | |
| # Get best candidate from GEPA Pareto front (highest score = best) | |
| gepa_pareto_best = max(pareto_log.pareto_front, key=lambda x: x['score']) | |
| gepa_pareto_fitness = gepa_pareto_best['score'] | |
| gepa_pareto_prompt = gepa_pareto_best['prompt'] | |
| gepa_pareto_type = gepa_pareto_best.get('type', 'unknown') | |
| gepa_pareto_notation = gepa_pareto_best.get('notation', 'S') | |
| best_candidate = { | |
| 'system_prompt': gepa_pareto_prompt, | |
| 'fitness': gepa_pareto_fitness, | |
| 'source': 'gepa_pareto_front', | |
| 'candidate_type': gepa_pareto_type, | |
| 'notation': gepa_pareto_notation | |
| } | |
| self.logger.info(f"✅ SELECTED: Best candidate from GEPA Pareto front") | |
| self.logger.info(f" Notation: {gepa_pareto_notation}") | |
| self.logger.info(f" Fitness: f({gepa_pareto_notation})={gepa_pareto_fitness:.4f}") | |
| self.logger.info(f" Type: {gepa_pareto_type}") | |
| self.logger.info(f" Prompt length: {len(gepa_pareto_prompt)} chars") | |
| self.logger.info(f" 💡 GEPA Pareto front is single source of truth (all candidates evaluated on Dpareto)") | |
| return best_candidate | |
| except Exception as e: | |
| self.logger.error(f"❌ Failed to extract from GEPA Pareto front: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # ======================================================================== | |
| # EDGE CASE FALLBACK: Pareto front empty (shouldn't happen, but handle gracefully) | |
| # ======================================================================== | |
| self.logger.warning(f"⚠️ GEPA Pareto front is empty - using gepa_result as fallback") | |
| self.logger.warning(f" This should not happen if all candidates are evaluated on Dpareto") | |
| # Try to extract from gepa_result (last resort) | |
| if hasattr(gepa_result, 'best_candidate'): | |
| gepa_candidate = gepa_result.best_candidate | |
| gepa_prompt = gepa_candidate.get('system_prompt') if isinstance(gepa_candidate, dict) else str(gepa_candidate) | |
| gepa_fitness = getattr(gepa_result, 'best_score', None) | |
| if gepa_prompt: | |
| self.logger.info(f"✅ Using gepa_result.best_candidate as fallback") | |
| return { | |
| 'system_prompt': gepa_prompt, | |
| 'fitness': float(gepa_fitness) if gepa_fitness is not None else None, | |
| 'source': 'gepa_result_fallback', | |
| 'candidate_type': 'unknown', | |
| 'notation': 'S' | |
| } | |
| # Last resort: return empty prompt | |
| self.logger.error(f"❌ No candidates found anywhere - returning empty prompt") | |
| return {'system_prompt': ''} | |
| except Exception as e: | |
| self.logger.error(f"❌ Error extracting best candidate: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| return {'system_prompt': ''} | |
| def _evaluate_candidate_on_testset( | |
| self, | |
| candidate: Dict[str, str], | |
| testset: List[Dict] | |
| ) -> float: | |
| """ | |
| Evaluate a candidate prompt on the held-out test set. | |
| Args: | |
| candidate: Prompt candidate to evaluate | |
| testset: Test dataset (not used during optimization) | |
| Returns: | |
| Average composite score on test set | |
| Raises: | |
| TestSetEvaluationError: If evaluation fails | |
| """ | |
| from ..utils.exceptions import TestSetEvaluationError | |
| try: | |
| # Evaluate using the adapter (same as GEPA does internally) | |
| eval_result = self.adapter.evaluate( | |
| batch=testset, | |
| candidate=candidate, | |
| capture_traces=False # Don't need detailed traces for test | |
| ) | |
| if not eval_result.scores: | |
| raise TestSetEvaluationError("No scores returned from test evaluation") | |
| # Calculate average score | |
| avg_score = sum(eval_result.scores) / len(eval_result.scores) | |
| self.logger.debug( | |
| f"Test set evaluation: {len(eval_result.scores)} samples, " | |
| f"scores: {eval_result.scores}, avg: {avg_score:.4f}" | |
| ) | |
| return avg_score | |
| except Exception as e: | |
| raise TestSetEvaluationError(f"Failed to evaluate on test set: {str(e)}") | |
| def optimize_sync(self, | |
| model: str, | |
| seed_prompt: str, | |
| dataset: Any, | |
| reflection_lm: str, | |
| max_metric_calls: int = 150, | |
| **kwargs) -> OptimizedResult: | |
| """ | |
| Synchronous version of the optimization method | |
| Args: | |
| model: Target model to optimize for | |
| seed_prompt: Initial prompt to optimize | |
| dataset: Training data in any format | |
| reflection_lm: Model for reflection | |
| max_metric_calls: Budget for optimization attempts | |
| **kwargs: Additional optimization parameters | |
| Returns: | |
| OptimizedResult: Optimization result | |
| """ | |
| # Run the async method in a new event loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| result = loop.run_until_complete( | |
| self.train(model, seed_prompt, dataset, reflection_lm, max_metric_calls, **kwargs) | |
| ) | |
| return result | |
| finally: | |
| loop.close() | |
| # Convenience function for quick optimization | |
| def optimize_prompt( | |
| model: Union[str, ModelConfig], | |
| seed_prompt: str, | |
| dataset: Any, | |
| reflection_model: Optional[Union[str, ModelConfig]] = None, | |
| **kwargs | |
| ) -> OptimizedResult: | |
| """ | |
| Convenience function for quick prompt optimization without creating optimizer instance | |
| Args: | |
| model: Target model configuration | |
| seed_prompt: Initial prompt to optimize | |
| dataset: Training data | |
| reflection_model: Model for reflection (optional) | |
| **kwargs: Additional optimization parameters | |
| Returns: | |
| OptimizedResult: Optimization result | |
| """ | |
| # Create default config if not provided | |
| if reflection_model is None: | |
| reflection_model = model | |
| config = OptimizationConfig( | |
| model=model, | |
| reflection_model=reflection_model, | |
| max_iterations=kwargs.get('max_iterations', 10), | |
| max_metric_calls=kwargs.get('max_metric_calls', 50), | |
| batch_size=kwargs.get('batch_size', 4) | |
| ) | |
| optimizer = GepaOptimizer(config=config) | |
| return asyncio.run(optimizer.train(seed_prompt, dataset, **kwargs)) | |