Pujan-Dev's picture
push: used only the logistic
7bda3a9
import logging
import pickle
import re
import shutil
from functools import lru_cache
from pathlib import Path
import numpy as np
import pandas as pd
from huggingface_hub import snapshot_download
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from config import Config
LOGGER = logging.getLogger(__name__)
MODEL_FILES = {
"Logistic Regression": "Logistic_Regression.pkl",
"Random Forest": "Random_Forest.pkl",
# "Gradient Boosting": "Gradient_Boosting.pkl",
"Linear SVC": "Linear_SVC.pkl",
"Ridge Classifier": "Ridge_Classifier.pkl",
"Multinomial NB": "Multinomial_NB.pkl",
"Bernoulli NB": "Bernoulli_NB.pkl",
}
SKIP_MODELS = set()
REPO_ID = Config.REPO_ID_LANG
HF_TOKEN = Config.HF_TOKEN
NEPALI_SUBDIR = "Nepali_model"
REQUIRED_BASE_FILES = ("word_vectorizer.pkl", "char_vectorizer.pkl")
# Ranked by validation accuracy from final_model/final_results.csv
DEFAULT_MODEL_RANKING = [
"Gradient Boosting",
"Logistic Regression",
"Linear SVC",
"Ridge Classifier",
"Bernoulli NB",
"Random Forest",
"Multinomial NB",
]
def _patch_legacy_logistic_model(model):
"""Backfill attributes expected by newer sklearn versions."""
if isinstance(model, (LogisticRegression, LogisticRegressionCV)) and not hasattr(
model, "multi_class"
):
model.multi_class = "auto"
return model
class NepaliRichFeatures:
"""Burstiness + stylometry feature extractor used during model training."""
@staticmethod
def extract_burstiness(text: str) -> dict:
sentences = [s.strip() for s in re.split(r"[।!?]", str(text)) if s.strip()]
if not sentences:
return {
"burst_mean": 0.0,
"burst_std": 0.0,
"burst_max": 0.0,
"burst_min": 0.0,
"burst_range": 0.0,
}
lengths = [len(s.split()) for s in sentences]
return {
"burst_mean": float(np.mean(lengths)),
"burst_std": float(np.std(lengths)),
"burst_max": float(np.max(lengths)),
"burst_min": float(np.min(lengths)),
"burst_range": float(np.max(lengths) - np.min(lengths)),
}
@staticmethod
def extract_stylometry(text: str) -> dict:
words = str(text).split()
num_words = max(len(words), 1)
num_chars = max(len(str(text)), 1)
num_sentences = max(
len([s for s in re.split(r"[।!?]", str(text)) if s.strip()]), 1
)
avg_word_len = float(np.mean([len(w) for w in words])) if words else 0.0
avg_sent_len = num_words / num_sentences
lexical_diversity = len(set(words)) / num_words
punct_count = (
str(text).count("।")
+ str(text).count("?")
+ str(text).count("!")
+ str(text).count(",")
)
punct_ratio = punct_count / num_chars
bigrams = [" ".join(words[i : i + 2]) for i in range(len(words) - 1)]
rep_bigram_ratio = (
(1.0 - len(set(bigrams)) / max(len(bigrams), 1)) if bigrams else 0.0
)
diacritic_count = sum(1 for c in str(text) if "\u093e" <= c <= "\u094d")
diacritic_ratio = diacritic_count / num_chars
return {
"num_words": num_words,
"num_chars": num_chars,
"num_sentences": num_sentences,
"avg_word_len": avg_word_len,
"avg_sent_len": avg_sent_len,
"lexical_diversity": lexical_diversity,
"punct_ratio": punct_ratio,
"rep_bigram_ratio": rep_bigram_ratio,
"diacritic_ratio": diacritic_ratio,
}
def transform(self, texts):
if isinstance(texts, str):
texts = [texts]
rows = []
for text in texts:
row = {**self.extract_burstiness(text), **self.extract_stylometry(text)}
rows.append(row)
return pd.DataFrame(rows).values.astype(np.float32)
def _repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def _has_required_artifacts(path: Path) -> bool:
if not path.exists() or not path.is_dir():
return False
has_base = all((path / filename).exists() for filename in REQUIRED_BASE_FILES)
has_any_model = any((path / filename).exists() for filename in MODEL_FILES.values())
return has_base and has_any_model
def _candidate_model_dirs() -> list[Path]:
candidates = []
repo = _repo_root()
if Config.Nepali_model_folder:
custom = Path(Config.Nepali_model_folder)
candidates.extend([custom, custom / NEPALI_SUBDIR])
default_dir = repo / "features" / "Model" / "Nepali_model"
candidates.extend([default_dir, default_dir / NEPALI_SUBDIR])
candidates.append(
repo / "notebook" / "ai_vs_human_nepali" / "final_model" / "saved_models"
)
return candidates
def _download_nepali_artifacts() -> None:
if not REPO_ID:
raise ValueError("English_model repo id is not configured")
repo = _repo_root()
target_dir = (
Path(Config.Nepali_model_folder)
if Config.Nepali_model_folder
else repo / "features" / "Model" / "Nepali_model"
)
snapshot_path = Path(snapshot_download(repo_id=REPO_ID, token=HF_TOKEN))
source_dir = (
snapshot_path / NEPALI_SUBDIR
if (snapshot_path / NEPALI_SUBDIR).is_dir()
else snapshot_path
)
target_dir.mkdir(parents=True, exist_ok=True)
shutil.copytree(source_dir, target_dir, dirs_exist_ok=True)
def resolve_model_dir() -> Path:
for path in _candidate_model_dirs():
if _has_required_artifacts(path):
return path
LOGGER.info("Nepali artifacts not found locally; downloading from %s", REPO_ID)
_download_nepali_artifacts()
for path in _candidate_model_dirs():
if _has_required_artifacts(path):
return path
raise FileNotFoundError(
"Nepali model directory not found. Set Nepali_model env or add expected artifacts."
)
@lru_cache(maxsize=1)
def load_artifacts():
model_dir = resolve_model_dir()
LOGGER.info("Loading Nepali artifacts from %s", model_dir)
models = {}
unavailable = {}
for model_name, file_name in MODEL_FILES.items():
if model_name in SKIP_MODELS:
unavailable[model_name] = "Skipped due to large artifact size"
continue
file_path = model_dir / file_name
if not file_path.exists():
unavailable[model_name] = "Missing model file"
continue
with open(file_path, "rb") as fp:
models[model_name] = _patch_legacy_logistic_model(pickle.load(fp))
with open(model_dir / "word_vectorizer.pkl", "rb") as fp:
word_vectorizer = pickle.load(fp)
with open(model_dir / "char_vectorizer.pkl", "rb") as fp:
char_vectorizer = pickle.load(fp)
rich_transformer = NepaliRichFeatures()
return {
"model_dir": str(model_dir),
"models": models,
"unavailable_models": unavailable,
"word_vectorizer": word_vectorizer,
"char_vectorizer": char_vectorizer,
"rich_transformer": rich_transformer,
}
def get_available_models():
artifacts = load_artifacts()
return list(artifacts["models"].keys())
def get_default_top_models(top_k: int = 2):
available = set(get_available_models())
ranked = [name for name in DEFAULT_MODEL_RANKING if name in available]
if not ranked:
return list(available)[:top_k]
return ranked[: max(1, top_k)]