Spaces:
Running
Running
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from chronos import Chronos2Pipeline | |
| MIN_LENGTH_CONTEXT = 64 | |
| PREDICTION_LENGTH = 32 | |
| BATCH_SIZE = 256 | |
| MAX_FILE_SIZE_MB = 256 # Maximum file size for upload (in megabytes) | |
| MAX_NUMBER_OF_COLUMNS = 200 # To avoid performance issues and ensure readability of results, we will limit the number of columns that can be processed to 200. If the uploaded data contains more than this number of columns, an error will be raised. | |
| MAX_NUMBER_OF_PLOTTABLE_SERIES = 5 # To avoid plotting too many series in the same plot, which can be unreadable. If there are more than this number of target columns, we will only plot the first MAX_NUMBER_OF_PLOTTABLE_SERIES columns. | |
| MAX_NUMBER_OF_POINTS_PLOTTABLE = 32_000 # To avoid plotting too many points in the same plot, which can be unreadable. If there are more than this number of points, we will only plot the first MAX_NUMBER_OF_POINTS_PLOTTABLE points. | |
| ## %% Data validation functions | |
| def validateData(file, timestamp_column:str=None): | |
| """ Validate the uploaded data file for time series anomaly detection. This function checks for the following conditions: | |
| 0. file size is less than 256MB. | |
| 1. The file must be uploaded and must be in CSV format. | |
| 2. If a timestamp column is provided, it must exist in the data and must not contain any missing values (NaNs). | |
| 3. The time series data must contain at least a minimum number of data points (defined by MIN_LENGTH_CONTEXT) for accurate anomaly detection. | |
| 4. The data must contain at least one column of values for anomaly detection. | |
| 5. The data must contain less than 200 columns to avoid performance issues and ensure readability of results. | |
| 6. The data must not contain any missing values (NaNs) for accurate anomaly detection. | |
| 7. The data must not contain any non-finite values (e.g., inf, -inf) for accurate anomaly detection. | |
| 8. All value columns must contain numeric data for accurate anomaly detection. | |
| Args: | |
| file (str): The path to the uploaded CSV file containing the time series data. | |
| timestamp_column (str, optional): The name of the timestamp column in the CSV file. If None, a default timestamp will be generated. | |
| Raises: | |
| AssertionError: If any of the validation conditions are not met, an AssertionError will be raised with a descriptive error message. | |
| """ | |
| assert file is not None and isinstance(file, str), "No file uploaded. Please upload a CSV file containing your time series data." | |
| assert os.path.getsize(file) < MAX_FILE_SIZE_MB * 1024 * 1024, f"File size exceeds the maximum limit of {MAX_FILE_SIZE_MB}MB. Please upload a smaller file." | |
| assert file is not None, "No file uploaded. Please upload a CSV file containing your time series data." | |
| assert file.endswith('.csv') and os.path.basename(file).count(".") == 1, "Invalid file format. Please upload a CSV file." | |
| df = pd.read_csv(file, index_col=None, header=0) | |
| if timestamp_column is not None: | |
| assert timestamp_column in df.columns, f"Timestamp column '{timestamp_column}' not found in the uploaded file. Please provide a valid timestamp column name." | |
| assert df[timestamp_column].isna().sum() == 0, f"Missing values detected in the timestamp column '{timestamp_column}'. Please ensure this column does not contain any missing values (NaNs) for accurate anomaly detection." | |
| df.drop(columns=[timestamp_column], inplace=True) | |
| assert len(df) >= MIN_LENGTH_CONTEXT, f"Insufficient data length. The uploaded time series must contain at least {MIN_LENGTH_CONTEXT} data points for accurate anomaly detection." | |
| assert len(df.columns) >= 1, "No value columns found. Please ensure your CSV file contains at least one column of values for anomaly detection." | |
| assert len(df.columns) <= MAX_NUMBER_OF_COLUMNS, f"Too many columns. The uploaded time series must contain less than {MAX_NUMBER_OF_COLUMNS} columns to avoid performance issues and ensure readability of results." | |
| assert df.isna().sum().sum() == 0, "Missing values detected in the uploaded data. Please ensure your CSV file does not contain any missing values (NaNs) for accurate anomaly detection." | |
| assert np.isfinite(df.select_dtypes(include=[np.number])).all().all(), "Non-finite values detected in the uploaded data. Please ensure your CSV file does not contain any non-finite values (e.g., inf, -inf) for accurate anomaly detection." | |
| assert df.select_dtypes(exclude=[np.number]).columns.empty, "Non-numeric value columns detected. Please ensure all value columns in your CSV file contain numeric data for accurate anomaly detection." | |
| ## %% Data pre-processing functions | |
| def preProcessData(file, timestamp_column:str=None)->list[pd.DataFrame, pd.Series|None, list[str]]: | |
| """ | |
| Pre-process the uploaded time series data for anomaly detection. This function performs the following steps: | |
| 1. Reads the uploaded CSV file into a pandas DataFrame. | |
| 2. If a timestamp column is provided, it extracts the timestamp values and drops the timestamp column from the DataFrame. If no timestamp column is provided, it generates a default timestamp using a RangeIndex. | |
| 3. Creates an 'item_id' column to segment the time series data into prediction segments based on the defined MIN_LENGTH_CONTEXT and PREDICTION_LENGTH. | |
| 4. Combines the timestamp, item_id, and value columns into a new DataFrame formatted for input into the Chronos2 pipeline. | |
| Args: | |
| file (str): The path to the uploaded CSV file containing the time series data. | |
| timestamp_column (str, optional): The name of the timestamp column in the CSV file. If None, a default timestamp will be generated. | |
| Returns: | |
| list[pd.DataFrame, pd.Series|None, list[str]]: A list containing: | |
| - the pre-processed DataFrame formatted for Chronos2 input, | |
| - the original timestamp values (if a timestamp column was provided). | |
| - the list of target column names. | |
| """ | |
| df = pd.read_csv(file, index_col=None, header=0) | |
| if timestamp_column is not None: | |
| timestamp_old = df[timestamp_column] | |
| df.drop(columns=[timestamp_column], inplace=True) | |
| else: | |
| timestamp_old = None | |
| targetCols = df.columns.tolist() | |
| item_id = np.zeros(len(df), dtype=np.int32) | |
| if MIN_LENGTH_CONTEXT < len(df): | |
| for seg in range((len(df) - MIN_LENGTH_CONTEXT + PREDICTION_LENGTH - 1) // PREDICTION_LENGTH): | |
| item_id[MIN_LENGTH_CONTEXT + seg * PREDICTION_LENGTH:min(MIN_LENGTH_CONTEXT + (seg + 1) * PREDICTION_LENGTH, len(df))] = seg + 1 # item_id starts from 1 for prediction segments | |
| return pd.concat([ | |
| pd.DataFrame({ | |
| 'timestamp': pd.date_range(start="2026-01-01 00:00:00", periods=len(df), freq="min"), | |
| 'item_id': item_id, | |
| }), | |
| df.reset_index(drop=True), | |
| ], axis=1), timestamp_old, targetCols | |
| ## Main prediction function | |
| def predictData(chronos2:Chronos2Pipeline, preProcessedData:pd.DataFrame, target_cols:list[str]) -> tuple[dict[str, pd.DataFrame], np.ndarray]: | |
| """ Predict future values for the time series data using the Chronos2 pipeline. This function performs the following steps: | |
| 1. Identifies the segments of the time series data that require predictions based on the 'item_id' column. | |
| 2. For each segment, prepares the input data for the Chronos2 pipeline by selecting the appropriate context length of historical data. | |
| 3. Runs predictions in batches through the Chronos2 pipeline, ensuring that different segments do not influence each other (cross_learning=False) while allowing for multivariate predictions within each segment. | |
| 4. Converts the predictions from the Chronos2 pipeline into DataFrames, one for each target column, containing the predicted values and corresponding timestamps. | |
| Args: | |
| chronos2 (Chronos2Pipeline): An instance of the Chronos2Pipeline for making predictions. | |
| preProcessedData (pd.DataFrame): The pre-processed DataFrame containing the time series data formatted for Chronos2 input. | |
| target_cols (list[str]): A list of target column names for which predictions are to be made. | |
| Returns: | |
| tuple[dict[str, pd.DataFrame], np.ndarray]: A tuple where the first element is a dictionary where keys are target column names and values are DataFrames containing the predictions for each column, and the second element is an array of indices in the original time series that correspond to the predictions. | |
| """ | |
| prediction_item_ids = [iid for iid in sorted(preProcessedData['item_id'].unique()) if iid > 0] | |
| tasks, segment_start_indices = [], [] # Track where each segment starts in original data | |
| for item_id in prediction_item_ids: | |
| segment_start = preProcessedData.index[preProcessedData['item_id'] == item_id].tolist()[0] | |
| tasks.append({ | |
| "target": preProcessedData.loc[segment_start - MIN_LENGTH_CONTEXT:segment_start-1, target_cols].values.T.astype(np.float32) | |
| }) | |
| segment_start_indices.append(segment_start) | |
| # Run predictions in batches | |
| # cross_learning=False: different segments don't influence each other | |
| # But within each task, all D columns ARE predicted jointly (multivariate) | |
| all_predictions = [] | |
| for batch_start in range(0, len(tasks), BATCH_SIZE): | |
| batch_tasks = tasks[batch_start:batch_start + BATCH_SIZE] | |
| try: | |
| # predict() returns list of tensors, each of shape (D, n_quantiles, prediction_length) | |
| all_predictions.extend(chronos2.predict( | |
| inputs=batch_tasks, | |
| prediction_length=PREDICTION_LENGTH, | |
| batch_size=len(batch_tasks), | |
| context_length=MIN_LENGTH_CONTEXT, | |
| cross_learning=False, # Segments don't influence each other | |
| ) | |
| ) | |
| except Exception as e: | |
| print(f"Error in batch starting at {batch_start}: {e}") | |
| raise | |
| # Convert predictions to DataFrames, one per target column | |
| # all_predictions[i] has shape (D, n_quantiles, prediction_length) | |
| predictions_dict = {col: [] for col in target_cols} | |
| all_indices = [] | |
| for seg_idx, (pred_tensor, seg_start) in enumerate(zip(all_predictions, segment_start_indices)): | |
| # pred_tensor shape: (D, n_quantiles, prediction_length) | |
| pred_np = pred_tensor.cpu().numpy() if hasattr(pred_tensor, 'cpu') else np.array(pred_tensor) | |
| # For each target column (variate) | |
| for d_idx, col in enumerate(target_cols): | |
| predictions_dict[col].append(pd.DataFrame({ | |
| 'item_id': prediction_item_ids[seg_idx], # item_id for this segment | |
| 'timestep': np.arange(seg_start, seg_start + PREDICTION_LENGTH), | |
| 'predictions': pred_np[d_idx, 10, :] | |
| })) | |
| # Track original indices | |
| all_indices.extend(range(seg_start, seg_start + PREDICTION_LENGTH)) | |
| return {col:pd.concat(predictions_dict[col], ignore_index=True) for col in target_cols}, np.array(all_indices, dtype=np.int32) | |
| def computeMultiHorizonAnomalyScore(predictions_df: pd.DataFrame, actual_values: np.ndarray, prediction_indices: np.ndarray, | |
| horizons: list[int] = [1, 8, 32, 64], quantile_col: str = 'predictions') -> np.ndarray: | |
| """ | |
| Compute multi-horizon anomaly scores using prediction errors across multiple forecast horizons. | |
| Args: | |
| predictions_df: DataFrame with predictions and quantiles | |
| actual_values: Ground truth values | |
| prediction_indices: Indices in actual_values corresponding to predictions | |
| target_col: Target column name | |
| horizons: List of forecast horizons to consider (default [32, 64]) | |
| quantile_col: Quantile column to use (default '0.5' for median). | |
| Can be '0.5', '0.95', '0.05' etc. | |
| Returns: | |
| Array of anomaly scores (max error across horizons for each prediction) | |
| """ | |
| all_horizon_scores = [] | |
| cols = predictions_df.columns.tolist() | |
| for h in horizons: | |
| idx_to_check = prediction_indices + (h - 1) | |
| mask = idx_to_check < len(actual_values) | |
| if not np.any(mask): | |
| continue | |
| elif quantile_col in cols: | |
| error_h = np.zeros(len(prediction_indices)) | |
| error_h[mask] = (actual_values[idx_to_check[mask]] - predictions_df[quantile_col].to_numpy()[mask])**2 | |
| all_horizon_scores.append(error_h) | |
| if not all_horizon_scores: | |
| # Fallback to standard squared difference if no horizons match [cite: 344] | |
| return np.zeros(len(prediction_indices)) | |
| return np.max(all_horizon_scores, axis=0) # [cite: 829] | |
| def aggregateAnomalyScores(continuousScores: dict[str, np.ndarray], percentile: float = 95.0)->pd.Series: | |
| """ | |
| Aggregate continuous anomaly scores across multiple columns into a single discrete anomaly label. This function performs the following steps: | |
| 1. Normalizes the continuous anomaly scores for each column using robust scaling (median and IQR) to ensure comparability across columns. | |
| 2. Stacks the normalized scores from all columns into a single array. | |
| 3. Computes an aggregated anomaly score for each time point by taking the mean of the normalized scores across all columns. | |
| 4. Determines a threshold for anomaly detection based on the specified percentile of the aggregated scores. | |
| 5. Assigns a discrete anomaly label (1 for anomaly, 0 for normal) to each time point based on whether the aggregated score exceeds the threshold. | |
| Args: | |
| continuousScores (dict[str, np.ndarray]): A dictionary where keys are column names and values are arrays of continuous anomaly scores for each time point. | |
| percentile (float, optional): The percentile threshold to determine anomalies based on the aggregated scores. Default is 95.0. | |
| Returns: | |
| pd.Series: A pandas Series containing the discrete anomaly labels (1 for anomaly, 0 for normal) for each time point. | |
| """ | |
| # Normalize scores per column before aggregation | |
| normalized_scores = {col:(scores - np.median(scores)) / (np.percentile(scores, 75) - np.percentile(scores, 25) + 1e-8) for col, scores in continuousScores.items()} | |
| # Stack normalized scores and aggregate | |
| aggregated_scores = np.mean(np.column_stack(list(normalized_scores.values())), axis=1) | |
| return (aggregated_scores > np.percentile(aggregated_scores, percentile)).astype(np.int8) | |
| ## Computing the discrete anomaly scores and labels | |
| def computeDiscreteScores(predictions_dict: dict[str, pd.DataFrame], time_series_df: pd.DataFrame, target_cols:list[str], indexes: np.ndarray, | |
| horizons: list[int] = [1, 8, 32, 64])-> pd.Series: | |
| """ Compute discrete anomaly scores and labels based on the predictions from the Chronos2 pipeline. This function performs the following steps: | |
| 1. Identifies the indices in the original time series corresponding to the predictions made by the Chronos2 pipeline. | |
| 2. For each target column, computes continuous anomaly scores using the multi-horizon prediction errors across the specified forecast horizons. | |
| 3. Aggregates the continuous anomaly scores across all target columns into a single discrete anomaly label using the aggregateAnomalyScores function. | |
| Args: | |
| predictions_dict (dict[str, pd.DataFrame]): A dictionary where keys are target column names and values are DataFrames containing the predictions for each column. | |
| time_series_df (pd.DataFrame): The original time series DataFrame containing the actual values for each target column. | |
| target_cols (list[str]): A list of target column names for which predictions were made and anomaly scores are to be computed. | |
| indexes (np.ndarray): The array of indices in the original time series that correspond to the predictions. | |
| horizons (list[int], optional): A list of forecast horizons to consider for computing anomaly scores. Default is [1, 8, 32, 64]. | |
| Returns: | |
| pd.Series: A pandas Series containing the discrete anomaly labels (1 for anomaly, 0 for normal) for each time point in the original time series. | |
| """ | |
| continuousScores = {col: computeMultiHorizonAnomalyScore( | |
| predictions_df=predictions_dict[col], | |
| actual_values=time_series_df[col].values, | |
| prediction_indices=indexes, | |
| horizons=horizons | |
| ) for col in target_cols} | |
| return aggregateAnomalyScores(continuousScores) | |
| def assembleResults(preProcessedData: pd.DataFrame, timestamp_old: pd.Series|None, target_cols:list[str], scores: np.ndarray)->pd.DataFrame: | |
| """ | |
| Assemble the final results DataFrame containing the original time series data along with the computed anomaly labels. This function performs the following steps: | |
| 1. Creates a copy of the pre-processed DataFrame to serve as the base for the results. | |
| 2. Adds a new column 'anomaly_label' to the DataFrame, where the anomaly labels are assigned based on the computed scores. For time points that do not have corresponding scores (e.g., initial context points), a default label of -1 is assigned. | |
| 3. If the original timestamp values are available (i.e., if a timestamp column was provided), they are added back to the results DataFrame. | |
| 4. The final results DataFrame is returned, containing the original time series data along | |
| with the anomaly labels and timestamps (if available). | |
| Args: | |
| preProcessedData (pd.DataFrame): The pre-processed DataFrame formatted for Chronos2 input, which serves as the base for the results. | |
| timestamp_old (pd.Series|None): The original timestamp values extracted from the uploaded data, or None if no timestamp column was provided. | |
| target_cols (list[str]): A list of target column names for which predictions were made and anomaly scores were computed. | |
| scores (np.ndarray): A numpy array containing the discrete anomaly labels (1 for anomaly, 0 for normal) for each time point corresponding to the predictions. | |
| Returns: | |
| pd.DataFrame: A DataFrame containing the original time series data along with an additional column 'anomaly_label' indicating the anomaly labels, and the original timestamps if available. | |
| """ | |
| result_df = preProcessedData.copy() | |
| result_df['anomaly_label'] = -1 | |
| result_df.loc[result_df['item_id'] > 0, 'anomaly_label'] = scores[:len(result_df)-MIN_LENGTH_CONTEXT] | |
| if timestamp_old is not None: | |
| result_df['timestamp'] = timestamp_old | |
| return result_df.drop(columns=['item_id']) | |
| else: | |
| return result_df.drop(columns=['item_id', 'timestamp']) | |
| def plotResults(df, target_cols:list[str]=None)->plt.Figure|None: | |
| """ Plot the original time series data along with the detected anomalies. This function performs the following steps: | |
| 1. Checks if the number of target columns and the number of data points in the DataFrame are within the defined limits for plotting (MAX_NUMBER_OF_PLOTTABLE_SERIES and MAX_NUMBER_OF_POINTS_PLOTTABLE). | |
| 2. If the data is suitable for plotting, it creates a line plot of the original time series data for each target column. | |
| 3. Anomalies are highlighted on the plot using a scatter plot with red 'x' markers. | |
| 4. The plot is formatted with titles, labels, legends, and rotated x-axis ticks for better readability. | |
| 5. The function returns the figure object containing the plot. If the data is not suitable for plotting, it returns None. | |
| Args: | |
| df (pd.DataFrame): The DataFrame containing the original time series data along with the | |
| anomaly labels and timestamps (if available). | |
| target_cols (list[str], optional): A list of target column names to be plotted. If None, all columns except 'timestamp' and 'anomaly_label' will be plotted. | |
| Returns: | |
| plt.Figure|None: The figure object containing the plot of the time series data with detected anomalies, or None if the data is not suitable for plotting. | |
| """ | |
| if df.shape[1] - 2 <= MAX_NUMBER_OF_PLOTTABLE_SERIES and df.shape[0] < MAX_NUMBER_OF_POINTS_PLOTTABLE: # -2 to exclude timestamp and anomaly_label columns | |
| fig, ax = plt.subplots(1,1,figsize=(15, 5)) | |
| df['temp_timestamp_for_print'] = np.arange(len(df)) # posizione numerica dei punti | |
| for col in target_cols: | |
| ax.plot(df['temp_timestamp_for_print'], df[col], label=col) | |
| # Evidenziazione anomalie con background rosso | |
| for _, row in df[df['anomaly_label'] == 1].iterrows(): | |
| ax.axvspan( | |
| row['temp_timestamp_for_print'] - 0.5, | |
| row['temp_timestamp_for_print'] + 0.5, | |
| color='red', | |
| alpha=0.15 | |
| ) | |
| ax.legend() | |
| ax.set_title('Time Series with Detected Anomalies') | |
| ax.set_xlabel('Timestamp') | |
| ax.set_ylabel('Values') | |
| ax.grid(True, which='both', linestyle='--', linewidth=0.5) | |
| ax.set_xticks(df['temp_timestamp_for_print'][::max(1, len(df)//10)], labels=df['timestamp' if 'timestamp' in df.columns else 'temp_timestamp_for_print'][::max(1, len(df)//10)], rotation=45) | |
| plt.tight_layout() | |
| df.drop(columns=['temp_timestamp_for_print'], inplace=True) | |
| return fig # Return the figure object | |
| else: | |
| return None |