| | from __future__ import annotations |
| |
|
| | from pathlib import Path |
| | import re |
| | from typing import Dict, Iterable, List, Mapping, Optional |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | |
| | STANDARD_COLUMNS: List[str] = [ |
| | "code_bv", |
| | "nom_bv", |
| | "annee", |
| | "date_scrutin", |
| | "type_scrutin", |
| | "tour", |
| | "inscrits", |
| | "votants", |
| | "abstentions", |
| | "blancs", |
| | "nuls", |
| | "exprimes", |
| | "code_candidature", |
| | "nom_candidature", |
| | "voix", |
| | ] |
| |
|
| | NUMERIC_COLUMNS = [ |
| | "inscrits", |
| | "votants", |
| | "abstentions", |
| | "blancs", |
| | "nuls", |
| | "exprimes", |
| | "voix", |
| | ] |
| |
|
| |
|
| | _MOJIBAKE_REPLACEMENTS = { |
| | "é": "é", |
| | "è": "è", |
| | "ê": "ê", |
| | "ë": "ë", |
| | "Ã ": "à", |
| | "â": "â", |
| | "ç": "ç", |
| | "ù": "ù", |
| | "û": "û", |
| | "ï": "ï", |
| | "ô": "ô", |
| | "ö": "ö", |
| | "É": "É", |
| | "È": "È", |
| | "Ê": "Ê", |
| | "Ë": "Ë", |
| | "À": "À", |
| | "Â": "Â", |
| | "Ç": "Ç", |
| | "�": "°", |
| | "�": "°", |
| | } |
| |
|
| |
|
| | def _normalize_label(label: str) -> str: |
| | """ |
| | Attempt to repair mojibake in column labels (UTF-8 read as latin-1 or vice versa). |
| | """ |
| | fixed = label |
| | try: |
| | fixed = label.encode("latin1").decode("utf-8") |
| | except (UnicodeEncodeError, UnicodeDecodeError): |
| | fixed = label |
| | else: |
| | if "Â" in fixed: |
| | fixed = fixed.replace("Â", "") |
| | try: |
| | |
| | fixed = fixed.encode("utf-8").decode("latin1") |
| | except (UnicodeEncodeError, UnicodeDecodeError): |
| | pass |
| | for bad, good in _MOJIBAKE_REPLACEMENTS.items(): |
| | if bad in fixed: |
| | fixed = fixed.replace(bad, good) |
| | fixed = fixed.replace("\ufeff", "") |
| | fixed = " ".join(fixed.split()) |
| | return fixed |
| |
|
| |
|
| | def _canonical_label(label: str) -> str: |
| | """ |
| | Lowercase alpha-numeric only version of a label for fuzzy matching. |
| | """ |
| | import re |
| |
|
| | norm = _normalize_label(label).lower() |
| | return re.sub(r"[^0-9a-z]", "", norm) |
| |
|
| |
|
| | def _unpivot_wide_candidates(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Detect wide candidate columns (e.g., 'Voix 1', 'Nuance liste 2') and unpivot to long. |
| | Keeps one row per candidate with standard columns 'voix' and 'code_candidature'. |
| | """ |
| | pattern = re.compile(r"^(?P<base>.*?)(?:\s+|_)?(?P<idx>\d+)$") |
| | candidate_map: Dict[str, Dict[str, str]] = {} |
| | wide_cols: set[str] = set() |
| | for col in df.columns: |
| | match = pattern.match(col) |
| | if not match: |
| | continue |
| | wide_cols.add(col) |
| | base = match.group("base").strip() |
| | idx = match.group("idx") |
| | canon = _canonical_label(base) |
| | field = None |
| | if canon == "voix": |
| | field = "voix" |
| | elif canon in {"nuance", "nuanceliste", "codenuance", "codenuanceducandidat", "codenuanceliste"}: |
| | field = "code_candidature" |
| | if field: |
| | candidate_map.setdefault(idx, {})[field] = col |
| |
|
| | indices = [ |
| | idx for idx, fields in candidate_map.items() |
| | if {"voix", "code_candidature"}.issubset(fields.keys()) |
| | ] |
| | if len(indices) <= 1: |
| | return df |
| |
|
| | candidate_cols = {col for fields in candidate_map.values() for col in fields.values()} |
| | base_cols = [c for c in df.columns if c not in wide_cols] |
| | frames = [] |
| | for idx in sorted(indices, key=lambda v: int(v)): |
| | fields = candidate_map[idx] |
| | use_cols = base_cols + list(fields.values()) |
| | sub = df[use_cols].copy() |
| | sub = sub.rename( |
| | columns={ |
| | fields["voix"]: "voix", |
| | fields["code_candidature"]: "code_candidature", |
| | } |
| | ) |
| | frames.append(sub) |
| | return pd.concat(frames, ignore_index=True) |
| |
|
| |
|
| | def deduplicate_columns(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | If multiple columns end up with the same name after rename/normalization, |
| | keep the first non-null value across duplicates and drop the extras. |
| | """ |
| | df = df.copy() |
| | duplicates = df.columns[df.columns.duplicated()].unique() |
| | for col in duplicates: |
| | cols = [c for c in df.columns if c == col] |
| | base = df[cols[0]] |
| | for extra in cols[1:]: |
| | base = base.fillna(df[extra]) |
| | df[col] = base |
| | df = df.drop(columns=cols[1:]) |
| | |
| | df = df.loc[:, ~df.columns.duplicated()] |
| | return df |
| |
|
| |
|
| | def load_raw( |
| | path: Path, |
| | *, |
| | sep: str = ";", |
| | encoding: str | Iterable[str] = "cp1252", |
| | decimal: str = ",", |
| | dtype: Optional[Mapping[str, str]] = None, |
| | engine: str = "c", |
| | ) -> pd.DataFrame: |
| | """ |
| | Wrapper around read_csv with encoding fallbacks to mitigate mojibake. |
| | |
| | Tries encodings in order (default: cp1252, utf-8-sig, latin-1) until column |
| | names no longer contain replacement artefacts (� or Ã), then normalises labels. |
| | """ |
| | encoding_choices: List[str] = [] |
| | if isinstance(encoding, str): |
| | encoding_choices.append(encoding) |
| | else: |
| | encoding_choices.extend(list(encoding)) |
| | encoding_choices.extend([e for e in ["utf-8-sig", "latin-1"] if e not in encoding_choices]) |
| |
|
| | last_exc: Optional[Exception] = None |
| | for enc in encoding_choices: |
| | try: |
| | try: |
| | df = pd.read_csv( |
| | path, |
| | sep=sep, |
| | encoding=enc, |
| | decimal=decimal, |
| | dtype=dtype, |
| | engine=engine, |
| | low_memory=False, |
| | ) |
| | except pd.errors.ParserError: |
| | |
| | df = pd.read_csv( |
| | path, |
| | sep=sep, |
| | encoding=enc, |
| | decimal=decimal, |
| | dtype=dtype, |
| | engine="python", |
| | on_bad_lines="skip", |
| | ) |
| | except UnicodeDecodeError as exc: |
| | last_exc = exc |
| | continue |
| |
|
| | bad_cols = any(("�" in col) or ("Ã" in col) for col in df.columns) |
| | if bad_cols and enc != encoding_choices[-1]: |
| | |
| | continue |
| |
|
| | df.columns = [_normalize_label(c) for c in df.columns] |
| | return df |
| |
|
| | if last_exc: |
| | raise last_exc |
| | raise UnicodeDecodeError("utf-8", b"", 0, 1, "unable to decode with provided encodings") |
| |
|
| |
|
| | def ensure_columns(df: pd.DataFrame, required: Iterable[str]) -> pd.DataFrame: |
| | """ |
| | Add missing columns with NaN placeholders to guarantee downstream compatibility. |
| | """ |
| | for col in required: |
| | if col not in df.columns: |
| | df[col] = np.nan |
| | return df |
| |
|
| |
|
| | def add_election_metadata(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame: |
| | """ |
| | Attach metadata about the scrutin to each row. |
| | |
| | Required meta keys: |
| | - type_scrutin |
| | - tour |
| | - date_scrutin |
| | |
| | Optional: |
| | - annee (otherwise derived from date_scrutin) |
| | """ |
| | df["type_scrutin"] = meta["type_scrutin"] |
| | df["tour"] = int(meta["tour"]) |
| | df["date_scrutin"] = pd.to_datetime(meta["date_scrutin"]) |
| | df["annee"] = meta.get("annee", df["date_scrutin"].dt.year) |
| | return df |
| |
|
| |
|
| | def build_code_bv(df: pd.DataFrame, meta: Mapping[str, object]) -> pd.DataFrame: |
| | """ |
| | Ensure a code_bv column exists. If already present, it is left intact. |
| | |
| | Optionally, pass in meta["code_bv_cols"] as a list of column names to combine. |
| | """ |
| | if "code_bv" in df.columns: |
| | df["code_bv"] = df["code_bv"].astype(str).str.strip() |
| | return df |
| |
|
| | columns_to_concat: Optional[List[str]] = meta.get("code_bv_cols") |
| | if columns_to_concat: |
| | actual_cols: List[str] = [] |
| | canon_map = {_canonical_label(col): col for col in df.columns} |
| | for target in columns_to_concat: |
| | canon = _canonical_label(target) |
| | if canon in canon_map: |
| | actual_cols.append(canon_map[canon]) |
| | else: |
| | raise KeyError(f"{target!r} not found in columns. Available: {list(df.columns)}") |
| |
|
| | df["code_bv"] = ( |
| | df[actual_cols] |
| | .astype(str) |
| | .apply(lambda row: "-".join([v.zfill(3) if v.isdigit() else v for v in row]), axis=1) |
| | ) |
| | else: |
| | raise KeyError("code_bv not found in dataframe and no code_bv_cols provided in meta.") |
| | return df |
| |
|
| |
|
| | def coerce_numeric(df: pd.DataFrame, numeric_cols: Iterable[str] = NUMERIC_COLUMNS) -> pd.DataFrame: |
| | for col in numeric_cols: |
| | if col in df.columns: |
| | df[col] = pd.to_numeric(df[col], errors="coerce") |
| | return df |
| |
|
| |
|
| | def basic_cleaning(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Apply harmonisations common to all scrutins. |
| | """ |
| | df = df.copy() |
| | df["voix"] = df.get("voix", 0).fillna(0) |
| |
|
| | |
| | mask_expr = ( |
| | df["exprimes"].isna() |
| | & df["votants"].notna() |
| | & df["blancs"].notna() |
| | & df["nuls"].notna() |
| | ) |
| | df.loc[mask_expr, "exprimes"] = ( |
| | df.loc[mask_expr, "votants"] - df.loc[mask_expr, "blancs"] - df.loc[mask_expr, "nuls"] |
| | ) |
| |
|
| | |
| | df = df[df["code_bv"].notna()] |
| | return df |
| |
|
| |
|
| | def standardize_election( |
| | path: Path, |
| | meta: Mapping[str, object], |
| | *, |
| | rename_map: Optional[Mapping[str, str]] = None, |
| | sep: str = ";", |
| | encoding: str | Iterable[str] = ("cp1252", "utf-8-sig", "latin-1"), |
| | decimal: str = ",", |
| | dtype: Optional[Mapping[str, str]] = None, |
| | ) -> pd.DataFrame: |
| | """ |
| | Load and standardise a single raw table to the long format expected downstream. |
| | |
| | Parameters |
| | ---------- |
| | path : Path |
| | CSV path to the raw election table. |
| | meta : Mapping |
| | Must contain type_scrutin, tour, date_scrutin. Optionally code_bv_cols and annee. |
| | rename_map : Mapping |
| | Columns to rename from the raw schema to the standard schema. |
| | """ |
| | df_raw = load_raw(path, sep=sep, encoding=encoding, decimal=decimal, dtype=dtype) |
| | rename_norm = {_normalize_label(k): v for k, v in (rename_map or {}).items()} |
| |
|
| | def _process(df: pd.DataFrame, meta_for_tour: Mapping[str, object]) -> pd.DataFrame: |
| | df_local = df.copy() |
| | df_local.columns = [_normalize_label(c) for c in df_local.columns] |
| | df_local = _unpivot_wide_candidates(df_local) |
| | if rename_norm: |
| | |
| | import re |
| |
|
| | def canonical_base(label: str) -> str: |
| | base = _canonical_label(label) |
| | return re.sub(r"\\d+$", "", base) |
| |
|
| | rename_by_base = {canonical_base(k): v for k, v in rename_norm.items()} |
| | rename_using = {} |
| | for col in df_local.columns: |
| | base = canonical_base(col) |
| | if base in rename_by_base: |
| | rename_using[col] = rename_by_base[base] |
| | df_local = df_local.rename(columns=rename_using) |
| | df_local = deduplicate_columns(df_local) |
| | df_local = df_local.loc[:, ~df_local.columns.duplicated()] |
| |
|
| | df_local = build_code_bv(df_local, meta_for_tour) |
| | df_local = add_election_metadata(df_local, meta_for_tour) |
| | df_local = ensure_columns(df_local, STANDARD_COLUMNS) |
| | df_local = coerce_numeric(df_local) |
| | df_local = basic_cleaning(df_local) |
| | ordered_cols = STANDARD_COLUMNS + [col for col in df_local.columns if col not in STANDARD_COLUMNS] |
| | return df_local[ordered_cols] |
| |
|
| | |
| | if meta.get("tour_column") and "tour" not in meta: |
| | tour_col = _normalize_label(str(meta["tour_column"])) |
| | if tour_col not in df_raw.columns: |
| | |
| | meta_single = {k: v for k, v in meta.items() if k != "tour_column"} |
| | meta_single["tour"] = int(meta.get("tour", 1)) |
| | return _process(df_raw, meta_single) |
| | tours = meta.get("tours") or sorted(df_raw[tour_col].dropna().unique()) |
| | frames: list[pd.DataFrame] = [] |
| | for tour_val in tours: |
| | meta_tour = {k: v for k, v in meta.items() if k != "tour_column"} |
| | meta_tour["tour"] = int(tour_val) |
| | frames.append(_process(df_raw[df_raw[tour_col] == tour_val], meta_tour)) |
| | if not frames: |
| | raise RuntimeError(f"Aucun tour détecté pour {path.name}") |
| | return pd.concat(frames, ignore_index=True) |
| |
|
| | return _process(df_raw, meta) |
| |
|
| |
|
| | def validate_consistency(df: pd.DataFrame, *, tolerance: float = 0.02) -> Dict[str, pd.DataFrame]: |
| | """ |
| | Quick validation checks. Returns a dict of issues to inspect. |
| | """ |
| | issues: Dict[str, pd.DataFrame] = {} |
| |
|
| | if {"votants", "inscrits"}.issubset(df.columns): |
| | issues["votants_gt_inscrits"] = df[df["votants"] > df["inscrits"]] |
| |
|
| | if {"exprimes", "blancs", "nuls", "votants"}.issubset(df.columns): |
| | expr_gap = df.copy() |
| | expr_gap["gap"] = ( |
| | (expr_gap["exprimes"] + expr_gap["blancs"] + expr_gap["nuls"] - expr_gap["votants"]) |
| | / expr_gap["votants"].replace(0, np.nan) |
| | ) |
| | issues["exprimes_balance_off"] = expr_gap[expr_gap["gap"].abs() > tolerance] |
| |
|
| | if {"code_bv", "type_scrutin", "tour", "exprimes", "voix"}.issubset(df.columns): |
| | sums = df.groupby(["code_bv", "type_scrutin", "tour"], as_index=False)[["exprimes", "voix"]].sum() |
| | sums["gap"] = (sums["voix"] - sums["exprimes"]) / sums["exprimes"].replace(0, np.nan) |
| | issues["sum_voix_vs_exprimes"] = sums[sums["gap"].abs() > tolerance] |
| |
|
| | return issues |
| |
|