""" Gradio application for the Physics-Informed Bayesian Optimization Platform. Provides an interactive UI for: 1. Defining parameter spaces 2. Specifying physics models (Python code) 3. Uploading initial experimental data 4. Running BO campaigns 5. Visualizing results """ import io import json import traceback from typing import Optional import gradio as gr import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np import pandas as pd import torch from torch import Tensor # --------------------------------------------------------------------------- # Utility: safely compile user-supplied physics model code # --------------------------------------------------------------------------- BUILTIN_PHYSICS = { "Arrhenius Kinetics": { "code": """\ def physics_model(X): \"\"\"Arrhenius kinetics: rate = A * exp(-Ea / (R*T)) * C^n\"\"\" T = X[:, 0] # temperature (K) C = X[:, 1] # concentration A = 1e8 # pre-exponential factor Ea = 50.0 # activation energy (kJ/mol) R = 8.314e-3 # gas constant (kJ/mol·K) n = 0.5 # reaction order return A * torch.exp(-Ea / (R * T)) * C ** n """, "params": "temperature (K): 300-800\nconcentration: 0.1-10", }, "Flory-Huggins Mixing": { "code": """\ def physics_model(X): \"\"\"Flory-Huggins free energy of mixing for binary polymer blend.\"\"\" phi = X[:, 0] # volume fraction (0-1) chi = X[:, 1] # Flory-Huggins parameter N = 100.0 # degree of polymerisation entropy = phi * torch.log(phi + 1e-8) / N + (1 - phi) * torch.log(1 - phi + 1e-8) / N enthalpy = chi * phi * (1 - phi) return -(entropy + enthalpy) # negative ΔG_mix (higher = better mixing) """, "params": "volume_fraction: 0.05-0.95\nchi_parameter: 0.0-2.0", }, "Polymer Recyclability": { "code": """\ def physics_model(X): \"\"\"Simplified recyclability metric for polymer formulation.\"\"\" ratio = X[:, 0] # monomer ratio temp = X[:, 1] # temperature (K) catalyst = X[:, 2] # catalyst loading (wt%) mixing = -ratio * torch.log(ratio + 1e-8) - (1 - ratio) * torch.log(1 - ratio + 1e-8) chi = 0.5 - 0.3 * (ratio - 0.5) ** 2 mixing_fe = mixing - chi * ratio * (1 - ratio) rate = torch.exp(-50.0 / (8.314e-3 * temp)) cat_eff = 1 - torch.exp(-0.8 * catalyst) return 5.0 * mixing_fe * rate * cat_eff + 2.0 """, "params": "monomer_ratio: 0.1-0.9\ntemperature (K): 350-500\ncatalyst_loading (wt%): 0.5-5.0", }, "Custom (enter code below)": {"code": "", "params": ""}, } DEMO_CSV = """\ temperature,concentration,yield 350,1.0,0.12 400,3.0,0.45 450,5.0,0.78 500,2.0,0.55 480,7.0,0.91 """ # --------------------------------------------------------------------------- # Compile physics model from code string # --------------------------------------------------------------------------- def _compile_physics_fn(code: str): """Safely compile user-provided physics model code. The code must define a function called `physics_model(X)`. """ allowed_globals = {"torch": torch, "np": np, "Tensor": Tensor, "__builtins__": {}} # Add safe builtins import builtins safe_builtins = { k: getattr(builtins, k) for k in ("range", "len", "float", "int", "abs", "max", "min", "print", "list", "tuple", "dict", "True", "False", "None") } allowed_globals["__builtins__"] = safe_builtins local_ns = {} exec(code, allowed_globals, local_ns) # noqa: S102 if "physics_model" not in local_ns: raise ValueError("Code must define a function called `physics_model(X)`.") return local_ns["physics_model"] # --------------------------------------------------------------------------- # Parse parameter space from multiline text # --------------------------------------------------------------------------- def _parse_params(text: str): """Parse parameter definitions from multiline text. Format per line: name: lower-upper Example: temperature (K): 300-800 """ from physics_informed_bo.experiment.parameter_space import ParameterSpace space = ParameterSpace() names = [] for line in text.strip().splitlines(): line = line.strip() if not line: continue name_part, bounds_part = line.rsplit(":", 1) name = name_part.strip() lo, hi = bounds_part.strip().split("-") space.add_continuous(name, float(lo), float(hi)) names.append(name) return space, names # --------------------------------------------------------------------------- # Core optimisation routine # --------------------------------------------------------------------------- def run_optimization( physics_template: str, physics_code: str, param_text: str, csv_file, csv_text: str, objective_col: str, acq_fn: str, n_initial: int, n_iterations: int, batch_size: int, noise_var: float, maximize: bool, seed: int, ): """Run the full physics-informed BO campaign and return results.""" try: torch.manual_seed(seed) # ── 1. Physics model ────────────────────────────────────────────── code = physics_code.strip() if physics_template != "Custom (enter code below)" and not code: code = BUILTIN_PHYSICS[physics_template]["code"] physics_fn = _compile_physics_fn(code) if code else None # ── 2. Parameter space ──────────────────────────────────────────── if not param_text.strip(): if physics_template != "Custom (enter code below)": param_text = BUILTIN_PHYSICS[physics_template]["params"] space, param_names = _parse_params(param_text) # ── 3. Initial data ────────────────────────────────────────────── X_init, y_init = None, None df_init = None if csv_file is not None: df_init = pd.read_csv(csv_file.name) elif csv_text.strip(): df_init = pd.read_csv(io.StringIO(csv_text.strip())) if df_init is not None: obj = objective_col.strip() or df_init.columns[-1] feature_cols = [c for c in df_init.columns if c != obj] # Match feature columns to param names if set(feature_cols) != set(param_names): # Try to align by order feature_cols = [c for c in df_init.columns if c != obj][:len(param_names)] X_init = torch.tensor(df_init[feature_cols].values, dtype=torch.float64) y_init = torch.tensor(df_init[obj].values, dtype=torch.float64).unsqueeze(-1) # ── 4. Configuration ───────────────────────────────────────────── from physics_informed_bo.config import OptimizationConfig, AcquisitionType acq_map = { "Expected Improvement (EI)": AcquisitionType.EXPECTED_IMPROVEMENT, "Upper Confidence Bound (UCB)": AcquisitionType.UPPER_CONFIDENCE_BOUND, "Probability of Improvement (PI)": AcquisitionType.PROBABILITY_OF_IMPROVEMENT, "Physics-Informed EI": AcquisitionType.PHYSICS_INFORMED_EI, } config = OptimizationConfig( acquisition_type=acq_map.get(acq_fn, AcquisitionType.EXPECTED_IMPROVEMENT), n_initial_samples=n_initial, max_iterations=n_iterations, batch_size=batch_size, noise_variance=noise_var, seed=seed, ) # ── 5. Build campaign ──────────────────────────────────────────── from physics_informed_bo.experiment.campaign import OptimizationCampaign initial_data = (X_init, y_init) if X_init is not None else None campaign = OptimizationCampaign( name="hf_space_campaign", parameter_space=space, physics_fn=physics_fn, initial_data=initial_data, config=config, maximize=maximize, ) # ── 6. Synthetic objective (demo) ───────────────────────────────── # When there is a physics model we simulate experiments as # physics + discrepancy + noise so the user sees the BO loop in action. def synthetic_objective(params: dict) -> float: vals = [params[n] for n in param_names] X = torch.tensor([vals], dtype=torch.float64) if physics_fn is not None: base = physics_fn(X).item() else: base = 0.0 discrepancy = 0.15 * np.sin(3.0 * sum(vals)) noise = noise_var**0.5 * np.random.randn() return base + discrepancy + noise # ── 7. Run BO loop ──────────────────────────────────────────────── log_lines = [] best_vals = [] for it in range(n_iterations): suggestions = campaign.suggest_next(batch_size) for params in suggestions: obj_val = synthetic_objective(params) campaign.report_result(params, obj_val) best = campaign.get_best() if maximize else campaign.get_best() best_vals.append(best["objective"]) log_lines.append( f"Iter {it + 1:3d} | suggested {len(suggestions)} exp(s) | " f"best so far = {best['objective']:.4f}" ) # ── 8. Results ──────────────────────────────────────────────────── results_df = campaign.to_dataframe() best = campaign.get_best() summary = campaign.summary() # ── Convergence plot ────────────────────────────────────────────── fig_conv, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4.5)) objs = results_df["objective"].values ax1.plot(objs, "o-", markersize=3, alpha=0.7) ax1.set_xlabel("Experiment #") ax1.set_ylabel("Objective") ax1.set_title("All Observations") ax1.grid(True, alpha=0.3) if maximize: bsf = np.maximum.accumulate(objs) else: bsf = np.minimum.accumulate(objs) ax2.plot(bsf, "s-", color="green", markersize=3) ax2.set_xlabel("Experiment #") ax2.set_ylabel("Best Objective") ax2.set_title("Convergence (Best So Far)") ax2.grid(True, alpha=0.3) fig_conv.tight_layout() # ── Parameter exploration heatmap ──────────────────────────────── fig_params = None if len(param_names) >= 2: fig_params, ax = plt.subplots(figsize=(7, 5)) sc = ax.scatter( results_df[param_names[0]], results_df[param_names[1]], c=results_df["objective"], cmap="viridis", s=30, edgecolors="k", linewidths=0.5, ) plt.colorbar(sc, ax=ax, label="Objective") ax.set_xlabel(param_names[0]) ax.set_ylabel(param_names[1]) ax.set_title("Parameter Exploration") fig_params.tight_layout() # ── Surrogate 1-D slice ────────────────────────────────────────── fig_surrogate = None if physics_fn is not None and campaign._designer._surrogate is not None: try: surrogate = campaign._designer._surrogate bounds = space.bounds n_grid = 150 # Slice through first parameter, others at midpoint mid = (bounds[0] + bounds[1]) / 2 x_range = torch.linspace(float(bounds[0, 0]), float(bounds[1, 0]), n_grid, dtype=torch.float64) X_grid = mid.unsqueeze(0).repeat(n_grid, 1) X_grid[:, 0] = x_range mean, var = surrogate.predict(X_grid) std = var.sqrt() fig_surrogate, ax = plt.subplots(figsize=(8, 5)) x_np = x_range.numpy() m_np = mean.squeeze().detach().numpy() s_np = std.squeeze().detach().numpy() ax.plot(x_np, m_np, "b-", lw=2, label="Surrogate mean") ax.fill_between(x_np, m_np - 2 * s_np, m_np + 2 * s_np, alpha=0.2, color="blue", label="95% CI") # Physics model line phys_np = physics_fn(X_grid).detach().numpy() ax.plot(x_np, phys_np, "r--", lw=1.5, label="Physics model") # Observed data projected onto this slice if X_init is not None: ax.scatter(X_init[:, 0].numpy(), y_init.squeeze().numpy(), c="red", s=40, zorder=5, edgecolors="k", label="Initial data") ax.set_xlabel(param_names[0]) ax.set_ylabel("Objective") ax.set_title(f"Surrogate vs Physics (slice along {param_names[0]})") ax.legend() ax.grid(True, alpha=0.3) fig_surrogate.tight_layout() except Exception: fig_surrogate = None # ── Format outputs ──────────────────────────────────────────────── log_text = "\n".join(log_lines) best_text = ( f"**Best objective: {best['objective']:.4f}**\n\n" f"Parameters:\n" + "\n".join(f" - **{k}**: {v:.4f}" for k, v in best["parameters"].items()) ) summary_text = json.dumps(summary, indent=2, default=str) return ( log_text, best_text, fig_conv, fig_params, fig_surrogate, results_df.round(4).to_string(index=False), summary_text, ) except Exception as exc: tb = traceback.format_exc() err = f"**Error:** {exc}\n\n```\n{tb}\n```" return err, err, None, None, None, "", "" # --------------------------------------------------------------------------- # Gradio interface # --------------------------------------------------------------------------- def on_template_change(template_name): """Populate code and params when a built-in template is selected.""" info = BUILTIN_PHYSICS.get(template_name, {"code": "", "params": ""}) return info["code"], info["params"] def build_app() -> gr.Blocks: with gr.Blocks( title="Physics-Informed Bayesian Optimization", theme=gr.themes.Soft(), ) as app: gr.Markdown( """ # ⚗️ Physics-Informed Bayesian Optimization Platform Design experiments efficiently by combining **physics models** with **Gaussian Process surrogates**. The physics model acts as a structured prior (GP mean function), and the GP learns the residual — dramatically reducing the number of experiments needed. **Backends:** BoTorch · GPyTorch · AX · BoFire """ ) with gr.Tabs(): # ── Tab 1: Setup ────────────────────────────────────────────── with gr.TabItem("1 · Setup"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Physics Model") physics_template = gr.Dropdown( choices=list(BUILTIN_PHYSICS.keys()), value="Arrhenius Kinetics", label="Built-in template", ) physics_code = gr.Code( value=BUILTIN_PHYSICS["Arrhenius Kinetics"]["code"], language="python", label="Physics model code (must define `physics_model(X)`)", lines=14, ) with gr.Column(scale=1): gr.Markdown("### Parameter Space") param_text = gr.Textbox( value=BUILTIN_PHYSICS["Arrhenius Kinetics"]["params"], label="Parameters (name: lower-upper, one per line)", lines=6, ) gr.Markdown("### Initial Data (optional)") csv_file = gr.File(label="Upload CSV", file_types=[".csv"]) csv_text = gr.Textbox( value="", label="… or paste CSV text", lines=5, placeholder=DEMO_CSV, ) objective_col = gr.Textbox( value="", label="Objective column name (leave blank → last column)", ) physics_template.change( on_template_change, inputs=[physics_template], outputs=[physics_code, param_text], ) # ── Tab 2: Configure ────────────────────────────────────────── with gr.TabItem("2 · Configure"): with gr.Row(): acq_fn = gr.Dropdown( choices=[ "Expected Improvement (EI)", "Upper Confidence Bound (UCB)", "Probability of Improvement (PI)", "Physics-Informed EI", ], value="Expected Improvement (EI)", label="Acquisition Function", ) maximize = gr.Checkbox(value=True, label="Maximize objective") with gr.Row(): n_initial = gr.Slider(3, 30, value=5, step=1, label="Initial samples (if no CSV)") n_iterations = gr.Slider(5, 100, value=20, step=1, label="BO iterations") batch_size = gr.Slider(1, 5, value=1, step=1, label="Batch size") with gr.Row(): noise_var = gr.Slider(0.001, 1.0, value=0.01, step=0.001, label="Noise variance") seed = gr.Number(value=42, label="Random seed", precision=0) # ── Tab 3: Run & Results ────────────────────────────────────── with gr.TabItem("3 · Run & Results"): run_btn = gr.Button("🚀 Run Optimization", variant="primary", size="lg") with gr.Row(): best_md = gr.Markdown(label="Best Result") with gr.Row(): convergence_plot = gr.Plot(label="Convergence") params_plot = gr.Plot(label="Parameter Exploration") with gr.Row(): surrogate_plot = gr.Plot(label="Surrogate vs Physics") with gr.Accordion("Optimization log", open=False): log_box = gr.Textbox(label="Log", lines=15, interactive=False) with gr.Accordion("Full results table", open=False): results_box = gr.Textbox(label="Results", lines=12, interactive=False) with gr.Accordion("Campaign summary (JSON)", open=False): summary_box = gr.Textbox(label="Summary", lines=10, interactive=False) run_btn.click( run_optimization, inputs=[ physics_template, physics_code, param_text, csv_file, csv_text, objective_col, acq_fn, n_initial, n_iterations, batch_size, noise_var, maximize, seed, ], outputs=[ log_box, best_md, convergence_plot, params_plot, surrogate_plot, results_box, summary_box, ], ) # ── Tab 4: About ────────────────────────────────────────────── with gr.TabItem("About"): gr.Markdown( """ ## How it works Traditional Bayesian optimisation uses a GP with a flat (constant) mean. This platform **replaces the mean with a physics model**: $$f(x) = \\phi(x) + \\varepsilon(x)$$ where $\\phi(x)$ is the physics model and $\\varepsilon(x) \\sim \\mathcal{GP}(0,\\, k(x,x'))$ captures the residual (model discrepancy + noise). ### Benefits - **Sample efficiency** — physics captures the trend; the GP only learns small deviations. - **Extrapolation** — physics provides reasonable predictions outside observed data. - **Constraint awareness** — physical constraints steer the search toward feasible regions. - **Graceful degradation** — works physics-only (no data), hybrid, or pure GP. ### Surrogate mode selection | Data | Physics model | Mode | |------|--------------|------| | None | ✓ | `physics_only` | | < 20 | ✓ | `physics_as_mean` | | 20-50 | ✓ | `weighted_ensemble` | | Any | ✗ | `gp_only` | ### Stack **PyTorch** · **GPyTorch** · **BoTorch** · AX Platform · BoFire --- *Built by Plinity — infinite recyclable polymers* """ ) return app # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- app = build_app() if __name__ == "__main__": app.launch()