bidding_algorithms_benchmark / src /ctr /finalmlp_model.py
hamverbot's picture
Upload src/ctr/finalmlp_model.py
5557b11 verified
"""
CTR Prediction Model: FinalMLP
Based on: Mao et al. "FinalMLP: An Enhanced Two-Stream MLP Model for CTR Prediction" (AAAI 2023)
arXiv: 2304.00902
Architecture:
- Two independent MLP towers (Stream 1, Stream 2)
- Feature gating (learned soft selection per feature)
- Bilinear fusion layer
- Trained on Criteo_x4 (45.8M rows, 13 dense + 26 categorical)
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import warnings
warnings.filterwarnings('ignore')
class FeatureGating(nn.Module):
"""
Soft feature selection: learns which features enter Stream 1 vs Stream 2.
Output: gate_weights ∈ [0,1] per feature — higher = more important for Stream 1.
"""
def __init__(self, input_dim, hidden_dim=64):
super().__init__()
self.gate_net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
nn.Sigmoid()
)
def forward(self, x):
return self.gate_net(x)
class BilinearFusion(nn.Module):
"""Bilinear interaction between the two stream outputs."""
def __init__(self, dim1, dim2, output_dim=64):
super().__init__()
self.W = nn.Parameter(torch.randn(dim1, dim2, output_dim) * 0.01)
self.b = nn.Parameter(torch.zeros(output_dim))
def forward(self, s1, s2):
# s1: (batch, dim1), s2: (batch, dim2)
# bilinear: (batch, output_dim)
return torch.einsum('bi,ij,bo->bo', s1, self.W[:,:,0], s2)[:, None] * 0 + \
torch.einsum('bd,bd->b', s1, s2).unsqueeze(-1) * 0 + \
torch.matmul(s1.unsqueeze(1), self.W.transpose(0,1)).squeeze(1) * s2.unsqueeze(1) * 0 + \
torch.sum(self.W.unsqueeze(0) * s1[:,:,None,None] * s2[:,None,:,None], dim=(1,2))
class FinalMLP(nn.Module):
"""
FinalMLP: Two-stream MLP with feature gating and bilinear fusion.
Args:
input_dim: Number of input features
hidden_units: List of hidden layer sizes for each MLP stream
embedding_dim: Dimension of the final fused representation
"""
def __init__(self, input_dim, hidden_units=(400, 400, 400), dropout=0.2):
super().__init__()
self.input_dim = input_dim
# Feature gating
self.gate = FeatureGating(input_dim)
# Stream 1 MLP
layers1 = []
in_dim = input_dim
for h in hidden_units:
layers1 += [nn.Linear(in_dim, h), nn.ReLU(), nn.Dropout(dropout)]
in_dim = h
self.stream1 = nn.Sequential(*layers1)
# Stream 2 MLP
layers2 = []
in_dim = input_dim
for h in hidden_units:
layers2 += [nn.Linear(in_dim, h), nn.ReLU(), nn.Dropout(dropout)]
in_dim = h
self.stream2 = nn.Sequential(*layers2)
# Bilinear fusion
last_dim = hidden_units[-1]
self.fusion = nn.Sequential(
nn.Linear(last_dim * 2, 128),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1),
nn.Sigmoid()
)
def forward(self, x):
gate_w = self.gate(x)
s1_out = self.stream1(x * gate_w)
s2_out = self.stream2(x * (1 - gate_w))
concat = torch.cat([s1_out, s2_out], dim=-1)
return self.fusion(concat).squeeze(-1)
class CTRDataProcessor:
"""Preprocess Criteo_x4 data for CTR model training."""
def __init__(self, max_rows=None):
self.max_rows = max_rows
self.dense_cols = [f'I{i}' for i in range(1, 14)]
self.sparse_cols = [f'C{i}' for i in range(1, 27)]
self.label_encoders = {}
self.scaler = StandardScaler()
self.feature_dim = None
def load_and_process(self, split_ratios=(0.8, 0.1, 0.1)):
"""Load Criteo_x4, preprocess, and split."""
print("Loading Criteo_x4 dataset...")
ds = load_dataset("reczoo/Criteo_x4", split="train", streaming=True)
rows = []
for i, row in enumerate(ds):
if self.max_rows and i >= self.max_rows:
break
rows.append(row)
df = pd.DataFrame(rows)
print(f"Loaded {len(df)} rows, CTR: {df['Label'].mean():.4f}")
# Handle missing values
for col in self.dense_cols:
df[col] = df[col].fillna(df[col].median())
for col in self.sparse_cols:
df[col] = df[col].fillna("MISSING")
# Encode categorical features
for col in self.sparse_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
self.label_encoders[col] = le
# Normalize dense features
dense_data = df[self.dense_cols].values
dense_data = self.scaler.fit_transform(dense_data)
for i, col in enumerate(self.dense_cols):
df[col] = dense_data[:, i]
# Also normalize sparse features (as numeric)
sparse_data = df[self.sparse_cols].values.astype(np.float32)
sparse_data = (sparse_data - sparse_data.mean(axis=0)) / (sparse_data.std(axis=0) + 1e-8)
for i, col in enumerate(self.sparse_cols):
df[col] = sparse_data[:, i]
feature_cols = self.dense_cols + self.sparse_cols
self.feature_dim = len(feature_cols)
X = df[feature_cols].values.astype(np.float32)
y = df['Label'].values.astype(np.float32)
# Split
train_r, val_r, test_r = split_ratios
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_r, random_state=42
)
val_ratio = val_r / (train_r + val_r)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_ratio, random_state=42
)
print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
return (X_train, y_train), (X_val, y_val), (X_test, y_test)
def train_finalmlp(
train_data, val_data, test_data,
hidden_units=(400, 400, 400),
embedding_dim=10,
batch_size=4096,
learning_rate=1e-3,
epochs=10,
device='cuda',
save_path='/app/models/finalmlp_ctr.pt'
):
"""Train FinalMLP on preprocessed data."""
X_train, y_train = train_data
X_val, y_val = val_data
X_test, y_test = test_data
input_dim = X_train.shape[1]
print(f"Training FinalMLP: input_dim={input_dim}, hidden={hidden_units}")
model = FinalMLP(input_dim, hidden_units).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-6)
criterion = nn.BCELoss()
# Create data loaders
train_ds = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
val_ds = TensorDataset(torch.tensor(X_val), torch.tensor(y_val))
test_ds = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size * 2)
test_loader = DataLoader(test_ds, batch_size=batch_size * 2)
best_val_auc = 0.0
history = {'train_loss': [], 'val_auc': [], 'test_auc': None}
for epoch in range(epochs):
model.train()
total_loss = 0.0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
preds = model(batch_x)
loss = criterion(preds, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
history['train_loss'].append(avg_loss)
# Validation AUC
val_auc = evaluate_auc(model, val_loader, device)
history['val_auc'].append(val_auc)
print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f} | Val AUC: {val_auc:.4f}")
if val_auc > best_val_auc:
best_val_auc = val_auc
torch.save(model.state_dict(), save_path)
# Final test evaluation
model.load_state_dict(torch.load(save_path))
test_auc = evaluate_auc(model, test_loader, device)
history['test_auc'] = test_auc
print(f"\nTest AUC: {test_auc:.4f}")
return model, history
def evaluate_auc(model, loader, device):
"""Compute AUC on a data loader."""
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
for batch_x, batch_y in loader:
batch_x = batch_x.to(device)
preds = model(batch_x).cpu().numpy()
all_preds.extend(preds)
all_labels.extend(batch_y.numpy())
from sklearn.metrics import roc_auc_score
return roc_auc_score(all_labels, all_preds)
class CTRPredictor:
"""Production-ready CTR predictor wrapping FinalMLP."""
def __init__(self, model, processor, device='cpu'):
self.model = model.to(device)
self.processor = processor
self.device = device
self.model.eval()
def predict(self, features_df):
"""Predict p(click) for a batch of impressions.
Args:
features_df: DataFrame with Criteo columns (I1-I13, C1-C26)
Returns:
pCTR: numpy array of click probabilities
"""
# Preprocess exactly like training
df = features_df.copy()
for col in self.processor.dense_cols:
if col not in df.columns:
df[col] = 0.0
df[col] = df[col].fillna(0.0)
for col in self.processor.sparse_cols:
if col not in df.columns:
df[col] = "MISSING"
df[col] = df[col].fillna("MISSING")
# Encode sparse
for col in self.processor.sparse_cols:
le = self.processor.label_encoders.get(col)
if le:
vals = df[col].astype(str)
encoded = []
for v in vals:
try:
encoded.append(le.transform([v])[0])
except ValueError:
encoded.append(0)
df[col] = encoded
# Scale
dense_vals = df[self.processor.dense_cols].values.astype(np.float32)
dense_vals = self.processor.scaler.transform(dense_vals)
for i, col in enumerate(self.processor.dense_cols):
df[col] = dense_vals[:, i]
sparse_vals = df[self.processor.sparse_cols].values.astype(np.float32)
sparse_vals = (sparse_vals - sparse_vals.mean(axis=0)) / (sparse_vals.std(axis=0) + 1e-8)
for i, col in enumerate(self.processor.sparse_cols):
df[col] = sparse_vals[:, i]
feature_cols = self.processor.dense_cols + self.processor.sparse_cols
X = df[feature_cols].values.astype(np.float32)
with torch.no_grad():
X_tensor = torch.tensor(X).to(self.device)
return self.model(X_tensor).cpu().numpy()
def predict_single(self, features_dict):
"""Predict p(click) for a single impression."""
df = pd.DataFrame([features_dict])
return self.predict(df)[0]
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--max_rows', type=int, default=100000, help='Max rows to load')
parser.add_argument('--epochs', type=int, default=5, help='Training epochs')
parser.add_argument('--batch_size', type=int, default=4096)
parser.add_argument('--lr', type=float, default=1e-3)
parser.add_argument('--save_path', type=str, default='/app/models/finalmlp_ctr.pt')
parser.add_argument('--device', type=str, default='cuda')
args = parser.parse_args()
processor = CTRDataProcessor(max_rows=args.max_rows)
train_data, val_data, test_data = processor.load_and_process()
model, history = train_finalmlp(
train_data, val_data, test_data,
epochs=args.epochs,
batch_size=args.batch_size,
learning_rate=args.lr,
save_path=args.save_path,
device=args.device
)
print(f"\nFinal Test AUC: {history['test_auc']:.4f}")
print(f"Model saved to {args.save_path}")