Spaces:
Sleeping
Sleeping
| """Main evaluation script for running all checks.""" | |
| import asyncio | |
| import json | |
| import shutil | |
| from datetime import datetime | |
| from pathlib import Path | |
| from instructor.checks.dynamic_checks import DynamicChecker | |
| from instructor.checks.llm_checks import LLMChecker | |
| from instructor.checks.static_checks import StaticChecker | |
| from instructor.database import Database | |
| from shared.config import settings | |
| from shared.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class Evaluator: | |
| """Main evaluator for running all checks on submissions.""" | |
| def __init__(self) -> None: | |
| """Initialize evaluator.""" | |
| self.db = Database() | |
| self.llm_checker = LLMChecker() | |
| self.temp_dir = Path("./temp_evaluations") | |
| self.temp_dir.mkdir(parents=True, exist_ok=True) | |
| def get_pending_repos(self) -> list[dict]: | |
| """Get repos that need evaluation. | |
| Returns: | |
| List of repo dictionaries | |
| """ | |
| all_repos = self.db.get_repos() | |
| pending = [] | |
| for repo in all_repos: | |
| repo_dict = repo.to_dict() | |
| # Check if already evaluated | |
| existing_results = self.db.get_results( | |
| email=repo_dict["email"], task=repo_dict["task"] | |
| ) | |
| # Filter to this round | |
| round_results = [ | |
| r for r in existing_results if r.to_dict()["round"] == repo_dict["round"] | |
| ] | |
| if not round_results: | |
| pending.append(repo_dict) | |
| logger.info(f"Found {len(pending)} pending repos for evaluation") | |
| return pending | |
| async def evaluate_repo(self, repo: dict) -> None: | |
| """Evaluate a single repository. | |
| Args: | |
| repo: Repository submission data | |
| """ | |
| logger.info(f"Evaluating {repo['email']}/{repo['task']}, round {repo['round']}") | |
| # Get task details | |
| session = self.db.get_session() | |
| try: | |
| task = ( | |
| session.query(self.db.Task) | |
| .filter_by( | |
| email=repo["email"], | |
| task=repo["task"], | |
| round=repo["round"], | |
| ) | |
| .first() | |
| ) | |
| if not task: | |
| logger.error(f"No task found for {repo['task']}") | |
| return | |
| task_dict = task.to_dict() | |
| checks_list = task_dict["checks"] | |
| finally: | |
| session.close() | |
| # Clone repo | |
| clone_dir = self.temp_dir / f"{repo['task']}_{repo['round']}" | |
| static_checker = StaticChecker(repo["repo_url"], repo["commit_sha"], clone_dir) | |
| try: | |
| static_checker.clone_repo() | |
| # Run static checks | |
| logger.info("Running static checks...") | |
| static_results = await self._run_static_checks( | |
| static_checker, task_dict["timestamp"] | |
| ) | |
| # Run LLM checks | |
| logger.info("Running LLM checks...") | |
| llm_results = self._run_llm_checks(clone_dir) | |
| # Run dynamic checks | |
| logger.info("Running dynamic checks...") | |
| dynamic_results = await self._run_dynamic_checks(repo["pages_url"], checks_list) | |
| # Save all results | |
| all_results = static_results + llm_results + dynamic_results | |
| for result in all_results: | |
| self.db.add_result( | |
| { | |
| "timestamp": datetime.utcnow(), | |
| "email": repo["email"], | |
| "task": repo["task"], | |
| "round": repo["round"], | |
| "repo_url": repo["repo_url"], | |
| "commit_sha": repo["commit_sha"], | |
| "pages_url": repo["pages_url"], | |
| "check": result["check"], | |
| "score": result["score"], | |
| "reason": result["reason"], | |
| "logs": result.get("logs", ""), | |
| } | |
| ) | |
| logger.info( | |
| f"Completed evaluation for {repo['email']}/{repo['task']}: " | |
| f"{len(all_results)} checks" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error evaluating {repo['task']}: {e}", exc_info=True) | |
| # Save error result | |
| self.db.add_result( | |
| { | |
| "timestamp": datetime.utcnow(), | |
| "email": repo["email"], | |
| "task": repo["task"], | |
| "round": repo["round"], | |
| "repo_url": repo["repo_url"], | |
| "commit_sha": repo["commit_sha"], | |
| "pages_url": repo["pages_url"], | |
| "check": "Evaluation", | |
| "score": 0.0, | |
| "reason": f"Evaluation failed: {e}", | |
| "logs": str(e), | |
| } | |
| ) | |
| finally: | |
| # Cleanup clone | |
| if clone_dir.exists(): | |
| shutil.rmtree(clone_dir) | |
| async def _run_static_checks( | |
| self, checker: StaticChecker, task_timestamp: str | |
| ) -> list[dict]: | |
| """Run static checks. | |
| Args: | |
| checker: Static checker instance | |
| task_timestamp: When task was sent | |
| Returns: | |
| List of check results | |
| """ | |
| results = [] | |
| # Parse timestamp | |
| if isinstance(task_timestamp, str): | |
| task_time = datetime.fromisoformat(task_timestamp.replace("Z", "+00:00")) | |
| else: | |
| task_time = task_timestamp | |
| # Repo creation time | |
| result = checker.check_created_after_task(task_time) | |
| results.append({"check": "Repo created after task", **result}) | |
| # MIT License | |
| result = checker.check_mit_license() | |
| results.append({"check": "MIT LICENSE exists", **result}) | |
| # README exists and basic quality | |
| result = checker.check_readme() | |
| results.append({"check": "README.md basic quality", **result}) | |
| # No secrets | |
| result = checker.check_no_secrets() | |
| results.append({"check": "No secrets in code", **result}) | |
| return results | |
| def _run_llm_checks(self, code_dir: Path) -> list[dict]: | |
| """Run LLM-based checks. | |
| Args: | |
| code_dir: Directory containing code | |
| Returns: | |
| List of check results | |
| """ | |
| results = [] | |
| # README quality | |
| readme_path = code_dir / "README.md" | |
| if readme_path.exists(): | |
| result = self.llm_checker.check_readme_quality(readme_path) | |
| results.append({"check": "LLM: README.md quality", **result}) | |
| # Code quality | |
| result = self.llm_checker.check_code_quality(code_dir) | |
| results.append({"check": "LLM: Code quality", **result}) | |
| return results | |
| async def _run_dynamic_checks( | |
| self, pages_url: str, checks: list[str] | |
| ) -> list[dict]: | |
| """Run dynamic Playwright checks. | |
| Args: | |
| pages_url: GitHub Pages URL | |
| checks: List of checks to run | |
| Returns: | |
| List of check results | |
| """ | |
| dynamic_checker = DynamicChecker(pages_url, checks) | |
| results = await dynamic_checker.run_checks() | |
| return results | |
| async def run(self) -> None: | |
| """Run evaluation on all pending repos.""" | |
| logger.info("Starting evaluation process") | |
| pending_repos = self.get_pending_repos() | |
| if not pending_repos: | |
| logger.info("No pending repos to evaluate") | |
| return | |
| # Evaluate each repo | |
| for repo in pending_repos: | |
| await self.evaluate_repo(repo) | |
| logger.info("Evaluation process complete") | |
| async def main(): | |
| """Main entry point.""" | |
| evaluator = Evaluator() | |
| await evaluator.run() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |