| import os |
| import sys |
| import numpy as np |
| import torch |
|
|
| |
| |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
| from Environment.solar_sys_environment import SolarSys |
| from Environment.cluster_env_wrapper import GlobalPriceVecEnvWrapper |
| from Environment.cluster_env_wrapper import make_vec_env |
| class InterClusterLedger: |
| """ |
| Tracks inter-cluster debts/transfers. |
| """ |
| def __init__(self): |
| self.balances = {} |
|
|
| def record_transfer(self, from_id: str, to_id: str, amount: float): |
| if from_id == to_id: return |
| self.balances.setdefault(from_id, {}) |
| self.balances.setdefault(to_id, {}) |
| self.balances[from_id][to_id] = self.balances[from_id].get(to_id, 0.0) - amount |
| self.balances[to_id][from_id] = self.balances[to_id].get(from_id, 0.0) + amount |
|
|
| def get_balance(self, a_id: str, b_id: str) -> float: |
| return self.balances.get(a_id, {}).get(b_id, 0.0) |
|
|
| def net_balances(self) -> dict: |
| return self.balances |
|
|
|
|
| class InterClusterCoordinator: |
| def __init__( |
| self, |
| cluster_env, |
| high_level_agent, |
| ledger, |
| max_transfer_kwh: float = 1000000.0, |
| w_cost_savings: float = 2.0, |
| w_grid_penalty: float = 0.3, |
| w_p2p_bonus: float = 0.3 |
| ): |
| self.cluster_env = cluster_env |
| self.agent = high_level_agent |
| self.ledger = ledger |
| self.max_transfer_kwh = max_transfer_kwh |
| self.w_cost_savings = w_cost_savings |
| self.w_grid_penalty = w_grid_penalty |
| self.w_p2p_bonus = w_p2p_bonus |
|
|
| def get_cluster_state(self, env, step_count: int) -> np.ndarray: |
| """ |
| array summarizing a single cluster's state by reading from its vectorized attributes. |
| """ |
| solar_env = env |
| idx = min(step_count, solar_env.num_steps - 1) |
| agg_soc = np.sum(solar_env.battery_soc) |
| agg_max_capacity = np.sum(solar_env.battery_max_capacity) |
| agg_soc_fraction = agg_soc / agg_max_capacity if agg_max_capacity > 0 else 0.0 |
|
|
| agg_demand = np.sum(solar_env.demands_day[idx]) |
| agg_solar = np.sum(solar_env.solars_day[idx]) |
|
|
| price = solar_env.get_grid_price(idx) |
| t_norm = idx / float(solar_env.steps_per_day) |
|
|
| return np.array([ |
| agg_soc, agg_max_capacity, agg_soc_fraction, |
| agg_demand, agg_solar, price, t_norm |
| ], dtype=np.float32) |
|
|
| def build_transfers(self, agent_action_vector: np.ndarray, reports: dict) -> tuple[np.ndarray, np.ndarray]: |
| """ |
| Acts as a centralized market maker based on agent actions and LIVE capacity reports. |
| """ |
| n = len(self.cluster_env.clusters) |
| raw_export_prefs = agent_action_vector[:, 0] |
| raw_import_prefs = agent_action_vector[:, 1] |
|
|
| export_prefs = torch.softmax(torch.tensor(raw_export_prefs), dim=-1).numpy() |
| import_prefs = torch.softmax(torch.tensor(raw_import_prefs), dim=-1).numpy() |
|
|
| total_available_for_export = 0.0 |
| potential_exports = np.zeros(n) |
| for i in range(n): |
| export_capacity = reports[i]['export_capacity'] |
| pref = float(export_prefs[i]) |
| potential_exports[i] = min(pref * self.max_transfer_kwh, export_capacity) |
| total_available_for_export += potential_exports[i] |
|
|
| total_requested_for_import = 0.0 |
| potential_imports = np.zeros(n) |
| for i in range(n): |
| import_capacity = reports[i]['import_capacity'] |
| pref = float(import_prefs[i]) |
| potential_imports[i] = min(pref * self.max_transfer_kwh, import_capacity) |
| total_requested_for_import += potential_imports[i] |
|
|
| total_matched_energy = min(total_available_for_export, total_requested_for_import) |
| actual_exports = np.zeros(n) |
| actual_imports = np.zeros(n) |
|
|
| if total_matched_energy > 1e-6: |
| if total_available_for_export > 0: |
| actual_exports = (potential_exports / total_available_for_export) * total_matched_energy |
| if total_requested_for_import > 0: |
| actual_imports = (potential_imports / total_requested_for_import) * total_matched_energy |
|
|
| return actual_exports, actual_imports |
| |
| def compute_inter_cluster_reward(self, all_cluster_infos: dict, actual_transfers: tuple, step_count: int) -> np.ndarray: |
| """ |
| Computes an INDIVIDUAL reward for each cluster agent to solve |
| the credit assignment problem. |
| """ |
| actual_exports, actual_imports = actual_transfers |
| num_clusters = len(self.cluster_env.cluster_envs) |
| cluster_rewards = np.zeros(num_clusters, dtype=np.float32) |
|
|
| |
| costs_per_cluster = [np.sum(c) for c in all_cluster_infos['costs']] |
| baseline_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_no_p2p']] |
| actual_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_with_p2p']] |
| |
| |
| grid_price = self.cluster_env.cluster_envs[0].get_grid_price(step_count) |
|
|
| for i in range(num_clusters): |
| baseline_cost_this_cluster = baseline_imports_per_cluster[i] * grid_price |
| actual_cost_this_cluster = costs_per_cluster[i] |
| cost_saved = baseline_cost_this_cluster - actual_cost_this_cluster |
| r_savings = self.w_cost_savings * cost_saved |
| r_grid = self.w_grid_penalty * actual_imports_per_cluster[i] |
| p2p_volume_this_cluster = actual_exports[i] + actual_imports[i] |
| r_p2p = self.w_p2p_bonus * p2p_volume_this_cluster |
| cluster_rewards[i] = r_savings + r_p2p - r_grid |
| |
| return cluster_rewards |