Spaces:
Sleeping
Sleeping
| """ | |
| preprocess.py — Text Cleaning & Preprocessing for Sentiment Analysis | |
| """ | |
| import re | |
| import html | |
| import string | |
| import numpy as np | |
| import pandas as pd | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from sklearn.model_selection import train_test_split | |
| # Download required NLTK data | |
| nltk.download("stopwords", quiet=True) | |
| nltk.download("wordnet", quiet=True) | |
| nltk.download("punkt", quiet=True) | |
| STOP_WORDS = set(stopwords.words("english")) | |
| # Keep negation words — important for sentiment! | |
| NEGATION_WORDS = {"no", "not", "nor", "never", "neither", "none", "nobody", | |
| "nothing", "nowhere", "hardly", "scarcely", "barely"} | |
| STOP_WORDS -= NEGATION_WORDS | |
| lemmatizer = WordNetLemmatizer() | |
| def clean_text(text: str, lemmatize: bool = False) -> str: | |
| """ | |
| Clean raw review text: | |
| 1. Decode HTML entities | |
| 2. Remove HTML tags | |
| 3. Lowercase | |
| 4. Remove URLs | |
| 5. Remove special characters (keep basic punctuation) | |
| 6. Normalize whitespace | |
| 7. Optionally lemmatize | |
| """ | |
| if not isinstance(text, str): | |
| return "" | |
| # 1. Decode HTML | |
| text = html.unescape(text) | |
| # 2. Remove HTML tags | |
| text = re.sub(r"<[^>]+>", " ", text) | |
| # 3. Lowercase | |
| text = text.lower() | |
| # 4. Remove URLs | |
| text = re.sub(r"https?://\S+|www\.\S+", " ", text) | |
| # 5. Remove special characters but keep alphanumerics and spaces | |
| text = re.sub(r"[^a-z0-9\s]", " ", text) | |
| # 6. Normalize whitespace | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if lemmatize: | |
| tokens = text.split() | |
| tokens = [lemmatizer.lemmatize(t) for t in tokens] | |
| text = " ".join(tokens) | |
| return text | |
| def remove_stopwords(text: str) -> str: | |
| """Remove stop words, keeping negation words intact.""" | |
| tokens = text.split() | |
| tokens = [t for t in tokens if t not in STOP_WORDS] | |
| return " ".join(tokens) | |
| def load_imdb_from_csv(filepath: str) -> pd.DataFrame: | |
| """Load IMDB dataset from a CSV file and prepare it.""" | |
| df = pd.read_csv(filepath) | |
| # Standard column names for Kaggle IMDB dataset | |
| df.columns = df.columns.str.lower().str.strip() | |
| assert "review" in df.columns and "sentiment" in df.columns, \ | |
| "CSV must have 'review' and 'sentiment' columns." | |
| df["label"] = (df["sentiment"].str.lower() == "positive").astype(int) | |
| return df[["review", "label"]] | |
| def load_imdb_from_huggingface() -> pd.DataFrame: | |
| """Load IMDB dataset from HuggingFace datasets (no Kaggle account needed).""" | |
| from datasets import load_dataset | |
| print("📥 Loading IMDB dataset from HuggingFace...") | |
| raw = load_dataset("imdb") | |
| train_df = pd.DataFrame(raw["train"]) | |
| test_df = pd.DataFrame(raw["test"]) | |
| df = pd.concat([train_df, test_df], ignore_index=True) | |
| df.rename(columns={"text": "review"}, inplace=True) | |
| return df[["review", "label"]] | |
| def preprocess_dataframe(df: pd.DataFrame, lemmatize: bool = False) -> pd.DataFrame: | |
| """Apply full preprocessing pipeline to a dataframe.""" | |
| print("🔄 Cleaning text...") | |
| df = df.copy() | |
| df["clean_text"] = df["review"].apply(lambda x: clean_text(x, lemmatize=lemmatize)) | |
| print("✅ Text cleaned.") | |
| return df | |
| def split_data(df: pd.DataFrame, test_size: float = 0.1, val_size: float = 0.1, | |
| random_state: int = 42): | |
| """Stratified train / val / test split.""" | |
| X = df["clean_text"].values | |
| y = df["label"].values | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=test_size, stratify=y, random_state=random_state | |
| ) | |
| val_ratio = val_size / (1 - test_size) | |
| X_train, X_val, y_train, y_val = train_test_split( | |
| X_train, y_train, test_size=val_ratio, stratify=y_train, random_state=random_state | |
| ) | |
| print(f"📊 Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}") | |
| return (X_train, y_train), (X_val, y_val), (X_test, y_test) | |
| if __name__ == "__main__": | |
| import os | |
| csv_path = "data/raw/IMDB Dataset.csv" | |
| if os.path.exists(csv_path): | |
| df = load_imdb_from_csv(csv_path) | |
| else: | |
| print("⚠️ CSV not found, falling back to HuggingFace...") | |
| df = load_imdb_from_huggingface() | |
| df = preprocess_dataframe(df) | |
| os.makedirs("data/processed", exist_ok=True) | |
| df.to_csv("data/processed/imdb_cleaned.csv", index=False) | |
| print("💾 Saved to data/processed/imdb_cleaned.csv") | |