codeSentry / codesentry-backend /tests /fixtures /vulnerable_ml_code.py
YashashviAlva's picture
Initial commit for HF Spaces deploy
7b4f5dd
"""
Deliberately vulnerable ML code for testing CodeSentry's detection capabilities.
Contains:
- Prompt injection (LLM01)
- Insecure output handling / eval (LLM02)
- Hardcoded HuggingFace token (LLM06 / A07)
- Insecure pickle deserialization (A04 / CWE-502)
- GPU tensor never moved to CPU (memory leak)
- N+1 embedding calls in loop
- FP32 when FP16 would suffice
- Missing @torch.no_grad on inference
- Model loaded inside request handler
- SQL injection in RAG query
- Debug mode enabled
"""
import os
import pickle
import sqlite3
from flask import Flask, request, jsonify
app = Flask(__name__)
app.config["DEBUG"] = True # A05: Security Misconfiguration
# ── A07 / LLM06: Hardcoded API key ──────────────────────────
HF_TOKEN = "hf_abcXYZabcXYZabcXYZabcXYZabcXYZ12"
OPENAI_API_KEY = "sk-proj-aaaabbbbccccddddeeeeffffgggghhhhiiiijjjj"
# ── Database (for RAG demo) ──────────────────────────────────
DB_PATH = "knowledge.db"
def get_db():
return sqlite3.connect(DB_PATH)
# ── A04 / CWE-502: Insecure pickle deserialization ──────────
@app.route("/load_model", methods=["POST"])
def load_model():
"""Loads a model from a user-supplied file path β€” insecure!"""
model_path = request.json.get("model_path")
# VULNERABILITY: pickle.load from untrusted user-controlled path
with open(model_path, "rb") as f:
model = pickle.load(f) # noqa: S301 β€” CWE-502
return jsonify({"status": "loaded"})
# ── LLM01: Prompt Injection ──────────────────────────────────
@app.route("/generate", methods=["POST"])
def generate():
"""Chat endpoint that directly concatenates user input into the prompt."""
user_input = request.json.get("message", "")
# VULNERABILITY: user input concatenated directly β€” prompt injection
prompt = f"You are a helpful assistant. User says: {user_input}"
# Model loaded INSIDE handler on every request (performance issue)
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2", token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
token=HF_TOKEN,
torch_dtype=torch.float32, # ML04: FP32 wastes 2x VRAM
)
# ML02: Missing @torch.no_grad β€” gradients computed unnecessarily
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=200)
# Tensor stays on GPU β€” memory leak (ML01)
result = tokenizer.decode(outputs[0])
# LLM02: LLM output piped directly to eval()
eval(result) # noqa: S307 β€” EXTREMELY DANGEROUS
return jsonify({"result": result})
# ── A03: SQL Injection in RAG query ─────────────────────────
@app.route("/search", methods=["GET"])
def rag_search():
"""RAG knowledge base search β€” SQL injection vulnerability."""
query = request.args.get("q", "")
conn = get_db()
cursor = conn.cursor()
# VULNERABILITY: unsanitised user input in SQL query
sql = f"SELECT * FROM documents WHERE content LIKE '%{query}%'"
cursor.execute(sql) # noqa: S608 β€” SQL injection
results = cursor.fetchall()
conn.close()
return jsonify({"results": results})
# ── ML03: N+1 embedding calls ────────────────────────────────
@app.route("/embed_documents", methods=["POST"])
def embed_documents():
"""Embeds each document individually in a loop instead of batching."""
import torch
from sentence_transformers import SentenceTransformer
documents = request.json.get("documents", [])
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = []
for doc in documents: # N+1: one GPU call per document
emb = model.encode(doc) # Should batch all at once
embeddings.append(emb.tolist())
return jsonify({"embeddings": embeddings})
# ── No authentication on sensitive endpoint ──────────────────
@app.route("/admin/retrain", methods=["POST"])
def retrain_model():
"""Triggers model retraining β€” no auth check!"""
# A01: Broken Access Control β€” no authentication
training_data = request.json.get("data", [])
# Just store without any validation (LLM03: training data poisoning)
return jsonify({"status": "retraining started", "samples": len(training_data)})
# ── Path traversal in file upload ────────────────────────────
@app.route("/upload_weights", methods=["POST"])
def upload_weights():
"""Saves uploaded model weights β€” path traversal vulnerability."""
filename = request.json.get("filename", "model.bin")
data = request.json.get("data", "")
# VULNERABILITY: filename not sanitised β€” path traversal possible
save_path = os.path.join("/models", filename)
with open(save_path, "wb") as f:
f.write(data.encode())
return jsonify({"saved": save_path})
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)