Spaces:
Running
Running
File size: 5,349 Bytes
7b4f5dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """
Deliberately vulnerable ML code for testing CodeSentry's detection capabilities.
Contains:
- Prompt injection (LLM01)
- Insecure output handling / eval (LLM02)
- Hardcoded HuggingFace token (LLM06 / A07)
- Insecure pickle deserialization (A04 / CWE-502)
- GPU tensor never moved to CPU (memory leak)
- N+1 embedding calls in loop
- FP32 when FP16 would suffice
- Missing @torch.no_grad on inference
- Model loaded inside request handler
- SQL injection in RAG query
- Debug mode enabled
"""
import os
import pickle
import sqlite3
from flask import Flask, request, jsonify
app = Flask(__name__)
app.config["DEBUG"] = True # A05: Security Misconfiguration
# ββ A07 / LLM06: Hardcoded API key ββββββββββββββββββββββββββ
HF_TOKEN = "hf_abcXYZabcXYZabcXYZabcXYZabcXYZ12"
OPENAI_API_KEY = "sk-proj-aaaabbbbccccddddeeeeffffgggghhhhiiiijjjj"
# ββ Database (for RAG demo) ββββββββββββββββββββββββββββββββββ
DB_PATH = "knowledge.db"
def get_db():
return sqlite3.connect(DB_PATH)
# ββ A04 / CWE-502: Insecure pickle deserialization ββββββββββ
@app.route("/load_model", methods=["POST"])
def load_model():
"""Loads a model from a user-supplied file path β insecure!"""
model_path = request.json.get("model_path")
# VULNERABILITY: pickle.load from untrusted user-controlled path
with open(model_path, "rb") as f:
model = pickle.load(f) # noqa: S301 β CWE-502
return jsonify({"status": "loaded"})
# ββ LLM01: Prompt Injection ββββββββββββββββββββββββββββββββββ
@app.route("/generate", methods=["POST"])
def generate():
"""Chat endpoint that directly concatenates user input into the prompt."""
user_input = request.json.get("message", "")
# VULNERABILITY: user input concatenated directly β prompt injection
prompt = f"You are a helpful assistant. User says: {user_input}"
# Model loaded INSIDE handler on every request (performance issue)
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2", token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
token=HF_TOKEN,
torch_dtype=torch.float32, # ML04: FP32 wastes 2x VRAM
)
# ML02: Missing @torch.no_grad β gradients computed unnecessarily
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=200)
# Tensor stays on GPU β memory leak (ML01)
result = tokenizer.decode(outputs[0])
# LLM02: LLM output piped directly to eval()
eval(result) # noqa: S307 β EXTREMELY DANGEROUS
return jsonify({"result": result})
# ββ A03: SQL Injection in RAG query βββββββββββββββββββββββββ
@app.route("/search", methods=["GET"])
def rag_search():
"""RAG knowledge base search β SQL injection vulnerability."""
query = request.args.get("q", "")
conn = get_db()
cursor = conn.cursor()
# VULNERABILITY: unsanitised user input in SQL query
sql = f"SELECT * FROM documents WHERE content LIKE '%{query}%'"
cursor.execute(sql) # noqa: S608 β SQL injection
results = cursor.fetchall()
conn.close()
return jsonify({"results": results})
# ββ ML03: N+1 embedding calls ββββββββββββββββββββββββββββββββ
@app.route("/embed_documents", methods=["POST"])
def embed_documents():
"""Embeds each document individually in a loop instead of batching."""
import torch
from sentence_transformers import SentenceTransformer
documents = request.json.get("documents", [])
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = []
for doc in documents: # N+1: one GPU call per document
emb = model.encode(doc) # Should batch all at once
embeddings.append(emb.tolist())
return jsonify({"embeddings": embeddings})
# ββ No authentication on sensitive endpoint ββββββββββββββββββ
@app.route("/admin/retrain", methods=["POST"])
def retrain_model():
"""Triggers model retraining β no auth check!"""
# A01: Broken Access Control β no authentication
training_data = request.json.get("data", [])
# Just store without any validation (LLM03: training data poisoning)
return jsonify({"status": "retraining started", "samples": len(training_data)})
# ββ Path traversal in file upload ββββββββββββββββββββββββββββ
@app.route("/upload_weights", methods=["POST"])
def upload_weights():
"""Saves uploaded model weights β path traversal vulnerability."""
filename = request.json.get("filename", "model.bin")
data = request.json.get("data", "")
# VULNERABILITY: filename not sanitised β path traversal possible
save_path = os.path.join("/models", filename)
with open(save_path, "wb") as f:
f.write(data.encode())
return jsonify({"saved": save_path})
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)
|