| from __future__ import annotations |
| import time |
| import os |
| import json |
| import math |
| import hashlib |
| import traceback |
| from dataclasses import dataclass |
| from typing import Dict, Any, List, Tuple, Optional |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| import numpy as np |
| import pandas as pd |
| from contextlib import contextmanager |
| import sys |
|
|
| @contextmanager |
| def suppress_output(enabled: bool = True): |
| if not enabled: |
| yield |
| return |
| with open(os.devnull, "w") as devnull: |
| old_out, old_err = sys.stdout, sys.stderr |
| sys.stdout, sys.stderr = devnull, devnull |
| try: |
| yield |
| finally: |
| sys.stdout, sys.stderr = old_out, old_err |
|
|
|
|
| import gymnasium as gym |
| import sinergym |
| from unihvac.find_files import ( |
| detect_paths, |
| find_manifest, |
| load_manifest_records, |
| get_paths_from_manifest_record, |
| ) |
|
|
| from unihvac.rollout import run_rollout_to_df |
| from unihvac.rewards import RewardConfig, compute_rewards_vectorized, compute_terminals, config_to_meta |
|
|
|
|
|
|
| |
| |
| |
| BUILDING = "OfficeSmall" |
| PREFER_PATCHED = True |
| OUTPUTS_DIRNAME = "traj_results" |
| SAVE_DIRNAME = "TrajectoryData_officesmall" |
| EPISODES_PER_RECORD = 1 |
| QUIET_WORKERS = False |
| BEHAVIORS = [ |
| "rbc_21_24", |
| "random_walk", |
| "piecewise", |
| "sinusoid", |
| "aggressive", |
| ] |
| TIME_STEP_HOURS = 900.0 / 3600.0 |
| HTG_MIN, HTG_MAX = 18.0, 24.0 |
| CLG_MIN, CLG_MAX = 22.0, 30.0 |
| DEADBAND_MIN = 1.0 |
| MAX_STEPS = None |
| VERBOSE_ROLLOUT = True |
| NUM_WORKERS = 16 |
| BASE_SEED = 123 |
| RESUME = True |
| REWARD_CFG = RewardConfig(version="v1_energy_only", w_energy=1.0, w_comfort=0.0) |
|
|
|
|
|
|
| |
| |
| |
| hot_actuators = { |
| "Htg_Core": ("Zone Temperature Control", "Heating Setpoint", "CORE_ZN"), |
| "Clg_Core": ("Zone Temperature Control", "Cooling Setpoint", "CORE_ZN"), |
| "Htg_P1": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_1"), |
| "Clg_P1": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_1"), |
| "Htg_P2": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_2"), |
| "Clg_P2": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_2"), |
| "Htg_P3": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_3"), |
| "Clg_P3": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_3"), |
| "Htg_P4": ("Zone Temperature Control", "Heating Setpoint", "PERIMETER_ZN_4"), |
| "Clg_P4": ("Zone Temperature Control", "Cooling Setpoint", "PERIMETER_ZN_4"), |
| } |
|
|
| hot_variables = { |
| "outdoor_temp": ("Site Outdoor Air DryBulb Temperature", "Environment"), |
| "core_temp": ("Zone Air Temperature", "Core_ZN"), |
| "perim1_temp": ("Zone Air Temperature", "Perimeter_ZN_1"), |
| "perim2_temp": ("Zone Air Temperature", "Perimeter_ZN_2"), |
| "perim3_temp": ("Zone Air Temperature", "Perimeter_ZN_3"), |
| "perim4_temp": ("Zone Air Temperature", "Perimeter_ZN_4"), |
| "elec_power": ("Facility Total HVAC Electricity Demand Rate", "Whole Building"), |
|
|
| "core_occ_count": ("Zone People Occupant Count", "CORE_ZN"), |
| "perim1_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_1"), |
| "perim2_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_2"), |
| "perim3_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_3"), |
| "perim4_occ_count": ("Zone People Occupant Count", "PERIMETER_ZN_4"), |
|
|
| "outdoor_dewpoint": ("Site Outdoor Air Dewpoint Temperature", "Environment"), |
| "outdoor_wetbulb": ("Site Outdoor Air Wetbulb Temperature", "Environment"), |
|
|
| "core_rh": ("Zone Air Relative Humidity", "CORE_ZN"), |
| "perim1_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_1"), |
| "perim2_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_2"), |
| "perim3_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_3"), |
| "perim4_rh": ("Zone Air Relative Humidity", "PERIMETER_ZN_4"), |
|
|
| "core_ash55_notcomfortable_summer": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer Clothes Not Comfortable Time", |
| "CORE_ZN", |
| ), |
| "core_ash55_notcomfortable_winter": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Winter Clothes Not Comfortable Time", |
| "CORE_ZN", |
| ), |
| "core_ash55_notcomfortable_any": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", |
| "CORE_ZN", |
| ), |
| "p1_ash55_notcomfortable_any": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", |
| "PERIMETER_ZN_1", |
| ), |
| "p2_ash55_notcomfortable_any": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", |
| "PERIMETER_ZN_2", |
| ), |
| "p3_ash55_notcomfortable_any": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", |
| "PERIMETER_ZN_3", |
| ), |
| "p4_ash55_notcomfortable_any": ( |
| "Zone Thermal Comfort ASHRAE 55 Simple Model Summer or Winter Clothes Not Comfortable Time", |
| "PERIMETER_ZN_4", |
| ), |
| } |
|
|
| def stable_hash_int(s: str, mod: int = 1000) -> int: |
| h = hashlib.md5(s.encode("utf-8")).hexdigest() |
| return int(h[:8], 16) % mod |
|
|
| def record_id(rec: Dict[str, Any]) -> str: |
| loc = rec.get("location", "UNKNOWN") |
| vname = rec.get("variation_name", "UNKNOWN") |
| btype = rec.get("building_type", BUILDING) |
| raw = f"{btype}__{loc}__{vname}" |
| safe = "".join(c if c.isalnum() or c in "._-=" else "_" for c in raw) |
| return safe |
|
|
| def _enforce_bounds(htg: float, clg: float) -> Tuple[float, float]: |
| h = float(np.clip(htg, HTG_MIN, HTG_MAX)) |
| c = float(np.clip(clg, CLG_MIN, CLG_MAX)) |
| if c < h + DEADBAND_MIN: |
| c = min(CLG_MAX, h + DEADBAND_MIN) |
| return h, c |
|
|
| def action_from_setpoints(htg: float, clg: float) -> np.ndarray: |
| h, c = _enforce_bounds(htg, clg) |
| return np.array([h, c] * 5, dtype=np.float32) |
|
|
| @dataclass |
| class PolicyRecorder: |
| behavior: str |
| rng: np.random.Generator |
| timestep_hours: float |
| last_htg: float = 21.0 |
| last_clg: float = 24.0 |
| piece_until: int = 0 |
| piece_htg: float = 21.0 |
| piece_clg: float = 24.0 |
|
|
| def __post_init__(self): |
| self.actions: List[np.ndarray] = [] |
|
|
| def policy(self, obs: Any, info: Dict[str, Any], step: int) -> np.ndarray: |
| b = self.behavior |
| if b == "rbc_21_24": |
| htg, clg = 21.0, 24.0 |
| elif b == "random_walk": |
| if step == 0: |
| self.last_htg, self.last_clg = 21.0, 24.0 |
| dh = self.rng.normal(0.0, 0.15) |
| dc = self.rng.normal(0.0, 0.20) |
| if (step % int(6 / self.timestep_hours)) == 0: |
| dh += self.rng.normal(0.0, 0.6) |
| dc += self.rng.normal(0.0, 0.8) |
| htg = self.last_htg + dh |
| clg = self.last_clg + dc |
| htg, clg = _enforce_bounds(htg, clg) |
| self.last_htg, self.last_clg = htg, clg |
| elif b == "piecewise": |
| if step >= self.piece_until: |
| hours = float(self.rng.choice([2, 3, 4, 6, 8, 12])) |
| dur_steps = max(1, int(round(hours / self.timestep_hours))) |
| self.piece_until = step + dur_steps |
| htg = float(self.rng.uniform(HTG_MIN, HTG_MAX)) |
| clg = float(self.rng.uniform(max(CLG_MIN, htg + DEADBAND_MIN), CLG_MAX)) |
| self.piece_htg, self.piece_clg = _enforce_bounds(htg, clg) |
| htg, clg = self.piece_htg, self.piece_clg |
| elif b == "sinusoid": |
| t_hours = step * self.timestep_hours |
| phase = 2.0 * math.pi * (t_hours / 24.0) |
| htg = 21.0 + 1.0 * math.sin(phase - 0.5) + self.rng.normal(0.0, 0.10) |
| clg = 24.5 + 1.5 * math.sin(phase) + self.rng.normal(0.0, 0.12) |
| htg, clg = _enforce_bounds(htg, clg) |
| elif b == "aggressive": |
| block = int((step * self.timestep_hours) // 6) % 2 |
| if block == 0: |
| htg = float(self.rng.uniform(21.0, 23.5)) |
| clg = float(self.rng.uniform(23.5, 25.5)) |
| else: |
| htg = float(self.rng.uniform(HTG_MIN, 20.5)) |
| clg = float(self.rng.uniform(26.0, CLG_MAX)) |
| htg, clg = _enforce_bounds(htg, clg) |
| else: |
| htg, clg = 21.0, 24.0 |
| a = action_from_setpoints(htg, clg) |
| self.actions.append(a) |
| return a |
|
|
| def select_state_columns(df: pd.DataFrame) -> List[str]: |
| base = list(hot_variables.keys()) |
| time_candidates = [ |
| "month", "day", "hour", |
| "day_of_week", "is_weekend", |
| "minute", "time", "timestep", |
| ] |
| cols = [] |
| for c in base + time_candidates: |
| if c in df.columns: |
| cols.append(c) |
| if not cols: |
| bad = set(["done", "terminated", "truncated"]) |
| cols = [c for c in df.columns if c not in bad and pd.api.types.is_numeric_dtype(df[c])] |
| return cols |
|
|
| def build_npz_payload( |
| df: pd.DataFrame, |
| actions: np.ndarray, |
| meta: Dict[str, Any], |
| ) -> Dict[str, Any]: |
| state_cols = select_state_columns(df) |
| obs = df[state_cols].to_numpy(dtype=np.float32) |
| rewards = compute_rewards_vectorized(df, timestep_hours=TIME_STEP_HOURS, cfg=REWARD_CFG) |
| terminals = compute_terminals(df) |
| meta = dict(meta) |
| meta["reward_cfg"] = config_to_meta(REWARD_CFG) |
| action_keys = [ |
| "htg_core", "clg_core", |
| "htg_p1", "clg_p1", |
| "htg_p2", "clg_p2", |
| "htg_p3", "clg_p3", |
| "htg_p4", "clg_p4", |
| ] |
| payload = { |
| "observations": obs, |
| "actions": actions.astype(np.float32), |
| "rewards": rewards, |
| "terminals": terminals, |
| "state_keys": np.array(state_cols, dtype=object), |
| "action_keys": np.array(action_keys, dtype=object), |
| "meta": np.array([json.dumps(meta)], dtype=object), |
| } |
| return payload |
|
|
| def save_npz(path: str, payload: Dict[str, Any]) -> None: |
| os.makedirs(os.path.dirname(path), exist_ok=True) |
| np.savez_compressed(path, **payload) |
|
|
| def run_one_episode( |
| rec: Dict[str, Any], |
| behavior: str, |
| episode_idx: int, |
| outputs_root: str, |
| save_root: str, |
| seed: int, |
| ) -> Optional[str]: |
| rid = record_id(rec) |
| bpath, wpath = get_paths_from_manifest_record(rec) |
| out_dir = os.path.join(outputs_root, OUTPUTS_DIRNAME, rid, behavior, f"ep{episode_idx:03d}") |
| os.makedirs(out_dir, exist_ok=True) |
| traj_dir = os.path.join(save_root, rid, behavior) |
| traj_path = os.path.join(traj_dir, f"traj_ep{episode_idx:03d}_seed{seed}.npz") |
| if RESUME and os.path.exists(traj_path): |
| return traj_path |
| rng = np.random.default_rng(seed) |
| recorder = PolicyRecorder(behavior=behavior, rng=rng, timestep_hours=TIME_STEP_HOURS) |
| with suppress_output(QUIET_WORKERS): |
| df = run_rollout_to_df( |
| building_path=str(bpath), |
| weather_path=str(wpath), |
| variables=hot_variables, |
| actuators=hot_actuators, |
| policy_fn=recorder.policy, |
| location=str(rec.get("location", rec.get("climate", "UNKNOWN"))), |
| timestep_hours=TIME_STEP_HOURS, |
| heating_sp=21.0, |
| cooling_sp=24.0, |
| reward=None, |
| max_steps=MAX_STEPS, |
| verbose=VERBOSE_ROLLOUT, |
| ) |
| actions = np.stack(recorder.actions, axis=0) if recorder.actions else np.zeros((len(df), 10), dtype=np.float32) |
| T = len(df) |
| if actions.shape[0] > T: |
| actions = actions[:T] |
| elif actions.shape[0] < T: |
| pad = np.repeat(actions[-1][None, :], T - actions.shape[0], axis=0) if actions.shape[0] > 0 else np.zeros((T, 10), dtype=np.float32) |
| actions = np.concatenate([actions, pad], axis=0) |
| if len(df) == actions.shape[0] and len(df) > 0: |
| df["setpoint_htg"] = actions[:, 0] |
| df["setpoint_clg"] = actions[:, 1] |
| meta = { |
| "record_id": rid, |
| "behavior": behavior, |
| "episode_idx": episode_idx, |
| "seed": seed, |
| "building_path": str(bpath), |
| "weather_path": str(wpath), |
| "location": rec.get("location", rec.get("climate")), |
| "thermal": rec.get("thermal", rec.get("thermal_variation")), |
| "occupancy": rec.get("occupancy", rec.get("occupancy_variation")), |
| "timestep_hours": TIME_STEP_HOURS, |
| "state_cols": select_state_columns(df), |
| } |
| payload = build_npz_payload(df=df, actions=actions, meta=meta) |
| save_npz(traj_path, payload) |
| df.to_csv(os.path.join(traj_dir, f"timeseries_ep{episode_idx:03d}_seed{seed}.csv"), index=False) |
| return traj_path |
|
|
| def main(): |
| paths = detect_paths(outputs_dirname=OUTPUTS_DIRNAME) |
| manifest_path = find_manifest(paths, building=BUILDING, prefer_patched=PREFER_PATCHED) |
| records = load_manifest_records(manifest_path) |
| outputs_root = str(paths.outputs_root) |
| save_root = os.path.join(outputs_root, SAVE_DIRNAME) |
| os.makedirs(save_root, exist_ok=True) |
| tasks = [] |
| task_id = 0 |
| for rec_idx, rec in enumerate(records): |
| for behavior in BEHAVIORS: |
| for ep in range(EPISODES_PER_RECORD): |
| seed = BASE_SEED + (rec_idx * 100000) + (stable_hash_int(behavior, 100000)) + ep |
| tasks.append((task_id, rec, behavior, ep, seed)) |
| task_id += 1 |
| t0 = time.time() |
| successes = 0 |
| failures = 0 |
| saved_paths: List[str] = [] |
| if NUM_WORKERS <= 1: |
| for tid, rec, behavior, ep, seed in tasks: |
| try: |
| p = run_one_episode( |
| rec=rec, |
| behavior=behavior, |
| episode_idx=ep, |
| outputs_root=outputs_root, |
| save_root=save_root, |
| seed=seed, |
| ) |
| if p: |
| saved_paths.append(p) |
| successes += 1 |
| if successes % 10 == 0: |
| elapsed = time.time() - t0 |
| done = successes + failures |
| rate = done / elapsed if elapsed > 0 else 0.0 |
| except Exception as e: |
| failures += 1 |
| rid = record_id(rec) |
| print(f"[ERROR] tid={tid} record={rid} behavior={behavior} ep={ep}: {e}") |
| print(traceback.format_exc()) |
| else: |
| with ProcessPoolExecutor(max_workers=NUM_WORKERS) as ex: |
| futs = [] |
| for tid, rec, behavior, ep, seed in tasks: |
| futs.append(ex.submit( |
| run_one_episode, |
| rec, behavior, ep, outputs_root, save_root, seed |
| )) |
| for i, fut in enumerate(as_completed(futs), 1): |
| try: |
| p = fut.result() |
| if p: |
| saved_paths.append(p) |
| successes += 1 |
| except Exception as e: |
| failures += 1 |
| print(f"[ERROR] future failed: {e}") |
| if i % 25 == 0 or i == len(futs): |
| elapsed = time.time() - t0 |
| rate = i / elapsed if elapsed > 0 else 0.0 |
| print(f"[progress] done={i}/{len(futs)} success={successes} fail={failures} rate={rate:.2f} eps/s elapsed={elapsed:.1f}s") |
| print("\nDONE") |
| if saved_paths: |
| print("Example saved file:", saved_paths[0]) |
| print("Save root:", save_root) |
|
|
| if __name__ == "__main__": |
| main() |
|
|