Spaces:
Sleeping
Sleeping
| """ | |
| bert_model.py β Fine-Tuning BERT for Sentiment Analysis (HuggingFace Transformers) | |
| """ | |
| import os | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import Dataset, DataLoader | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| get_linear_schedule_with_warmup, | |
| ) | |
| from torch.optim import AdamW | |
| from sklearn.utils.class_weight import compute_class_weight | |
| from tqdm import tqdm | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| BERT_CONFIG = { | |
| "model_name": "bert-base-uncased", | |
| "max_len": 512, | |
| "batch_size": 16, # Reduce to 8 if OOM | |
| "epochs": 3, | |
| "lr": 2e-5, | |
| "warmup_ratio": 0.06, | |
| "weight_decay": 0.01, | |
| "num_labels": 2, | |
| "unfreeze_layers": 4, # Fine-tune last N encoder layers | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dataset | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BERTSentimentDataset(Dataset): | |
| def __init__(self, texts, labels, tokenizer, max_len: int = 512): | |
| self.texts = texts | |
| self.labels = labels | |
| self.tokenizer = tokenizer | |
| self.max_len = max_len | |
| def __len__(self): | |
| return len(self.texts) | |
| def __getitem__(self, idx): | |
| encoding = self.tokenizer( | |
| self.texts[idx], | |
| max_length=self.max_len, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| return { | |
| "input_ids": encoding["input_ids"].squeeze(0), | |
| "attention_mask": encoding["attention_mask"].squeeze(0), | |
| "label": torch.tensor(self.labels[idx], dtype=torch.long), | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Model Setup | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_bert_model(config: dict): | |
| """Load pre-trained BERT and freeze early layers.""" | |
| print(f"π₯ Loading {config['model_name']}...") | |
| tokenizer = AutoTokenizer.from_pretrained(config["model_name"]) | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| config["model_name"], | |
| num_labels=config["num_labels"], | |
| ) | |
| # Freeze all layers first | |
| for param in model.bert.parameters(): | |
| param.requires_grad = False | |
| # Unfreeze last N encoder layers + pooler | |
| encoder_layers = model.bert.encoder.layer | |
| for layer in encoder_layers[-config["unfreeze_layers"]:]: | |
| for param in layer.parameters(): | |
| param.requires_grad = True | |
| for param in model.bert.pooler.parameters(): | |
| param.requires_grad = True | |
| n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| n_total = sum(p.numel() for p in model.parameters()) | |
| print(f"βοΈ Trainable params: {n_trainable:,} / {n_total:,}") | |
| return tokenizer, model | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Training Loop | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def train_bert(X_train, y_train, X_val, y_val, | |
| config: dict = None, save_path: str = "models/bert_finetuned"): | |
| """ | |
| Fine-tune BERT on sentiment data with: | |
| - Weighted cross-entropy loss (class imbalance) | |
| - Linear warmup schedule | |
| - Gradient clipping | |
| - Best model checkpointing | |
| """ | |
| if config is None: | |
| config = BERT_CONFIG | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"\n{'='*60}\nπ€ Fine-Tuning BERT | Device: {device}\n{'='*60}") | |
| tokenizer, model = build_bert_model(config) | |
| model = model.to(device) | |
| # Datasets | |
| train_ds = BERTSentimentDataset(X_train, y_train, tokenizer, config["max_len"]) | |
| val_ds = BERTSentimentDataset(X_val, y_val, tokenizer, config["max_len"]) | |
| train_loader = DataLoader(train_ds, batch_size=config["batch_size"], | |
| shuffle=True, num_workers=0, pin_memory=True) | |
| val_loader = DataLoader(val_ds, batch_size=config["batch_size"] * 2, | |
| shuffle=False, num_workers=0) | |
| # Class weights for imbalanced data | |
| class_weights = compute_class_weight("balanced", classes=np.unique(y_train), | |
| y=y_train) | |
| class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device) | |
| criterion = nn.CrossEntropyLoss(weight=class_weights) | |
| # Optimizer & Scheduler | |
| optimizer = AdamW( | |
| [p for p in model.parameters() if p.requires_grad], | |
| lr=config["lr"], | |
| weight_decay=config["weight_decay"], | |
| eps=1e-8, | |
| ) | |
| total_steps = len(train_loader) * config["epochs"] | |
| warmup_steps = int(total_steps * config["warmup_ratio"]) | |
| scheduler = get_linear_schedule_with_warmup( | |
| optimizer, num_warmup_steps=warmup_steps, | |
| num_training_steps=total_steps | |
| ) | |
| best_val_f1 = 0.0 | |
| history = {"train_loss": [], "val_loss": [], "val_acc": [], "val_f1": []} | |
| for epoch in range(1, config["epochs"] + 1): | |
| # ββ Train ββ | |
| model.train() | |
| total_loss, n = 0.0, 0 | |
| pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{config['epochs']} [Train]", | |
| leave=False) | |
| for batch in pbar: | |
| input_ids = batch["input_ids"].to(device) | |
| attention_mask = batch["attention_mask"].to(device) | |
| labels = batch["label"].to(device) | |
| optimizer.zero_grad() | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| loss = criterion(outputs.logits, labels) | |
| loss.backward() | |
| nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| scheduler.step() | |
| total_loss += loss.item() * len(labels) | |
| n += len(labels) | |
| pbar.set_postfix({"loss": f"{loss.item():.4f}"}) | |
| avg_train_loss = total_loss / n | |
| # ββ Validate ββ | |
| model.eval() | |
| val_loss, val_correct, val_n = 0.0, 0, 0 | |
| all_preds, all_labels, all_proba = [], [], [] | |
| with torch.no_grad(): | |
| for batch in val_loader: | |
| input_ids = batch["input_ids"].to(device) | |
| attention_mask = batch["attention_mask"].to(device) | |
| labels = batch["label"].to(device) | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| loss = criterion(outputs.logits, labels) | |
| proba = torch.softmax(outputs.logits, dim=-1)[:, 1] | |
| val_loss += loss.item() * len(labels) | |
| preds = outputs.logits.argmax(dim=-1) | |
| val_correct += (preds == labels).sum().item() | |
| val_n += len(labels) | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(labels.cpu().numpy()) | |
| all_proba.extend(proba.cpu().numpy()) | |
| avg_val_loss = val_loss / val_n | |
| val_acc = val_correct / val_n | |
| from sklearn.metrics import f1_score | |
| val_f1 = f1_score(all_labels, all_preds) | |
| history["train_loss"].append(avg_train_loss) | |
| history["val_loss"].append(avg_val_loss) | |
| history["val_acc"].append(val_acc) | |
| history["val_f1"].append(val_f1) | |
| print(f"\nEpoch {epoch:02d}/{config['epochs']} | " | |
| f"Train Loss: {avg_train_loss:.4f} | " | |
| f"Val Loss: {avg_val_loss:.4f} | " | |
| f"Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}") | |
| if val_f1 > best_val_f1: | |
| best_val_f1 = val_f1 | |
| os.makedirs(save_path, exist_ok=True) | |
| model.save_pretrained(save_path) | |
| tokenizer.save_pretrained(save_path) | |
| print(f" β Best model saved (F1={best_val_f1:.4f}) β {save_path}") | |
| return model, tokenizer, history | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Inference | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_bert_model(model_path: str = "models/bert_finetuned"): | |
| """Load fine-tuned BERT from disk.""" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_path) | |
| model.eval() | |
| return tokenizer, model | |
| def predict_bert(model, tokenizer, texts: list, max_len: int = 512, | |
| batch_size: int = 32) -> tuple: | |
| """Run inference on a list of raw texts.""" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = model.to(device) | |
| model.eval() | |
| all_preds, all_proba = [], [] | |
| for i in range(0, len(texts), batch_size): | |
| batch_texts = texts[i: i + batch_size] | |
| encoding = tokenizer(batch_texts, max_length=max_len, padding=True, | |
| truncation=True, return_tensors="pt") | |
| input_ids = encoding["input_ids"].to(device) | |
| attention_mask = encoding["attention_mask"].to(device) | |
| with torch.no_grad(): | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| proba = torch.softmax(outputs.logits, dim=-1)[:, 1].cpu().numpy() | |
| preds = (proba >= 0.5).astype(int) | |
| all_proba.extend(proba) | |
| all_preds.extend(preds) | |
| return np.array(all_preds), np.array(all_proba) | |
| if __name__ == "__main__": | |
| import sys | |
| sys.path.insert(0, "src") | |
| from preprocess import load_imdb_from_huggingface, preprocess_dataframe, split_data | |
| from evaluate import evaluate_model | |
| df = load_imdb_from_huggingface() | |
| # For BERT we use minimal cleaning (keep punctuation, casing matters less) | |
| df["clean_text"] = df["review"].str.replace(r"<[^>]+>", " ", regex=True).str.strip() | |
| (X_train, y_train), (X_val, y_val), (X_test, y_test) = split_data(df) | |
| model, tokenizer, history = train_bert(X_train, y_train, X_val, y_val) | |
| # Reload best & test | |
| tokenizer, model = load_bert_model() | |
| y_pred, y_proba = predict_bert(model, tokenizer, list(X_test)) | |
| print("\nπ TEST SET RESULTS:") | |
| evaluate_model(y_test, y_pred, y_proba, model_name="BERT (bert-base-uncased)", | |
| split="test", save_plots=True) | |