| """ |
| normative_calculator.py - v2 |
| |
| Utility functions for computing z-scores and percentiles for any biomarker |
| contained in *Table_1_summary_measure.xlsx*. |
| |
| |
| |
| Author: Lars Masanneck 06-05-2025 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import pathlib |
| import warnings |
| from typing import Dict, Iterable, List, Sequence, Union |
|
|
| import pandas as pd |
| from scipy import stats |
| from datetime import datetime |
|
|
|
|
| |
| |
| |
|
|
| __all__ = [ |
| "load_normative_table", |
| "compute_normative_position", |
| "add_normative_columns", |
| "categorize_bmi", |
| "compute_skew_corrected_position", |
| ] |
|
|
| |
| |
| |
|
|
| |
| _BMI_BOUNDS: List[tuple[float, float, str]] = [ |
| (0, 18.5, "Underweight"), |
| (18.5, 25, "Healthy"), |
| (25, 30, "Overweight"), |
| (30, math.inf, "Obesity"), |
| ] |
|
|
| |
| |
| |
|
|
|
|
| def _categorize(value: float, bounds: Sequence[tuple]) -> str: |
| """Return category *label* for *value* given (lower, upper, label) tuples.""" |
| for lower, upper, label in bounds: |
| if lower <= value < upper: |
| return label |
| raise ValueError(f"{value} outside defined bounds.") |
|
|
|
|
| def categorize_bmi(bmi: Union[str, float]) -> str: |
| """Map numeric BMI to the table's BMI category strings.""" |
| if isinstance(bmi, str): |
| return bmi.strip().capitalize() |
| return _categorize(float(bmi), _BMI_BOUNDS) |
|
|
|
|
| def _categorize_age(age: Union[str, int], normative_df: pd.DataFrame) -> str: |
| """Return an age‐group string for a numeric age, or pass through if already a string.""" |
| if isinstance(age, str): |
| return age.strip() |
| for grp in normative_df["Age"].unique(): |
| grp = grp.strip() |
| if "-" in grp: |
| lo, hi = grp.split("-", 1) |
| try: |
| lo_i, hi_i = int(lo), int(hi) |
| except ValueError: |
| continue |
| if lo_i <= age <= hi_i: |
| return grp |
| elif grp.endswith("+"): |
| try: |
| lo_i = int(grp[:-1]) |
| except ValueError: |
| continue |
| if age >= lo_i: |
| return grp |
| raise ValueError(f"No normative age group found for age {age!r}.") |
|
|
|
|
| def load_normative_table(path): |
| path = pathlib.Path(path) |
| if not path.exists(): |
| raise FileNotFoundError(path) |
| |
| str_cols = ["Age", "area", "gender", "Bmi", "Biomarkers", "nb_category"] |
| |
| float_cols = [ |
| "min", |
| "max", |
| "median", |
| "q1", |
| "q3", |
| "iqr", |
| "mad", |
| "mean", |
| "sd", |
| "se", |
| "ci", |
| ] |
|
|
| def parse_num(x): |
| |
| if isinstance(x, datetime): |
| |
| |
| if x.year > datetime.now().year: |
| return x.year + x.month / 100 |
| |
| |
| return x.day + x.month / 100 |
| |
| try: |
| return float(x) |
| except Exception: |
| return pd.NA |
|
|
| |
| converters = {col: str for col in str_cols} |
| converters.update({col: parse_num for col in float_cols}) |
|
|
| |
| if path.suffix.lower() == ".csv": |
| df = pd.read_csv(path, converters=converters) |
| else: |
| df = pd.read_excel(path, converters=converters) |
|
|
| |
| for c in str_cols: |
| df[c] = df[c].astype(str) |
| df.columns = df.columns.str.strip() |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _extract_stats( |
| normative_df: pd.DataFrame, |
| biomarker: str, |
| age_group: str, |
| region: str, |
| gender: str, |
| bmi_category: str, |
| ) -> Dict[str, Union[float, str]]: |
| """Return all summary statistics for the requested stratum.""" |
| mask = ( |
| (normative_df["Biomarkers"].str.lower() == biomarker.lower()) |
| & (normative_df["Age"].str.lower() == age_group.lower()) |
| & (normative_df["area"].str.lower() == region.lower()) |
| & (normative_df["gender"].str.lower() == gender.lower()) |
| & (normative_df["Bmi"].str.lower() == bmi_category.lower()) |
| ) |
| subset = normative_df.loc[mask] |
| if subset.empty: |
| raise KeyError("No normative stats found for the specified stratum.") |
| if len(subset) > 1: |
| warnings.warn( |
| "Multiple normative rows found; using the first one (check your table)." |
| ) |
| row = subset.iloc[0] |
| |
| n_col = "nb_category" if "nb_category" in row else "n" |
| n_raw = row[n_col] |
| n = str(row[n_col]) |
|
|
| return { |
| "median": float(row["median"]), |
| "q1": float(row["q1"]), |
| "q3": float(row["q3"]), |
| "iqr": float(row["iqr"]), |
| "mad": float(row["mad"]), |
| "mean": float(row["mean"]), |
| "sd": float(row["sd"]), |
| "se": float(row["se"]), |
| "ci": float(row["ci"]), |
| "n": n, |
| } |
|
|
|
|
| def z_score(value: float, mean: float, sd: float) -> float: |
| """Compute z-score; returns NaN if SD is 0.""" |
| if sd == 0: |
| return float("nan") |
| return (value - mean) / sd |
|
|
|
|
| def percentile_from_z(z: float) -> float: |
| """Convert z-score to percentile (0-100).""" |
| return float(stats.norm.cdf(z) * 100) |
|
|
|
|
| def compute_normative_position( |
| *, |
| value: float, |
| biomarker: str, |
| age_group: Union[str, int], |
| region: str, |
| gender: str, |
| bmi: Union[str, float], |
| normative_df: pd.DataFrame, |
| ) -> Dict[str, Union[float, str]]: |
| """ |
| Compute where a single measurement falls relative to a normative distribution. |
| |
| Parameters |
| ---------- |
| value : float |
| Raw measurement for the specified biomarker. |
| biomarker : str |
| Name of the biomarker (must match a value in the "Biomarkers" column |
| of `normative_df`). |
| age_group : Union[str, int] |
| Either: |
| - A string age-group label (e.g. "40-49") matching `normative_df["Age"]`, or |
| - An integer age, which will be mapped into the correct age-group bracket. |
| region : str |
| Region name matching `normative_df["area"]` (case-insensitive). |
| gender : str |
| Gender label matching `normative_df["gender"]` (case-insensitive). |
| bmi : Union[str, float] |
| Either: |
| - A string BMI category (e.g. "Healthy"), or |
| - A numeric BMI value, which will be bucketed into WHO categories. |
| normative_df : pd.DataFrame |
| Table of normative summary statistics as returned by `load_normative_table`. |
| |
| Returns |
| ------- |
| Dict[str, Union[float, str]] |
| A dictionary containing: |
| - "z_score" (float): the computed z-score, |
| - "percentile" (float): the percentile (0–100), |
| - "mean" (float): the normative mean, |
| - "sd" (float): the normative standard deviation, |
| - "n" (str): the sample-size category string from the normative table. |
| - "median" (float): the normative median, |
| - "q1" (float): the first quartile, |
| - "q3" (float): the third quartile, |
| - "iqr" (float): the interquartile range, |
| - "mad" (float): the median absolute deviation, |
| - "se" (float): the standard error, |
| - "ci" (float): the confidence interval. |
| |
| Raises |
| ------ |
| KeyError |
| If no matching stratum is found in `normative_df`. |
| ValueError |
| If an integer `age_group` cannot be mapped to any age bracket. |
| """ |
| |
| age_group_str = _categorize_age(age_group, normative_df) |
| bmi_cat = categorize_bmi(bmi) |
| stats_d = _extract_stats( |
| normative_df=normative_df, |
| biomarker=biomarker, |
| age_group=age_group_str, |
| region=region, |
| gender=gender, |
| bmi_category=bmi_cat, |
| ) |
| z = z_score(value, stats_d["mean"], stats_d["sd"]) |
| pct = percentile_from_z(z) |
| return { |
| "z_score": z, |
| "percentile": pct, |
| "mean": stats_d["mean"], |
| "sd": stats_d["sd"], |
| "n": stats_d["n"], |
| "median": stats_d["median"], |
| "q1": stats_d["q1"], |
| "q3": stats_d["q3"], |
| "iqr": stats_d["iqr"], |
| "mad": stats_d["mad"], |
| "se": stats_d["se"], |
| "ci": stats_d["ci"], |
| } |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _compute_for_row( |
| row: pd.Series, |
| biomarker: str, |
| normative_df: pd.DataFrame, |
| age_col: str, |
| region_col: str, |
| gender_col: str, |
| bmi_col: str, |
| value_col: str, |
| ): |
| try: |
| res = compute_normative_position( |
| value=row[value_col], |
| biomarker=biomarker, |
| age_group=row[age_col], |
| region=row[region_col], |
| gender=row[gender_col], |
| bmi=row[bmi_col], |
| normative_df=normative_df, |
| ) |
| return pd.Series( |
| [res["z_score"], res["percentile"]], |
| index=[f"{biomarker}_z", f"{biomarker}_pct"], |
| ) |
| except Exception as exc: |
| warnings.warn(str(exc)) |
| return pd.Series( |
| [float("nan"), float("nan")], index=[f"{biomarker}_z", f"{biomarker}_pct"] |
| ) |
|
|
|
|
| def add_normative_columns( |
| df: pd.DataFrame, |
| *, |
| biomarkers: Iterable[str], |
| normative_df: pd.DataFrame, |
| age_col: str = "Age", |
| region_col: str = "area", |
| gender_col: str = "gender", |
| bmi_col: str = "Bmi", |
| value_cols: dict[str, str] | None = None, |
| output_prefixes: dict[str, str] | None = None, |
| ) -> pd.DataFrame: |
| """ |
| Append z-score and percentile columns for multiple biomarkers, with optional |
| custom prefixes for the output column names. |
| |
| Parameters |
| ---------- |
| df : pd.DataFrame |
| Participant-level data, must include demographic columns and raw biomarker |
| values. |
| biomarkers : Iterable[str] |
| List of biomarker names to process. |
| normative_df : pd.DataFrame |
| Normative summary table as loaded by `load_normative_table`. |
| age_col : str, default "Age" |
| Column in `df` containing age-group labels or integer ages. |
| region_col : str, default "area" |
| Column in `df` matching the "area" field in `normative_df`. |
| gender_col : str, default "gender" |
| Column in `df` matching the "gender" field in `normative_df`. |
| bmi_col : str, default "Bmi" |
| Column in `df` containing BMI values or categories. |
| value_cols : dict[str, str], optional |
| Mapping from each biomarker name to the column in `df` that holds its |
| raw numeric value. Defaults to identity mapping. |
| output_prefixes : dict[str, str], optional |
| Mapping from each biomarker name to the prefix to use for the output |
| columns. Defaults to using the biomarker name itself. |
| |
| Returns |
| ------- |
| pd.DataFrame |
| A copy of `df` with two new columns for each biomarker: |
| `<prefix>_z` and `<prefix>_pct`. |
| """ |
| value_cols = value_cols or {bm: bm for bm in biomarkers} |
| output_prefixes = output_prefixes or {} |
| out = df.copy() |
|
|
| for bm in biomarkers: |
| prefix = output_prefixes.get(bm, bm) |
| out[[f"{prefix}_z", f"{prefix}_pct"]] = df.apply( |
| _compute_for_row, |
| axis=1, |
| biomarker=bm, |
| normative_df=normative_df, |
| age_col=age_col, |
| region_col=region_col, |
| gender_col=gender_col, |
| bmi_col=bmi_col, |
| value_col=value_cols[bm], |
| ) |
|
|
| return out |
|
|
|
|
| |
| def compute_skew_corrected_position( |
| value: float, mean: float, sd: float, median: float |
| ) -> dict[str, float]: |
| """Compute skew-corrected z-score and percentile using Pearson Type III distribution.""" |
| |
| if sd == 0: |
| skewness = float("nan") |
| else: |
| skewness = 3 * (mean - median) / sd |
| |
| dist = stats.pearson3(skewness, loc=mean, scale=sd) |
| |
| p = dist.cdf(value) |
| |
| z_corr = stats.norm.ppf(p) |
| return {"z_skew_corrected": z_corr, "percentile_skew_corrected": float(p * 100)} |
|
|