Spaces:
Running
Running
| import logging | |
| import pickle | |
| import re | |
| import shutil | |
| from functools import lru_cache | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from huggingface_hub import snapshot_download | |
| from sklearn.linear_model import LogisticRegression, LogisticRegressionCV | |
| from config import Config | |
| LOGGER = logging.getLogger(__name__) | |
| MODEL_FILES = { | |
| "Logistic Regression": "Logistic_Regression.pkl", | |
| "Random Forest": "Random_Forest.pkl", | |
| # "Gradient Boosting": "Gradient_Boosting.pkl", | |
| "Linear SVC": "Linear_SVC.pkl", | |
| "Ridge Classifier": "Ridge_Classifier.pkl", | |
| "Multinomial NB": "Multinomial_NB.pkl", | |
| "Bernoulli NB": "Bernoulli_NB.pkl", | |
| } | |
| SKIP_MODELS = set() | |
| REPO_ID = Config.REPO_ID_LANG | |
| HF_TOKEN = Config.HF_TOKEN | |
| NEPALI_SUBDIR = "Nepali_model" | |
| REQUIRED_BASE_FILES = ("word_vectorizer.pkl", "char_vectorizer.pkl") | |
| # Ranked by validation accuracy from final_model/final_results.csv | |
| DEFAULT_MODEL_RANKING = [ | |
| "Gradient Boosting", | |
| "Logistic Regression", | |
| "Linear SVC", | |
| "Ridge Classifier", | |
| "Bernoulli NB", | |
| "Random Forest", | |
| "Multinomial NB", | |
| ] | |
| def _patch_legacy_logistic_model(model): | |
| """Backfill attributes expected by newer sklearn versions.""" | |
| if isinstance(model, (LogisticRegression, LogisticRegressionCV)) and not hasattr( | |
| model, "multi_class" | |
| ): | |
| model.multi_class = "auto" | |
| return model | |
| class NepaliRichFeatures: | |
| """Burstiness + stylometry feature extractor used during model training.""" | |
| def extract_burstiness(text: str) -> dict: | |
| sentences = [s.strip() for s in re.split(r"[।!?]", str(text)) if s.strip()] | |
| if not sentences: | |
| return { | |
| "burst_mean": 0.0, | |
| "burst_std": 0.0, | |
| "burst_max": 0.0, | |
| "burst_min": 0.0, | |
| "burst_range": 0.0, | |
| } | |
| lengths = [len(s.split()) for s in sentences] | |
| return { | |
| "burst_mean": float(np.mean(lengths)), | |
| "burst_std": float(np.std(lengths)), | |
| "burst_max": float(np.max(lengths)), | |
| "burst_min": float(np.min(lengths)), | |
| "burst_range": float(np.max(lengths) - np.min(lengths)), | |
| } | |
| def extract_stylometry(text: str) -> dict: | |
| words = str(text).split() | |
| num_words = max(len(words), 1) | |
| num_chars = max(len(str(text)), 1) | |
| num_sentences = max( | |
| len([s for s in re.split(r"[।!?]", str(text)) if s.strip()]), 1 | |
| ) | |
| avg_word_len = float(np.mean([len(w) for w in words])) if words else 0.0 | |
| avg_sent_len = num_words / num_sentences | |
| lexical_diversity = len(set(words)) / num_words | |
| punct_count = ( | |
| str(text).count("।") | |
| + str(text).count("?") | |
| + str(text).count("!") | |
| + str(text).count(",") | |
| ) | |
| punct_ratio = punct_count / num_chars | |
| bigrams = [" ".join(words[i : i + 2]) for i in range(len(words) - 1)] | |
| rep_bigram_ratio = ( | |
| (1.0 - len(set(bigrams)) / max(len(bigrams), 1)) if bigrams else 0.0 | |
| ) | |
| diacritic_count = sum(1 for c in str(text) if "\u093e" <= c <= "\u094d") | |
| diacritic_ratio = diacritic_count / num_chars | |
| return { | |
| "num_words": num_words, | |
| "num_chars": num_chars, | |
| "num_sentences": num_sentences, | |
| "avg_word_len": avg_word_len, | |
| "avg_sent_len": avg_sent_len, | |
| "lexical_diversity": lexical_diversity, | |
| "punct_ratio": punct_ratio, | |
| "rep_bigram_ratio": rep_bigram_ratio, | |
| "diacritic_ratio": diacritic_ratio, | |
| } | |
| def transform(self, texts): | |
| if isinstance(texts, str): | |
| texts = [texts] | |
| rows = [] | |
| for text in texts: | |
| row = {**self.extract_burstiness(text), **self.extract_stylometry(text)} | |
| rows.append(row) | |
| return pd.DataFrame(rows).values.astype(np.float32) | |
| def _repo_root() -> Path: | |
| return Path(__file__).resolve().parents[2] | |
| def _has_required_artifacts(path: Path) -> bool: | |
| if not path.exists() or not path.is_dir(): | |
| return False | |
| has_base = all((path / filename).exists() for filename in REQUIRED_BASE_FILES) | |
| has_any_model = any((path / filename).exists() for filename in MODEL_FILES.values()) | |
| return has_base and has_any_model | |
| def _candidate_model_dirs() -> list[Path]: | |
| candidates = [] | |
| repo = _repo_root() | |
| if Config.Nepali_model_folder: | |
| custom = Path(Config.Nepali_model_folder) | |
| candidates.extend([custom, custom / NEPALI_SUBDIR]) | |
| default_dir = repo / "features" / "Model" / "Nepali_model" | |
| candidates.extend([default_dir, default_dir / NEPALI_SUBDIR]) | |
| candidates.append( | |
| repo / "notebook" / "ai_vs_human_nepali" / "final_model" / "saved_models" | |
| ) | |
| return candidates | |
| def _download_nepali_artifacts() -> None: | |
| if not REPO_ID: | |
| raise ValueError("English_model repo id is not configured") | |
| repo = _repo_root() | |
| target_dir = ( | |
| Path(Config.Nepali_model_folder) | |
| if Config.Nepali_model_folder | |
| else repo / "features" / "Model" / "Nepali_model" | |
| ) | |
| snapshot_path = Path(snapshot_download(repo_id=REPO_ID, token=HF_TOKEN)) | |
| source_dir = ( | |
| snapshot_path / NEPALI_SUBDIR | |
| if (snapshot_path / NEPALI_SUBDIR).is_dir() | |
| else snapshot_path | |
| ) | |
| target_dir.mkdir(parents=True, exist_ok=True) | |
| shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) | |
| def resolve_model_dir() -> Path: | |
| for path in _candidate_model_dirs(): | |
| if _has_required_artifacts(path): | |
| return path | |
| LOGGER.info("Nepali artifacts not found locally; downloading from %s", REPO_ID) | |
| _download_nepali_artifacts() | |
| for path in _candidate_model_dirs(): | |
| if _has_required_artifacts(path): | |
| return path | |
| raise FileNotFoundError( | |
| "Nepali model directory not found. Set Nepali_model env or add expected artifacts." | |
| ) | |
| def load_artifacts(): | |
| model_dir = resolve_model_dir() | |
| LOGGER.info("Loading Nepali artifacts from %s", model_dir) | |
| models = {} | |
| unavailable = {} | |
| for model_name, file_name in MODEL_FILES.items(): | |
| if model_name in SKIP_MODELS: | |
| unavailable[model_name] = "Skipped due to large artifact size" | |
| continue | |
| file_path = model_dir / file_name | |
| if not file_path.exists(): | |
| unavailable[model_name] = "Missing model file" | |
| continue | |
| with open(file_path, "rb") as fp: | |
| models[model_name] = _patch_legacy_logistic_model(pickle.load(fp)) | |
| with open(model_dir / "word_vectorizer.pkl", "rb") as fp: | |
| word_vectorizer = pickle.load(fp) | |
| with open(model_dir / "char_vectorizer.pkl", "rb") as fp: | |
| char_vectorizer = pickle.load(fp) | |
| rich_transformer = NepaliRichFeatures() | |
| return { | |
| "model_dir": str(model_dir), | |
| "models": models, | |
| "unavailable_models": unavailable, | |
| "word_vectorizer": word_vectorizer, | |
| "char_vectorizer": char_vectorizer, | |
| "rich_transformer": rich_transformer, | |
| } | |
| def get_available_models(): | |
| artifacts = load_artifacts() | |
| return list(artifacts["models"].keys()) | |
| def get_default_top_models(top_k: int = 2): | |
| available = set(get_available_models()) | |
| ranked = [name for name in DEFAULT_MODEL_RANKING if name in available] | |
| if not ranked: | |
| return list(available)[:top_k] | |
| return ranked[: max(1, top_k)] | |