codeSentry / codesentry-backend /tests /test_performance_agent.py
YashashviAlva's picture
Initial commit for HF Spaces deploy
7b4f5dd
"""
Tests for PerformanceAgent β€” static scan only (no LLM / GPU required).
"""
from __future__ import annotations
import pathlib
import pytest
from agents.performance_agent import PerformanceAgent
from api.models import OptimizationType
from tools.code_parser import FileEntry
FIXTURES_DIR = pathlib.Path(__file__).parent / "fixtures"
# ──────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────
@pytest.fixture(scope="module")
def vulnerable_code() -> str:
return (FIXTURES_DIR / "vulnerable_ml_code.py").read_text(encoding="utf-8")
@pytest.fixture(scope="module")
def clean_code() -> str:
return (FIXTURES_DIR / "clean_ml_code.py").read_text(encoding="utf-8")
@pytest.fixture(scope="module")
def agent() -> PerformanceAgent:
return PerformanceAgent()
@pytest.fixture(scope="module")
def vulnerable_files(vulnerable_code: str) -> list[FileEntry]:
return [("vulnerable_ml_code.py", vulnerable_code)]
@pytest.fixture(scope="module")
def perf_findings(agent: PerformanceAgent, vulnerable_files: list[FileEntry]):
return agent.static_scan(vulnerable_files)
# ──────────────────────────────────────────
# Inline test code snippets
# ──────────────────────────────────────────
GPU_LEAK_CODE = '''
import torch
model = load_model().cuda()
def infer(text):
inputs = tokenizer(text, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs)
# Tensor never moved to CPU or deleted β€” memory leak
return outputs
'''
N_PLUS_ONE_CODE = '''
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
documents = ["doc1", "doc2", "doc3"]
embeddings = []
for doc in documents:
emb = model.encode(doc)
embeddings.append(emb)
'''
FP32_CODE = '''
import torch
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
torch_dtype=torch.float32,
)
'''
NO_GRAD_CODE = '''
import torch
model = load_model()
def predict(text):
inputs = tokenizer(text, return_tensors="pt")
outputs = model(inputs)
return outputs.logits.argmax()
'''
BATCHED_CODE = '''
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
documents = ["doc1", "doc2", "doc3"]
# Correct: batch all at once
embeddings = model.encode(documents, batch_size=32)
'''
# ──────────────────────────────────────────
# Tests
# ──────────────────────────────────────────
class TestGPUMemoryLeakDetection:
def test_detects_gpu_memory_leak(self, agent: PerformanceAgent):
"""Should detect GPU tensor with no corresponding .cpu() or del."""
files: list[FileEntry] = [("test_leak.py", GPU_LEAK_CODE)]
findings = agent.static_scan(files)
gpu_findings = [
f for f in findings
if f.type == OptimizationType.gpu_memory
]
assert len(gpu_findings) > 0, "Expected GPU memory finding for tensor not moved to CPU"
def test_no_leak_with_empty_cache(self, agent: PerformanceAgent):
"""Code that calls empty_cache should produce fewer GPU memory warnings."""
clean_gpu_code = GPU_LEAK_CODE + "\ntorch.cuda.empty_cache()\n"
files: list[FileEntry] = [("clean_gpu.py", clean_gpu_code)]
findings = agent.static_scan(files)
# Should have fewer findings because empty_cache is present
without_cache = agent.static_scan([("test.py", GPU_LEAK_CODE)])
assert len(findings) <= len(without_cache)
class TestNPlusOneEmbeddings:
def test_detects_n_plus_one_embeddings(self, agent: PerformanceAgent):
"""Should detect encode() called inside a for-loop."""
files: list[FileEntry] = [("n_plus_one.py", N_PLUS_ONE_CODE)]
findings = agent.static_scan(files)
throughput_findings = [
f for f in findings
if f.type == OptimizationType.throughput
or "n+1" in f.title.lower()
or "loop" in f.title.lower()
or "batch" in f.suggestion.lower()
]
assert len(throughput_findings) > 0, (
"Expected throughput finding for N+1 embedding calls"
)
def test_no_n_plus_one_for_batch_code(self, agent: PerformanceAgent):
"""Correctly batched embeddings should not be flagged."""
files: list[FileEntry] = [("batched.py", BATCHED_CODE)]
findings = agent.static_scan(files)
n_plus_one_findings = [
f for f in findings
if "n+1" in f.title.lower()
]
assert len(n_plus_one_findings) == 0, "Batched code should not flag N+1"
class TestFP32Inefficiency:
def test_detects_fp32_inefficiency(self, agent: PerformanceAgent):
"""Should detect torch.float32 / .float() usage."""
files: list[FileEntry] = [("fp32_code.py", FP32_CODE)]
findings = agent.static_scan(files)
fp32_findings = [
f for f in findings
if "fp32" in f.title.lower()
or "float32" in f.title.lower()
or "bf16" in f.title.lower()
]
assert len(fp32_findings) > 0, "Expected FP32 inefficiency finding"
def test_fp32_finding_type_is_gpu_memory(self, agent: PerformanceAgent):
files: list[FileEntry] = [("fp32_code.py", FP32_CODE)]
findings = agent.static_scan(files)
fp32_findings = [
f for f in findings
if "fp32" in f.title.lower() or "float32" in f.title.lower()
]
if fp32_findings:
assert fp32_findings[0].type == OptimizationType.gpu_memory
class TestMemorySavingsEstimate:
def test_estimates_memory_savings(self, perf_findings):
"""At least one finding should report a positive savings_mb value."""
savings = [f.saving_mb for f in perf_findings if f.saving_mb and f.saving_mb > 0]
assert len(savings) > 0, (
"Expected at least one finding with savings_mb > 0"
)
def test_total_savings_positive(self, perf_findings):
total = sum(f.saving_mb or 0 for f in perf_findings)
assert total > 0, "Total estimated savings should be > 0 MB"
class TestMissingNoGrad:
def test_detects_missing_no_grad(self, agent: PerformanceAgent):
"""Should detect inference function missing @torch.no_grad."""
files: list[FileEntry] = [("no_grad.py", NO_GRAD_CODE)]
findings = agent.static_scan(files)
no_grad_findings = [
f for f in findings
if "no_grad" in f.title.lower()
or "gradient" in f.suggestion.lower()
]
assert len(no_grad_findings) > 0, "Expected finding for missing @torch.no_grad"
class TestFindingSchema:
def test_all_performance_findings_have_required_fields(self, perf_findings):
for i, finding in enumerate(perf_findings):
assert finding.type is not None, f"Finding {i} missing type"
assert finding.title, f"Finding {i} missing title"
assert finding.suggestion, f"Finding {i} missing suggestion"
def test_vulnerable_code_has_performance_findings(self, perf_findings):
assert len(perf_findings) > 0, (
"PerformanceAgent.static_scan() returned no findings for vulnerable code"
)