live / server.py
github-actions[bot]
deploy: sync from GitHub 2026-05-04T13:40:43Z
817f4f7
"""
server.py
Runs a simulation between AI datacenter workloads and an electrical grid (IEEE 13-bus OpenDSS model).
Uses GPU power traces and workloads to model howAI inference/training affects grid voltage and stability over time.
"""
from __future__ import annotations
from fractions import Fraction
from pathlib import Path
import subprocess, tempfile, os, uvicorn, threading, math, json, hashlib
import pandas as pd
from fastapi import FastAPI, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from openg2g.coordinator import Coordinator
from openg2g.datacenter.config import (
DatacenterConfig, InferenceModelSpec,
PowerAugmentationConfig, InferenceRamp, TrainingRun,
)
from openg2g.datacenter.offline import OfflineDatacenter, OfflineWorkload
from openg2g.datacenter.workloads.inference import InferenceData, MLEnergySource
from openg2g.datacenter.workloads.training import TrainingTrace, TrainingTraceParams
from openg2g.grid.opendss import OpenDSSGrid
from openg2g.grid.config import TapPosition
from openg2g.controller.tap_schedule import TapScheduleController
from openg2g.metrics.voltage import compute_allbus_voltage_stats
import asyncio, uuid, time
from concurrent.futures import ProcessPoolExecutor
import sqlite3, json
conn = sqlite3.connect("jobs.db", check_same_thread=False, timeout=30)
conn.execute("PRAGMA journal_mode=WAL;")
# create table to track background simulation jobs
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
status TEXT,
result TEXT,
error TEXT
)
""")
conn.commit()
#currently set to 2 for free tier at hf
_pool = ProcessPoolExecutor(max_workers=2)
_jobs: dict = {}
_start_time = time.time()
DSS_DIR = Path(__file__).parent / "examples/ieee13"
DSS_MASTER = "IEEE13Nodeckt.dss"
CONFIG_PATH = Path(__file__).parent / "examples/offline/config.json"
# Maps IEEE 13-bus indices to OpenDSS bus names
BUS_INDEX_TO_NAME = {
1:"650", 2:"632", 3:"633", 4:"645", 5:"646", 6:"671",
7:"684", 8:"611", 9:"634", 10:"675", 11:"652", 12:"680", 13:"692",
}
BUSES_ORDERED = [BUS_INDEX_TO_NAME[i] for i in range(1, 14)]
#read files
_config_raw = json.loads(CONFIG_PATH.read_text())
_MODELS = tuple(InferenceModelSpec(**m) for m in _config_raw["models"])
_SOURCES = {s["model_label"]: MLEnergySource(**s) for s in _config_raw["data_sources"]}
_DC_CONFIG = DatacenterConfig(gpus_per_server=8, base_kw_per_phase=500.0)
if _config_raw.get("data_dir"):
_DATA_DIR = Path(_config_raw["data_dir"])
else:
blob = json.dumps(sorted(_config_raw["data_sources"],
key=lambda s: s["model_label"]),
sort_keys=True).encode()
_DATA_DIR = Path(__file__).parent / "data/offline" / hashlib.sha256(blob).hexdigest()[:16]
# Load traces_summary.csv once at startup so we can quickly look up trace files
_TRACES_SUMMARY_PATH = _DATA_DIR / "traces_summary.csv"
#Cached dataframe of available GPU power traces
_traces_df: pd.DataFrame | None = None
"""
Load trace index CSV and cache it.
"""
def _load_traces_index() -> pd.DataFrame:
global _traces_df
if _traces_df is None:
if _TRACES_SUMMARY_PATH.exists():
_traces_df = pd.read_csv(_TRACES_SUMMARY_PATH)
else:
_traces_df = pd.DataFrame(columns=["model_label","num_gpus","max_num_seqs","trace_file"])
return _traces_df
"""
Lookup GPU power trace and scale by replica count.
Returns a list of per-timestep total power values in watts.
"""
def _get_trace_power(model_label: str, num_gpus: int, max_num_seqs: int,
num_replicas: int = 1) -> list[float]:
df = _load_traces_index()
row = df[
(df["model_label"] == model_label) &
(df["num_gpus"] == num_gpus) &
(df["max_num_seqs"]== max_num_seqs)
]
if row.empty:
raise ValueError(
f"No trace found for model={model_label} num_gpus={num_gpus} "
f"max_num_seqs={max_num_seqs}. "
f"Available: {df[['model_label','num_gpus','max_num_seqs']].to_dict('records')}"
)
trace_file = _DATA_DIR / row.iloc[0]["trace_file"]
trace_df = pd.read_csv(trace_file)
power_W = trace_df["power_total_W"].tolist()
return [p * num_replicas for p in power_W]
print(f" [startup] data dir: {_DATA_DIR} exists={_DATA_DIR.exists()}")
_load_traces_index() # load at startup
"""Datacenter workload (baseline)"""
def _build_dc(scale: float = 1.0, duration_s: int = 300) -> OfflineDatacenter:
scaled_models = tuple(
InferenceModelSpec(
model_label = m.model_label,
num_replicas = max(1, int(m.num_replicas * scale)),
gpus_per_replica = m.gpus_per_replica,
initial_batch_size = m.initial_batch_size,
itl_deadline_s = m.itl_deadline_s,
) for m in _MODELS
)
inference_data = InferenceData.ensure(_DATA_DIR, scaled_models, _SOURCES, dt_s=0.1)
training_trace = TrainingTrace.ensure(
_DATA_DIR / "training_trace.csv", TrainingTraceParams()
)
t0 = min(40.0, duration_s * 0.13)
t1 = min(140.0, duration_s * 0.47)
t2 = min(150.0, duration_s * 0.50)
t3 = min(220.0, duration_s * 0.73)
workload = OfflineWorkload(
inference_data = inference_data,
training = TrainingRun(
n_gpus = max(1, int(24 * scale)),
trace = training_trace,
target_peak_W_per_gpu= 400.0,
).at(t_start=t0, t_end=t1),
inference_ramps = InferenceRamp(
target=min(1.0, 0.25 * scale)
).at(t_start=t2, t_end=t3),
)
return OfflineDatacenter(
_DC_CONFIG, workload, dt_s=Fraction(1, 10), seed=0,
power_augmentation=PowerAugmentationConfig(
amplitude_scale_range=(0.88, 1.12),
noise_fraction=0.04,
),
)
"""
Build datacenter workload from GPU trace.
Returns (datacenter, raw_power_W_list)
"""
def _build_dc_from_real_trace(
model_label: str,
num_gpus: int,
max_num_seqs: int,
num_replicas: int,
duration_s: int,
) -> tuple[OfflineDatacenter, list[float]]:
power_W = _get_trace_power(model_label, num_gpus, max_num_seqs, num_replicas)
# Trim or repeat trace to match requested duration at dt=0.1s
target_steps = int(duration_s / 0.1)
if len(power_W) < target_steps:
# Repeat trace to fill duration
repeats = math.ceil(target_steps / len(power_W))
power_W = (power_W * repeats)[:target_steps]
else:
power_W = power_W[:target_steps]
# Build InferenceData with a single model replica matching the trace GPUs
model_spec = InferenceModelSpec(
model_label = model_label,
num_replicas = num_replicas,
gpus_per_replica = num_gpus,
initial_batch_size = max_num_seqs,
itl_deadline_s = 0.08,
)
source = _SOURCES.get(model_label)
if source is None:
# Fall back to first available source if model not in config
source = next(iter(_SOURCES.values()))
inference_data = InferenceData.ensure(
_DATA_DIR, (model_spec,), {model_label: source}, dt_s=0.1
)
workload = OfflineWorkload(inference_data=inference_data)
dc = OfflineDatacenter(
_DC_CONFIG, workload, dt_s=Fraction(1, 10), seed=0,
power_augmentation=PowerAugmentationConfig(
amplitude_scale_range=(1.0, 1.0), # no augmentation — use real trace as-is
noise_fraction=0.0,
),
)
return dc, power_W
"""Create IEEE 13-bus grid with datacenter connection."""
def _build_grid(tap_pu: float, dc_bus: str) -> OpenDSSGrid:
return OpenDSSGrid(
dss_case_dir=str(DSS_DIR), dss_master_file=DSS_MASTER,
dc_bus=dc_bus, dc_bus_kv=4.16,
power_factor=_DC_CONFIG.power_factor,
dt_s=Fraction(1), connection_type="wye",
)
def _make_tap(v: float):
return TapPosition(a=v, b=v, c=v).at(t=0)
"""Run datacenter + grid simulation."""
def _run(dc, grid, tap_pu, dc_bus, duration_s):
coord = Coordinator(
datacenter=dc, grid=grid,
controllers=[TapScheduleController(
schedule=_make_tap(tap_pu), dt_s=Fraction(1)
)],
total_duration_s=duration_s,
dc_bus=dc_bus,
)
return coord.run()
"""
Runs one full simulation job (datacenter + grid) in a worker process
and returns results for the API.
"""
def _run_full(req_dict: dict) -> dict:
dc_bus = BUS_INDEX_TO_NAME.get(req_dict["targetBus"], "671")
replicas = max(1, req_dict["numReplicas"])
dc, raw_power_W = _build_dc_from_real_trace(
model_label = req_dict["modelLabel"],
num_gpus = req_dict["numGpus"],
max_num_seqs = req_dict["maxNumSeqs"],
num_replicas = replicas,
duration_s = req_dict["durationS"],
)
grid = _build_grid(req_dict["substationVoltage"], dc_bus)
log = _run(dc, grid, req_dict["substationVoltage"], dc_bus, req_dict["durationS"])
step = max(1, req_dict["sampleInterval"])
gs_sampled = log.grid_states[::step]
t_sampled = list(log.time_s[::step])
dc_states = log.dc_states
results = []
for i, (t, gs) in enumerate(zip(t_sampled, gs_sampled)):
vs = _voltages(gs)
dc_i = min(range(len(dc_states)), key=lambda j: abs(dc_states[j].time_s - t))
ds = dc_states[dc_i]
kw = float((ds.power_w.a + ds.power_w.b + ds.power_w.c) / 1000)
if math.isnan(kw): kw = 0.0
trace_idx = min(int(t / 0.1), len(raw_power_W) - 1) if raw_power_W else 0
raw_kw = raw_power_W[trace_idx] / 1000.0 if raw_power_W else kw
results.append({
"time": float(t),
"gpu_power_W": kw * 1000,
"gpu_power_kW": kw,
"gpu_power_raw_kW": raw_kw,
"gpu_reactive_kVAR": kw * 0.329,
"active_gpus": replicas * req_dict["numGpus"],
"voltages": vs,
"min_voltage": min(vs),
"max_voltage": max(vs),
"target_bus_voltage": vs[req_dict["targetBus"] - 1],
"total_load_kW": kw,
})
return {
"numSamples": len(results),
"targetBus": req_dict["targetBus"],
"modelLabel": req_dict["modelLabel"],
"numGpus": req_dict["numGpus"],
"maxNumSeqs": req_dict["maxNumSeqs"],
"numReplicas": replicas,
"duration": float(max(r["time"] for r in results) if results else 0),
"minVoltage": float(min(r["min_voltage"] for r in results) if results else 1.0),
"maxVoltage": float(max(r["max_voltage"] for r in results) if results else 1.0),
"avgGpuPower": float(sum(r["gpu_power_W"] for r in results) / len(results) if results else 0),
"peakGpuPower": float(max(r["gpu_power_W"] for r in results) if results else 0),
"timeSeries": results,
}
"""Get per-bus voltage (worst phase per bus)."""
def _voltages(gs, debug=False) -> list[float]:
result = []
for name in BUSES_ORDERED:
try:
tp = gs.voltages[name]
vals = [float(v) for v in [tp.a, tp.b, tp.c]
if not math.isnan(float(v)) and 0.5 < float(v) < 1.5]
result.append(min(vals) if vals else None)
except Exception:
result.append(None)
known = [v for v in result if v is not None]
avg = sum(known) / len(known) if known else 1.0
result = [v if v is not None else avg for v in result]
if debug:
print(f" [V] {[round(v,4) for v in result]}")
return result
# ── FastAPI────────────────────────────────────────────────────────────────
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://gpu2grid.io", "http://localhost:5173", "http://localhost:5174"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_origin_regex=".*",
)
class PowerflowRequest(BaseModel):
substationVoltage: float = 1.05
numBuses: int = 13
baseVoltage: float = 4.16
targetBus: int = 0
class LLMImpactRequest(BaseModel):
targetBus: int = 9
sampleInterval: int = 1
substationVoltage: float = 1.05
modelLabel: str = "Llama-3.1-8B"
numGpus: int = 1
maxNumSeqs: int = 128
numReplicas: int = 1
durationS: int = 300
class HeatmapRequest(BaseModel):
voltages: list[float]
dataCenterBus: Optional[int] = None
@app.get("/api/health")
def health():
return {"status": "ok", "data_ready": _DATA_DIR.exists(),
"message": "gpu2grid OpenDSS server"}
@app.get("/api/status")
def status():
active = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE status='pending'"
).fetchone()[0]
total = conn.execute(
"SELECT COUNT(*) FROM jobs"
).fetchone()[0]
return {
"active_jobs": active,
"total_jobs": total,
"workers": _pool._max_workers,
}
@app.get("/api/job/{job_id}")
def get_job(job_id: str):
row = conn.execute(
"SELECT status, result, error FROM jobs WHERE id=?",
(job_id,)
).fetchone()
if not row:
raise HTTPException(404, "Job not found")
status, result, error = row
if status == "done":
return {"status": status, "result": json.loads(result)}
elif status == "error":
return {"status": status, "detail": error}
else:
return {"status": status}
"""Return available traces"""
@app.get("/api/traces")
def list_traces():
df = _load_traces_index()
if df.empty:
return {"traces": [], "models": [], "trainingAvailable": False}
traces = df[["model_label","num_gpus","max_num_seqs"]].to_dict("records")
models = []
for model_label, group in df.groupby("model_label"):
models.append({
"modelLabel": model_label,
"numGpus": int(group["num_gpus"].iloc[0]),
"batchSizes": sorted(group["max_num_seqs"].tolist()),
})
training_available = (_DATA_DIR / "training_trace.csv").exists()
return {
"traces": traces,
"models": models,
"trainingAvailable": training_available,
"dataDir": str(_DATA_DIR),
}
"""Baseline grid simulation, no workload"""
@app.post("/api/powerflow")
async def powerflow(req: PowerflowRequest):
print(f"\nPowerflow v={req.substationVoltage}")
try:
dc = _build_dc(scale=0.001, duration_s=5)
grid = _build_grid(req.substationVoltage, "671")
log = _run(dc, grid, req.substationVoltage, "671", 5)
vs = _voltages(log.grid_states[-1], debug=True)
print(f" min={min(vs):.4f} max={max(vs):.4f}")
return {"buses": [{"id": i+1, "voltage": v, "activePower": 0.0,
"reactivePower": 0.0} for i, v in enumerate(vs)],
"lines": []}
except Exception as e:
import traceback; traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
"""Simulate AI workload impact on grid using GPU traces."""
@app.post("/api/llm-impact")
async def llm_impact(req: LLMImpactRequest):
job_id = uuid.uuid4().hex
conn.execute(
"INSERT INTO jobs (id, status) VALUES (?, ?)",
(job_id, "pending")
)
conn.commit()
async def run_and_store():
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(_pool, _run_full, req.dict())
conn.execute(
"UPDATE jobs SET status=?, result=? WHERE id=?",
("done", json.dumps(result), job_id)
)
conn.commit()
except Exception as e:
conn.execute(
"UPDATE jobs SET status=?, error=? WHERE id=?",
("error", str(e), job_id)
)
conn.commit()
asyncio.create_task(run_and_store())
return {"job_id": job_id}
@app.post("/api/heatmap")
async def heatmap(req: HeatmapRequest):
if len(req.voltages) != 13:
raise HTTPException(400, f"Need 13 voltages, got {len(req.voltages)}")
script = str(Path(__file__).parent / "generate_heatmap.py")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
out = f.name
subprocess.run(
["python3", script, out] + [str(v) for v in req.voltages] +
([str(req.dataCenterBus)] if req.dataCenterBus else []),
check=True,
)
png = open(out, "rb").read()
os.unlink(out)
return Response(content=png, media_type="image/png")
if __name__ == "__main__":
print("\n" + "="*70)
print("="*70)
print(f" Data: {_DATA_DIR} ready={_DATA_DIR.exists()}")
df = _load_traces_index()
if not df.empty:
models = df["model_label"].unique().tolist()
print(f" Models: {models}")
print(f" Traces: {len(df)} configurations")
print("="*70 + "\n")
uvicorn.run("server:app", host="0.0.0.0", port=8080, workers=1, log_level="info")