| import os |
| import sys |
| import time |
| from datetime import datetime, timedelta |
| import re |
| import numpy as np |
| import torch |
| import pandas as pd |
| import matplotlib.pyplot as plt |
|
|
| |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
| |
| from cluster import InterClusterCoordinator, InterClusterLedger |
| from Environment.cluster_env_wrapper import make_vec_env |
| from mappo.trainer.mappo import MAPPO |
| from meanfield.trainer.meanfield import MFAC |
|
|
| def recursive_sum(item): |
| total = 0 |
| if hasattr(item, '__iter__') and not isinstance(item, str): |
| for sub_item in item: |
| total += recursive_sum(sub_item) |
| elif np.isreal(item): |
| total += item |
| return total |
|
|
|
|
| def main(): |
| overall_start_time = time.time() |
| |
| STATE_TO_RUN = "pennsylvania" |
| DATA_PATH = "" |
| |
| match = re.search(r'(\d+)houses', DATA_PATH) |
| if not match: |
| raise ValueError("Could not extract the number of houses from DATA_PATH.") |
| NUMBER_OF_AGENTS = int(match.group(1)) |
| NUM_EPISODES = 10000 |
| CLUSTER_SIZE = 10 |
| BATCH_SIZE = 256 |
| CHECKPOINT_INTERVAL= 1000 |
| WINDOW_SIZE = 80 |
| MAX_TRANSFER_KWH = 100000 |
| LR = 2e-4 |
| GAMMA = 0.95 |
| LAMBDA = 0.95 |
| CLIP_EPS = 0.2 |
| K_EPOCHS = 4 |
| JOINT_TRAINING_START_EPISODE = 2000 |
| FREEZE_HIGH_FOR_EPISODES = 20 |
| FREEZE_LOW_FOR_EPISODES = 10 |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| run_name = f"hierarchical_{STATE_TO_RUN}_{NUMBER_OF_AGENTS}agents_" \ |
| f"{CLUSTER_SIZE}size_{NUM_EPISODES}eps_{timestamp}" |
| root_dir = os.path.join("Training", run_name) |
| models_dir= os.path.join(root_dir, "models") |
| logs_dir = os.path.join(root_dir, "logs") |
| plots_dir = os.path.join(root_dir, "plots") |
|
|
| for d in (models_dir, logs_dir, plots_dir): |
| os.makedirs(d, exist_ok=True) |
| print(f"Logging to: {root_dir}") |
|
|
| |
| cluster_env = make_vec_env( |
| data_path=DATA_PATH, |
| time_freq="15T", |
| cluster_size=CLUSTER_SIZE, |
| state=STATE_TO_RUN |
| ) |
|
|
| |
| n_clusters = cluster_env.num_envs |
| sample_subenv = cluster_env.cluster_envs[0] |
| n_agents_per_cluster = sample_subenv.num_agents |
|
|
| local_dim = sample_subenv.observation_space.shape[-1] |
| global_dim = n_agents_per_cluster * local_dim |
| act_dim = sample_subenv.action_space[0].shape[-1] |
| total_buffer_size = sample_subenv.num_steps * n_clusters |
| print(f"Low-level agent buffer size set to: {total_buffer_size}") |
| print(f"Created {n_clusters} clusters.") |
| print(f"Shared low-level agent: {n_agents_per_cluster} agents per cluster, " |
| f"obs_dim={local_dim}, global_dim={global_dim}, act_dim={act_dim}") |
| print(f"Creating {n_clusters} independent low-level MAPPO agents...") |
| low_agents = [] |
| for i in range(n_clusters): |
| agent_buffer_size = sample_subenv.num_steps |
|
|
| agent = MAPPO( |
| n_agents = n_agents_per_cluster, |
| local_dim = local_dim, |
| global_dim = global_dim, |
| act_dim = act_dim, |
| lr = LR, |
| gamma = GAMMA, |
| lam = LAMBDA, |
| clip_eps = CLIP_EPS, |
| k_epochs = K_EPOCHS, |
| batch_size = BATCH_SIZE, |
| episode_len = agent_buffer_size |
| ) |
| low_agents.append(agent) |
|
|
| OBS_DIM_HI_LOCAL = 7 |
| act_dim_inter = 2 |
| print(f"Inter-cluster agent (MFAC): n_agents={n_clusters}, " |
| f"local_dim={OBS_DIM_HI_LOCAL}, act_dim={act_dim_inter}") |
| inter_agent = MFAC( |
| n_agents = n_clusters, |
| local_dim = OBS_DIM_HI_LOCAL, |
| act_dim = act_dim_inter, |
| lr = LR, |
| gamma = GAMMA, |
| lam = LAMBDA, |
| clip_eps = CLIP_EPS, |
| k_epochs = K_EPOCHS, |
| batch_size = BATCH_SIZE, |
| episode_len=96 |
| ) |
| ledger = InterClusterLedger() |
| coordinator = InterClusterCoordinator( |
| cluster_env, |
| inter_agent, |
| ledger, |
| max_transfer_kwh=MAX_TRANSFER_KWH |
| ) |
|
|
| |
| total_steps = 0 |
| inter_episode_rewards = [] |
| episode_log_data = [] |
| performance_metrics_log = [] |
| agent_rewards_log = [[] for _ in range(NUMBER_OF_AGENTS)] |
| intra_log = {} |
| inter_log = {} |
| total_log = {} |
| cost_log = {} |
|
|
| for ep in range(1, NUM_EPISODES + 1): |
| inter_episode_rewards_this_ep = [] |
| step_count = 0 |
| start_time = time.time() |
| ep_total_inter_cluster_reward = 0.0 |
| day_logs = [] |
| obs_clusters, _ = cluster_env.reset() |
| |
| if ep > 1: |
| all_cluster_metrics = cluster_env.call('get_episode_metrics') |
|
|
| |
| system_metrics = { |
| "grid_reduction_entire_day": sum(m["grid_reduction_entire_day"] for m in all_cluster_metrics), |
| "grid_reduction_peak_hours": sum(m["grid_reduction_peak_hours"] for m in all_cluster_metrics), |
| "total_cost_savings": sum(m["total_cost_savings"] for m in all_cluster_metrics), |
| "battery_degradation_cost_total": sum(m["battery_degradation_cost_total"] for m in all_cluster_metrics), |
| |
| "fairness_on_cost_savings": np.mean([m["fairness_on_cost_savings"] for m in all_cluster_metrics]), |
| "Episode": ep - 1 |
| } |
|
|
| performance_metrics_log.append(system_metrics) |
|
|
|
|
| |
| |
| done_all = False |
| cluster_rewards = np.zeros((n_clusters, n_agents_per_cluster), dtype=np.float32) |
| total_cost = 0.0 |
| total_grid_import = 0.0 |
| |
| |
| is_phase_1 = ep < JOINT_TRAINING_START_EPISODE |
|
|
| if ep == 1: print(f"\n--- Starting Phase 1: Training Low-Level Agent Only (up to ep {JOINT_TRAINING_START_EPISODE-1}) ---") |
| if ep == JOINT_TRAINING_START_EPISODE: print(f"\n--- Starting Phase 2: Joint Hierarchical Training (from ep {JOINT_TRAINING_START_EPISODE}) ---") |
| |
| |
| while not done_all: |
| total_steps += 1 |
| step_count += 1 |
| |
| batch_global_obs = obs_clusters.reshape(n_clusters, -1) |
|
|
| |
| low_level_actions_list = [] |
| low_level_logps_list = [] |
| for c_idx in range(n_clusters): |
| agent = low_agents[c_idx] |
| local_obs_cluster = obs_clusters[c_idx] |
| global_obs_cluster = batch_global_obs[c_idx] |
|
|
| actions, logps = agent.select_action(local_obs_cluster, global_obs_cluster) |
|
|
| low_level_actions_list.append(actions) |
| low_level_logps_list.append(logps) |
| low_level_actions = np.stack(low_level_actions_list) |
| low_level_logps = np.stack(low_level_logps_list) |
| |
| |
| if is_phase_1: |
| exports, imports = None, None |
| else: |
| |
| inter_cluster_obs_local_list = [coordinator.get_cluster_state(se, step_count) for se in cluster_env.cluster_envs] |
| inter_cluster_obs_local = np.array(inter_cluster_obs_local_list) |
| |
| |
| high_level_action, high_level_logp = inter_agent.select_action(inter_cluster_obs_local) |
|
|
| |
| current_reports = {i: {'export_capacity': cluster_env.get_export_capacity(i), 'import_capacity': cluster_env.get_import_capacity(i)} for i in range(n_clusters)} |
| exports, imports = coordinator.build_transfers(high_level_action, current_reports) |
|
|
| |
| next_obs_clusters, rewards, done_all, step_info = cluster_env.step( |
| low_level_actions, exports=exports, imports=imports |
| ) |
| cluster_infos = step_info.get("cluster_infos") |
|
|
| day_logs.append({ |
| "costs": cluster_infos["costs"], |
| "grid_import_no_p2p": cluster_infos["grid_import_no_p2p"], |
| "charge_amount": cluster_infos.get("charge_amount"), |
| "discharge_amount": cluster_infos.get("discharge_amount") |
| }) |
| per_agent_rewards = np.stack(cluster_infos['agent_rewards']) |
|
|
| rewards_for_buffer = per_agent_rewards |
| if not is_phase_1: |
| transfers_for_logging = (exports, imports) |
| high_level_rewards_per_cluster = coordinator.compute_inter_cluster_reward( |
| all_cluster_infos=cluster_infos, |
| actual_transfers=transfers_for_logging, |
| step_count=step_count |
| ) |
| ep_total_inter_cluster_reward += np.sum(high_level_rewards_per_cluster) |
| next_inter_cluster_obs_local_list = [coordinator.get_cluster_state(se, step_count + 1) for se in cluster_env.cluster_envs] |
| next_inter_cluster_obs_local = np.array(next_inter_cluster_obs_local_list) |
|
|
| inter_agent.store( |
| inter_cluster_obs_local, |
| high_level_action, |
| high_level_logp, |
| high_level_rewards_per_cluster, |
| [done_all]*n_clusters, |
| next_inter_cluster_obs_local |
| ) |
| bonus_per_agent = np.zeros_like(per_agent_rewards) |
| for c_idx in range(n_clusters): |
| num_agents_in_cluster = per_agent_rewards.shape[1] |
| if num_agents_in_cluster > 0: |
| bonus = high_level_rewards_per_cluster[c_idx] / num_agents_in_cluster |
| bonus_per_agent[c_idx, :] = bonus |
|
|
| rewards_for_buffer = per_agent_rewards + bonus_per_agent |
|
|
| |
| dones_list = step_info.get("cluster_dones") |
| for idx in range(n_clusters): |
| low_agents[idx].store( |
| obs_clusters[idx], |
| batch_global_obs[idx], |
| low_level_actions[idx], |
| low_level_logps[idx], |
| rewards_for_buffer[idx], |
| dones_list[idx], |
| next_obs_clusters[idx].reshape(-1) |
| ) |
|
|
| |
| cluster_rewards += per_agent_rewards |
| total_cost += np.sum(cluster_infos['costs']) |
| total_grid_import += np.sum(cluster_infos['grid_import_with_p2p']) |
|
|
| obs_clusters = next_obs_clusters |
| if is_phase_1: |
| for agent in low_agents: |
| agent.update() |
| else: |
| CYCLE_LENGTH = FREEZE_HIGH_FOR_EPISODES + FREEZE_LOW_FOR_EPISODES |
| phase2_episode_num = ep - JOINT_TRAINING_START_EPISODE |
| position_in_cycle = phase2_episode_num % CYCLE_LENGTH |
|
|
| if position_in_cycle < FREEZE_HIGH_FOR_EPISODES: |
| print(f"Updating ALL LOW-LEVEL agents (High-level is frozen).") |
| for agent in low_agents: |
| agent.update() |
| else: |
| print(f"Updating HIGH-LEVEL agent (Low-level is frozen).") |
| inter_agent.update() |
|
|
| |
| duration = time.time() - start_time |
| num_low_level_agents = n_clusters * n_agents_per_cluster |
| get_price_fn = cluster_env.cluster_envs[0].get_grid_price |
|
|
|
|
|
|
| baseline_costs_per_step = [ |
| recursive_sum(entry["grid_import_no_p2p"]) * get_price_fn(i) |
| for i, entry in enumerate(day_logs) |
| ] |
| total_baseline_cost = sum(baseline_costs_per_step) |
| actual_costs_per_step = [recursive_sum(entry["costs"]) for entry in day_logs] |
| total_actual_cost = sum(actual_costs_per_step) |
| cost_reduction_pct = (1 - (total_actual_cost / total_baseline_cost)) * 100 if total_baseline_cost > 0 else 0.0 |
| total_reward_intra = cluster_rewards.sum() |
| mean_reward_intra = total_reward_intra / num_low_level_agents if num_low_level_agents > 0 else 0.0 |
| total_reward_inter = ep_total_inter_cluster_reward |
| mean_reward_inter = total_reward_inter / step_count if step_count > 0 else 0.0 |
| total_reward_system = total_reward_intra + total_reward_inter |
| mean_reward_system = total_reward_system / num_low_level_agents if num_low_level_agents > 0 else 0.0 |
|
|
|
|
| intra_log.setdefault('total', []).append(total_reward_intra) |
| intra_log.setdefault('mean', []).append(mean_reward_intra) |
| inter_log.setdefault('total', []).append(total_reward_inter) |
| inter_log.setdefault('mean', []).append(mean_reward_inter) |
| total_log.setdefault('total', []).append(total_reward_system) |
| total_log.setdefault('mean', []).append(mean_reward_system) |
| cost_log.setdefault('total_cost', []).append(total_actual_cost) |
| cost_log.setdefault('cost_without_p2p', []).append(total_baseline_cost) |
|
|
|
|
| episode_log_data.append({ |
| "Episode": ep, |
| "Mean_Reward_System": mean_reward_system, |
| "Mean_Reward_Intra": mean_reward_intra, |
| "Mean_Reward_Inter": mean_reward_inter, |
| "Total_Reward_System": total_reward_system, |
| "Total_Reward_Intra": total_reward_intra, |
| "Total_Reward_Inter": total_reward_inter, |
| "Cost_Reduction_Pct": cost_reduction_pct, |
| "Episode_Duration": duration, |
| }) |
|
|
|
|
| print(f"Ep {ep}/{NUM_EPISODES} | " |
| f"Mean System R: {mean_reward_system:.3f} | " |
| f"Cost Red: {cost_reduction_pct:.1f}% | " |
| f"Time: {duration:.2f}s") |
|
|
|
|
| if ep % CHECKPOINT_INTERVAL == 0 or ep == NUM_EPISODES: |
| for c_idx, agent in enumerate(low_agents): |
| agent.save(os.path.join(models_dir, f"low_cluster{c_idx}_ep{ep}.pth")) |
| inter_agent.save(os.path.join(models_dir, f"inter_ep{ep}.pth")) |
| print(f"Saved checkpoint at episode {ep}") |
|
|
| print("Training completed! Aggregating final logs...") |
| |
| final_cluster_metrics = cluster_env.call('get_episode_metrics') |
| final_system_metrics = { |
| "grid_reduction_entire_day": sum(m["grid_reduction_entire_day"] for m in final_cluster_metrics), |
| "grid_reduction_peak_hours": sum(m["grid_reduction_peak_hours"] for m in final_cluster_metrics), |
| "total_cost_savings": sum(m["total_cost_savings"] for m in final_cluster_metrics), |
| "battery_degradation_cost_total": sum(m["battery_degradation_cost_total"] for m in final_cluster_metrics), |
| "fairness_on_cost_savings": np.mean([m["fairness_on_cost_savings"] for m in final_cluster_metrics]), |
| "Episode": NUM_EPISODES |
| } |
| performance_metrics_log.append(final_system_metrics) |
|
|
| df_rewards_log = pd.DataFrame(episode_log_data) |
| df_perf_log = pd.DataFrame(performance_metrics_log) |
| df_final_log = pd.merge(df_rewards_log, df_perf_log, on="Episode") |
|
|
| log_csv_path = os.path.join(logs_dir, "training_performance_log.csv") |
| overall_end_time = time.time() |
| total_duration_seconds = overall_end_time - overall_start_time |
| total_time_row = pd.DataFrame([{"Episode": "Total_Training_Time", "Episode_Duration": total_duration_seconds}]) |
| df_to_save = pd.concat([df_final_log, total_time_row], ignore_index=True) |
|
|
| columns_to_save = [ |
| "Episode", |
| "Mean_Reward_System", |
| "Mean_Reward_Intra", |
| "Mean_Reward_Inter", |
| "Total_Reward_System", |
| "Total_Reward_Intra", |
| "Total_Reward_Inter", |
| "Cost_Reduction_Pct", |
| "battery_degradation_cost_total", |
| "Episode_Duration", |
| "total_cost_savings", |
| "grid_reduction_entire_day", |
| "fairness_on_cost_savings" |
| ] |
| df_to_save = df_to_save[[col for col in columns_to_save if col in df_to_save.columns]] |
| df_to_save.to_csv(log_csv_path, index=False) |
| print(f"Saved comprehensive training performance log to: {log_csv_path}") |
|
|
| generate_plots( |
| plots_dir=plots_dir, |
| num_episodes=NUM_EPISODES, |
| intra_log=intra_log, |
| inter_log=inter_log, |
| total_log=total_log, |
| cost_log=cost_log, |
| df_final_log=df_final_log |
| ) |
| overall_end_time = time.time() |
| total_duration_seconds = overall_end_time - overall_start_time |
| total_duration_formatted = str(timedelta(seconds=int(total_duration_seconds))) |
|
|
| |
| print("\n" + "="*50) |
| print(f"Total Training Time: {total_duration_formatted} (HH:MM:SS)") |
| print("="*50) |
|
|
| |
| def generate_plots( |
| plots_dir: str, |
| num_episodes: int, |
| intra_log: dict, |
| inter_log: dict, |
| total_log: dict, |
| cost_log: list, |
| df_final_log: pd.DataFrame |
| ): |
| """ |
| Generates and saves all final plots after training is complete. |
| """ |
| print("Training completed! Generating plotsβ¦") |
| def moving_avg(series, window): |
| return pd.Series(series).rolling(window=window, center=True, min_periods=1).mean().to_numpy() |
|
|
| ma_window = 120 |
| episodes = np.arange(1, num_episodes + 1) |
|
|
| |
| fig, ax = plt.subplots(figsize=(12, 7)) |
| ax.plot(episodes, moving_avg(intra_log['total'], ma_window), label=f'Total Reward (MA {ma_window})', linewidth=2) |
| ax.set_xlabel("Episode") |
| ax.set_ylabel("Total Intra-Cluster Reward", color='tab:blue') |
| ax.tick_params(axis='y', labelcolor='tab:blue') |
| ax.grid(True) |
| |
| ax2 = ax.twinx() |
| ax2.plot(episodes, moving_avg(intra_log['mean'], ma_window), label=f'Mean Reward (MA {ma_window})', linewidth=2, linestyle='--', color='tab:cyan') |
| ax2.set_ylabel("Mean Intra-Cluster Reward", color='tab:cyan') |
| ax2.tick_params(axis='y', labelcolor='tab:cyan') |
| |
| fig.suptitle("Intra-Cluster (Low-Level Agent) Rewards") |
| fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) |
| plt.savefig(os.path.join(plots_dir, "1_intra_cluster_rewards.png"), dpi=200) |
| plt.close() |
|
|
| |
| fig, ax = plt.subplots(figsize=(12, 7)) |
| ax.plot(episodes, moving_avg(inter_log['total'], ma_window), label=f'Total Reward (MA {ma_window})', linewidth=2, color='tab:green') |
| ax.set_xlabel("Episode") |
| ax.set_ylabel("Total Inter-Cluster Reward", color='tab:green') |
| ax.tick_params(axis='y', labelcolor='tab:green') |
| ax.grid(True) |
| |
| ax2 = ax.twinx() |
| ax2.plot(episodes, moving_avg(inter_log['mean'], ma_window), label=f'Mean Reward (MA {ma_window})', linewidth=2, linestyle='--', color='mediumseagreen') |
| ax2.set_ylabel("Mean Inter-Cluster Reward", color='mediumseagreen') |
| ax2.tick_params(axis='y', labelcolor='mediumseagreen') |
| |
| fig.suptitle("Inter-Cluster (High-Level Agent) Rewards") |
| fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) |
| plt.savefig(os.path.join(plots_dir, "2_inter_cluster_rewards.png"), dpi=200) |
| plt.close() |
|
|
| |
| fig, ax = plt.subplots(figsize=(12, 7)) |
| ax.plot(episodes, moving_avg(total_log['total'], ma_window), label=f'Total System Reward (MA {ma_window})', linewidth=2, color='tab:red') |
| ax.set_xlabel("Episode") |
| ax.set_ylabel("Total System Reward", color='tab:red') |
| ax.tick_params(axis='y', labelcolor='tab:red') |
| ax.grid(True) |
| |
| ax2 = ax.twinx() |
| ax2.plot(episodes, moving_avg(total_log['mean'], ma_window), label=f'Mean System Reward (MA {ma_window})', linewidth=2, linestyle='--', color='salmon') |
| ax2.set_ylabel("Mean System Reward per Agent", color='salmon') |
| ax2.tick_params(axis='y', labelcolor='salmon') |
| |
| fig.suptitle("Total System Rewards (Intra + Inter)") |
| fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) |
| plt.savefig(os.path.join(plots_dir, "3_total_system_rewards.png"), dpi=200) |
| plt.close() |
|
|
| |
| cost_df = pd.DataFrame(cost_log) |
| cost_df['cost_reduction_pct'] = 100 * (1 - (cost_df['total_cost'] / cost_df['cost_without_p2p'])).clip(lower=-np.inf, upper=100) |
| plt.figure(figsize=(12, 7)) |
| plt.plot(episodes, moving_avg(cost_df['cost_reduction_pct'], ma_window), label=f'Cost Reduction % (MA {ma_window})', color='purple', linewidth=2) |
| plt.xlabel("Episode") |
| plt.ylabel("Cost Reduction (%)") |
| plt.title("Total System-Wide Cost Reduction") |
| plt.legend() |
| plt.grid(True) |
| plt.savefig(os.path.join(plots_dir, "4_cost_reduction.png"), dpi=200) |
| plt.close() |
|
|
| |
| df_plot = df_final_log[pd.to_numeric(df_final_log['Episode'], errors='coerce').notna()].copy() |
| df_plot['Episode'] = pd.to_numeric(df_plot['Episode']) |
|
|
| |
| plt.figure(figsize=(12, 7)) |
| plt.plot(df_plot["Episode"], moving_avg(df_plot["battery_degradation_cost_total"], ma_window), |
| label=f'Degradation Cost (MA {ma_window})', color='darkgreen', linewidth=2) |
| plt.xlabel("Episode") |
| plt.ylabel("Total Degradation Cost ($)") |
| plt.title("Total Battery Degradation Cost") |
| plt.legend() |
| plt.grid(True) |
| plt.savefig(os.path.join(plots_dir, "5_battery_degradation_cost.png"), dpi=200) |
| plt.close() |
|
|
|
|
| print(f"All plots have been saved to: {plots_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |