| import gym |
| import numpy as np |
| import math |
| import sys |
| import os |
| import functools |
|
|
| import pandas as pd |
|
|
| |
| |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
| from Environment.solar_sys_environment import SolarSys |
|
|
|
|
| def form_clusters(metrics: dict, size: int) -> list: |
| """ |
| Forms balanced, heterogeneous clusters by categorizing houses based on their |
| energy profile and distributing them evenly in a round-robin fashion. |
| """ |
| house_ids = list(metrics.keys()) |
| if not house_ids: |
| return [] |
| all_consumption = [m['consumption'] for m in metrics.values()] |
| all_solar = [m['solar'] for m in metrics.values()] |
| |
| median_consumption = np.median(all_consumption) if all_consumption else 0 |
| median_solar = np.median(all_solar) if all_solar else 0 |
|
|
| |
| producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption] |
| consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption] |
| prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption] |
| neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption] |
|
|
| |
| sorted_categorized_houses = producers + consumers + prosumers + neutrals |
| |
| |
| categorized_set = set(sorted_categorized_houses) |
| uncategorized = [h for h in house_ids if h not in categorized_set] |
| final_house_list = sorted_categorized_houses + uncategorized |
| num_houses = len(house_ids) |
| num_clusters = math.ceil(num_houses / size) |
| |
| clusters = [[] for _ in range(num_clusters)] |
| |
| for i, house_id in enumerate(final_house_list): |
| target_cluster_idx = i % num_clusters |
| clusters[target_cluster_idx].append(house_id) |
|
|
| return clusters |
|
|
| class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper): |
| def __init__(self, env, clusters: list): |
| super().__init__(env) |
| self.clusters = clusters |
| |
| |
| self.cluster_envs = self.env.envs |
|
|
| def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None): |
| num_clusters = len(self.cluster_envs) |
| net_transfers = np.zeros(num_clusters) |
| if exports is not None and imports is not None: |
| net_transfers = imports - exports |
| batched_low_level_actions = actions |
| batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32) |
| batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32) |
| final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices) |
| obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple) |
| dones = terminateds | truncateds |
| done_all = dones.all() |
|
|
|
|
|
|
| if done_all: |
| final_infos = infos['final_info'] |
| keys = final_infos[0].keys() |
| infos = {k: np.stack([info[k] for info in final_infos]) for k in keys} |
|
|
| info_agg = { |
| "cluster_dones": dones, |
| "cluster_infos": infos, |
| } |
| |
| return obs_next, rewards, done_all, info_agg |
|
|
| def get_export_capacity(self, cluster_idx: int) -> float: |
| """Returns the total physically exportable energy from a cluster's batteries and solar in kWh.""" |
| cluster_env = self.cluster_envs[cluster_idx] |
| available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency |
| total_exportable = np.sum(available_from_batt) + cluster_env.current_solar |
| return float(total_exportable) |
|
|
| def get_import_capacity(self, cluster_idx: int) -> float: |
| """Returns the total physically importable space in a cluster's batteries in kWh.""" |
| cluster_env = self.cluster_envs[cluster_idx] |
| free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc |
| total_storable = np.sum(free_space) |
| return float(total_storable) |
|
|
| def send_energy(self, from_cluster_idx: int, amount: float) -> float: |
| """Drains 'amount' of energy from the specified cluster (batteries first, then solar).""" |
| cluster_env = self.cluster_envs[from_cluster_idx] |
| return cluster_env.send_energy(amount) |
|
|
| def receive_energy(self, to_cluster_idx: int, amount: float) -> float: |
| """Charges batteries in the specified cluster with 'amount' of energy.""" |
| cluster_env = self.cluster_envs[to_cluster_idx] |
| return cluster_env.receive_energy(amount) |
|
|
|
|
| def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str): |
| print("--- Pre-loading shared dataset for all environments ---") |
| try: |
| shared_df = pd.read_csv(data_path) |
| shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True) |
| shared_df.set_index("local_15min", inplace=True) |
|
|
| |
| shared_df = shared_df.resample(time_freq).mean() |
| |
|
|
| except Exception as e: |
| raise ValueError(f"Failed to pre-load data in make_vec_env: {e}") |
|
|
| base_env_for_metrics = SolarSys( |
| data_path=data_path, |
| time_freq=time_freq, |
| preloaded_data=shared_df, |
| state=state |
| ) |
| |
| |
| metrics = {} |
| for hid in base_env_for_metrics.house_ids: |
| total_consumption = float( |
| np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum() |
| ) |
| total_solar = float( |
| base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum() |
| ) |
| metrics[hid] = {'consumption': total_consumption, 'solar': total_solar} |
| |
| clusters = form_clusters(metrics, cluster_size) |
| print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.") |
|
|
| |
| env_fns = [] |
| for cluster_house_ids in clusters: |
| preset_env_fn = functools.partial( |
| SolarSys, |
| data_path=data_path, |
| time_freq=time_freq, |
| house_ids_in_cluster=cluster_house_ids, |
| preloaded_data=shared_df, |
| state=state |
| ) |
| env_fns.append(preset_env_fn) |
| sync_vec_env = gym.vector.SyncVectorEnv(env_fns) |
| wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters) |
|
|
| return wrapped_vec_env |