"""Student API endpoint for receiving build/update requests.""" import asyncio from datetime import datetime from pathlib import Path from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from shared.config import settings from shared.logger import setup_logger from shared.models import RepoSubmission, TaskRequest from student.code_generator import CodeGenerator from student.github_manager import GitHubManager from student.notification_client import NotificationClient logger = setup_logger(__name__) app = FastAPI( title="LLM Code Deployment - Student API", description="API endpoint for receiving build and update requests", version="1.0.0", ) # Track ongoing tasks active_tasks: dict[str, dict] = {} # Lazy initialization of services _code_generator = None _github_manager = None _notification_client = None def get_code_generator(): """Lazy initialization of code generator.""" global _code_generator if _code_generator is None: _code_generator = CodeGenerator() return _code_generator def get_github_manager(): """Lazy initialization of GitHub manager.""" global _github_manager if _github_manager is None: _github_manager = GitHubManager() return _github_manager def get_notification_client(): """Lazy initialization of notification client.""" global _notification_client if _notification_client is None: _notification_client = NotificationClient() return _notification_client @app.post("/api/build") async def build_app(request: TaskRequest, background_tasks: BackgroundTasks) -> JSONResponse: """Receive build request and process in background. Args: request: Task request with app brief and requirements background_tasks: FastAPI background tasks Returns: HTTP 200 JSON response """ logger.info(f"Received build request for task {request.task}, round {request.round}") # Verify secret if request.secret != settings.student_secret: logger.error(f"Invalid secret for task {request.task}") raise HTTPException(status_code=401, detail="Invalid secret") # Verify email if request.email != settings.student_email: logger.error(f"Email mismatch: expected {settings.student_email}, got {request.email}") raise HTTPException(status_code=401, detail="Email mismatch") # Check if task is already in progress task_key = f"{request.task}-{request.round}" if task_key in active_tasks: logger.warning(f"Task {task_key} already in progress") return JSONResponse( status_code=200, content={ "status": "in_progress", "message": f"Task {task_key} is already being processed", }, ) # Mark task as active active_tasks[task_key] = { "status": "accepted", "started_at": datetime.utcnow().isoformat(), } # Process in background if request.round == 1: background_tasks.add_task(process_build_request, request) else: background_tasks.add_task(process_update_request, request) logger.info(f"Accepted task {task_key} for background processing") return JSONResponse( status_code=200, content={ "status": "accepted", "message": f"Task {task_key} accepted for processing", "task": request.task, "round": request.round, }, ) async def process_build_request(request: TaskRequest) -> None: """Process build request in background. Args: request: Task request """ task_key = f"{request.task}-{request.round}" try: logger.info(f"Processing build request for {task_key}") # Create output directory output_dir = settings.generated_repos_dir / request.task output_dir.mkdir(parents=True, exist_ok=True) # Generate code logger.info(f"Generating code for {task_key}") active_tasks[task_key]["status"] = "generating" get_code_generator().generate_app(request, output_dir) # Create repo and deploy logger.info(f"Deploying to GitHub for {task_key}") active_tasks[task_key]["status"] = "deploying" repo_url, commit_sha, pages_url = get_github_manager().create_and_deploy( request.task, output_dir ) # Prepare submission submission = RepoSubmission( email=request.email, task=request.task, round=request.round, nonce=request.nonce, repo_url=repo_url, commit_sha=commit_sha, pages_url=pages_url, ) # Notify evaluation endpoint logger.info(f"Notifying evaluation endpoint for {task_key}") active_tasks[task_key]["status"] = "notifying" success = await get_notification_client().notify_with_timeout( request.evaluation_url, submission, timeout_minutes=settings.task_timeout_minutes, ) if success: logger.info(f"Successfully completed task {task_key}") active_tasks[task_key]["status"] = "completed" active_tasks[task_key]["completed_at"] = datetime.utcnow().isoformat() else: logger.error(f"Failed to notify evaluation endpoint for {task_key}") active_tasks[task_key]["status"] = "notification_failed" except Exception as e: logger.error(f"Error processing build request {task_key}: {e}", exc_info=True) active_tasks[task_key]["status"] = "failed" active_tasks[task_key]["error"] = str(e) async def process_update_request(request: TaskRequest) -> None: """Process update request in background. Args: request: Task request """ task_key = f"{request.task}-{request.round}" try: logger.info(f"Processing update request for {task_key}") # Use existing output directory output_dir = settings.generated_repos_dir / request.task if not output_dir.exists(): # If directory doesn't exist, treat as new build logger.warning(f"No existing code for {request.task}, creating new") output_dir.mkdir(parents=True, exist_ok=True) get_code_generator().generate_app(request, output_dir) else: # Generate updated code logger.info(f"Updating code for {task_key}") active_tasks[task_key]["status"] = "generating" get_code_generator().generate_app(request, output_dir) # Update repo and redeploy logger.info(f"Redeploying to GitHub for {task_key}") active_tasks[task_key]["status"] = "deploying" from shared.utils import sanitize_repo_name repo_name = sanitize_repo_name(request.task) try: repo_url, commit_sha = get_github_manager().update_and_redeploy(repo_name, output_dir) pages_url = f"https://{settings.github_username}.github.io/{repo_name}/" except Exception as e: logger.warning(f"Update failed, creating new repo: {e}") repo_url, commit_sha, pages_url = get_github_manager().create_and_deploy( request.task, output_dir ) # Prepare submission submission = RepoSubmission( email=request.email, task=request.task, round=request.round, nonce=request.nonce, repo_url=repo_url, commit_sha=commit_sha, pages_url=pages_url, ) # Notify evaluation endpoint logger.info(f"Notifying evaluation endpoint for {task_key}") active_tasks[task_key]["status"] = "notifying" success = await get_notification_client().notify_with_timeout( request.evaluation_url, submission, timeout_minutes=settings.task_timeout_minutes, ) if success: logger.info(f"Successfully completed task {task_key}") active_tasks[task_key]["status"] = "completed" active_tasks[task_key]["completed_at"] = datetime.utcnow().isoformat() else: logger.error(f"Failed to notify evaluation endpoint for {task_key}") active_tasks[task_key]["status"] = "notification_failed" except Exception as e: logger.error(f"Error processing update request {task_key}: {e}", exc_info=True) active_tasks[task_key]["status"] = "failed" active_tasks[task_key]["error"] = str(e) @app.get("/api/status/{task_id}") async def get_task_status(task_id: str) -> JSONResponse: """Get status of a task. Args: task_id: Task identifier Returns: Task status information """ matching_tasks = {k: v for k, v in active_tasks.items() if k.startswith(task_id)} if not matching_tasks: raise HTTPException(status_code=404, detail="Task not found") return JSONResponse(content=matching_tasks) @app.get("/health") async def health_check() -> JSONResponse: """Health check endpoint. Returns: Health status """ return JSONResponse( content={ "status": "healthy", "active_tasks": len(active_tasks), "timestamp": datetime.utcnow().isoformat(), } ) if __name__ == "__main__": import uvicorn # Ensure directories exist settings.ensure_directories() logger.info(f"Starting Student API on port {settings.student_api_port}") uvicorn.run( "student.api:app", host="0.0.0.0", port=settings.student_api_port, reload=True, )