again / controllers /estimation /graphical_controller.py
Beam2513's picture
Upload 127 files
798602c verified
from __future__ import annotations
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from core.estimation.inference.estimators import estimate_mean, estimate_sigma
from core.estimation.inference.ci import (
ci_mean_analytic,
ci_mean_bootstrap,
ci_median_analytic,
ci_median_bootstrap,
)
from core.estimation.inference.pi import (
pi_mean,
pi_median,
pi_iqr,
pi_bootstrap,
)
from core.estimation.graphical_analysis import (
plot_histogram_with_overlays,
plot_ecdf,
)
# ---------------------------------------------------------------------
# Utilities (aligned with inference_controller)
# ---------------------------------------------------------------------
def select_distribution(mean_estimator: str, sigma_estimator: str) -> str:
if (
mean_estimator == "Sample Mean"
and sigma_estimator == "Deviation (1 ddof)"
):
return "t"
return "norm"
def validate_deviation_estimator(*, sigma_estimator: str, n: int):
if sigma_estimator == "Range (bias corrected)" and n > 25:
raise ValueError(
"Range-based confidence intervals require n ≤ 25. "
"Use another estimator or bootstrap."
)
def _prepare_series(
df: pd.DataFrame,
column: str,
weights_col: Optional[str],
) -> tuple[np.ndarray, Optional[np.ndarray]]:
if df is None:
raise ValueError("No data loaded. Please load a dataset first.")
if column not in df.columns:
raise ValueError(f"Column '{column}' not found in the dataframe.")
series = df[column].dropna()
if series.empty:
raise ValueError(f"Column '{column}' has no non-missing values.")
weights = None
if weights_col is not None:
if weights_col not in df.columns:
raise ValueError(
f"Weights column '{weights_col}' not found in the dataframe."
)
weights_series = df[weights_col].reindex(series.index).dropna()
common_idx = series.index.intersection(weights_series.index)
series = series.loc[common_idx]
weights_series = weights_series.loc[common_idx]
weights = weights_series.to_numpy()
return series.to_numpy(), weights
def run_graphical_analysis(
*,
df: pd.DataFrame,
column: str,
graph_type: str,
# Histogram / PMF controls
add_kde: bool,
add_data: bool,
add_normal: bool,
add_ci: bool,
ci_choice: str,
add_pi: bool,
pi_choice: str,
# Estimators
mean_estimator: str,
median_estimator: str,
sigma_estimator: str,
trim_param,
winsor_limits,
weights_col: Optional[str],
# Normal μ source
normal_mu_source: str,
# Bootstrap options
bootstrap_mean: bool,
bootstrap_median: bool,
bootstrap_sigma: bool,
bootstrap_prediction: bool,
bootstrap_samples: int,
# CI/PI confidence level
ci_pi_conf_level: float,
# ECDF controls
ecdf_add_conf: bool,
ecdf_conf_level: float,
ecdf_add_normal: bool,
):
data, weights = _prepare_series(df, column, weights_col)
if not (0.0 < ci_pi_conf_level < 1.0):
raise ValueError("Confidence level for CI/PI must be in (0, 1).")
if graph_type in ("Histogram", "Empirical Probability Mass Function"):
return _run_hist_or_pmf(
data=data,
var_name=column,
graph_type=graph_type,
add_kde=add_kde,
add_data=add_data,
add_normal=add_normal,
add_ci=add_ci,
ci_choice=ci_choice,
add_pi=add_pi,
pi_choice=pi_choice,
mean_estimator=mean_estimator,
median_estimator=median_estimator,
sigma_estimator=sigma_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
normal_mu_source=normal_mu_source,
bootstrap_mean=bootstrap_mean,
bootstrap_median=bootstrap_median,
bootstrap_sigma=bootstrap_sigma,
bootstrap_prediction=bootstrap_prediction,
bootstrap_samples=bootstrap_samples,
ci_pi_conf_level=ci_pi_conf_level,
)
if graph_type == "Empirical Cumulative Distribution Function (ECDF)":
return _run_ecdf(
data=data,
var_name=column,
ecdf_add_conf=ecdf_add_conf,
ecdf_conf_level=ecdf_conf_level,
ecdf_add_normal=ecdf_add_normal,
mean_estimator=mean_estimator,
sigma_estimator=sigma_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
normal_mu_source=normal_mu_source,
)
raise ValueError(f"Unknown graph type: {graph_type}")
def _run_hist_or_pmf(
*,
data: np.ndarray,
var_name: str,
graph_type: str,
add_kde: bool,
add_data: bool,
add_normal: bool,
add_ci: bool,
ci_choice: str,
add_pi: bool,
pi_choice: str,
mean_estimator: str,
median_estimator: str,
sigma_estimator: str,
trim_param,
winsor_limits,
weights: Optional[np.ndarray],
normal_mu_source: str,
bootstrap_mean: bool,
bootstrap_median: bool,
bootstrap_sigma: bool,
bootstrap_prediction: bool,
bootstrap_samples: int,
ci_pi_conf_level: float,
):
alpha = 1.0 - ci_pi_conf_level
n = len(data)
validate_deviation_estimator(
sigma_estimator=sigma_estimator,
n=n,
)
ci_mean_interval = None
ci_median_interval = None
pi_interval = None
hat_mu = None
hat_sigma = None
need_intervals = add_ci or add_pi or add_normal
if need_intervals:
# --- Parameters for Normal overlay ---
if add_normal:
if normal_mu_source == "Mean-based CI":
hat_mu = estimate_mean(
data,
mean_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
)
else:
hat_mu = float(np.median(data))
hat_sigma = estimate_sigma(
data=data,
estimator=sigma_estimator,
)
# --- Confidence intervals ---
if add_ci:
dist = select_distribution(mean_estimator, sigma_estimator)
# CI for mean
if bootstrap_mean:
ci_mean_interval = ci_mean_bootstrap(
data=data,
estimator=mean_estimator,
alpha=alpha,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
B=bootstrap_samples,
)
else:
ci_mean_interval = ci_mean_analytic(
data=data,
estimator=mean_estimator,
alpha=alpha,
dist=dist,
sigma_estimator=sigma_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
)
# CI for median
if bootstrap_median:
ci_median_interval = ci_median_bootstrap(
data=data,
alpha=alpha,
B=bootstrap_samples,
)
else:
ci_median_interval = ci_median_analytic(
data=data,
alpha=alpha,
sigma_estimator=sigma_estimator,
)
# Respect user choice (Mean / Median / Both)
if ci_choice == "Mean":
ci_median_interval = None
elif ci_choice == "Median":
ci_mean_interval = None
# --- Prediction intervals ---
if add_pi:
dist = select_distribution(mean_estimator, sigma_estimator)
if pi_choice == "Mean":
pi_interval = pi_mean(
data=data,
alpha=alpha,
estimator=mean_estimator,
dist=dist,
sigma_estimator=sigma_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
)
elif pi_choice == "Median":
# New API: pi_median only needs data, alpha and sigma_estimator
pi_interval = pi_median(
data=data,
alpha=alpha,
sigma_estimator=sigma_estimator,
)
elif pi_choice == "IQR":
pi_interval = pi_iqr(
data=data,
alpha=alpha,
)
elif pi_choice == "Bootstrap":
if not bootstrap_prediction:
raise ValueError(
"To use the Bootstrap prediction interval, enable the "
"'Bootstrap Prediction' option in the estimator settings."
)
pi_interval = pi_bootstrap(
data=data,
alpha=alpha,
B=bootstrap_samples,
)
else:
raise ValueError(
f"Unknown prediction-interval choice: {pi_choice}"
)
fig = plot_histogram_with_overlays(
data=data,
graph_type=graph_type,
var_name=var_name,
add_kde=add_kde,
add_data=add_data,
add_normal=add_normal,
hat_mu=hat_mu,
hat_sigma=hat_sigma,
ci_mean_interval=ci_mean_interval,
ci_median_interval=ci_median_interval,
pi_interval=pi_interval,
)
return fig
def _run_ecdf(
*,
data: np.ndarray,
var_name: str,
ecdf_add_conf: bool,
ecdf_conf_level: float,
ecdf_add_normal: bool,
mean_estimator: str,
sigma_estimator: str,
trim_param,
winsor_limits,
weights: Optional[np.ndarray],
normal_mu_source: str,
):
if not (0.0 < ecdf_conf_level < 1.0):
raise ValueError("ECDF confidence level must be in (0, 1).")
alpha = 1.0 - ecdf_conf_level
n = len(data)
validate_deviation_estimator(
sigma_estimator=sigma_estimator,
n=n,
)
hat_mu = None
hat_sigma = None
if ecdf_add_normal:
if normal_mu_source == "Mean-based CI":
hat_mu = estimate_mean(
data,
mean_estimator,
trim_param=trim_param,
winsor_limits=winsor_limits,
weights=weights,
)
else:
hat_mu = float(np.median(data))
hat_sigma = estimate_sigma(
data=data,
estimator=sigma_estimator,
)
fig = plot_ecdf(
data=data,
var_name=var_name,
alpha=alpha,
add_conf_band=ecdf_add_conf,
add_normal=ecdf_add_normal,
hat_mu=hat_mu,
hat_sigma=hat_sigma,
)
return fig