File size: 5,349 Bytes
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Deliberately vulnerable ML code for testing CodeSentry's detection capabilities.

Contains:
  - Prompt injection (LLM01)
  - Insecure output handling / eval (LLM02)
  - Hardcoded HuggingFace token (LLM06 / A07)
  - Insecure pickle deserialization (A04 / CWE-502)
  - GPU tensor never moved to CPU (memory leak)
  - N+1 embedding calls in loop
  - FP32 when FP16 would suffice
  - Missing @torch.no_grad on inference
  - Model loaded inside request handler
  - SQL injection in RAG query
  - Debug mode enabled
"""

import os
import pickle
import sqlite3

from flask import Flask, request, jsonify

app = Flask(__name__)
app.config["DEBUG"] = True  # A05: Security Misconfiguration

# ── A07 / LLM06: Hardcoded API key ──────────────────────────
HF_TOKEN = "hf_abcXYZabcXYZabcXYZabcXYZabcXYZ12"
OPENAI_API_KEY = "sk-proj-aaaabbbbccccddddeeeeffffgggghhhhiiiijjjj"

# ── Database (for RAG demo) ──────────────────────────────────
DB_PATH = "knowledge.db"


def get_db():
    return sqlite3.connect(DB_PATH)


# ── A04 / CWE-502: Insecure pickle deserialization ──────────
@app.route("/load_model", methods=["POST"])
def load_model():
    """Loads a model from a user-supplied file path β€” insecure!"""
    model_path = request.json.get("model_path")
    # VULNERABILITY: pickle.load from untrusted user-controlled path
    with open(model_path, "rb") as f:
        model = pickle.load(f)  # noqa: S301 β€” CWE-502
    return jsonify({"status": "loaded"})


# ── LLM01: Prompt Injection ──────────────────────────────────
@app.route("/generate", methods=["POST"])
def generate():
    """Chat endpoint that directly concatenates user input into the prompt."""
    user_input = request.json.get("message", "")
    # VULNERABILITY: user input concatenated directly β€” prompt injection
    prompt = f"You are a helpful assistant. User says: {user_input}"

    # Model loaded INSIDE handler on every request (performance issue)
    import torch
    from transformers import AutoModelForCausalLM, AutoTokenizer

    tokenizer = AutoTokenizer.from_pretrained("gpt2", token=HF_TOKEN)
    model = AutoModelForCausalLM.from_pretrained(
        "gpt2",
        token=HF_TOKEN,
        torch_dtype=torch.float32,  # ML04: FP32 wastes 2x VRAM
    )

    # ML02: Missing @torch.no_grad β€” gradients computed unnecessarily
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    # Tensor stays on GPU β€” memory leak (ML01)
    result = tokenizer.decode(outputs[0])

    # LLM02: LLM output piped directly to eval()
    eval(result)  # noqa: S307 β€” EXTREMELY DANGEROUS

    return jsonify({"result": result})


# ── A03: SQL Injection in RAG query ─────────────────────────
@app.route("/search", methods=["GET"])
def rag_search():
    """RAG knowledge base search β€” SQL injection vulnerability."""
    query = request.args.get("q", "")
    conn = get_db()
    cursor = conn.cursor()
    # VULNERABILITY: unsanitised user input in SQL query
    sql = f"SELECT * FROM documents WHERE content LIKE '%{query}%'"
    cursor.execute(sql)  # noqa: S608 β€” SQL injection
    results = cursor.fetchall()
    conn.close()
    return jsonify({"results": results})


# ── ML03: N+1 embedding calls ────────────────────────────────
@app.route("/embed_documents", methods=["POST"])
def embed_documents():
    """Embeds each document individually in a loop instead of batching."""
    import torch
    from sentence_transformers import SentenceTransformer

    documents = request.json.get("documents", [])
    model = SentenceTransformer("all-MiniLM-L6-v2")

    embeddings = []
    for doc in documents:  # N+1: one GPU call per document
        emb = model.encode(doc)  # Should batch all at once
        embeddings.append(emb.tolist())

    return jsonify({"embeddings": embeddings})


# ── No authentication on sensitive endpoint ──────────────────
@app.route("/admin/retrain", methods=["POST"])
def retrain_model():
    """Triggers model retraining β€” no auth check!"""
    # A01: Broken Access Control β€” no authentication
    training_data = request.json.get("data", [])
    # Just store without any validation (LLM03: training data poisoning)
    return jsonify({"status": "retraining started", "samples": len(training_data)})


# ── Path traversal in file upload ────────────────────────────
@app.route("/upload_weights", methods=["POST"])
def upload_weights():
    """Saves uploaded model weights β€” path traversal vulnerability."""
    filename = request.json.get("filename", "model.bin")
    data = request.json.get("data", "")
    # VULNERABILITY: filename not sanitised β€” path traversal possible
    save_path = os.path.join("/models", filename)
    with open(save_path, "wb") as f:
        f.write(data.encode())
    return jsonify({"saved": save_path})


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)