File size: 14,227 Bytes
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43efb12
 
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
Performance Agent β€” GPU memory, latency and ROCm optimisation analyser.
Identifies ML-specific inefficiencies in code running on AMD MI300X.
"""
from __future__ import annotations

import json
import logging
import re
from typing import Any, AsyncGenerator, Dict, List, Optional

from openai import AsyncOpenAI

from api.models import PerformanceFinding, OptimizationType
from tools.code_parser import FileEntry, build_context_block
from tools.benchmark_tool import analyse_memory_optimisations

logger = logging.getLogger(__name__)

PERFORMANCE_SYSTEM_PROMPT = """You are CodeSentry Performance Agent β€” an AMD ROCm GPU performance engineer specialising in ML systems.

Analyse the provided code for performance issues specific to AI/ML workloads on AMD MI300X (192 GB HBM3).

## Check these categories (MANDATORY):

### GPU Memory Issues:
- Tensors allocated on GPU never moved back to CPU or deleted β†’ VRAM leak
- Missing torch.cuda.empty_cache() / hip.device_synchronize() after batch inference
- Model loaded in float32 when float16/bfloat16 suffices β†’ 2x VRAM waste
- Gradient tracking enabled during inference (missing @torch.no_grad or torch.inference_mode)
- KV cache not bounded β†’ unbounded context growth

### Latency Issues:
- Model weights loaded inside per-request handler (should be singleton loaded at startup)
- Synchronous blocking calls inside async endpoints
- Tokenizer instantiated per-request instead of pre-loaded
- Missing torch.compile() for repeated inference patterns

### Throughput Issues:
- N+1 embedding calls: embed() called in a loop instead of batching all inputs
- Sequential agent calls that could be parallelised
- Missing continuous batching configuration in vLLM serving
- Single-worker serving when tensor parallelism is available

### ROCm/AMD-Specific:
- Using CUDA-only APIs not available on ROCm (use HIP equivalents)
- Missing HIP_VISIBLE_DEVICES environment configuration
- Not using Flash Attention 2 compatible with ROCm
- Memory bandwidth not maximised (FP8 quantisation available on MI300X)

## Output Format (STRICT JSON ARRAY):
[
  {
    "type": "gpu_memory|latency|throughput",
    "title": "Short descriptive title",
    "current_estimate": "Description of current resource usage",
    "optimized_estimate": "Description after fix",
    "saving_mb": <float MB saved or 0>,
    "saving": "Human-readable saving description",
    "suggestion": "Detailed explanation of the issue",
    "code_fix": "Concrete code fix or snippet",
    "line_number": <integer or null>,
    "file_path": "<filename or null>"
  }
]

Return ONLY the JSON array. If no issues found, return: []
"""


class PerformanceAgent:
    def __init__(
        self,
        vllm_base_url: str = "http://localhost:8080/v1",
        model: str = "Qwen/Qwen2.5-Coder-32B-Instruct",
        api_key: str = "not-needed-local",
        max_tokens: int = 3072,
        temperature: float = 0.05,
    ) -> None:
        self.model = model
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.client = AsyncOpenAI(
            base_url=vllm_base_url,
            api_key=api_key,
            timeout=60.0,
            max_retries=1,
        )

    # ─────────────────────────────────────────
    # Static heuristic scan (no LLM)
    # ─────────────────────────────────────────

    def static_scan(self, files: List[FileEntry]) -> List[PerformanceFinding]:
        """Regex-based performance heuristics across all files."""
        findings: List[PerformanceFinding] = []

        for file_path, code in files:
            heuristic_results = analyse_memory_optimisations(code)
            for r in heuristic_results:
                try:
                    opt_type = OptimizationType(r["type"])
                except ValueError:
                    opt_type = OptimizationType.gpu_memory

                findings.append(
                    PerformanceFinding(
                        type=opt_type,
                        title=f"[Static] {r['title']}",
                        current_estimate=r.get("current_estimate"),
                        optimized_estimate=r.get("optimized_estimate"),
                        saving_mb=r.get("saving_mb", 0.0),
                        saving=r.get("saving"),
                        description=r.get("suggestion", ""),
                        suggestion=r.get("code_fix", ""),
                        file=file_path,
                    )
                )

            # Additional per-file checks
            findings.extend(self._check_model_loading_in_handler(code, file_path))
            findings.extend(self._check_n_plus_one_loop(code, file_path))
            findings.extend(self._check_fp32_usage(code, file_path))

        return findings

    def _check_model_loading_in_handler(self, code: str, file_path: str) -> List[PerformanceFinding]:
        """Detect model loading inside route/request handlers."""
        results: List[PerformanceFinding] = []
        # Find route decorators followed by from_pretrained within ~20 lines
        lines = code.splitlines()
        in_handler = False
        handler_start = 0
        for i, line in enumerate(lines):
            stripped = line.strip()
            if re.match(r"@(app|router)\.(get|post|put|delete|patch)", stripped):
                in_handler = True
                handler_start = i + 1
            if in_handler and re.search(r"from_pretrained|AutoModel|AutoTokenizer", stripped):
                if i - handler_start < 25:
                    results.append(
                        PerformanceFinding(
                            type=OptimizationType.latency,
                            title="[Static] Model loaded inside request handler",
                            current_estimate="Model weights loaded on every request (~10-30s cold start)",
                            optimized_estimate="Model singleton pre-loaded at startup (<1ms per request)",
                            saving_mb=0.0,
                            saving="Eliminates per-request load latency",
                            description="Model loaded once at startup using a global singleton or lifespan event.",
                            suggestion=(
                                "# At module level:\n"
                                "model = AutoModel.from_pretrained(...)\n\n"
                                "# In handler: use the pre-loaded `model`"
                            ),
                            line=i + 1,
                            file=file_path,
                        )
                    )
                in_handler = False
        return results

    def _check_n_plus_one_loop(self, code: str, file_path: str) -> List[PerformanceFinding]:
        """Detect embedding/encode calls inside for loops."""
        results: List[PerformanceFinding] = []
        lines = code.splitlines()
        for i, line in enumerate(lines):
            if re.match(r"\s*for\s+\w+\s+in\s+", line):
                # Check next 5 lines for embed/encode calls
                lookahead = "\n".join(lines[i + 1 : i + 6])
                if re.search(r"\.(embed|encode|get_embedding)\(", lookahead):
                    results.append(
                        PerformanceFinding(
                            type=OptimizationType.throughput,
                            title="[Static] N+1 embedding calls in loop",
                            current_estimate="1 GPU kernel launch per item",
                            optimized_estimate="1 GPU kernel launch for all items",
                            saving_mb=0.0,
                            saving="Up to 50x throughput improvement",
                            description=(
                                "Embedding model called inside a loop. "
                                "Collect all inputs first, then batch-encode in one call."
                            ),
                            suggestion=(
                                "# Instead of:\n"
                                "for text in texts:\n"
                                "    emb = model.encode(text)\n\n"
                                "# Use:\n"
                                "embeddings = model.encode(texts, batch_size=32)"
                            ),
                            line=i + 1,
                            file=file_path,
                        )
                    )
        return results

    def _check_fp32_usage(self, code: str, file_path: str) -> List[PerformanceFinding]:
        """Flag explicit float32 usage where bfloat16 would suffice."""
        results: List[PerformanceFinding] = []
        lines = code.splitlines()
        for i, line in enumerate(lines):
            if re.search(r"torch\.float32|torch_dtype\s*=\s*torch\.float32|\.float\(\)", line):
                if not re.search(r"#.*noqa|#.*keep-fp32", line, re.IGNORECASE):
                    results.append(
                        PerformanceFinding(
                            type=OptimizationType.gpu_memory,
                            title="[Static] FP32 dtype β€” should use BF16",
                            current_estimate="4 bytes/param (float32)",
                            optimized_estimate="2 bytes/param (bfloat16) β€” 50% VRAM saving",
                            saving_mb=None,
                            saving="~50% VRAM reduction on MI300X",
                            description="AMD MI300X supports bfloat16 natively with no accuracy loss for inference.",
                            suggestion=(
                                "# Replace:\n"
                                "model = model.float()\n"
                                "# With:\n"
                                "model = model.to(torch.bfloat16)  # or torch_dtype=torch.bfloat16"
                            ),
                            line=i + 1,
                            file=file_path,
                        )
                    )
        return results

    # ─────────────────────────────────────────
    # LLM analysis
    # ─────────────────────────────────────────

    async def llm_scan(self, code_context: str) -> List[PerformanceFinding]:
        """Deep LLM-based performance analysis."""
        user_message = (
            "Analyse the following codebase for GPU memory, latency, and throughput issues "
            "on AMD MI300X hardware:\n\n"
            f"```\n{code_context}\n```\n\n"
            "Return ONLY the JSON array of performance findings."
        )
        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": PERFORMANCE_SYSTEM_PROMPT},
                    {"role": "user", "content": user_message},
                ],
                max_tokens=self.max_tokens,
                temperature=self.temperature,
            )
            raw = response.choices[0].message.content or "[]"
            return self._parse_llm_response(raw)
        except Exception as exc:
            logger.error("[PerformanceAgent] LLM call failed: %s", exc)
            return []

    async def analyze(
        self,
        files: List[FileEntry],
        code_context: str,
        use_llm: bool = True,
    ) -> List[PerformanceFinding]:
        """Full pipeline: static heuristics + LLM deep analysis."""
        static = self.static_scan(files)
        logger.info("[PerformanceAgent] Static scan: %d findings", len(static))

        if not use_llm:
            return static

        llm_findings = await self.llm_scan(code_context)
        logger.info("[PerformanceAgent] LLM scan: %d findings", len(llm_findings))

        # Merge: deduplicate by title
        llm_titles = {f.title for f in llm_findings}
        merged = list(llm_findings)
        for f in static:
            clean_title = f.title.replace("[Static] ", "")
            if clean_title not in llm_titles:
                merged.append(f)

        return merged

    # ─────────────────────────────────────────
    # Helpers
    # ─────────────────────────────────────────

    def _parse_llm_response(self, raw: str) -> List[PerformanceFinding]:
        raw = re.sub(r"```(?:json)?\s*", "", raw).strip().rstrip("`").strip()
        start, end = raw.find("["), raw.rfind("]") + 1
        if start == -1 or end == 0:
            return []
        try:
            data: List[Dict] = json.loads(raw[start:end])
        except json.JSONDecodeError:
            return []

        findings: List[PerformanceFinding] = []
        for item in data:
            try:
                opt_type_str = item.get("type", "gpu_memory")
                try:
                    opt_type = OptimizationType(opt_type_str)
                except ValueError:
                    opt_type = OptimizationType.gpu_memory

                findings.append(
                    PerformanceFinding(
                        type=opt_type,
                        title=item.get("title", "Unknown"),
                        current_estimate=item.get("current_estimate"),
                        optimized_estimate=item.get("optimized_estimate"),
                        saving_mb=item.get("saving_mb"),
                        saving=item.get("saving"),
                        description=item.get("suggestion", ""),
                        suggestion=item.get("code_fix"),
                        line=item.get("line_number"),
                        file=item.get("file_path"),
                        code=item.get("code_snippet"),
                    )
                )
            except Exception as e:
                logger.debug("[PerformanceAgent] Skipping malformed finding: %s", e)
        return findings