"""Providers for launching ASGI applications via ``uv run``.""" from __future__ import annotations import os import socket import subprocess import time from typing import Dict, Optional import requests from .providers import RuntimeProvider def _check_uv_installed() -> None: try: subprocess.check_output(["uv", "--version"]) except FileNotFoundError as exc: raise RuntimeError( "`uv` executable not found. Install uv from https://docs.astral.sh and ensure it is on PATH." ) from exc def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("", 0)) sock.listen(1) return sock.getsockname()[1] def _create_uv_command( *, host: str, port: int, reload: bool, workers: int, app: str, project_path: str, ) -> list[str]: command: list[str] = ["uv", "run", "--isolated", "--project", project_path] command.append("--") command.extend( [ "uvicorn", app, "--host", host, "--port", str(port), "--workers", str(workers), ] ) if reload: command.append("--reload") return command def _poll_health(health_url: str, timeout_s: float) -> None: """Poll a health endpoint until it returns HTTP 200 or times out.""" deadline = time.time() + timeout_s while time.time() < deadline: try: timeout = max(0.0001, min(deadline - time.time(), 2.0)) response = requests.get(health_url, timeout=timeout) if response.status_code == 200: return except requests.RequestException: continue time.sleep(0.5) raise TimeoutError(f"Server did not become ready within {timeout_s:.1f} seconds") class UVProvider(RuntimeProvider): """ RuntimeProvider implementation backed by ``uv run``. Args: project_path: Local path to a uv project (passed to ``uv run --project``) app: ASGI application path for uvicorn (defaults to ``server.app:app``) host: Host interface to bind to (defaults to ``0.0.0.0``) reload: Whether to enable uvicorn's reload mode env_vars: Environment variables to pass through to the spawned process context_timeout_s: How long to wait for the environment to become ready Example: >>> provider = UVProvider(project_path="/path/to/env") >>> base_url = provider.start() >>> print(base_url) # http://localhost:8000 >>> # Use the environment via base_url >>> provider.stop() """ def __init__( self, *, project_path: str, app: str = "server.app:app", host: str = "0.0.0.0", reload: bool = False, env_vars: Optional[Dict[str, str]] = None, context_timeout_s: float = 60.0, ): """Initialize the UVProvider.""" self.project_path = os.path.abspath(project_path) self.app = app self.host = host self.reload = reload self.env_vars = env_vars self.context_timeout_s = context_timeout_s _check_uv_installed() self._process = None self._base_url = None def start( self, port: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, workers: int = 1, **_: Dict[str, str], ) -> str: """ Start the environment via `uv run`. Args: port: The port to bind the environment to env_vars: Environment variables to pass to the environment workers: The number of workers to use Returns: The base URL of the environment Raises: RuntimeError: If the environment is already running """ if self._process is not None and self._process.poll() is None: raise RuntimeError("UVProvider is already running") bind_port = port or _find_free_port() command = _create_uv_command( host=self.host, port=bind_port, reload=self.reload, workers=workers, app=self.app, project_path=self.project_path, ) env = os.environ.copy() if self.env_vars: env.update(self.env_vars) if env_vars: env.update(env_vars) try: self._process = subprocess.Popen(command, env=env) except OSError as exc: raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc client_host = "127.0.0.1" if self.host in {"0.0.0.0", "::"} else self.host self._base_url = f"http://{client_host}:{bind_port}" return self._base_url def wait_for_ready(self, timeout_s: float = 60.0) -> None: """ Wait for the environment to become ready. Args: timeout_s: The timeout to wait for the environment to become ready Raises: RuntimeError: If the environment is not running TimeoutError: If the environment does not become ready within the timeout """ if self._process and self._process.poll() is not None: code = self._process.returncode raise RuntimeError(f"uv process exited prematurely with code {code}") _poll_health(f"{self._base_url}/health", timeout_s=timeout_s) def stop(self) -> None: """ Stop the environment. Raises: RuntimeError: If the environment is not running """ if self._process is None: return if self._process.poll() is None: self._process.terminate() try: self._process.wait(timeout=10.0) except subprocess.TimeoutExpired: self._process.kill() self._process.wait(timeout=5.0) self._process = None self._base_url = None @property def base_url(self) -> str: """ The base URL of the environment. Returns: The base URL of the environment Raises: RuntimeError: If the environment is not running """ if self._base_url is None: raise RuntimeError("UVProvider has not been started") return self._base_url