Spaces:
Sleeping
Sleeping
| """ | |
| Log Parser for Extracting Candidates and Feedback | |
| Parses optimization logs to extract candidate prompts, feedback, and scores. | |
| """ | |
| import re | |
| from typing import List, Dict, Optional, Tuple | |
| from pathlib import Path | |
| class OptimizationLogParser: | |
| """Parse optimization logs to extract candidates and feedback""" | |
| def __init__(self, log_file: str): | |
| """ | |
| Initialize parser with log file path. | |
| Args: | |
| log_file: Path to log file | |
| """ | |
| self.log_file = Path(log_file) | |
| self.content = "" | |
| if self.log_file.exists(): | |
| with open(self.log_file, 'r', encoding='utf-8') as f: | |
| self.content = f.read() | |
| def extract_iterations(self) -> List[Dict]: | |
| """Extract iteration information from logs""" | |
| iterations = [] | |
| # Pattern to find iteration markers | |
| iteration_pattern = r'Iteration\s+(\d+)|Starting GEPA optimization|π Starting GEPA optimization' | |
| # Find all iteration starts | |
| for match in re.finditer(iteration_pattern, self.content): | |
| # Try to extract iteration number | |
| iter_num = 1 | |
| if match.group(1): | |
| iter_num = int(match.group(1)) | |
| # Find the section for this iteration | |
| start_pos = match.start() | |
| next_match = list(re.finditer(iteration_pattern, self.content)) | |
| next_idx = next((i for i, m in enumerate(next_match) if m.start() > start_pos), None) | |
| if next_idx is not None: | |
| end_pos = next_match[next_idx].start() | |
| iter_content = self.content[start_pos:end_pos] | |
| else: | |
| iter_content = self.content[start_pos:] | |
| iterations.append({ | |
| 'iteration': iter_num, | |
| 'content': iter_content, | |
| 'start_pos': start_pos | |
| }) | |
| return iterations | |
| def extract_candidates(self, iteration_content: str) -> List[Dict]: | |
| """ | |
| Extract candidate prompts from iteration content. | |
| Args: | |
| iteration_content: Content for a single iteration | |
| Returns: | |
| List of candidate dictionaries | |
| """ | |
| candidates = [] | |
| # Pattern 1: GEPA Reflection candidates | |
| # Look for "PROPOSED PROMPT" or "π PROPOSED PROMPT" | |
| gepa_patterns = [ | |
| r'π PROPOSED PROMPT.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| r'PROPOSED PROMPT.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| r'GEPA REFLECTION.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| ] | |
| for pattern in gepa_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| prompt = match.group(1).strip() | |
| if prompt and len(prompt) > 20: # Valid prompt | |
| candidates.append({ | |
| 'source': 'GEPA_Reflection', | |
| 'prompt': prompt, | |
| 'position': match.start() | |
| }) | |
| # Pattern 2: LLEGO Crossover candidates | |
| crossover_patterns = [ | |
| r'𧬠Crossover.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| r'Crossover.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| ] | |
| for pattern in crossover_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| prompt = match.group(1).strip() | |
| if prompt and len(prompt) > 20: | |
| candidates.append({ | |
| 'source': 'LLEGO_Crossover', | |
| 'prompt': prompt, | |
| 'position': match.start() | |
| }) | |
| # Pattern 3: LLEGO Mutation candidates | |
| mutation_patterns = [ | |
| r'π² Mutation.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| r'Mutation.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| ] | |
| for pattern in mutation_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| prompt = match.group(1).strip() | |
| if prompt and len(prompt) > 20: | |
| candidates.append({ | |
| 'source': 'LLEGO_Mutation', | |
| 'prompt': prompt, | |
| 'position': match.start() | |
| }) | |
| # Pattern 4: Generic candidate markers | |
| # Look for prompts in quotes or code blocks | |
| generic_patterns = [ | |
| r'"([^"]{50,})"', # Quoted prompts | |
| r'```\s*(.*?)\s*```', # Code blocks | |
| ] | |
| for pattern in generic_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| prompt = match.group(1).strip() | |
| # Check if it looks like a prompt (contains task instructions) | |
| if (len(prompt) > 50 and | |
| any(keyword in prompt.lower() for keyword in | |
| ['you are', 'task', 'instruction', 'element', 'identify', 'select'])): | |
| # Check if we haven't already captured this | |
| if not any(c['prompt'] == prompt for c in candidates): | |
| candidates.append({ | |
| 'source': 'Unknown', | |
| 'prompt': prompt, | |
| 'position': match.start() | |
| }) | |
| # Sort by position | |
| candidates.sort(key=lambda x: x['position']) | |
| return candidates | |
| def extract_feedback(self, iteration_content: str) -> List[Dict]: | |
| """ | |
| Extract feedback from iteration content. | |
| Args: | |
| iteration_content: Content for a single iteration | |
| Returns: | |
| List of feedback dictionaries | |
| """ | |
| feedback_list = [] | |
| # Pattern 1: Explicit feedback markers | |
| feedback_patterns = [ | |
| r'π¬ FEEDBACK:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', | |
| r'FEEDBACK:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', | |
| r'Feedback:\s*(.*?)(?=\n\n|\nπ|\nπ|\nπ‘|$)', | |
| ] | |
| for pattern in feedback_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| feedback_text = match.group(1).strip() | |
| if feedback_text and len(feedback_text) > 10: | |
| feedback_list.append({ | |
| 'feedback': feedback_text, | |
| 'position': match.start() | |
| }) | |
| # Pattern 2: LLM-as-Judge feedback | |
| judge_patterns = [ | |
| r'LLM-as-Judge.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| r'Judge Feedback.*?----------------------------------------\s*(.*?)(?=----------------------------------------|π|π|$)', | |
| ] | |
| for pattern in judge_patterns: | |
| for match in re.finditer(pattern, iteration_content, re.DOTALL): | |
| feedback_text = match.group(1).strip() | |
| if feedback_text and len(feedback_text) > 10: | |
| feedback_list.append({ | |
| 'feedback': feedback_text, | |
| 'position': match.start(), | |
| 'source': 'LLM-as-Judge' | |
| }) | |
| # Sort by position | |
| feedback_list.sort(key=lambda x: x['position']) | |
| return feedback_list | |
| def extract_scores(self, iteration_content: str) -> List[Dict]: | |
| """ | |
| Extract scores from iteration content. | |
| Args: | |
| iteration_content: Content for a single iteration | |
| Returns: | |
| List of score dictionaries | |
| """ | |
| scores = [] | |
| # Pattern for scores | |
| score_patterns = [ | |
| r'Score:\s*([\d.]+)', | |
| r'Average score:\s*([\d.]+)', | |
| r'π― SCORE:\s*([\d.]+)', | |
| r'π Score:\s*([\d.]+)', | |
| ] | |
| for pattern in score_patterns: | |
| for match in re.finditer(pattern, iteration_content): | |
| score_value = float(match.group(1)) | |
| scores.append({ | |
| 'score': score_value, | |
| 'position': match.start() | |
| }) | |
| # Sort by position | |
| scores.sort(key=lambda x: x['position']) | |
| return scores | |
| def parse_all(self) -> Dict: | |
| """ | |
| Parse entire log file and extract all information. | |
| Returns: | |
| Dictionary with all extracted information | |
| """ | |
| iterations = self.extract_iterations() | |
| result = { | |
| 'iterations': [], | |
| 'total_iterations': len(iterations), | |
| 'all_candidates': [], | |
| 'all_feedback': [] | |
| } | |
| for iter_info in iterations: | |
| iter_num = iter_info['iteration'] | |
| iter_content = iter_info['content'] | |
| candidates = self.extract_candidates(iter_content) | |
| feedback = self.extract_feedback(iter_content) | |
| scores = self.extract_scores(iter_content) | |
| # Try to associate scores with candidates | |
| for i, candidate in enumerate(candidates): | |
| # Find nearest score after this candidate | |
| candidate_pos = candidate['position'] | |
| nearest_score = None | |
| min_distance = float('inf') | |
| for score_info in scores: | |
| if score_info['position'] > candidate_pos: | |
| distance = score_info['position'] - candidate_pos | |
| if distance < min_distance: | |
| min_distance = distance | |
| nearest_score = score_info['score'] | |
| if nearest_score is not None: | |
| candidate['score'] = nearest_score | |
| # Try to associate feedback | |
| nearest_feedback = None | |
| min_distance = float('inf') | |
| for feedback_info in feedback: | |
| if feedback_info['position'] > candidate_pos: | |
| distance = feedback_info['position'] - candidate_pos | |
| if distance < min_distance and distance < 5000: # Within reasonable distance | |
| min_distance = distance | |
| nearest_feedback = feedback_info['feedback'] | |
| if nearest_feedback: | |
| candidate['feedback'] = nearest_feedback | |
| result['iterations'].append({ | |
| 'iteration': iter_num, | |
| 'candidates': candidates, | |
| 'feedback': feedback, | |
| 'scores': scores | |
| }) | |
| result['all_candidates'].extend(candidates) | |
| result['all_feedback'].extend(feedback) | |
| return result | |