| import torch |
| import pandas as pd |
| import numpy as np |
| from sklearn.preprocessing import MinMaxScaler |
| import joblib |
| import os |
| from typing import Tuple, Dict |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| class MultiHouseDataset(torch.utils.data.Dataset): |
| |
| def __init__(self, data_dir: str, window_size: int = 96, step_size: int = 1, |
| scaler_path: str = 'global_scaler.gz', cache_in_memory: bool = True, |
| dtype: torch.dtype = torch.float32, limit_to_one_year: bool = True): |
| self.window_size = window_size |
| self.step_size = step_size |
| self.cache_in_memory = cache_in_memory |
| self.dtype = dtype |
| self.limit_to_one_year = limit_to_one_year |
| |
| all_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.csv')]) |
| print(f"Found {len(all_files)} house files in '{data_dir}'.") |
| |
| self.num_houses = len(all_files) |
| |
| print("Reading house data...") |
| if self.limit_to_one_year: |
| print("INFO: Limiting data to the first year (17,520 samples) for each house.") |
| |
| data_per_house = [] |
| timestamps_per_house = [] |
| |
| SAMPLES_PER_YEAR = 17520 |
| |
| for filename in all_files: |
| df = pd.read_csv(os.path.join(data_dir, filename), parse_dates=['timestamp']) |
| timestamps_per_house.append(df['timestamp'].values) |
| time_series_values = df[['grid_usage', 'solar_generation']].values.astype(np.float32) |
|
|
| if self.limit_to_one_year: |
| time_series_values = time_series_values[:SAMPLES_PER_YEAR] |
|
|
| num_timesteps = len(time_series_values) |
| timesteps_of_day = np.arange(num_timesteps) % 48 |
|
|
| sin_time = np.sin(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) |
| cos_time = np.cos(2 * np.pi * timesteps_of_day / 48.0).astype(np.float32) |
|
|
| time_series_values = np.concatenate([ |
| time_series_values, |
| sin_time[:, np.newaxis], |
| cos_time[:, np.newaxis] |
| ], axis=1) |
|
|
| data_per_house.append(time_series_values) |
|
|
| if os.path.exists(scaler_path): |
| scaler = joblib.load(scaler_path) |
| print(f"Scaler loaded from {scaler_path}") |
| else: |
| print("Fitting global scaler...") |
| combined_data = np.vstack(data_per_house) |
| scaler = MinMaxScaler(feature_range=(-1, 1)) |
| scaler.fit(combined_data) |
| joblib.dump(scaler, scaler_path) |
| print(f"Scaler saved to {scaler_path}") |
| |
| if self.cache_in_memory: |
| print("Caching normalized data...") |
| self.normalized_data_per_house = [] |
| for series in data_per_house: |
| normalized = scaler.transform(series) |
| tensor_data = torch.from_numpy(normalized).to(dtype=self.dtype) |
| self.normalized_data_per_house.append(tensor_data) |
| else: |
| self.normalized_data_per_house = [] |
| for series in data_per_house: |
| self.normalized_data_per_house.append(scaler.transform(series)) |
| |
| del data_per_house |
| |
| print("Pre-computing mappings...") |
| |
| self.windows_per_house = [(len(d) - self.window_size) // self.step_size + 1 for d in self.normalized_data_per_house] |
| self.cumulative_windows = np.cumsum([0] + self.windows_per_house) |
| self.total_windows = self.cumulative_windows[-1] |
|
|
| self.sample_to_house = np.empty(self.total_windows, dtype=np.int32) |
| self.sample_to_local_idx = np.empty(self.total_windows, dtype=np.int32) |
| self.sample_to_day_of_week = np.empty(self.total_windows, dtype=np.int32) |
| self.sample_to_day_of_year = np.empty(self.total_windows, dtype=np.int32) |
|
|
| for house_idx in range(self.num_houses): |
| start_global_idx = self.cumulative_windows[house_idx] |
| end_global_idx = self.cumulative_windows[house_idx + 1] |
| num_windows_for_this_house = self.windows_per_house[house_idx] |
|
|
| self.sample_to_house[start_global_idx:end_global_idx] = house_idx |
| |
| local_indices = np.arange(num_windows_for_this_house) * self.step_size |
| self.sample_to_local_idx[start_global_idx:end_global_idx] = local_indices |
|
|
| house_timestamps = pd.Series(timestamps_per_house[house_idx][local_indices]) |
| self.sample_to_day_of_week[start_global_idx:end_global_idx] = house_timestamps.dt.dayofweek |
| self.sample_to_day_of_year[start_global_idx:end_global_idx] = house_timestamps.dt.dayofyear - 1 |
|
|
| print(f"Dataset initialized. Total windows: {self.total_windows} from {self.num_houses} houses.") |
| memory_usage = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 if self.cache_in_memory else 0 |
| print(f"Memory usage for cached tensors: {memory_usage:.1f} MB") |
|
|
| def __len__(self) -> int: |
| return self.total_windows |
|
|
| def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]: |
| if idx < 0 or idx >= self.total_windows: |
| raise IndexError("Index out of range") |
|
|
| house_index = self.sample_to_house[idx] |
| local_start_pos = self.sample_to_local_idx[idx] |
| |
| window_data = self.normalized_data_per_house[house_index][local_start_pos : local_start_pos + self.window_size] |
| |
| conditions = { |
| "house_id": torch.tensor(house_index, dtype=torch.long), |
| "day_of_week": torch.tensor(self.sample_to_day_of_week[idx], dtype=torch.long), |
| "day_of_year": torch.tensor(self.sample_to_day_of_year[idx], dtype=torch.long), |
| } |
| |
| return window_data, conditions |
|
|
| def get_memory_usage(self) -> dict: |
| if self.cache_in_memory: |
| tensor_memory = sum(data.numel() * data.element_size() for data in self.normalized_data_per_house) / 1e6 |
| else: |
| tensor_memory = 0 |
| |
| mapping_memory = (self.sample_to_house.nbytes + self.sample_to_local_idx.nbytes) / 1e6 |
| |
| return { |
| 'tensor_cache_mb': tensor_memory, |
| 'mapping_arrays_mb': mapping_memory, |
| 'total_mb': tensor_memory + mapping_memory |
| } |
|
|
| class LatentDataset(torch.utils.data.Dataset): |
| def __init__(self, latent_vectors: torch.Tensor, house_ids: torch.Tensor): |
| assert len(latent_vectors) == len(house_ids), "Latent vectors and house IDs must have same length" |
| self.latent_vectors = latent_vectors.contiguous() |
| self.house_ids = house_ids.contiguous() |
| |
| def __len__(self) -> int: |
| return len(self.latent_vectors) |
| |
| def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]: |
| return self.latent_vectors[idx], self.house_ids[idx] |
|
|
| if __name__ == "__main__": |
| import time |
| |
| DATA_DIRECTORY = './data/per_house/' |
| |
| if os.path.exists(DATA_DIRECTORY): |
| print("--- Testing Dataset Setup ---") |
| |
| start_time = time.time() |
| dataset = MultiHouseDataset(data_dir=DATA_DIRECTORY, window_size=96, step_size=96) |
| init_time = time.time() - start_time |
| print(f"Dataset initialization: {init_time:.2f}s") |
| print(f"Memory usage: {dataset.get_memory_usage()}") |
| |
| if len(dataset) > 0: |
| first_sample, first_conditions = dataset[0] |
| |
| print(f"\nSample data shape: {first_sample.shape}") |
| print(f"Sample conditions: {first_conditions}") |
| print(f"Total houses: {dataset.num_houses}") |
| else: |
| print(f"ERROR: Data directory not found at '{DATA_DIRECTORY}'. Please create and populate this directory.") |
|
|