nikraf's picture
Upload folder using huggingface_hub
714cf46 verified
import entrypoint_setup
import argparse
import copy
import random
import time
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
from tqdm.auto import tqdm
from transformers import AutoModelForMaskedLM
def set_seed(seed: int):
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
SUPPORTED_BACKENDS = ("sdpa", "flex", "kernels_flash")
class ThroughputChecker:
def __init__(
self,
warmup_batches: int = 10,
timed_batches: int = 100,
):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.warmup_batches = warmup_batches
self.timed_batches = timed_batches
self.canonical_amino_acids = "ACDEFGHIKLMNPQRSTVWY"
def _load_model(self, model_path: str):
model = AutoModelForMaskedLM.from_pretrained(
model_path,
dtype=torch.bfloat16,
device_map=self.device,
trust_remote_code=True,
).eval()
return model
def _generate_random_sequence(self, length: int) -> str:
return "M" + "".join(random.choices(self.canonical_amino_acids, k=length - 1))
def _generate_random_batch(self, batch_size: int, min_length: int, max_length: int) -> list[str]:
max_length_example = self._generate_random_sequence(max_length)
return [max_length_example] + [
self._generate_random_sequence(random.randint(min_length, max_length))
for _ in range(batch_size - 1)
]
@torch.inference_mode()
def _time(self, model, tokenizer, batch_size: int, min_length: int, max_length: int):
model = model.to(self.device).eval()
set_seed(42)
min_dynamic_warmup_batches = self.warmup_batches
max_dynamic_warmup_batches = self.warmup_batches * 10
stability_window = 3
relative_stability_tolerance = 0.10
def synchronize():
if self.device.type == "cuda":
torch.cuda.synchronize()
def run_one_batch() -> int:
batch = self._generate_random_batch(batch_size, min_length, max_length)
tokenized = tokenizer(
batch,
return_tensors="pt",
padding="max_length",
max_length=max_length,
truncation=True,
add_special_tokens=True,
)
input_ids = tokenized["input_ids"]
if "attention_mask" in tokenized:
nonpad_tokens_this = tokenized["attention_mask"].sum().item()
else:
pad_token_id = tokenizer.pad_token_id
if pad_token_id is not None:
nonpad_tokens_this = (input_ids != pad_token_id).sum().item()
else:
nonpad_tokens_this = input_ids.numel()
tokenized = {k: v.to(self.device) for k, v in tokenized.items()}
_ = model(**tokenized, output_hidden_states=True)
return nonpad_tokens_this
def time_batches(num_batches: int, message: str):
processed_tokens = 0
synchronize()
start_time = time.time()
for _ in tqdm(range(num_batches), desc=message, leave=False):
processed_tokens += run_one_batch()
synchronize()
end_time = time.time()
return end_time - start_time, processed_tokens
# Compile first, then keep warming up until the compiled path stabilizes.
model = torch.compile(model)
warmup_latencies = []
for warmup_idx in tqdm(range(max_dynamic_warmup_batches), desc="Warmup (dynamic)", leave=False):
synchronize()
warmup_start = time.time()
_ = run_one_batch()
synchronize()
warmup_latency = time.time() - warmup_start
warmup_latencies.append(warmup_latency)
if warmup_idx + 1 < min_dynamic_warmup_batches:
continue
if len(warmup_latencies) < 2 * stability_window:
continue
previous_window = warmup_latencies[-2 * stability_window:-stability_window]
current_window = warmup_latencies[-stability_window:]
previous_mean = sum(previous_window) / stability_window
current_mean = sum(current_window) / stability_window
assert previous_mean > 0.0, "Warmup latency mean should be positive."
relative_change = abs(current_mean - previous_mean) / previous_mean
if relative_change <= relative_stability_tolerance:
break
time_taken, timed_tokens_sum = time_batches(self.timed_batches, "Timed")
if self.device.type == "cuda":
torch.cuda.empty_cache()
return time_taken, timed_tokens_sum
def evaluate(self, model_path: str, batch_sizes: list[int], min_length: int, sequence_lengths: list[int], backends: list[str]):
results = {backend: {} for backend in backends}
original_model = self._load_model(model_path)
tokenizer = original_model.tokenizer
for backend in backends:
print(f"Benchmarking {model_path} with backend={backend}")
try:
backend_model = copy.deepcopy(original_model)
backend_model.attn_backend = backend
except AssertionError as error:
print(f"Skipping backend '{backend}' for {model_path}: {error}")
continue
for bs in batch_sizes:
for max_length in sequence_lengths:
model_copy = copy.deepcopy(backend_model)
time_taken, tokens = self._time(
model_copy,
tokenizer,
bs,
min_length,
max_length,
)
results[backend][(bs, max_length)] = {"time": time_taken, "tokens": tokens}
original_model.cpu()
del original_model
if self.device.type == "cuda":
torch.cuda.empty_cache()
return results
def plot_results(all_results: dict, output_path: str):
sns.set_theme(style="whitegrid")
plot_data = []
for model_path, results in all_results.items():
model_name = Path(model_path).name
for backend in sorted(results.keys()):
for (bs, max_length), entry in results[backend].items():
time_taken = entry["time"]
nonpad_tokens = entry["tokens"]
tokens_per_sec = nonpad_tokens / time_taken if time_taken > 0 else 0.0
plot_data.append(
{
"Model": model_name,
"Backend": backend,
"Batch": bs,
"SeqLen": max_length,
"TokensPerSec": tokens_per_sec,
"NonPadTokens": nonpad_tokens,
"Seconds": time_taken,
}
)
if not plot_data:
return
plot_df = pd.DataFrame(plot_data)
sequence_lengths = sorted(plot_df["SeqLen"].dropna().unique().tolist())
plot = sns.relplot(
data=plot_df,
x="SeqLen",
y="TokensPerSec",
hue="Backend",
style="Batch",
kind="line",
marker="o",
dashes=False,
col="Model",
col_wrap=1,
height=4.5,
aspect=1.5,
facet_kws={"sharey": False},
)
plot.set_titles("{col_name}")
plot.set(xticks=sequence_lengths)
plot.set_axis_labels("Sequence length", "Non-pad tokens/s")
plot.figure.suptitle("Throughput comparison by model")
plot.tight_layout()
plot.figure.subplots_adjust(top=0.93, right=0.95, bottom=0.06)
plot.add_legend(title="Backend / Batch")
plt.savefig(output_path, dpi=300)
print(f"Results saved to {output_path}")
if __name__ == "__main__":
# On Windows, use "%cd%" instead of "${PWD}" to get the current working directory:
# docker run --gpus all -v "%cd%":/workspace fastplms python -m testing.throughput
# On Linux/macOS, keep using ${PWD}:
# docker run --gpus all -v ${PWD}:/workspace fastplms python -m testing.throughput
parser = argparse.ArgumentParser()
parser.add_argument("--hf_token", type=str, default=None)
parser.add_argument(
"--model_paths",
nargs="+",
default=["Synthyra/ESM2-8M", "Synthyra/ESMplusplus_small"],
)
parser.add_argument("--batch_sizes", nargs="+", type=int, default=[2, 4, 8])
parser.add_argument("--sequence_lengths", nargs="+", type=int, default=[64, 128, 256, 512, 1024, 2048])
parser.add_argument("--backends", nargs="+", choices=SUPPORTED_BACKENDS, default=list(SUPPORTED_BACKENDS))
parser.add_argument("--min_length", type=int, default=32)
parser.add_argument("--warmup_batches", type=int, default=10)
parser.add_argument("--timed_batches", type=int, default=100)
parser.add_argument("--output_path", type=str, default="throughput_comparison.png")
args = parser.parse_args()
if args.hf_token:
from huggingface_hub import login
login(token=args.hf_token)
checker = ThroughputChecker(warmup_batches=args.warmup_batches, timed_batches=args.timed_batches)
all_results = {}
for model_path in args.model_paths:
all_results[model_path] = checker.evaluate(
model_path,
args.batch_sizes,
min_length=args.min_length,
sequence_lengths=args.sequence_lengths,
backends=args.backends,
)
plot_results(all_results, args.output_path)