"""Main evaluation script for running all checks.""" import asyncio import json import shutil from datetime import datetime from pathlib import Path from instructor.checks.dynamic_checks import DynamicChecker from instructor.checks.llm_checks import LLMChecker from instructor.checks.static_checks import StaticChecker from instructor.database import Database from shared.config import settings from shared.logger import setup_logger logger = setup_logger(__name__) class Evaluator: """Main evaluator for running all checks on submissions.""" def __init__(self) -> None: """Initialize evaluator.""" self.db = Database() self.llm_checker = LLMChecker() self.temp_dir = Path("./temp_evaluations") self.temp_dir.mkdir(parents=True, exist_ok=True) def get_pending_repos(self) -> list[dict]: """Get repos that need evaluation. Returns: List of repo dictionaries """ all_repos = self.db.get_repos() pending = [] for repo in all_repos: repo_dict = repo.to_dict() # Check if already evaluated existing_results = self.db.get_results( email=repo_dict["email"], task=repo_dict["task"] ) # Filter to this round round_results = [ r for r in existing_results if r.to_dict()["round"] == repo_dict["round"] ] if not round_results: pending.append(repo_dict) logger.info(f"Found {len(pending)} pending repos for evaluation") return pending async def evaluate_repo(self, repo: dict) -> None: """Evaluate a single repository. Args: repo: Repository submission data """ logger.info(f"Evaluating {repo['email']}/{repo['task']}, round {repo['round']}") # Get task details session = self.db.get_session() try: task = ( session.query(self.db.Task) .filter_by( email=repo["email"], task=repo["task"], round=repo["round"], ) .first() ) if not task: logger.error(f"No task found for {repo['task']}") return task_dict = task.to_dict() checks_list = task_dict["checks"] finally: session.close() # Clone repo clone_dir = self.temp_dir / f"{repo['task']}_{repo['round']}" static_checker = StaticChecker(repo["repo_url"], repo["commit_sha"], clone_dir) try: static_checker.clone_repo() # Run static checks logger.info("Running static checks...") static_results = await self._run_static_checks( static_checker, task_dict["timestamp"] ) # Run LLM checks logger.info("Running LLM checks...") llm_results = self._run_llm_checks(clone_dir) # Run dynamic checks logger.info("Running dynamic checks...") dynamic_results = await self._run_dynamic_checks(repo["pages_url"], checks_list) # Save all results all_results = static_results + llm_results + dynamic_results for result in all_results: self.db.add_result( { "timestamp": datetime.utcnow(), "email": repo["email"], "task": repo["task"], "round": repo["round"], "repo_url": repo["repo_url"], "commit_sha": repo["commit_sha"], "pages_url": repo["pages_url"], "check": result["check"], "score": result["score"], "reason": result["reason"], "logs": result.get("logs", ""), } ) logger.info( f"Completed evaluation for {repo['email']}/{repo['task']}: " f"{len(all_results)} checks" ) except Exception as e: logger.error(f"Error evaluating {repo['task']}: {e}", exc_info=True) # Save error result self.db.add_result( { "timestamp": datetime.utcnow(), "email": repo["email"], "task": repo["task"], "round": repo["round"], "repo_url": repo["repo_url"], "commit_sha": repo["commit_sha"], "pages_url": repo["pages_url"], "check": "Evaluation", "score": 0.0, "reason": f"Evaluation failed: {e}", "logs": str(e), } ) finally: # Cleanup clone if clone_dir.exists(): shutil.rmtree(clone_dir) async def _run_static_checks( self, checker: StaticChecker, task_timestamp: str ) -> list[dict]: """Run static checks. Args: checker: Static checker instance task_timestamp: When task was sent Returns: List of check results """ results = [] # Parse timestamp if isinstance(task_timestamp, str): task_time = datetime.fromisoformat(task_timestamp.replace("Z", "+00:00")) else: task_time = task_timestamp # Repo creation time result = checker.check_created_after_task(task_time) results.append({"check": "Repo created after task", **result}) # MIT License result = checker.check_mit_license() results.append({"check": "MIT LICENSE exists", **result}) # README exists and basic quality result = checker.check_readme() results.append({"check": "README.md basic quality", **result}) # No secrets result = checker.check_no_secrets() results.append({"check": "No secrets in code", **result}) return results def _run_llm_checks(self, code_dir: Path) -> list[dict]: """Run LLM-based checks. Args: code_dir: Directory containing code Returns: List of check results """ results = [] # README quality readme_path = code_dir / "README.md" if readme_path.exists(): result = self.llm_checker.check_readme_quality(readme_path) results.append({"check": "LLM: README.md quality", **result}) # Code quality result = self.llm_checker.check_code_quality(code_dir) results.append({"check": "LLM: Code quality", **result}) return results async def _run_dynamic_checks( self, pages_url: str, checks: list[str] ) -> list[dict]: """Run dynamic Playwright checks. Args: pages_url: GitHub Pages URL checks: List of checks to run Returns: List of check results """ dynamic_checker = DynamicChecker(pages_url, checks) results = await dynamic_checker.run_checks() return results async def run(self) -> None: """Run evaluation on all pending repos.""" logger.info("Starting evaluation process") pending_repos = self.get_pending_repos() if not pending_repos: logger.info("No pending repos to evaluate") return # Evaluate each repo for repo in pending_repos: await self.evaluate_repo(repo) logger.info("Evaluation process complete") async def main(): """Main entry point.""" evaluator = Evaluator() await evaluator.run() if __name__ == "__main__": asyncio.run(main())