| import torch |
| import torch.nn as nn |
| import numpy as np |
| import pandas as pd |
| import os |
| import joblib |
| import math |
| import datetime |
| from tqdm import tqdm |
| import matplotlib.pyplot as plt |
| import matplotlib.dates as mdates |
|
|
| |
| |
| |
|
|
| try: |
| from hierarchical_diffusion_model import ( |
| HierarchicalDiffusionModel, ConditionalUnet, ResnetBlock1D, |
| AttentionBlock1D, DownBlock1D, UpBlock1D, |
| SinusoidalPositionEmbeddings, ImprovedDiffusionModel |
| ) |
| print("Diffusion model classes imported.") |
| except ImportError: |
| print("="*50) |
| print("ERROR: Could not import model classes from 'hierarchical_diffusion_model.py'.") |
| print("="*50) |
| exit() |
|
|
|
|
| |
| |
| |
|
|
| def add_amplitude_jitter(series, daily_samples=48, scale=0.05): |
| series = series.copy() |
| num_days = len(series) // daily_samples |
| if num_days == 0: return series |
| factors = np.random.normal(1.0, scale, size=num_days) |
| for d in range(num_days): |
| start, end = d * daily_samples, (d + 1) * daily_samples |
| series[start:end] *= factors[d] |
| return series |
|
|
| def add_cloud_variability(pv, timestamps, base_sigma=0.25): |
| pv = pv.copy() |
| if len(pv) == 0: return pv |
| days = pd.Series(pv, index=timestamps).groupby(timestamps.date) |
| adjusted = [] |
| for day, vals in days: |
| cloud_factor = np.random.lognormal(mean=-0.02, sigma=base_sigma) |
| hour = vals.index.hour |
| day_pv = np.where((hour >= 6) & (hour <= 18), vals * cloud_factor, 0.0) |
| adjusted.append(day_pv) |
| if not adjusted: return np.array([]) |
| return np.concatenate(adjusted) |
|
|
| def enforce_physics(df: pd.DataFrame, pv_cap_kw: float | None = None) -> pd.DataFrame: |
| df = df.copy() |
| df['solar_generation'] = np.clip(df['solar_generation'], 0.0, None) |
| hour = df.index.hour |
| night = (hour < 7) | (hour > 18) |
| df.loc[night, 'solar_generation'] = 0.0 |
| export_mask = df['grid_usage'] < 0 |
| if export_mask.any(): |
| limited_export = -np.minimum(-df.loc[export_mask, 'grid_usage'], df.loc[export_mask, 'solar_generation']) |
| df.loc[export_mask, 'grid_usage'] = limited_export |
| zero_pv_neg_grid = export_mask & (df['solar_generation'] <= 1e-6) |
| df.loc[zero_pv_neg_grid, 'grid_usage'] = 0.0 |
| if pv_cap_kw is not None: |
| df['solar_generation'] = np.clip(df['solar_generation'], 0.0, pv_cap_kw) |
| return df |
|
|
| def calculate_generation_length(duration: str, samples_per_day: int) -> int: |
| """Calculate samples needed.""" |
| if duration == '1_year': |
| return 365 * samples_per_day |
| elif duration == '6_months': |
| return 182 * samples_per_day |
| elif duration == '2_months': |
| return 60 * samples_per_day |
| elif duration == '1_month': |
| return 30 * samples_per_day |
| elif duration == '14_days': |
| return 14 * samples_per_day |
| elif duration == '7_days': |
| return 7 * samples_per_day |
| elif duration == '2_days': |
| return 2 * samples_per_day |
| else: |
| print(f"Warning: Unknown duration '{duration}'. Defaulting to 1 year.") |
| return 365 * samples_per_day |
|
|
| |
| |
| |
|
|
| class Config: |
| |
| MODEL_PATH = './trained_model/best_hierarchical_model.pth' |
| SCALER_PATH = './data/global_scaler.gz' |
| ORIGINAL_DATA_DIR = './data/per_house' |
| OUTPUT_DIR = './generated_data' |
|
|
| |
| GENERATION_DURATION = '1_year' |
| NUM_PROFILES_TO_GENERATE = 2000 |
| PLOTS_TO_GENERATE = 20 |
| GENERATION_BATCH_SIZE = 128 |
|
|
| |
| TRAINING_WINDOW_DAYS = 14 |
| |
| NUM_HOUSES_TRAINED_ON = 300 |
| SAMPLES_PER_DAY = 48 |
| NUM_FEATURES = 4 |
| DOWNSCALE_FACTOR = 4 |
| EMBEDDING_DIM = 64 |
| HIDDEN_SIZE = 512 |
| HIDDEN_DIMS = [HIDDEN_SIZE // 4, HIDDEN_SIZE // 2, HIDDEN_SIZE] |
| DROPOUT = 0.1 |
| USE_ATTENTION = True |
| DIFFUSION_TIMESTEPS = 500 |
| BLOCKS_PER_LEVEL = 3 |
|
|
|
|
| |
| |
| |
|
|
| def main(cfg, run_output_dir): |
| """Main generation logic.""" |
| DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" |
| print(f"Using device: {DEVICE}") |
|
|
| csv_output_dir = os.path.join(run_output_dir, 'csv') |
| plot_output_dir = os.path.join(run_output_dir, 'plots') |
| os.makedirs(csv_output_dir, exist_ok=True) |
| os.makedirs(plot_output_dir, exist_ok=True) |
| |
| print("Loading resources...") |
| try: |
| scaler = joblib.load(cfg.SCALER_PATH) |
| if scaler.n_features_in_ != cfg.NUM_FEATURES: |
| print(f"WARNING: Scaler was fit on {scaler.n_features_in_} features, but model expects {cfg.NUM_FEATURES}.") |
| |
| original_files = sorted([f for f in os.listdir(cfg.ORIGINAL_DATA_DIR) if f.endswith('.csv')]) |
| if not original_files: |
| raise FileNotFoundError("No original data files found to extract timestamps.") |
| |
| sample_original_df = pd.read_csv(os.path.join(cfg.ORIGINAL_DATA_DIR, original_files[0]), index_col='timestamp', parse_dates=True) |
| |
| |
| full_timestamps = sample_original_df.index[:(365 * cfg.SAMPLES_PER_DAY)] |
| |
| |
| total_samples_needed = calculate_generation_length(cfg.GENERATION_DURATION, cfg.SAMPLES_PER_DAY) |
| |
| |
| TRAINING_WINDOW_SAMPLES = cfg.TRAINING_WINDOW_DAYS * cfg.SAMPLES_PER_DAY |
| |
| |
| if total_samples_needed > len(full_timestamps): |
| print(f"Warning: Requested {total_samples_needed} samples, but file has {len(full_timestamps)}. Clamping to max.") |
| total_samples_needed = len(full_timestamps) |
| |
| print(f"Goal: Generate {total_samples_needed} samples ({cfg.GENERATION_DURATION}) per profile.") |
| print(f"Strategy: Stitching {TRAINING_WINDOW_SAMPLES}-sample chunks.") |
| |
| model = HierarchicalDiffusionModel( |
| in_channels=cfg.NUM_FEATURES, |
| num_houses=cfg.NUM_HOUSES_TRAINED_ON, |
| downscale_factor=cfg.DOWNSCALE_FACTOR, |
| embedding_dim=cfg.EMBEDDING_DIM, |
| hidden_dims=cfg.HIDDEN_DIMS, |
| dropout=cfg.DROPOUT, |
| use_attention=cfg.USE_ATTENTION, |
| num_timesteps=cfg.DIFFUSION_TIMESTEPS, |
| blocks_per_level=cfg.BLOCKS_PER_LEVEL |
| ) |
| |
| model.load_state_dict(torch.load(cfg.MODEL_PATH, map_location=DEVICE)) |
| model.to(DEVICE) |
| model.eval() |
| print("Model, scaler, timestamps ready.") |
|
|
| except FileNotFoundError as e: |
| print(f"ERROR: A required file was not found. Details: {e}") |
| return |
| except Exception as e: |
| print(f"An error occurred during setup: {e}") |
| return |
|
|
| num_batches = math.ceil(cfg.NUM_PROFILES_TO_GENERATE / cfg.GENERATION_BATCH_SIZE) |
| house_counter = 0 |
|
|
| pbar = tqdm(range(num_batches), desc="Generating Batches") |
| for i in pbar: |
| current_batch_size = min(cfg.GENERATION_BATCH_SIZE, cfg.NUM_PROFILES_TO_GENERATE - house_counter) |
| if current_batch_size <= 0: break |
| pbar.set_postfix({'batch_size': current_batch_size}) |
| |
| |
| num_chunks_needed = math.ceil(total_samples_needed / TRAINING_WINDOW_SAMPLES) |
| batch_chunks_list = [] |
|
|
| for chunk_idx in range(num_chunks_needed): |
| |
| samples_remaining = total_samples_needed - (chunk_idx * TRAINING_WINDOW_SAMPLES) |
| current_chunk_length = min(TRAINING_WINDOW_SAMPLES, samples_remaining) |
| |
| shape_to_generate = (current_chunk_length, cfg.NUM_FEATURES) |
|
|
| |
| sample_conditions = { |
| "house_id": torch.randint(0, cfg.NUM_HOUSES_TRAINED_ON, (current_batch_size,), device=DEVICE), |
| "day_of_week": torch.randint(0, 7, (current_batch_size,), device=DEVICE), |
| "day_of_year": torch.randint(0, 365, (current_batch_size,), device=DEVICE) |
| } |
| |
| with torch.no_grad(): |
| |
| generated_chunk_data = model.sample(current_batch_size, sample_conditions, shape=shape_to_generate) |
| |
| batch_chunks_list.append(generated_chunk_data.cpu().numpy()) |
| |
| |
| generated_data_np = np.concatenate(batch_chunks_list, axis=1) |
| |
|
|
| |
| for j in range(current_batch_size): |
| current_house_num = house_counter + 1 |
| |
| profile_timestamps = full_timestamps[:total_samples_needed] |
| normalized_series = generated_data_np[j] |
| |
| unscaled_series = scaler.inverse_transform(normalized_series) |
| |
| df = pd.DataFrame( |
| unscaled_series, |
| columns=['grid_usage', 'solar_generation', 'sin_time', 'cos_time'], |
| index=profile_timestamps |
| ) |
|
|
| df = enforce_physics(df) |
| df['grid_usage'] = add_amplitude_jitter(df['grid_usage'].values, scale=0.08, daily_samples=cfg.SAMPLES_PER_DAY) |
| df['solar_generation'] = add_cloud_variability(df['solar_generation'].values, df.index, base_sigma=0.3) |
| df = enforce_physics(df) |
| |
| df_to_save = df[['grid_usage', 'solar_generation']] |
| df_to_save.to_csv(os.path.join(csv_output_dir, f'generated_house_{current_house_num}.csv')) |
|
|
| if house_counter < cfg.PLOTS_TO_GENERATE: |
| plot_df = df_to_save.head(cfg.SAMPLES_PER_DAY * 14) |
| plt.figure(figsize=(15, 6)) |
| plt.plot(plot_df.index, plot_df['grid_usage'], label='Grid Usage', color='dodgerblue', alpha=0.9) |
| plt.plot(plot_df.index, plot_df['solar_generation'], label='Solar Generation', color='darkorange', alpha=0.9) |
| plt.title(f'Generated Data for Profile {current_house_num} (First 14 Days)') |
| plt.xlabel('Timestamp'); plt.ylabel('Power (kW)'); plt.legend(); plt.grid(True, which='both', linestyle='--', linewidth=0.5) |
| plt.tight_layout() |
| plt.savefig(os.path.join(plot_output_dir, f'generated_profile_{current_house_num}_plot.png')) |
| plt.close() |
| |
| house_counter += 1 |
|
|
| print(f"\nSuccessfully generated and saved {house_counter} house profiles.") |
| if cfg.PLOTS_TO_GENERATE > 0: |
| print(f"Plots saved to '{plot_output_dir}'.") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == '__main__': |
| config = Config() |
| |
| |
| run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
| run_name = f"generation_run_{config.GENERATION_DURATION}_{run_timestamp}" |
| run_output_dir = os.path.join(config.OUTPUT_DIR, run_name) |
| os.makedirs(run_output_dir, exist_ok=True) |
| |
| print(f"Starting new generation run: {run_name}") |
| print(f"All outputs will be saved to: {run_output_dir}") |
| |
| |
| main(config, run_output_dir) |
| |
| print("\nGeneration process complete.") |