| | """ |
| | Comprehensive frequency management module for time series forecasting. |
| | |
| | This module centralizes all frequency-related functionality including: |
| | - Frequency enum with helper methods |
| | - Frequency parsing and validation |
| | - Pandas frequency string conversion |
| | - Safety checks for date ranges |
| | - Frequency selection utilities |
| | - All frequency constants and mappings |
| | """ |
| |
|
| | import logging |
| | import re |
| | from enum import Enum |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from numpy.random import Generator |
| |
|
| | from src.data.constants import BASE_END_DATE, BASE_START_DATE, MAX_YEARS |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class Frequency(Enum): |
| | """ |
| | Enhanced Frequency enum with comprehensive helper methods. |
| | |
| | Each frequency includes methods for pandas conversion, safety checks, |
| | and other frequency-specific operations. |
| | """ |
| |
|
| | A = "A" |
| | Q = "Q" |
| | M = "M" |
| | W = "W" |
| | D = "D" |
| | H = "h" |
| | S = "s" |
| | T1 = "1min" |
| | T5 = "5min" |
| | T10 = "10min" |
| | T15 = "15min" |
| | T30 = "30min" |
| |
|
| | def to_pandas_freq(self, for_date_range: bool = True) -> str: |
| | """ |
| | Convert to pandas frequency string. |
| | |
| | Args: |
| | for_date_range: If True, use strings suitable for pd.date_range(). |
| | If False, use strings suitable for pd.PeriodIndex(). |
| | |
| | Returns: |
| | Pandas frequency string |
| | """ |
| | base, prefix, _ = FREQUENCY_MAPPING[self] |
| |
|
| | |
| | if for_date_range: |
| | |
| | if self == Frequency.M: |
| | return "ME" |
| | elif self == Frequency.A: |
| | return "YE" |
| | elif self == Frequency.Q: |
| | return "QE" |
| | else: |
| | |
| | if self == Frequency.M: |
| | return "M" |
| | elif self == Frequency.A: |
| | return "Y" |
| | elif self == Frequency.Q: |
| | return "Q" |
| |
|
| | |
| | if prefix: |
| | return f"{prefix}{base}" |
| | else: |
| | return base |
| |
|
| | def to_pandas_offset(self) -> str: |
| | """Get pandas offset string for time delta calculations.""" |
| | return FREQUENCY_TO_OFFSET[self] |
| |
|
| | def get_days_per_period(self) -> float: |
| | """Get approximate days per period for this frequency.""" |
| | _, _, days = FREQUENCY_MAPPING[self] |
| | return days |
| |
|
| | def get_max_safe_length(self) -> int: |
| | """Get maximum safe sequence length to prevent timestamp overflow.""" |
| | return ALL_FREQUENCY_MAX_LENGTHS.get(self, float("inf")) |
| |
|
| | def is_high_frequency(self) -> bool: |
| | """Check if this is a high frequency (minute/second level).""" |
| | return self in [ |
| | Frequency.S, |
| | Frequency.T1, |
| | Frequency.T5, |
| | Frequency.T10, |
| | Frequency.T15, |
| | Frequency.T30, |
| | ] |
| |
|
| | def is_low_frequency(self) -> bool: |
| | """Check if this is a low frequency (annual/quarterly/monthly).""" |
| | return self in [Frequency.A, Frequency.Q, Frequency.M] |
| |
|
| | def get_seasonality(self) -> int: |
| | """Get typical seasonality for this frequency.""" |
| | seasonality_map = { |
| | Frequency.S: 3600, |
| | Frequency.T1: 60, |
| | Frequency.T5: 12, |
| | Frequency.T10: 6, |
| | Frequency.T15: 4, |
| | Frequency.T30: 2, |
| | Frequency.H: 24, |
| | Frequency.D: 7, |
| | Frequency.W: 52, |
| | Frequency.M: 12, |
| | Frequency.Q: 4, |
| | Frequency.A: 1, |
| | } |
| | return seasonality_map.get(self, 1) |
| |
|
| | def get_gift_eval_weight(self) -> float: |
| | """Get GIFT eval dataset frequency weight.""" |
| | return GIFT_EVAL_FREQUENCY_WEIGHTS.get(self, 0.1) |
| |
|
| | def get_length_range(self) -> tuple[int, int, int, int]: |
| | """Get (min_length, max_length, optimal_start, optimal_end) for this frequency.""" |
| | return GIFT_EVAL_LENGTH_RANGES.get(self, (50, 1000, 100, 500)) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | FREQUENCY_MAPPING: dict[Frequency, tuple[str, str, float]] = { |
| | Frequency.A: ( |
| | "YE", |
| | "", |
| | 365.25, |
| | ), |
| | Frequency.Q: ("Q", "", 91.3125), |
| | Frequency.M: ("M", "", 30.4375), |
| | Frequency.W: ("W", "", 7), |
| | Frequency.D: ("D", "", 1), |
| | Frequency.H: ("h", "", 1 / 24), |
| | Frequency.S: ("s", "", 1 / 86400), |
| | Frequency.T1: ("min", "1", 1 / 1440), |
| | Frequency.T5: ("min", "5", 1 / 288), |
| | Frequency.T10: ("min", "10", 1 / 144), |
| | Frequency.T15: ("min", "15", 1 / 96), |
| | Frequency.T30: ("min", "30", 1 / 48), |
| | } |
| |
|
| | |
| | FREQUENCY_TO_OFFSET: dict[Frequency, str] = { |
| | Frequency.A: "AS", |
| | Frequency.Q: "QS", |
| | Frequency.M: "MS", |
| | Frequency.W: "W", |
| | Frequency.D: "D", |
| | Frequency.H: "H", |
| | Frequency.T1: "1T", |
| | Frequency.T5: "5T", |
| | Frequency.T10: "10T", |
| | Frequency.T15: "15T", |
| | Frequency.T30: "30T", |
| | Frequency.S: "S", |
| | } |
| |
|
| | |
| | SHORT_FREQUENCY_MAX_LENGTHS = { |
| | Frequency.A: MAX_YEARS, |
| | Frequency.Q: MAX_YEARS * 4, |
| | Frequency.M: MAX_YEARS * 12, |
| | Frequency.W: int(MAX_YEARS * 52.1775), |
| | Frequency.D: int(MAX_YEARS * 365.2425), |
| | } |
| |
|
| | HIGH_FREQUENCY_MAX_LENGTHS = { |
| | Frequency.H: int(MAX_YEARS * 365.2425 * 24), |
| | Frequency.S: int(MAX_YEARS * 365.2425 * 24 * 60 * 60), |
| | Frequency.T1: int(MAX_YEARS * 365.2425 * 24 * 60), |
| | Frequency.T5: int(MAX_YEARS * 365.2425 * 24 * 12), |
| | Frequency.T10: int(MAX_YEARS * 365.2425 * 24 * 6), |
| | Frequency.T15: int(MAX_YEARS * 365.2425 * 24 * 4), |
| | Frequency.T30: int(MAX_YEARS * 365.2425 * 24 * 2), |
| | } |
| |
|
| | |
| | ALL_FREQUENCY_MAX_LENGTHS = { |
| | **SHORT_FREQUENCY_MAX_LENGTHS, |
| | **HIGH_FREQUENCY_MAX_LENGTHS, |
| | } |
| |
|
| | |
| | GIFT_EVAL_FREQUENCY_WEIGHTS: dict[Frequency, float] = { |
| | Frequency.H: 25.0, |
| | Frequency.D: 23.4, |
| | Frequency.W: 12.9, |
| | Frequency.T15: 9.7, |
| | Frequency.T5: 9.7, |
| | Frequency.M: 7.3, |
| | Frequency.T10: 4.8, |
| | Frequency.S: 4.8, |
| | Frequency.T1: 1.6, |
| | Frequency.Q: 0.8, |
| | Frequency.A: 0.8, |
| | } |
| |
|
| | |
| | |
| | GIFT_EVAL_LENGTH_RANGES: dict[Frequency, tuple[int, int, int, int]] = { |
| | |
| | Frequency.A: (25, 100, 30, 70), |
| | Frequency.Q: (25, 150, 50, 120), |
| | Frequency.M: (40, 1000, 100, 600), |
| | Frequency.W: (50, 3500, 100, 1500), |
| | |
| | Frequency.D: (150, 25000, 300, 7000), |
| | Frequency.H: (600, 35000, 700, 17000), |
| | |
| | Frequency.T1: (200, 2500, 1200, 1800), |
| | Frequency.S: (7500, 9500, 7900, 9000), |
| | Frequency.T15: (1000, 140000, 50000, 130000), |
| | Frequency.T5: (200, 105000, 20000, 95000), |
| | Frequency.T10: (40000, 55000, 47000, 52000), |
| | Frequency.T30: (100, 50000, 10000, 40000), |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def parse_frequency(freq_str: str) -> Frequency: |
| | """ |
| | Parse frequency string to Frequency enum, robust to variations. |
| | |
| | Handles various frequency string formats: |
| | - Standard: "A", "Q", "M", "W", "D", "H", "S" |
| | - Pandas-style: "A-DEC", "W-SUN", "QE-MAR" |
| | - Minutes: "5T", "10min", "1T" |
| | - Case variations: "a", "h", "D" |
| | |
| | Args: |
| | freq_str: The frequency string to parse (e.g., "5T", "W-SUN", "M") |
| | |
| | Returns: |
| | Corresponding Frequency enum member |
| | |
| | Raises: |
| | ValueError: If the frequency string is not supported |
| | """ |
| | |
| | |
| | minute_match = re.match(r"^(\d*)T$", freq_str, re.IGNORECASE) or re.match(r"^(\d*)min$", freq_str, re.IGNORECASE) |
| | if minute_match: |
| | multiplier = int(minute_match.group(1)) if minute_match.group(1) else 1 |
| | enum_key = f"T{multiplier}" |
| | try: |
| | return Frequency[enum_key] |
| | except KeyError: |
| | logger.warning( |
| | f"Unsupported minute frequency '{freq_str}' (multiplier: {multiplier}). " |
| | f"Falling back to '1min' ({Frequency.T1.value})." |
| | ) |
| | return Frequency.T1 |
| |
|
| | |
| | try: |
| | offset = pd.tseries.frequencies.to_offset(freq_str) |
| | standardized_freq = offset.name |
| | except Exception: |
| | standardized_freq = freq_str |
| |
|
| | |
| | base_freq = standardized_freq.split("-")[0].upper() |
| |
|
| | freq_map = { |
| | "A": Frequency.A, |
| | "Y": Frequency.A, |
| | "YE": Frequency.A, |
| | "Q": Frequency.Q, |
| | "QE": Frequency.Q, |
| | "M": Frequency.M, |
| | "ME": Frequency.M, |
| | "W": Frequency.W, |
| | "D": Frequency.D, |
| | "H": Frequency.H, |
| | "S": Frequency.S, |
| | } |
| |
|
| | if base_freq in freq_map: |
| | return freq_map[base_freq] |
| |
|
| | raise NotImplementedError(f"Frequency '{standardized_freq}' is not supported.") |
| |
|
| |
|
| | def validate_frequency_safety(start_date: np.datetime64, total_length: int, frequency: Frequency) -> bool: |
| | """ |
| | Check if start date and frequency combination is safe for pandas datetime operations. |
| | |
| | This function verifies that pd.date_range(start=start_date, periods=total_length, freq=freq_str) |
| | will not raise an OutOfBoundsDatetime error, accounting for pandas' datetime bounds |
| | (1677-09-21 to 2262-04-11) and realistic frequency limitations. |
| | |
| | Args: |
| | start_date: The proposed start date for the time series |
| | total_length: Total length of the time series |
| | frequency: The frequency of the time series |
| | |
| | Returns: |
| | True if the combination is safe, False otherwise |
| | """ |
| | try: |
| | |
| | freq_str = frequency.to_pandas_freq(for_date_range=True) |
| |
|
| | |
| | start_pd = pd.Timestamp(start_date) |
| |
|
| | |
| | if start_pd < pd.Timestamp.min or start_pd > pd.Timestamp.max: |
| | return False |
| |
|
| | |
| | max_length = frequency.get_max_safe_length() |
| | if total_length > max_length: |
| | return False |
| |
|
| | |
| | if frequency.is_low_frequency(): |
| | if frequency == Frequency.A and total_length > 500: |
| | return False |
| | elif frequency == Frequency.Q and total_length > 2000: |
| | return False |
| | elif frequency == Frequency.M and total_length > 6000: |
| | return False |
| |
|
| | |
| | days_per_period = frequency.get_days_per_period() |
| | approx_days = total_length * days_per_period |
| |
|
| | |
| | if frequency in [Frequency.A, Frequency.Q]: |
| | approx_days *= 1.1 |
| |
|
| | end_date = start_pd + pd.Timedelta(days=approx_days) |
| |
|
| | |
| | if end_date < pd.Timestamp.min or end_date > pd.Timestamp.max: |
| | return False |
| |
|
| | |
| | pd.date_range(start=start_pd, periods=total_length, freq=freq_str) |
| | return True |
| |
|
| | except (pd.errors.OutOfBoundsDatetime, OverflowError, ValueError): |
| | return False |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def select_safe_random_frequency(total_length: int, rng: Generator) -> Frequency: |
| | """ |
| | Select a random frequency suitable for a given total length of a time series, |
| | based on actual GIFT eval dataset patterns and distributions. |
| | |
| | The selection logic: |
| | 1. Filters frequencies that can handle the given total_length |
| | 2. Applies base weights derived from actual GIFT eval frequency distribution |
| | 3. Strongly boosts frequencies that are in their optimal length ranges |
| | 4. Handles edge cases gracefully with fallbacks |
| | |
| | Args: |
| | total_length: The total length of the time series (history + future) |
| | rng: A numpy random number generator instance |
| | |
| | Returns: |
| | A randomly selected frequency that matches GIFT eval patterns |
| | """ |
| | |
| | valid_frequencies = [] |
| | frequency_scores = [] |
| |
|
| | for freq in Frequency: |
| | |
| | max_allowed = freq.get_max_safe_length() |
| | if total_length > max_allowed: |
| | continue |
| |
|
| | |
| | min_len, max_len, optimal_start, optimal_end = freq.get_length_range() |
| |
|
| | |
| | if total_length < min_len or total_length > max_len: |
| | continue |
| |
|
| | valid_frequencies.append(freq) |
| |
|
| | |
| | base_weight = freq.get_gift_eval_weight() |
| |
|
| | |
| | if optimal_start <= total_length <= optimal_end: |
| | |
| | length_multiplier = 5.0 |
| | else: |
| | |
| | if total_length < optimal_start: |
| | |
| | distance_ratio = (optimal_start - total_length) / (optimal_start - min_len) |
| | else: |
| | |
| | distance_ratio = (total_length - optimal_end) / (max_len - optimal_end) |
| |
|
| | |
| | length_multiplier = 0.3 + 1.2 * (1.0 - distance_ratio) |
| |
|
| | final_score = base_weight * length_multiplier |
| | frequency_scores.append(final_score) |
| |
|
| | |
| | if not valid_frequencies: |
| | |
| | if total_length <= 100: |
| | |
| | fallback_order = [ |
| | Frequency.A, |
| | Frequency.Q, |
| | Frequency.M, |
| | Frequency.W, |
| | Frequency.D, |
| | ] |
| | elif total_length <= 1000: |
| | |
| | fallback_order = [Frequency.D, Frequency.W, Frequency.H, Frequency.M] |
| | else: |
| | |
| | fallback_order = [Frequency.H, Frequency.D, Frequency.T15, Frequency.T5] |
| |
|
| | for fallback_freq in fallback_order: |
| | max_allowed = fallback_freq.get_max_safe_length() |
| | if total_length <= max_allowed: |
| | return fallback_freq |
| | |
| | return Frequency.D |
| |
|
| | if len(valid_frequencies) == 1: |
| | return valid_frequencies[0] |
| |
|
| | |
| | scores = np.array(frequency_scores) |
| | probabilities = scores / scores.sum() |
| |
|
| | return rng.choice(valid_frequencies, p=probabilities) |
| |
|
| |
|
| | def select_safe_start_date( |
| | total_length: int, |
| | frequency: Frequency, |
| | rng: Generator | None = None, |
| | max_retries: int = 10, |
| | ) -> np.datetime64: |
| | """ |
| | Select a safe start date that ensures the entire time series (history + future) |
| | will not exceed pandas' datetime bounds. |
| | |
| | Args: |
| | total_length: Total length of the time series (history + future) |
| | frequency: Time series frequency |
| | rng: Random number generator instance |
| | max_retries: Maximum number of retry attempts |
| | |
| | Returns: |
| | A safe start date that prevents timestamp overflow |
| | |
| | Raises: |
| | ValueError: If no safe start date is found after max_retries or if the required |
| | time span exceeds the available date window |
| | """ |
| | if rng is None: |
| | rng = np.random.default_rng() |
| |
|
| | days_per_period = frequency.get_days_per_period() |
| |
|
| | |
| | total_days = total_length * days_per_period |
| |
|
| | |
| | latest_safe_start = BASE_END_DATE - np.timedelta64(int(total_days), "D") |
| | earliest_safe_start = BASE_START_DATE |
| |
|
| | |
| | if latest_safe_start < earliest_safe_start: |
| | available_days = (BASE_END_DATE - BASE_START_DATE).astype("timedelta64[D]").astype(int) |
| | available_years = available_days / 365.25 |
| | required_years = total_days / 365.25 |
| | raise ValueError( |
| | f"Required time span ({required_years:.1f} years, {total_days:.0f} days) " |
| | f"exceeds available date window ({available_years:.1f} years, {available_days} days). " |
| | f"Reduce total_length ({total_length}) or extend the date window." |
| | ) |
| |
|
| | |
| | earliest_ns = earliest_safe_start.astype("datetime64[ns]").astype(np.int64) |
| | latest_ns = latest_safe_start.astype("datetime64[ns]").astype(np.int64) |
| |
|
| | for _ in range(max_retries): |
| | |
| | random_ns = rng.integers(earliest_ns, latest_ns + 1) |
| | start_date = np.datetime64(int(random_ns), "ns") |
| |
|
| | |
| | if validate_frequency_safety(start_date, total_length, frequency): |
| | return start_date |
| |
|
| | |
| | return BASE_START_DATE |
| |
|