Sentiment-Analysis / src /bert_model.py
najahaja's picture
Upload 26 files
c247f12 verified
"""
bert_model.py β€” Fine-Tuning BERT for Sentiment Analysis (HuggingFace Transformers)
"""
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
get_linear_schedule_with_warmup,
)
from torch.optim import AdamW
from sklearn.utils.class_weight import compute_class_weight
from tqdm import tqdm
# ──────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────
BERT_CONFIG = {
"model_name": "bert-base-uncased",
"max_len": 512,
"batch_size": 16, # Reduce to 8 if OOM
"epochs": 3,
"lr": 2e-5,
"warmup_ratio": 0.06,
"weight_decay": 0.01,
"num_labels": 2,
"unfreeze_layers": 4, # Fine-tune last N encoder layers
}
# ──────────────────────────────────────────────
# Dataset
# ──────────────────────────────────────────────
class BERTSentimentDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_len: int = 512):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
encoding = self.tokenizer(
self.texts[idx],
max_length=self.max_len,
padding="max_length",
truncation=True,
return_tensors="pt",
)
return {
"input_ids": encoding["input_ids"].squeeze(0),
"attention_mask": encoding["attention_mask"].squeeze(0),
"label": torch.tensor(self.labels[idx], dtype=torch.long),
}
# ──────────────────────────────────────────────
# Model Setup
# ──────────────────────────────────────────────
def build_bert_model(config: dict):
"""Load pre-trained BERT and freeze early layers."""
print(f"πŸ“₯ Loading {config['model_name']}...")
tokenizer = AutoTokenizer.from_pretrained(config["model_name"])
model = AutoModelForSequenceClassification.from_pretrained(
config["model_name"],
num_labels=config["num_labels"],
)
# Freeze all layers first
for param in model.bert.parameters():
param.requires_grad = False
# Unfreeze last N encoder layers + pooler
encoder_layers = model.bert.encoder.layer
for layer in encoder_layers[-config["unfreeze_layers"]:]:
for param in layer.parameters():
param.requires_grad = True
for param in model.bert.pooler.parameters():
param.requires_grad = True
n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
n_total = sum(p.numel() for p in model.parameters())
print(f"βš™οΈ Trainable params: {n_trainable:,} / {n_total:,}")
return tokenizer, model
# ──────────────────────────────────────────────
# Training Loop
# ──────────────────────────────────────────────
def train_bert(X_train, y_train, X_val, y_val,
config: dict = None, save_path: str = "models/bert_finetuned"):
"""
Fine-tune BERT on sentiment data with:
- Weighted cross-entropy loss (class imbalance)
- Linear warmup schedule
- Gradient clipping
- Best model checkpointing
"""
if config is None:
config = BERT_CONFIG
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n{'='*60}\nπŸ€— Fine-Tuning BERT | Device: {device}\n{'='*60}")
tokenizer, model = build_bert_model(config)
model = model.to(device)
# Datasets
train_ds = BERTSentimentDataset(X_train, y_train, tokenizer, config["max_len"])
val_ds = BERTSentimentDataset(X_val, y_val, tokenizer, config["max_len"])
train_loader = DataLoader(train_ds, batch_size=config["batch_size"],
shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=config["batch_size"] * 2,
shuffle=False, num_workers=0)
# Class weights for imbalanced data
class_weights = compute_class_weight("balanced", classes=np.unique(y_train),
y=y_train)
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
# Optimizer & Scheduler
optimizer = AdamW(
[p for p in model.parameters() if p.requires_grad],
lr=config["lr"],
weight_decay=config["weight_decay"],
eps=1e-8,
)
total_steps = len(train_loader) * config["epochs"]
warmup_steps = int(total_steps * config["warmup_ratio"])
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=warmup_steps,
num_training_steps=total_steps
)
best_val_f1 = 0.0
history = {"train_loss": [], "val_loss": [], "val_acc": [], "val_f1": []}
for epoch in range(1, config["epochs"] + 1):
# ── Train ──
model.train()
total_loss, n = 0.0, 0
pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{config['epochs']} [Train]",
leave=False)
for batch in pbar:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["label"].to(device)
optimizer.zero_grad()
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
loss = criterion(outputs.logits, labels)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
scheduler.step()
total_loss += loss.item() * len(labels)
n += len(labels)
pbar.set_postfix({"loss": f"{loss.item():.4f}"})
avg_train_loss = total_loss / n
# ── Validate ──
model.eval()
val_loss, val_correct, val_n = 0.0, 0, 0
all_preds, all_labels, all_proba = [], [], []
with torch.no_grad():
for batch in val_loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["label"].to(device)
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
loss = criterion(outputs.logits, labels)
proba = torch.softmax(outputs.logits, dim=-1)[:, 1]
val_loss += loss.item() * len(labels)
preds = outputs.logits.argmax(dim=-1)
val_correct += (preds == labels).sum().item()
val_n += len(labels)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_proba.extend(proba.cpu().numpy())
avg_val_loss = val_loss / val_n
val_acc = val_correct / val_n
from sklearn.metrics import f1_score
val_f1 = f1_score(all_labels, all_preds)
history["train_loss"].append(avg_train_loss)
history["val_loss"].append(avg_val_loss)
history["val_acc"].append(val_acc)
history["val_f1"].append(val_f1)
print(f"\nEpoch {epoch:02d}/{config['epochs']} | "
f"Train Loss: {avg_train_loss:.4f} | "
f"Val Loss: {avg_val_loss:.4f} | "
f"Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}")
if val_f1 > best_val_f1:
best_val_f1 = val_f1
os.makedirs(save_path, exist_ok=True)
model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)
print(f" βœ… Best model saved (F1={best_val_f1:.4f}) β†’ {save_path}")
return model, tokenizer, history
# ──────────────────────────────────────────────
# Inference
# ──────────────────────────────────────────────
def load_bert_model(model_path: str = "models/bert_finetuned"):
"""Load fine-tuned BERT from disk."""
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
model.eval()
return tokenizer, model
def predict_bert(model, tokenizer, texts: list, max_len: int = 512,
batch_size: int = 32) -> tuple:
"""Run inference on a list of raw texts."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()
all_preds, all_proba = [], []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i: i + batch_size]
encoding = tokenizer(batch_texts, max_length=max_len, padding=True,
truncation=True, return_tensors="pt")
input_ids = encoding["input_ids"].to(device)
attention_mask = encoding["attention_mask"].to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
proba = torch.softmax(outputs.logits, dim=-1)[:, 1].cpu().numpy()
preds = (proba >= 0.5).astype(int)
all_proba.extend(proba)
all_preds.extend(preds)
return np.array(all_preds), np.array(all_proba)
if __name__ == "__main__":
import sys
sys.path.insert(0, "src")
from preprocess import load_imdb_from_huggingface, preprocess_dataframe, split_data
from evaluate import evaluate_model
df = load_imdb_from_huggingface()
# For BERT we use minimal cleaning (keep punctuation, casing matters less)
df["clean_text"] = df["review"].str.replace(r"<[^>]+>", " ", regex=True).str.strip()
(X_train, y_train), (X_val, y_val), (X_test, y_test) = split_data(df)
model, tokenizer, history = train_bert(X_train, y_train, X_val, y_val)
# Reload best & test
tokenizer, model = load_bert_model()
y_pred, y_proba = predict_bert(model, tokenizer, list(X_test))
print("\nπŸ“Š TEST SET RESULTS:")
evaluate_model(y_test, y_pred, y_proba, model_name="BERT (bert-base-uncased)",
split="test", save_plots=True)