""" bert_model.py — Fine-Tuning BERT for Sentiment Analysis (HuggingFace Transformers) """ import os import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, get_linear_schedule_with_warmup, ) from torch.optim import AdamW from sklearn.utils.class_weight import compute_class_weight from tqdm import tqdm # ────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────── BERT_CONFIG = { "model_name": "bert-base-uncased", "max_len": 512, "batch_size": 16, # Reduce to 8 if OOM "epochs": 3, "lr": 2e-5, "warmup_ratio": 0.06, "weight_decay": 0.01, "num_labels": 2, "unfreeze_layers": 4, # Fine-tune last N encoder layers } # ────────────────────────────────────────────── # Dataset # ────────────────────────────────────────────── class BERTSentimentDataset(Dataset): def __init__(self, texts, labels, tokenizer, max_len: int = 512): self.texts = texts self.labels = labels self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.texts) def __getitem__(self, idx): encoding = self.tokenizer( self.texts[idx], max_length=self.max_len, padding="max_length", truncation=True, return_tensors="pt", ) return { "input_ids": encoding["input_ids"].squeeze(0), "attention_mask": encoding["attention_mask"].squeeze(0), "label": torch.tensor(self.labels[idx], dtype=torch.long), } # ────────────────────────────────────────────── # Model Setup # ────────────────────────────────────────────── def build_bert_model(config: dict): """Load pre-trained BERT and freeze early layers.""" print(f"📥 Loading {config['model_name']}...") tokenizer = AutoTokenizer.from_pretrained(config["model_name"]) model = AutoModelForSequenceClassification.from_pretrained( config["model_name"], num_labels=config["num_labels"], ) # Freeze all layers first for param in model.bert.parameters(): param.requires_grad = False # Unfreeze last N encoder layers + pooler encoder_layers = model.bert.encoder.layer for layer in encoder_layers[-config["unfreeze_layers"]:]: for param in layer.parameters(): param.requires_grad = True for param in model.bert.pooler.parameters(): param.requires_grad = True n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) n_total = sum(p.numel() for p in model.parameters()) print(f"⚙️ Trainable params: {n_trainable:,} / {n_total:,}") return tokenizer, model # ────────────────────────────────────────────── # Training Loop # ────────────────────────────────────────────── def train_bert(X_train, y_train, X_val, y_val, config: dict = None, save_path: str = "models/bert_finetuned"): """ Fine-tune BERT on sentiment data with: - Weighted cross-entropy loss (class imbalance) - Linear warmup schedule - Gradient clipping - Best model checkpointing """ if config is None: config = BERT_CONFIG device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"\n{'='*60}\n🤗 Fine-Tuning BERT | Device: {device}\n{'='*60}") tokenizer, model = build_bert_model(config) model = model.to(device) # Datasets train_ds = BERTSentimentDataset(X_train, y_train, tokenizer, config["max_len"]) val_ds = BERTSentimentDataset(X_val, y_val, tokenizer, config["max_len"]) train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True, num_workers=0, pin_memory=True) val_loader = DataLoader(val_ds, batch_size=config["batch_size"] * 2, shuffle=False, num_workers=0) # Class weights for imbalanced data class_weights = compute_class_weight("balanced", classes=np.unique(y_train), y=y_train) class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device) criterion = nn.CrossEntropyLoss(weight=class_weights) # Optimizer & Scheduler optimizer = AdamW( [p for p in model.parameters() if p.requires_grad], lr=config["lr"], weight_decay=config["weight_decay"], eps=1e-8, ) total_steps = len(train_loader) * config["epochs"] warmup_steps = int(total_steps * config["warmup_ratio"]) scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_steps ) best_val_f1 = 0.0 history = {"train_loss": [], "val_loss": [], "val_acc": [], "val_f1": []} for epoch in range(1, config["epochs"] + 1): # ── Train ── model.train() total_loss, n = 0.0, 0 pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{config['epochs']} [Train]", leave=False) for batch in pbar: input_ids = batch["input_ids"].to(device) attention_mask = batch["attention_mask"].to(device) labels = batch["label"].to(device) optimizer.zero_grad() outputs = model(input_ids=input_ids, attention_mask=attention_mask) loss = criterion(outputs.logits, labels) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() scheduler.step() total_loss += loss.item() * len(labels) n += len(labels) pbar.set_postfix({"loss": f"{loss.item():.4f}"}) avg_train_loss = total_loss / n # ── Validate ── model.eval() val_loss, val_correct, val_n = 0.0, 0, 0 all_preds, all_labels, all_proba = [], [], [] with torch.no_grad(): for batch in val_loader: input_ids = batch["input_ids"].to(device) attention_mask = batch["attention_mask"].to(device) labels = batch["label"].to(device) outputs = model(input_ids=input_ids, attention_mask=attention_mask) loss = criterion(outputs.logits, labels) proba = torch.softmax(outputs.logits, dim=-1)[:, 1] val_loss += loss.item() * len(labels) preds = outputs.logits.argmax(dim=-1) val_correct += (preds == labels).sum().item() val_n += len(labels) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_proba.extend(proba.cpu().numpy()) avg_val_loss = val_loss / val_n val_acc = val_correct / val_n from sklearn.metrics import f1_score val_f1 = f1_score(all_labels, all_preds) history["train_loss"].append(avg_train_loss) history["val_loss"].append(avg_val_loss) history["val_acc"].append(val_acc) history["val_f1"].append(val_f1) print(f"\nEpoch {epoch:02d}/{config['epochs']} | " f"Train Loss: {avg_train_loss:.4f} | " f"Val Loss: {avg_val_loss:.4f} | " f"Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}") if val_f1 > best_val_f1: best_val_f1 = val_f1 os.makedirs(save_path, exist_ok=True) model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) print(f" ✅ Best model saved (F1={best_val_f1:.4f}) → {save_path}") return model, tokenizer, history # ────────────────────────────────────────────── # Inference # ────────────────────────────────────────────── def load_bert_model(model_path: str = "models/bert_finetuned"): """Load fine-tuned BERT from disk.""" tokenizer = AutoTokenizer.from_pretrained(model_path) model = AutoModelForSequenceClassification.from_pretrained(model_path) model.eval() return tokenizer, model def predict_bert(model, tokenizer, texts: list, max_len: int = 512, batch_size: int = 32) -> tuple: """Run inference on a list of raw texts.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) model.eval() all_preds, all_proba = [], [] for i in range(0, len(texts), batch_size): batch_texts = texts[i: i + batch_size] encoding = tokenizer(batch_texts, max_length=max_len, padding=True, truncation=True, return_tensors="pt") input_ids = encoding["input_ids"].to(device) attention_mask = encoding["attention_mask"].to(device) with torch.no_grad(): outputs = model(input_ids=input_ids, attention_mask=attention_mask) proba = torch.softmax(outputs.logits, dim=-1)[:, 1].cpu().numpy() preds = (proba >= 0.5).astype(int) all_proba.extend(proba) all_preds.extend(preds) return np.array(all_preds), np.array(all_proba) if __name__ == "__main__": import sys sys.path.insert(0, "src") from preprocess import load_imdb_from_huggingface, preprocess_dataframe, split_data from evaluate import evaluate_model df = load_imdb_from_huggingface() # For BERT we use minimal cleaning (keep punctuation, casing matters less) df["clean_text"] = df["review"].str.replace(r"<[^>]+>", " ", regex=True).str.strip() (X_train, y_train), (X_val, y_val), (X_test, y_test) = split_data(df) model, tokenizer, history = train_bert(X_train, y_train, X_val, y_val) # Reload best & test tokenizer, model = load_bert_model() y_pred, y_proba = predict_bert(model, tokenizer, list(X_test)) print("\n📊 TEST SET RESULTS:") evaluate_model(y_test, y_pred, y_proba, model_name="BERT (bert-base-uncased)", split="test", save_plots=True)