Spaces:
Running
Running
File size: 8,173 Bytes
7b4f5dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
GPU memory estimation and benchmark utilities.
Provides before/after estimates for ML code optimisations.
"""
from __future__ import annotations
import re
import time
from typing import Dict, List, Optional
# ββββββββββββββββββββββββββββββββββββββββββββββ
# Memory constants (approximate, in MB)
# ββββββββββββββββββββββββββββββββββββββββββββββ
DTYPE_BYTES: Dict[str, float] = {
"float32": 4.0,
"float16": 2.0,
"bfloat16": 2.0,
"int8": 1.0,
"int4": 0.5,
}
MODEL_SIZE_PARAMS: Dict[str, int] = {
"7b": 7_000_000_000,
"13b": 13_000_000_000,
"32b": 32_000_000_000,
"70b": 70_000_000_000,
"72b": 72_000_000_000,
}
def estimate_model_vram_mb(params: int, dtype: str = "float16") -> float:
"""Estimate VRAM (MB) required for a model given its parameter count and dtype."""
bytes_per_param = DTYPE_BYTES.get(dtype, 2.0)
return (params * bytes_per_param) / (1024 ** 2)
def estimate_activation_vram_mb(batch_size: int, seq_len: int, hidden_size: int, dtype: str = "float16") -> float:
"""Rough VRAM estimate for activations during inference."""
bytes_per_param = DTYPE_BYTES.get(dtype, 2.0)
# Approximate: batch * seq * hidden * ~12 layers worth of activations
activation_elements = batch_size * seq_len * hidden_size * 12
return (activation_elements * bytes_per_param) / (1024 ** 2)
def calculate_fp32_to_fp16_saving(vram_mb: float) -> float:
"""Saving in MB from switching from FP32 β FP16."""
return vram_mb / 2.0
# ββββββββββββββββββββββββββββββββββββββββββββββ
# Code analysis heuristics
# ββββββββββββββββββββββββββββββββββββββββββββββ
def detect_dtype_from_code(code: str) -> str:
"""Detect the dtype being used in code via regex heuristics."""
if re.search(r"torch\.float32|\.float\(\)", code):
return "float32"
if re.search(r"torch\.float16|fp16", code, re.IGNORECASE):
return "float16"
if re.search(r"torch\.bfloat16|bf16", code, re.IGNORECASE):
return "bfloat16"
return "float16" # modern default
def detect_model_size_from_code(code: str) -> Optional[int]:
"""Try to detect model parameter count from code strings."""
for label, count in MODEL_SIZE_PARAMS.items():
if label in code.lower():
return count
return None
def detect_batch_size(code: str) -> int:
"""Extract batch size from code heuristics."""
match = re.search(r"batch_size\s*=\s*(\d+)", code)
if match:
return int(match.group(1))
return 1 # conservative default
def detect_seq_length(code: str) -> int:
"""Extract sequence length from code heuristics."""
match = re.search(r"max_length\s*=\s*(\d+)|max_tokens\s*=\s*(\d+)|seq_len\s*=\s*(\d+)", code)
if match:
return int(next(g for g in match.groups() if g is not None))
return 512 # safe default
# ββββββββββββββββββββββββββββββββββββββββββββββ
# Optimisation analysis
# ββββββββββββββββββββββββββββββββββββββββββββββ
def analyse_memory_optimisations(code: str) -> List[Dict]:
"""
Scan code and return a list of memory optimisation opportunities
with before/after estimates.
"""
findings: List[Dict] = []
dtype = detect_dtype_from_code(code)
params = detect_model_size_from_code(code)
# FP32 β FP16 opportunity
if dtype == "float32" and params:
current_mb = estimate_model_vram_mb(params, "float32")
optimised_mb = estimate_model_vram_mb(params, "float16")
saving = current_mb - optimised_mb
findings.append({
"type": "gpu_memory",
"title": "Switch from FP32 to FP16/BF16",
"current_estimate": f"{current_mb:.0f} MB",
"optimized_estimate": f"{optimised_mb:.0f} MB",
"saving_mb": saving,
"saving": f"{saving:.0f} MB ({saving / current_mb * 100:.0f}% reduction)",
"code_fix": "# Change: model.float() β model.half() OR torch_dtype=torch.bfloat16",
})
# Missing no_grad
inference_fns = re.findall(
r"def\s+(predict|infer|inference|generate|run_model)\s*\(", code
)
no_grad_present = bool(re.search(r"@torch\.no_grad|with torch\.no_grad", code))
if inference_fns and not no_grad_present:
findings.append({
"type": "gpu_memory",
"title": "Missing @torch.no_grad() on inference path",
"current_estimate": "2x gradient memory overhead",
"optimized_estimate": "Gradient tensors freed immediately",
"saving_mb": 512.0, # conservative estimate
"saving": "~512 MB (eliminates gradient buffers)",
"code_fix": "@torch.no_grad()\ndef predict(...):",
})
# Missing empty_cache
if re.search(r"\.cuda\(\)|\.to\(['\"]cuda", code) and not re.search(r"empty_cache", code):
findings.append({
"type": "gpu_memory",
"title": "Missing torch.cuda.empty_cache() after batch inference",
"current_estimate": "Fragmented VRAM accumulates between requests",
"optimized_estimate": "VRAM returned to pool after each batch",
"saving_mb": 256.0,
"saving": "~256 MB per batch cycle",
"code_fix": "torch.cuda.empty_cache() # Add after inference loop",
})
# N+1 embedding calls
if re.search(r"for .+ in .+:\s*\n.*(embed|encode)\(", code, re.DOTALL):
findings.append({
"type": "throughput",
"title": "N+1 Embedding Calls β Should Batch",
"current_estimate": "1 GPU kernel launch per item",
"optimized_estimate": "1 GPU kernel launch per batch",
"saving_mb": 0.0,
"saving": "Up to 50x latency reduction",
"code_fix": "embeddings = model.encode(all_texts, batch_size=32) # Batch all at once",
})
return findings
# ββββββββββββββββββββββββββββββββββββββββββββββ
# Benchmark runner
# ββββββββββββββββββββββββββββββββββββββββββββββ
class BenchmarkResult:
def __init__(self) -> None:
self.start_time: float = 0.0
self.end_time: float = 0.0
self.ttff_seconds: float = 0.0 # time to first finding
self.total_seconds: float = 0.0
self.tokens_processed: int = 0
self.findings_count: int = 0
@property
def tokens_per_second(self) -> float:
if self.total_seconds > 0 and self.tokens_processed > 0:
return self.tokens_processed / self.total_seconds
return 0.0
def to_dict(self) -> Dict:
return {
"ttff_seconds": round(self.ttff_seconds, 3),
"total_analysis_seconds": round(self.total_seconds, 3),
"tokens_processed": self.tokens_processed,
"tokens_per_second": round(self.tokens_per_second, 1),
"findings_count": self.findings_count,
}
def start_benchmark() -> BenchmarkResult:
result = BenchmarkResult()
result.start_time = time.perf_counter()
return result
def record_first_finding(result: BenchmarkResult) -> None:
if result.ttff_seconds == 0.0:
result.ttff_seconds = time.perf_counter() - result.start_time
def finish_benchmark(result: BenchmarkResult, tokens: int = 0, findings: int = 0) -> BenchmarkResult:
result.end_time = time.perf_counter()
result.total_seconds = result.end_time - result.start_time
result.tokens_processed = tokens
result.findings_count = findings
return result
|