| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import math |
| | from collections.abc import Iterable, Iterator |
| | from enum import Enum |
| | from functools import cached_property |
| | from pathlib import Path |
| |
|
| | import datasets |
| | import pyarrow.compute as pc |
| | from gluonts.dataset import DataEntry |
| | from gluonts.dataset.common import ProcessDataEntry |
| | from gluonts.dataset.split import TestData, TrainingDataset, split |
| | from gluonts.itertools import Map |
| | from gluonts.time_feature import norm_freq_str |
| | from gluonts.transform import Transformation |
| | from pandas.tseries.frequencies import to_offset |
| | from toolz import compose |
| |
|
| | TEST_SPLIT = 0.1 |
| | MAX_WINDOW = 20 |
| |
|
| | M4_PRED_LENGTH_MAP = { |
| | "A": 6, |
| | "Q": 8, |
| | "M": 18, |
| | "W": 13, |
| | "D": 14, |
| | "H": 48, |
| | "h": 48, |
| | "Y": 6, |
| | } |
| |
|
| | PRED_LENGTH_MAP = { |
| | "M": 12, |
| | "W": 8, |
| | "D": 30, |
| | "H": 48, |
| | "h": 48, |
| | "T": 48, |
| | "S": 60, |
| | "s": 60, |
| | "min": 48, |
| | } |
| |
|
| | TFB_PRED_LENGTH_MAP = { |
| | "A": 6, |
| | "Y": 6, |
| | "H": 48, |
| | "h": 48, |
| | "Q": 8, |
| | "D": 14, |
| | "M": 18, |
| | "W": 13, |
| | "U": 8, |
| | "T": 8, |
| | "min": 8, |
| | "us": 8, |
| | } |
| |
|
| |
|
| | class Term(Enum): |
| | SHORT = "short" |
| | MEDIUM = "medium" |
| | LONG = "long" |
| |
|
| | @property |
| | def multiplier(self) -> int: |
| | if self == Term.SHORT: |
| | return 1 |
| | elif self == Term.MEDIUM: |
| | return 10 |
| | elif self == Term.LONG: |
| | return 15 |
| |
|
| |
|
| | def itemize_start(data_entry: DataEntry) -> DataEntry: |
| | data_entry["start"] = data_entry["start"].item() |
| | return data_entry |
| |
|
| |
|
| | class MultivariateToUnivariate(Transformation): |
| | def __init__(self, field): |
| | self.field = field |
| |
|
| | def __call__(self, data_it: Iterable[DataEntry], is_train: bool = False) -> Iterator: |
| | for data_entry in data_it: |
| | item_id = data_entry["item_id"] |
| | val_ls = list(data_entry[self.field]) |
| | for id, val in enumerate(val_ls): |
| | univariate_entry = data_entry.copy() |
| | univariate_entry[self.field] = val |
| | univariate_entry["item_id"] = item_id + "_dim" + str(id) |
| | yield univariate_entry |
| |
|
| |
|
| | class Dataset: |
| | def __init__( |
| | self, |
| | name: str, |
| | term: Term | str = Term.SHORT, |
| | to_univariate: bool = False, |
| | storage_path: str = None, |
| | max_windows: int | None = None, |
| | ): |
| | storage_path = Path(storage_path) |
| | self.hf_dataset = datasets.load_from_disk(str(storage_path / name)).with_format("numpy") |
| | process = ProcessDataEntry( |
| | self.freq, |
| | one_dim_target=self.target_dim == 1, |
| | ) |
| |
|
| | self.gluonts_dataset = Map(compose(process, itemize_start), self.hf_dataset) |
| | if to_univariate: |
| | self.gluonts_dataset = MultivariateToUnivariate("target").apply(self.gluonts_dataset) |
| |
|
| | self.term = Term(term) |
| | self.name = name |
| | self.max_windows = max_windows if max_windows is not None else MAX_WINDOW |
| |
|
| | @cached_property |
| | def prediction_length(self) -> int: |
| | freq = norm_freq_str(to_offset(self.freq).name) |
| | if freq.endswith("E"): |
| | freq = freq[:-1] |
| | pred_len = M4_PRED_LENGTH_MAP[freq] if "m4" in self.name else PRED_LENGTH_MAP[freq] |
| | return self.term.multiplier * pred_len |
| |
|
| | @cached_property |
| | def freq(self) -> str: |
| | return self.hf_dataset[0]["freq"] |
| |
|
| | @cached_property |
| | def target_dim(self) -> int: |
| | return target.shape[0] if len((target := self.hf_dataset[0]["target"]).shape) > 1 else 1 |
| |
|
| | @cached_property |
| | def past_feat_dynamic_real_dim(self) -> int: |
| | if "past_feat_dynamic_real" not in self.hf_dataset[0]: |
| | return 0 |
| | elif len((past_feat_dynamic_real := self.hf_dataset[0]["past_feat_dynamic_real"]).shape) > 1: |
| | return past_feat_dynamic_real.shape[0] |
| | else: |
| | return 1 |
| |
|
| | @cached_property |
| | def windows(self) -> int: |
| | if "m4" in self.name: |
| | return 1 |
| | w = math.ceil(TEST_SPLIT * self._min_series_length / self.prediction_length) |
| | return min(max(1, w), self.max_windows) |
| |
|
| | @cached_property |
| | def _min_series_length(self) -> int: |
| | if self.hf_dataset[0]["target"].ndim > 1: |
| | lengths = pc.list_value_length(pc.list_flatten(pc.list_slice(self.hf_dataset.data.column("target"), 0, 1))) |
| | else: |
| | lengths = pc.list_value_length(self.hf_dataset.data.column("target")) |
| | return min(lengths.to_numpy()) |
| |
|
| | @cached_property |
| | def sum_series_length(self) -> int: |
| | if self.hf_dataset[0]["target"].ndim > 1: |
| | lengths = pc.list_value_length(pc.list_flatten(self.hf_dataset.data.column("target"))) |
| | else: |
| | lengths = pc.list_value_length(self.hf_dataset.data.column("target")) |
| | return sum(lengths.to_numpy()) |
| |
|
| | @property |
| | def training_dataset(self) -> TrainingDataset: |
| | training_dataset, _ = split(self.gluonts_dataset, offset=-self.prediction_length * (self.windows + 1)) |
| | return training_dataset |
| |
|
| | @property |
| | def validation_dataset(self) -> TrainingDataset: |
| | validation_dataset, _ = split(self.gluonts_dataset, offset=-self.prediction_length * self.windows) |
| | return validation_dataset |
| |
|
| | @property |
| | def test_data(self) -> TestData: |
| | _, test_template = split(self.gluonts_dataset, offset=-self.prediction_length * self.windows) |
| | test_data = test_template.generate_instances( |
| | prediction_length=self.prediction_length, |
| | windows=self.windows, |
| | distance=self.prediction_length, |
| | ) |
| | return test_data |
| |
|