import logging import pickle import re import shutil from functools import lru_cache from pathlib import Path import numpy as np import pandas as pd from huggingface_hub import snapshot_download from sklearn.linear_model import LogisticRegression, LogisticRegressionCV from config import Config LOGGER = logging.getLogger(__name__) MODEL_FILES = { "Logistic Regression": "Logistic_Regression.pkl", "Random Forest": "Random_Forest.pkl", # "Gradient Boosting": "Gradient_Boosting.pkl", "Linear SVC": "Linear_SVC.pkl", "Ridge Classifier": "Ridge_Classifier.pkl", "Multinomial NB": "Multinomial_NB.pkl", "Bernoulli NB": "Bernoulli_NB.pkl", } SKIP_MODELS = set() REPO_ID = Config.REPO_ID_LANG HF_TOKEN = Config.HF_TOKEN NEPALI_SUBDIR = "Nepali_model" REQUIRED_BASE_FILES = ("word_vectorizer.pkl", "char_vectorizer.pkl") # Ranked by validation accuracy from final_model/final_results.csv DEFAULT_MODEL_RANKING = [ "Gradient Boosting", "Logistic Regression", "Linear SVC", "Ridge Classifier", "Bernoulli NB", "Random Forest", "Multinomial NB", ] def _patch_legacy_logistic_model(model): """Backfill attributes expected by newer sklearn versions.""" if isinstance(model, (LogisticRegression, LogisticRegressionCV)) and not hasattr( model, "multi_class" ): model.multi_class = "auto" return model class NepaliRichFeatures: """Burstiness + stylometry feature extractor used during model training.""" @staticmethod def extract_burstiness(text: str) -> dict: sentences = [s.strip() for s in re.split(r"[।!?]", str(text)) if s.strip()] if not sentences: return { "burst_mean": 0.0, "burst_std": 0.0, "burst_max": 0.0, "burst_min": 0.0, "burst_range": 0.0, } lengths = [len(s.split()) for s in sentences] return { "burst_mean": float(np.mean(lengths)), "burst_std": float(np.std(lengths)), "burst_max": float(np.max(lengths)), "burst_min": float(np.min(lengths)), "burst_range": float(np.max(lengths) - np.min(lengths)), } @staticmethod def extract_stylometry(text: str) -> dict: words = str(text).split() num_words = max(len(words), 1) num_chars = max(len(str(text)), 1) num_sentences = max( len([s for s in re.split(r"[।!?]", str(text)) if s.strip()]), 1 ) avg_word_len = float(np.mean([len(w) for w in words])) if words else 0.0 avg_sent_len = num_words / num_sentences lexical_diversity = len(set(words)) / num_words punct_count = ( str(text).count("।") + str(text).count("?") + str(text).count("!") + str(text).count(",") ) punct_ratio = punct_count / num_chars bigrams = [" ".join(words[i : i + 2]) for i in range(len(words) - 1)] rep_bigram_ratio = ( (1.0 - len(set(bigrams)) / max(len(bigrams), 1)) if bigrams else 0.0 ) diacritic_count = sum(1 for c in str(text) if "\u093e" <= c <= "\u094d") diacritic_ratio = diacritic_count / num_chars return { "num_words": num_words, "num_chars": num_chars, "num_sentences": num_sentences, "avg_word_len": avg_word_len, "avg_sent_len": avg_sent_len, "lexical_diversity": lexical_diversity, "punct_ratio": punct_ratio, "rep_bigram_ratio": rep_bigram_ratio, "diacritic_ratio": diacritic_ratio, } def transform(self, texts): if isinstance(texts, str): texts = [texts] rows = [] for text in texts: row = {**self.extract_burstiness(text), **self.extract_stylometry(text)} rows.append(row) return pd.DataFrame(rows).values.astype(np.float32) def _repo_root() -> Path: return Path(__file__).resolve().parents[2] def _has_required_artifacts(path: Path) -> bool: if not path.exists() or not path.is_dir(): return False has_base = all((path / filename).exists() for filename in REQUIRED_BASE_FILES) has_any_model = any((path / filename).exists() for filename in MODEL_FILES.values()) return has_base and has_any_model def _candidate_model_dirs() -> list[Path]: candidates = [] repo = _repo_root() if Config.Nepali_model_folder: custom = Path(Config.Nepali_model_folder) candidates.extend([custom, custom / NEPALI_SUBDIR]) default_dir = repo / "features" / "Model" / "Nepali_model" candidates.extend([default_dir, default_dir / NEPALI_SUBDIR]) candidates.append( repo / "notebook" / "ai_vs_human_nepali" / "final_model" / "saved_models" ) return candidates def _download_nepali_artifacts() -> None: if not REPO_ID: raise ValueError("English_model repo id is not configured") repo = _repo_root() target_dir = ( Path(Config.Nepali_model_folder) if Config.Nepali_model_folder else repo / "features" / "Model" / "Nepali_model" ) snapshot_path = Path(snapshot_download(repo_id=REPO_ID, token=HF_TOKEN)) source_dir = ( snapshot_path / NEPALI_SUBDIR if (snapshot_path / NEPALI_SUBDIR).is_dir() else snapshot_path ) target_dir.mkdir(parents=True, exist_ok=True) shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) def resolve_model_dir() -> Path: for path in _candidate_model_dirs(): if _has_required_artifacts(path): return path LOGGER.info("Nepali artifacts not found locally; downloading from %s", REPO_ID) _download_nepali_artifacts() for path in _candidate_model_dirs(): if _has_required_artifacts(path): return path raise FileNotFoundError( "Nepali model directory not found. Set Nepali_model env or add expected artifacts." ) @lru_cache(maxsize=1) def load_artifacts(): model_dir = resolve_model_dir() LOGGER.info("Loading Nepali artifacts from %s", model_dir) models = {} unavailable = {} for model_name, file_name in MODEL_FILES.items(): if model_name in SKIP_MODELS: unavailable[model_name] = "Skipped due to large artifact size" continue file_path = model_dir / file_name if not file_path.exists(): unavailable[model_name] = "Missing model file" continue with open(file_path, "rb") as fp: models[model_name] = _patch_legacy_logistic_model(pickle.load(fp)) with open(model_dir / "word_vectorizer.pkl", "rb") as fp: word_vectorizer = pickle.load(fp) with open(model_dir / "char_vectorizer.pkl", "rb") as fp: char_vectorizer = pickle.load(fp) rich_transformer = NepaliRichFeatures() return { "model_dir": str(model_dir), "models": models, "unavailable_models": unavailable, "word_vectorizer": word_vectorizer, "char_vectorizer": char_vectorizer, "rich_transformer": rich_transformer, } def get_available_models(): artifacts = load_artifacts() return list(artifacts["models"].keys()) def get_default_top_models(top_k: int = 2): available = set(get_available_models()) ranked = [name for name in DEFAULT_MODEL_RANKING if name in available] if not ranked: return list(available)[:top_k] return ranked[: max(1, top_k)]