temp / instructor /evaluate.py
CheeksTheGeek's picture
Initial commit: LLM Code Deployment System
c5292d8 unverified
"""Main evaluation script for running all checks."""
import asyncio
import json
import shutil
from datetime import datetime
from pathlib import Path
from instructor.checks.dynamic_checks import DynamicChecker
from instructor.checks.llm_checks import LLMChecker
from instructor.checks.static_checks import StaticChecker
from instructor.database import Database
from shared.config import settings
from shared.logger import setup_logger
logger = setup_logger(__name__)
class Evaluator:
"""Main evaluator for running all checks on submissions."""
def __init__(self) -> None:
"""Initialize evaluator."""
self.db = Database()
self.llm_checker = LLMChecker()
self.temp_dir = Path("./temp_evaluations")
self.temp_dir.mkdir(parents=True, exist_ok=True)
def get_pending_repos(self) -> list[dict]:
"""Get repos that need evaluation.
Returns:
List of repo dictionaries
"""
all_repos = self.db.get_repos()
pending = []
for repo in all_repos:
repo_dict = repo.to_dict()
# Check if already evaluated
existing_results = self.db.get_results(
email=repo_dict["email"], task=repo_dict["task"]
)
# Filter to this round
round_results = [
r for r in existing_results if r.to_dict()["round"] == repo_dict["round"]
]
if not round_results:
pending.append(repo_dict)
logger.info(f"Found {len(pending)} pending repos for evaluation")
return pending
async def evaluate_repo(self, repo: dict) -> None:
"""Evaluate a single repository.
Args:
repo: Repository submission data
"""
logger.info(f"Evaluating {repo['email']}/{repo['task']}, round {repo['round']}")
# Get task details
session = self.db.get_session()
try:
task = (
session.query(self.db.Task)
.filter_by(
email=repo["email"],
task=repo["task"],
round=repo["round"],
)
.first()
)
if not task:
logger.error(f"No task found for {repo['task']}")
return
task_dict = task.to_dict()
checks_list = task_dict["checks"]
finally:
session.close()
# Clone repo
clone_dir = self.temp_dir / f"{repo['task']}_{repo['round']}"
static_checker = StaticChecker(repo["repo_url"], repo["commit_sha"], clone_dir)
try:
static_checker.clone_repo()
# Run static checks
logger.info("Running static checks...")
static_results = await self._run_static_checks(
static_checker, task_dict["timestamp"]
)
# Run LLM checks
logger.info("Running LLM checks...")
llm_results = self._run_llm_checks(clone_dir)
# Run dynamic checks
logger.info("Running dynamic checks...")
dynamic_results = await self._run_dynamic_checks(repo["pages_url"], checks_list)
# Save all results
all_results = static_results + llm_results + dynamic_results
for result in all_results:
self.db.add_result(
{
"timestamp": datetime.utcnow(),
"email": repo["email"],
"task": repo["task"],
"round": repo["round"],
"repo_url": repo["repo_url"],
"commit_sha": repo["commit_sha"],
"pages_url": repo["pages_url"],
"check": result["check"],
"score": result["score"],
"reason": result["reason"],
"logs": result.get("logs", ""),
}
)
logger.info(
f"Completed evaluation for {repo['email']}/{repo['task']}: "
f"{len(all_results)} checks"
)
except Exception as e:
logger.error(f"Error evaluating {repo['task']}: {e}", exc_info=True)
# Save error result
self.db.add_result(
{
"timestamp": datetime.utcnow(),
"email": repo["email"],
"task": repo["task"],
"round": repo["round"],
"repo_url": repo["repo_url"],
"commit_sha": repo["commit_sha"],
"pages_url": repo["pages_url"],
"check": "Evaluation",
"score": 0.0,
"reason": f"Evaluation failed: {e}",
"logs": str(e),
}
)
finally:
# Cleanup clone
if clone_dir.exists():
shutil.rmtree(clone_dir)
async def _run_static_checks(
self, checker: StaticChecker, task_timestamp: str
) -> list[dict]:
"""Run static checks.
Args:
checker: Static checker instance
task_timestamp: When task was sent
Returns:
List of check results
"""
results = []
# Parse timestamp
if isinstance(task_timestamp, str):
task_time = datetime.fromisoformat(task_timestamp.replace("Z", "+00:00"))
else:
task_time = task_timestamp
# Repo creation time
result = checker.check_created_after_task(task_time)
results.append({"check": "Repo created after task", **result})
# MIT License
result = checker.check_mit_license()
results.append({"check": "MIT LICENSE exists", **result})
# README exists and basic quality
result = checker.check_readme()
results.append({"check": "README.md basic quality", **result})
# No secrets
result = checker.check_no_secrets()
results.append({"check": "No secrets in code", **result})
return results
def _run_llm_checks(self, code_dir: Path) -> list[dict]:
"""Run LLM-based checks.
Args:
code_dir: Directory containing code
Returns:
List of check results
"""
results = []
# README quality
readme_path = code_dir / "README.md"
if readme_path.exists():
result = self.llm_checker.check_readme_quality(readme_path)
results.append({"check": "LLM: README.md quality", **result})
# Code quality
result = self.llm_checker.check_code_quality(code_dir)
results.append({"check": "LLM: Code quality", **result})
return results
async def _run_dynamic_checks(
self, pages_url: str, checks: list[str]
) -> list[dict]:
"""Run dynamic Playwright checks.
Args:
pages_url: GitHub Pages URL
checks: List of checks to run
Returns:
List of check results
"""
dynamic_checker = DynamicChecker(pages_url, checks)
results = await dynamic_checker.run_checks()
return results
async def run(self) -> None:
"""Run evaluation on all pending repos."""
logger.info("Starting evaluation process")
pending_repos = self.get_pending_repos()
if not pending_repos:
logger.info("No pending repos to evaluate")
return
# Evaluate each repo
for repo in pending_repos:
await self.evaluate_repo(repo)
logger.info("Evaluation process complete")
async def main():
"""Main entry point."""
evaluator = Evaluator()
await evaluator.run()
if __name__ == "__main__":
asyncio.run(main())