| | """ |
| | Helion-OSC Evaluation Script |
| | Comprehensive evaluation suite for code generation and mathematical reasoning |
| | """ |
| |
|
| | import os |
| | import json |
| | import torch |
| | import logging |
| | import numpy as np |
| | from typing import List, Dict, Any, Optional, Tuple |
| | from dataclasses import dataclass, field |
| | from tqdm import tqdm |
| | import subprocess |
| | import tempfile |
| | import signal |
| | from contextlib import contextmanager |
| | import multiprocessing as mp |
| | from transformers import AutoTokenizer, AutoModelForCausalLM |
| | from datasets import load_dataset |
| | import re |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class EvaluationConfig: |
| | """Configuration for evaluation""" |
| | model_name: str = "DeepXR/Helion-OSC" |
| | device: str = "cuda" if torch.cuda.is_available() else "cpu" |
| | batch_size: int = 4 |
| | max_length: int = 2048 |
| | temperature: float = 0.7 |
| | top_p: float = 0.95 |
| | num_samples: int = 1 |
| | timeout: int = 5 |
| | output_dir: str = "./evaluation_results" |
| |
|
| |
|
| | class TimeoutException(Exception): |
| | """Exception raised when code execution times out""" |
| | pass |
| |
|
| |
|
| | @contextmanager |
| | def time_limit(seconds): |
| | """Context manager for timing out code execution""" |
| | def signal_handler(signum, frame): |
| | raise TimeoutException("Code execution timed out") |
| | |
| | signal.signal(signal.SIGALRM, signal_handler) |
| | signal.alarm(seconds) |
| | try: |
| | yield |
| | finally: |
| | signal.alarm(0) |
| |
|
| |
|
| | class CodeExecutor: |
| | """Safe code execution environment""" |
| | |
| | @staticmethod |
| | def execute_python(code: str, timeout: int = 5) -> Tuple[bool, str]: |
| | """ |
| | Execute Python code safely |
| | |
| | Args: |
| | code: Python code to execute |
| | timeout: Timeout in seconds |
| | |
| | Returns: |
| | Tuple of (success, output/error) |
| | """ |
| | with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
| | f.write(code) |
| | temp_file = f.name |
| | |
| | try: |
| | result = subprocess.run( |
| | ['python', temp_file], |
| | capture_output=True, |
| | text=True, |
| | timeout=timeout |
| | ) |
| | |
| | os.unlink(temp_file) |
| | |
| | if result.returncode == 0: |
| | return True, result.stdout |
| | else: |
| | return False, result.stderr |
| | |
| | except subprocess.TimeoutExpired: |
| | os.unlink(temp_file) |
| | return False, "Execution timed out" |
| | except Exception as e: |
| | if os.path.exists(temp_file): |
| | os.unlink(temp_file) |
| | return False, str(e) |
| | |
| | @staticmethod |
| | def check_syntax(code: str, language: str = "python") -> Tuple[bool, str]: |
| | """ |
| | Check code syntax without execution |
| | |
| | Args: |
| | code: Code to check |
| | language: Programming language |
| | |
| | Returns: |
| | Tuple of (is_valid, error_message) |
| | """ |
| | if language.lower() == "python": |
| | try: |
| | compile(code, '<string>', 'exec') |
| | return True, "" |
| | except SyntaxError as e: |
| | return False, str(e) |
| | |
| | return True, "Syntax checking not implemented for this language" |
| |
|
| |
|
| | class HumanEvalEvaluator: |
| | """Evaluator for HumanEval benchmark""" |
| | |
| | def __init__(self, config: EvaluationConfig): |
| | self.config = config |
| | self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | config.model_name, |
| | torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| | device_map="auto" if config.device == "cuda" else None |
| | ) |
| | if config.device == "cpu": |
| | self.model = self.model.to(config.device) |
| | self.model.eval() |
| | self.executor = CodeExecutor() |
| | |
| | def load_humaneval(self) -> List[Dict]: |
| | """Load HumanEval dataset""" |
| | logger.info("Loading HumanEval dataset...") |
| | dataset = load_dataset("openai_humaneval", split="test") |
| | return list(dataset) |
| | |
| | def generate_solution(self, prompt: str) -> str: |
| | """Generate code solution for a prompt""" |
| | inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| | |
| | with torch.no_grad(): |
| | outputs = self.model.generate( |
| | **inputs, |
| | max_length=self.config.max_length, |
| | temperature=self.config.temperature, |
| | top_p=self.config.top_p, |
| | do_sample=True, |
| | pad_token_id=self.tokenizer.eos_token_id |
| | ) |
| | |
| | generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| | |
| | solution = generated[len(prompt):].strip() |
| | return solution |
| | |
| | def test_solution(self, solution: str, test_code: str) -> bool: |
| | """Test a solution against test cases""" |
| | full_code = solution + "\n" + test_code |
| | success, output = self.executor.execute_python(full_code, self.config.timeout) |
| | return success |
| | |
| | def evaluate(self) -> Dict[str, float]: |
| | """Run HumanEval evaluation""" |
| | logger.info("Starting HumanEval evaluation...") |
| | |
| | problems = self.load_humaneval() |
| | results = { |
| | "total": len(problems), |
| | "passed": 0, |
| | "failed": 0, |
| | "syntax_errors": 0, |
| | "runtime_errors": 0, |
| | "timeouts": 0 |
| | } |
| | |
| | for problem in tqdm(problems, desc="Evaluating HumanEval"): |
| | prompt = problem["prompt"] |
| | test = problem["test"] |
| | entry_point = problem["entry_point"] |
| | |
| | |
| | solution = self.generate_solution(prompt) |
| | |
| | |
| | is_valid, error = self.executor.check_syntax(solution) |
| | if not is_valid: |
| | results["syntax_errors"] += 1 |
| | results["failed"] += 1 |
| | continue |
| | |
| | |
| | try: |
| | if self.test_solution(solution, test): |
| | results["passed"] += 1 |
| | else: |
| | results["failed"] += 1 |
| | results["runtime_errors"] += 1 |
| | except TimeoutException: |
| | results["failed"] += 1 |
| | results["timeouts"] += 1 |
| | |
| | |
| | results["pass@1"] = results["passed"] / results["total"] |
| | |
| | logger.info(f"HumanEval Results: {results}") |
| | return results |
| |
|
| |
|
| | class MBPPEvaluator: |
| | """Evaluator for MBPP (Mostly Basic Python Problems) benchmark""" |
| | |
| | def __init__(self, config: EvaluationConfig): |
| | self.config = config |
| | self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | config.model_name, |
| | torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| | device_map="auto" if config.device == "cuda" else None |
| | ) |
| | if config.device == "cpu": |
| | self.model = self.model.to(config.device) |
| | self.model.eval() |
| | self.executor = CodeExecutor() |
| | |
| | def load_mbpp(self) -> List[Dict]: |
| | """Load MBPP dataset""" |
| | logger.info("Loading MBPP dataset...") |
| | dataset = load_dataset("mbpp", split="test") |
| | return list(dataset) |
| | |
| | def generate_solution(self, prompt: str) -> str: |
| | """Generate code solution""" |
| | inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| | |
| | with torch.no_grad(): |
| | outputs = self.model.generate( |
| | **inputs, |
| | max_length=self.config.max_length, |
| | temperature=self.config.temperature, |
| | top_p=self.config.top_p, |
| | do_sample=True, |
| | pad_token_id=self.tokenizer.eos_token_id |
| | ) |
| | |
| | generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| | solution = generated[len(prompt):].strip() |
| | return solution |
| | |
| | def evaluate(self) -> Dict[str, float]: |
| | """Run MBPP evaluation""" |
| | logger.info("Starting MBPP evaluation...") |
| | |
| | problems = self.load_mbpp() |
| | results = { |
| | "total": len(problems), |
| | "passed": 0, |
| | "failed": 0 |
| | } |
| | |
| | for problem in tqdm(problems, desc="Evaluating MBPP"): |
| | prompt = problem["text"] |
| | test_cases = problem["test_list"] |
| | |
| | |
| | solution = self.generate_solution(prompt) |
| | |
| | |
| | all_passed = True |
| | for test in test_cases: |
| | test_code = solution + "\n" + test |
| | success, _ = self.executor.execute_python(test_code, self.config.timeout) |
| | if not success: |
| | all_passed = False |
| | break |
| | |
| | if all_passed: |
| | results["passed"] += 1 |
| | else: |
| | results["failed"] += 1 |
| | |
| | results["pass@1"] = results["passed"] / results["total"] |
| | |
| | logger.info(f"MBPP Results: {results}") |
| | return results |
| |
|
| |
|
| | class GSM8KEvaluator: |
| | """Evaluator for GSM8K mathematical reasoning benchmark""" |
| | |
| | def __init__(self, config: EvaluationConfig): |
| | self.config = config |
| | self.tokenizer = AutoTokenizer.from_pretrained(config.model_name) |
| | self.model = AutoModelForCausalLM.from_pretrained( |
| | config.model_name, |
| | torch_dtype=torch.bfloat16 if config.device == "cuda" else torch.float32, |
| | device_map="auto" if config.device == "cuda" else None |
| | ) |
| | if config.device == "cpu": |
| | self.model = self.model.to(config.device) |
| | self.model.eval() |
| | |
| | def load_gsm8k(self) -> List[Dict]: |
| | """Load GSM8K dataset""" |
| | logger.info("Loading GSM8K dataset...") |
| | dataset = load_dataset("gsm8k", "main", split="test") |
| | return list(dataset) |
| | |
| | def extract_answer(self, text: str) -> Optional[float]: |
| | """Extract numerical answer from text""" |
| | |
| | patterns = [ |
| | r'####\s*(-?\d+\.?\d*)', |
| | r'answer is\s*(-?\d+\.?\d*)', |
| | r'equals?\s*(-?\d+\.?\d*)', |
| | r'=\s*(-?\d+\.?\d*)', |
| | r'\$?\s*(-?\d+\.?\d*)\s*$' |
| | ] |
| | |
| | for pattern in patterns: |
| | match = re.search(pattern, text, re.IGNORECASE) |
| | if match: |
| | try: |
| | return float(match.group(1)) |
| | except: |
| | continue |
| | |
| | return None |
| | |
| | def generate_solution(self, problem: str) -> str: |
| | """Generate solution for math problem""" |
| | prompt = f"Problem: {problem}\n\nLet's solve this step by step:\n" |
| | inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) |
| | |
| | with torch.no_grad(): |
| | outputs = self.model.generate( |
| | **inputs, |
| | max_length=self.config.max_length, |
| | temperature=0.3, |
| | top_p=0.9, |
| | do_sample=False, |
| | pad_token_id=self.tokenizer.eos_token_id |
| | ) |
| | |
| | generated = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| | return generated |
| | |
| | def evaluate(self) -> Dict[str, float]: |
| | """Run GSM8K evaluation""" |
| | logger.info("Starting GSM8K evaluation...") |
| | |
| | problems = self.load_gsm8k() |
| | results = { |
| | "total": len(problems), |
| | "correct": 0, |
| | "incorrect": 0, |
| | "no_answer": 0 |
| | } |
| | |
| | for problem in tqdm(problems, desc="Evaluating GSM8K"): |
| | question = problem["question"] |
| | correct_answer_text = problem["answer"] |
| | |
| | |
| | correct_answer = self.extract_answer(correct_answer_text) |
| | if correct_answer is None: |
| | continue |
| | |
| | |
| | solution = self.generate_solution(question) |
| | |
| | |
| | predicted_answer = self.extract_answer(solution) |
| | |
| | if predicted_answer is None: |
| | results["no_answer"] += 1 |
| | results["incorrect"] += 1 |
| | elif abs(predicted_answer - correct_answer) < 1e-5: |
| | results["correct"] += 1 |
| | else: |
| | results["incorrect"] += 1 |
| | |
| | results["accuracy"] = results["correct"] / results["total"] |
| | |
| | logger.info(f"GSM8K Results: {results}") |
| | return results |
| |
|
| |
|
| | class ComprehensiveEvaluator: |
| | """Run comprehensive evaluation across all benchmarks""" |
| | |
| | def __init__(self, config: EvaluationConfig): |
| | self.config = config |
| | os.makedirs(config.output_dir, exist_ok=True) |
| | |
| | def run_all_evaluations(self) -> Dict[str, Any]: |
| | """Run all evaluation benchmarks""" |
| | logger.info("Starting comprehensive evaluation...") |
| | |
| | all_results = {} |
| | |
| | |
| | try: |
| | logger.info("\n" + "="*80) |
| | logger.info("Running HumanEval Evaluation") |
| | logger.info("="*80) |
| | humaneval_evaluator = HumanEvalEvaluator(self.config) |
| | all_results["humaneval"] = humaneval_evaluator.evaluate() |
| | except Exception as e: |
| | logger.error(f"HumanEval evaluation failed: {e}") |
| | all_results["humaneval"] = {"error": str(e)} |
| | |
| | |
| | try: |
| | logger.info("\n" + "="*80) |
| | logger.info("Running MBPP Evaluation") |
| | logger.info("="*80) |
| | mbpp_evaluator = MBPPEvaluator(self.config) |
| | all_results["mbpp"] = mbpp_evaluator.evaluate() |
| | except Exception as e: |
| | logger.error(f"MBPP evaluation failed: {e}") |
| | all_results["mbpp"] = {"error": str(e)} |
| | |
| | |
| | try: |
| | logger.info("\n" + "="*80) |
| | logger.info("Running GSM8K Evaluation") |
| | logger.info("="*80) |
| | gsm8k_evaluator = GSM8KEvaluator(self.config) |
| | all_results["gsm8k"] = gsm8k_evaluator.evaluate() |
| | except Exception as e: |
| | logger.error(f"GSM8K evaluation failed: {e}") |
| | all_results["gsm8k"] = {"error": str(e)} |
| | |
| | |
| | self.save_results(all_results) |
| | |
| | |
| | self.print_summary(all_results) |
| | |
| | return all_results |
| | |
| | def save_results(self, results: Dict[str, Any]): |
| | """Save evaluation results to file""" |
| | output_file = os.path.join(self.config.output_dir, "evaluation_results.json") |
| | with open(output_file, 'w') as f: |
| | json.dump(results, f, indent=2) |
| | logger.info(f"Results saved to {output_file}") |
| | |
| | def print_summary(self, results: Dict[str, Any]): |
| | """Print evaluation summary""" |
| | logger.info("\n" + "="*80) |
| | logger.info("EVALUATION SUMMARY") |
| | logger.info("="*80) |
| | |
| | if "humaneval" in results and "pass@1" in results["humaneval"]: |
| | logger.info(f"HumanEval Pass@1: {results['humaneval']['pass@1']:.3f}") |
| | |
| | if "mbpp" in results and "pass@1" in results["mbpp"]: |
| | logger.info(f"MBPP Pass@1: {results['mbpp']['pass@1']:.3f}") |
| | |
| | if "gsm8k" in results and "accuracy" in results["gsm8k"]: |
| | logger.info(f"GSM8K Accuracy: {results['gsm8k']['accuracy']:.3f}") |
| | |
| | logger.info("="*80) |
| |
|
| |
|
| | def main(): |
| | """Main evaluation script""" |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser(description="Evaluate Helion-OSC model") |
| | parser.add_argument("--model_name", type=str, default="DeepXR/Helion-OSC") |
| | parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu") |
| | parser.add_argument("--batch_size", type=int, default=4) |
| | parser.add_argument("--max_length", type=int, default=2048) |
| | parser.add_argument("--temperature", type=float, default=0.7) |
| | parser.add_argument("--top_p", type=float, default=0.95) |
| | parser.add_argument("--timeout", type=int, default=5) |
| | parser.add_argument("--output_dir", type=str, default="./evaluation_results") |
| | parser.add_argument("--benchmark", type=str, choices=["all", "humaneval", "mbpp", "gsm8k"], default="all") |
| | |
| | args = parser.parse_args() |
| | |
| | config = EvaluationConfig( |
| | model_name=args.model_name, |
| | device=args.device, |
| | batch_size=args.batch_size, |
| | max_length=args.max_length, |
| | temperature=args.temperature, |
| | top_p=args.top_p, |
| | timeout=args.timeout, |
| | output_dir=args.output_dir |
| | ) |
| | |
| | if args.benchmark == "all": |
| | evaluator = ComprehensiveEvaluator(config) |
| | evaluator.run_all_evaluations() |
| | elif args.benchmark == "humaneval": |
| | evaluator = HumanEvalEvaluator(config) |
| | evaluator.evaluate() |
| | elif args.benchmark == "mbpp": |
| | evaluator = MBPPEvaluator(config) |
| | evaluator.evaluate() |
| | elif args.benchmark == "gsm8k": |
| | evaluator = GSM8KEvaluator(config) |
| | evaluator.evaluate() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |