Spaces:
Running
Running
| """ | |
| Deliberately vulnerable ML code for testing CodeSentry's detection capabilities. | |
| Contains: | |
| - Prompt injection (LLM01) | |
| - Insecure output handling / eval (LLM02) | |
| - Hardcoded HuggingFace token (LLM06 / A07) | |
| - Insecure pickle deserialization (A04 / CWE-502) | |
| - GPU tensor never moved to CPU (memory leak) | |
| - N+1 embedding calls in loop | |
| - FP32 when FP16 would suffice | |
| - Missing @torch.no_grad on inference | |
| - Model loaded inside request handler | |
| - SQL injection in RAG query | |
| - Debug mode enabled | |
| """ | |
| import os | |
| import pickle | |
| import sqlite3 | |
| from flask import Flask, request, jsonify | |
| app = Flask(__name__) | |
| app.config["DEBUG"] = True # A05: Security Misconfiguration | |
| # ββ A07 / LLM06: Hardcoded API key ββββββββββββββββββββββββββ | |
| HF_TOKEN = "hf_abcXYZabcXYZabcXYZabcXYZabcXYZ12" | |
| OPENAI_API_KEY = "sk-proj-aaaabbbbccccddddeeeeffffgggghhhhiiiijjjj" | |
| # ββ Database (for RAG demo) ββββββββββββββββββββββββββββββββββ | |
| DB_PATH = "knowledge.db" | |
| def get_db(): | |
| return sqlite3.connect(DB_PATH) | |
| # ββ A04 / CWE-502: Insecure pickle deserialization ββββββββββ | |
| def load_model(): | |
| """Loads a model from a user-supplied file path β insecure!""" | |
| model_path = request.json.get("model_path") | |
| # VULNERABILITY: pickle.load from untrusted user-controlled path | |
| with open(model_path, "rb") as f: | |
| model = pickle.load(f) # noqa: S301 β CWE-502 | |
| return jsonify({"status": "loaded"}) | |
| # ββ LLM01: Prompt Injection ββββββββββββββββββββββββββββββββββ | |
| def generate(): | |
| """Chat endpoint that directly concatenates user input into the prompt.""" | |
| user_input = request.json.get("message", "") | |
| # VULNERABILITY: user input concatenated directly β prompt injection | |
| prompt = f"You are a helpful assistant. User says: {user_input}" | |
| # Model loaded INSIDE handler on every request (performance issue) | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained("gpt2", token=HF_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| "gpt2", | |
| token=HF_TOKEN, | |
| torch_dtype=torch.float32, # ML04: FP32 wastes 2x VRAM | |
| ) | |
| # ML02: Missing @torch.no_grad β gradients computed unnecessarily | |
| inputs = tokenizer(prompt, return_tensors="pt").to("cuda") | |
| outputs = model.generate(**inputs, max_new_tokens=200) | |
| # Tensor stays on GPU β memory leak (ML01) | |
| result = tokenizer.decode(outputs[0]) | |
| # LLM02: LLM output piped directly to eval() | |
| eval(result) # noqa: S307 β EXTREMELY DANGEROUS | |
| return jsonify({"result": result}) | |
| # ββ A03: SQL Injection in RAG query βββββββββββββββββββββββββ | |
| def rag_search(): | |
| """RAG knowledge base search β SQL injection vulnerability.""" | |
| query = request.args.get("q", "") | |
| conn = get_db() | |
| cursor = conn.cursor() | |
| # VULNERABILITY: unsanitised user input in SQL query | |
| sql = f"SELECT * FROM documents WHERE content LIKE '%{query}%'" | |
| cursor.execute(sql) # noqa: S608 β SQL injection | |
| results = cursor.fetchall() | |
| conn.close() | |
| return jsonify({"results": results}) | |
| # ββ ML03: N+1 embedding calls ββββββββββββββββββββββββββββββββ | |
| def embed_documents(): | |
| """Embeds each document individually in a loop instead of batching.""" | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| documents = request.json.get("documents", []) | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| embeddings = [] | |
| for doc in documents: # N+1: one GPU call per document | |
| emb = model.encode(doc) # Should batch all at once | |
| embeddings.append(emb.tolist()) | |
| return jsonify({"embeddings": embeddings}) | |
| # ββ No authentication on sensitive endpoint ββββββββββββββββββ | |
| def retrain_model(): | |
| """Triggers model retraining β no auth check!""" | |
| # A01: Broken Access Control β no authentication | |
| training_data = request.json.get("data", []) | |
| # Just store without any validation (LLM03: training data poisoning) | |
| return jsonify({"status": "retraining started", "samples": len(training_data)}) | |
| # ββ Path traversal in file upload ββββββββββββββββββββββββββββ | |
| def upload_weights(): | |
| """Saves uploaded model weights β path traversal vulnerability.""" | |
| filename = request.json.get("filename", "model.bin") | |
| data = request.json.get("data", "") | |
| # VULNERABILITY: filename not sanitised β path traversal possible | |
| save_path = os.path.join("/models", filename) | |
| with open(save_path, "wb") as f: | |
| f.write(data.encode()) | |
| return jsonify({"saved": save_path}) | |
| if __name__ == "__main__": | |
| app.run(debug=True, host="0.0.0.0", port=5000) | |