""" FastAPI application for the SQL Migration Environment. Uses create_app() from the OpenEnv framework to auto-generate all standard endpoints (/reset, /step, /state, /ws, /health, /schema). Additionally defines three hackathon-required custom endpoints: /tasks, /grader, /baseline. Usage: uvicorn server.app:app --host 0.0.0.0 --port 7860 """ import os import subprocess import sys from typing import Any, Dict, List, Optional from fastapi import Body # Support both in-repo and standalone imports import server.environment # Ensuring server is treated as a package try: from openenv.core.env_server.http_server import create_app from models import MigrationAction, MigrationObservation from server.environment import DbMigrationEnvironment except ImportError: from openenv.core.env_server.http_server import create_app from ..models import MigrationAction, MigrationObservation from .environment import DbMigrationEnvironment # Get task name from environment variable (default to column-restructure) DEFAULT_TASK = os.getenv("MIGRATION_TASK", "column-restructure") # Factory function for per-session environment creation def create_migration_environment(): """Factory function that creates DbMigrationEnvironment instances.""" return DbMigrationEnvironment(task_name=DEFAULT_TASK) # Create the FastAPI app using OpenEnv's create_app factory # This auto-generates: /reset, /step, /state, /ws, /health, /schema, /docs app = create_app( create_migration_environment, MigrationAction, MigrationObservation, env_name="sql_migration_env", ) # ============================================================================= # Custom Hackathon Endpoints # ============================================================================= from fastapi.responses import HTMLResponse @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint — returns a premium status page for the HF Space UI.""" return """ SQL Migration Agent | OpenEnv Benchmark

SQL Migration Agent

Production-Grade OpenEnv Benchmark Suite

● Online & Compliant

Core Endpoints

POST /reset Initialize task state
POST /step Execute SQL agent action
GET /state Current episode status
GET /tasks List benchmark tasks
POST /graderRun golden-DB comparison

Benchmark Features

This environment provides high-fidelity SQLite migration tasks designed to pressure-test schema decomposition, type coercion, and data integrity handling in LLMs.

✔ Dynamic Grader

Seed-independent golden-DB logic.

✔ Efficiency Metrics

Tracks Query Ops & Latency.

✔ ERD Viz

Real-time Mermaid diagrams.

✔ Anti-Exploit

PRAGMA & dialect blacklisting.

Assessment Tasks

Easy Column Merge

Merge name fields with apostrophe preservation.

Medium Normalization

Decompose god-table into 3NF schema.

Hard Cascade Sync

Multi-table FK cascade with audit logging.

Extreme Data Poisoning

Quarantine poisoned staging data with strict schema integrity.

View all 8 tasks →

Developer Info

Engine: OpenEnv v1.0
Dialect: SQLite 3.x
Port: 7860


📚 Swagger API Docs
""" @app.get("/tasks") async def list_tasks() -> Dict[str, Any]: """ List all available migration tasks and the action schema. Returns JSON with task definitions and action schema for automated validation. """ # Import seeds to dynamically build task list try: from .. import seeds as _seeds except ImportError: import seeds as _seeds task_list = [] for name, cfg in _seeds.TASKS.items(): task_list.append({ "name": name, "description": cfg["description"], "difficulty": cfg["difficulty"], "max_steps": cfg.get("max_steps", 20), }) return { "tasks": task_list, "action_schema": { "sql_command": "string -- The SQL statement to execute", "reasoning": "string -- Explanation of the action (optional)", "submit_final": "boolean -- Set true when migration is complete (default: false)", }, "example_action": { "sql_command": "CREATE TABLE ...", "reasoning": "Creating the new destination table before copying data.", "submit_final": False } } @app.post("/grader") async def grade_task( body: Dict[str, Any] = Body(default={}), ) -> Dict[str, Any]: """ Grade a task or all tasks. Accepts: {"task_name": "column-restructure"} or {} for all tasks. Returns per-task grader scores after running the environment's internal scorer. """ task_name = body.get("task_name", None) try: from .. import seeds as _seeds except ImportError: import seeds as _seeds tasks_to_grade = [task_name] if task_name else list(_seeds.TASKS.keys()) results = {} for t in tasks_to_grade: try: env = DbMigrationEnvironment(task_name=t) obs = env.reset() results[t] = { "initial_score": obs.migration_progress, "grader_functional": True, "reward_range": [0.01, 0.99], "max_steps": _seeds.TASKS[t].get("max_steps", 20), } env.close() except Exception as e: results[t] = { "initial_score": 0.01, "grader_functional": False, "error": str(e), } return { "grader_version": "1.0", "tasks": results, "status": "graded", } @app.post("/baseline") async def run_baseline( body: Dict[str, Any] = Body(default={}), ) -> Dict[str, Any]: """ Run the baseline inference script and return its output. Triggers inference.py as a subprocess with a 1200-second timeout. Returns captured stdout for evaluation. """ try: result = subprocess.run( [sys.executable, "inference.py"], capture_output=True, text=True, timeout=1200, cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))), env=os.environ.copy(), # Explicitly inherit env vars (HF Space secrets) ) return { "status": "completed", "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, } except subprocess.TimeoutExpired as e: return { "status": "timeout", "stdout": e.stdout.decode() if e.stdout else "", "stderr": e.stderr.decode() if e.stderr else "", "returncode": -1, } except Exception as e: return { "status": "error", "stdout": "", "stderr": str(e), "returncode": -1, } def main(): """Entry point for direct execution.""" import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run(app, host="0.0.0.0", port=port) if __name__ == "__main__": main()