codeSentry / codesentry-backend /tools /benchmark_tool.py
YashashviAlva's picture
Initial commit for HF Spaces deploy
7b4f5dd
"""
GPU memory estimation and benchmark utilities.
Provides before/after estimates for ML code optimisations.
"""
from __future__ import annotations
import re
import time
from typing import Dict, List, Optional
# ──────────────────────────────────────────────
# Memory constants (approximate, in MB)
# ──────────────────────────────────────────────
DTYPE_BYTES: Dict[str, float] = {
"float32": 4.0,
"float16": 2.0,
"bfloat16": 2.0,
"int8": 1.0,
"int4": 0.5,
}
MODEL_SIZE_PARAMS: Dict[str, int] = {
"7b": 7_000_000_000,
"13b": 13_000_000_000,
"32b": 32_000_000_000,
"70b": 70_000_000_000,
"72b": 72_000_000_000,
}
def estimate_model_vram_mb(params: int, dtype: str = "float16") -> float:
"""Estimate VRAM (MB) required for a model given its parameter count and dtype."""
bytes_per_param = DTYPE_BYTES.get(dtype, 2.0)
return (params * bytes_per_param) / (1024 ** 2)
def estimate_activation_vram_mb(batch_size: int, seq_len: int, hidden_size: int, dtype: str = "float16") -> float:
"""Rough VRAM estimate for activations during inference."""
bytes_per_param = DTYPE_BYTES.get(dtype, 2.0)
# Approximate: batch * seq * hidden * ~12 layers worth of activations
activation_elements = batch_size * seq_len * hidden_size * 12
return (activation_elements * bytes_per_param) / (1024 ** 2)
def calculate_fp32_to_fp16_saving(vram_mb: float) -> float:
"""Saving in MB from switching from FP32 β†’ FP16."""
return vram_mb / 2.0
# ──────────────────────────────────────────────
# Code analysis heuristics
# ──────────────────────────────────────────────
def detect_dtype_from_code(code: str) -> str:
"""Detect the dtype being used in code via regex heuristics."""
if re.search(r"torch\.float32|\.float\(\)", code):
return "float32"
if re.search(r"torch\.float16|fp16", code, re.IGNORECASE):
return "float16"
if re.search(r"torch\.bfloat16|bf16", code, re.IGNORECASE):
return "bfloat16"
return "float16" # modern default
def detect_model_size_from_code(code: str) -> Optional[int]:
"""Try to detect model parameter count from code strings."""
for label, count in MODEL_SIZE_PARAMS.items():
if label in code.lower():
return count
return None
def detect_batch_size(code: str) -> int:
"""Extract batch size from code heuristics."""
match = re.search(r"batch_size\s*=\s*(\d+)", code)
if match:
return int(match.group(1))
return 1 # conservative default
def detect_seq_length(code: str) -> int:
"""Extract sequence length from code heuristics."""
match = re.search(r"max_length\s*=\s*(\d+)|max_tokens\s*=\s*(\d+)|seq_len\s*=\s*(\d+)", code)
if match:
return int(next(g for g in match.groups() if g is not None))
return 512 # safe default
# ──────────────────────────────────────────────
# Optimisation analysis
# ──────────────────────────────────────────────
def analyse_memory_optimisations(code: str) -> List[Dict]:
"""
Scan code and return a list of memory optimisation opportunities
with before/after estimates.
"""
findings: List[Dict] = []
dtype = detect_dtype_from_code(code)
params = detect_model_size_from_code(code)
# FP32 β†’ FP16 opportunity
if dtype == "float32" and params:
current_mb = estimate_model_vram_mb(params, "float32")
optimised_mb = estimate_model_vram_mb(params, "float16")
saving = current_mb - optimised_mb
findings.append({
"type": "gpu_memory",
"title": "Switch from FP32 to FP16/BF16",
"current_estimate": f"{current_mb:.0f} MB",
"optimized_estimate": f"{optimised_mb:.0f} MB",
"saving_mb": saving,
"saving": f"{saving:.0f} MB ({saving / current_mb * 100:.0f}% reduction)",
"code_fix": "# Change: model.float() β†’ model.half() OR torch_dtype=torch.bfloat16",
})
# Missing no_grad
inference_fns = re.findall(
r"def\s+(predict|infer|inference|generate|run_model)\s*\(", code
)
no_grad_present = bool(re.search(r"@torch\.no_grad|with torch\.no_grad", code))
if inference_fns and not no_grad_present:
findings.append({
"type": "gpu_memory",
"title": "Missing @torch.no_grad() on inference path",
"current_estimate": "2x gradient memory overhead",
"optimized_estimate": "Gradient tensors freed immediately",
"saving_mb": 512.0, # conservative estimate
"saving": "~512 MB (eliminates gradient buffers)",
"code_fix": "@torch.no_grad()\ndef predict(...):",
})
# Missing empty_cache
if re.search(r"\.cuda\(\)|\.to\(['\"]cuda", code) and not re.search(r"empty_cache", code):
findings.append({
"type": "gpu_memory",
"title": "Missing torch.cuda.empty_cache() after batch inference",
"current_estimate": "Fragmented VRAM accumulates between requests",
"optimized_estimate": "VRAM returned to pool after each batch",
"saving_mb": 256.0,
"saving": "~256 MB per batch cycle",
"code_fix": "torch.cuda.empty_cache() # Add after inference loop",
})
# N+1 embedding calls
if re.search(r"for .+ in .+:\s*\n.*(embed|encode)\(", code, re.DOTALL):
findings.append({
"type": "throughput",
"title": "N+1 Embedding Calls β€” Should Batch",
"current_estimate": "1 GPU kernel launch per item",
"optimized_estimate": "1 GPU kernel launch per batch",
"saving_mb": 0.0,
"saving": "Up to 50x latency reduction",
"code_fix": "embeddings = model.encode(all_texts, batch_size=32) # Batch all at once",
})
return findings
# ──────────────────────────────────────────────
# Benchmark runner
# ──────────────────────────────────────────────
class BenchmarkResult:
def __init__(self) -> None:
self.start_time: float = 0.0
self.end_time: float = 0.0
self.ttff_seconds: float = 0.0 # time to first finding
self.total_seconds: float = 0.0
self.tokens_processed: int = 0
self.findings_count: int = 0
@property
def tokens_per_second(self) -> float:
if self.total_seconds > 0 and self.tokens_processed > 0:
return self.tokens_processed / self.total_seconds
return 0.0
def to_dict(self) -> Dict:
return {
"ttff_seconds": round(self.ttff_seconds, 3),
"total_analysis_seconds": round(self.total_seconds, 3),
"tokens_processed": self.tokens_processed,
"tokens_per_second": round(self.tokens_per_second, 1),
"findings_count": self.findings_count,
}
def start_benchmark() -> BenchmarkResult:
result = BenchmarkResult()
result.start_time = time.perf_counter()
return result
def record_first_finding(result: BenchmarkResult) -> None:
if result.ttff_seconds == 0.0:
result.ttff_seconds = time.perf_counter() - result.start_time
def finish_benchmark(result: BenchmarkResult, tokens: int = 0, findings: int = 0) -> BenchmarkResult:
result.end_time = time.perf_counter()
result.total_seconds = result.end_time - result.start_time
result.tokens_processed = tokens
result.findings_count = findings
return result