File size: 4,857 Bytes
d2a5602
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
dispatchAI API Security Audit
Checks: auth, injection, rate limiting, input validation, info leaks
"""
import httpx
import asyncio
import json

BASE = "http://127.0.0.1:8081"

async def audit():
    results = []
    async with httpx.AsyncClient(timeout=30.0) as c:
        
        # 1. No auth
        try:
            r = await c.post(f'{BASE}/v1/chat/completions', json={"model":"test","messages":[]})
            results.append(("No auth rejected", r.status_code == 401, f"Got {r.status_code}"))
        except Exception as e:
            results.append(("No auth rejected", False, str(e)))
        
        # 2. Invalid API key
        try:
            r = await c.post(f'{BASE}/v1/chat/completions', 
                headers={"Authorization":"Bearer invalid-key"},
                json={"model":"test","messages":[]})
            results.append(("Invalid key rejected", r.status_code == 401, f"Got {r.status_code}"))
        except Exception as e:
            results.append(("Invalid key rejected", False, str(e)))
        
        # 3. SQL/Shell injection in prompt
        try:
            r = await c.post(f'{BASE}/v1/chat/completions',
                headers={"Authorization":"Bearer da-demo-key-0001"},
                json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile",
                      "messages":[{"role":"user","content":"'; rm -rf /; --"}],
                      "max_tokens":5})
            results.append(("Injection handled", r.status_code in (200, 500), f"Got {r.status_code}"))
        except Exception as e:
            results.append(("Injection handled", False, str(e)))
        
        # 4. Max tokens limit
        try:
            r = await c.post(f'{BASE}/v1/chat/completions',
                headers={"Authorization":"Bearer da-demo-key-0001"},
                json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile",
                      "messages":[{"role":"user","content":"Hello"}],
                      "max_tokens":100000})
            results.append(("Large max_tokens handled", r.status_code in (200, 400), f"Got {r.status_code}"))
        except Exception as e:
            results.append(("Large max_tokens handled", False, str(e)))
        
        # 5. Empty messages
        try:
            r = await c.post(f'{BASE}/v1/chat/completions',
                headers={"Authorization":"Bearer da-demo-key-0001"},
                json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile","messages":[]})
            results.append(("Empty messages handled", r.status_code in (200, 400, 422), f"Got {r.status_code}"))
        except Exception as e:
            results.append(("Empty messages handled", False, str(e)))
        
        # 6. Invalid model name
        try:
            r = await c.post(f'{BASE}/v1/chat/completions',
                headers={"Authorization":"Bearer da-demo-key-0001"},
                json={"model":"../../etc/passwd","messages":[{"role":"user","content":"hi"}]})
            results.append(("Path traversal rejected", r.status_code == 400, f"Got {r.status_code}"))
        except Exception as e:
            results.append(("Path traversal rejected", False, str(e)))
        
        # 7. Admin endpoint exposure
        try:
            r = await c.get(f'{BASE}/admin/phones')
            data = json.loads(r.text)
            serials_exposed = "serial" in str(data)
            results.append(("Admin exposes phone serials", serials_exposed, "Should be protected"))
        except Exception as e:
            results.append(("Admin exposes phone serials", False, str(e)))
        
        # 8. CORS headers
        try:
            r = await c.options(f'{BASE}/v1/chat/completions',
                headers={"Origin":"https://evil.com"})
            cors = r.headers.get("access-control-allow-origin", "")
            results.append(("CORS allows all", cors == "*", f"CORS: {cors}"))
        except Exception as e:
            results.append(("CORS check", False, str(e)))
    
    # Print results
    print("=" * 60)
    print("SECURITY AUDIT RESULTS")
    print("=" * 60)
    passed = 0
    failed = 0
    for name, ok, detail in results:
        emoji = "✅" if ok else "❌"
        print(f"  {emoji} {name}: {detail}")
        if ok: passed += 1
        else: failed += 1
    print(f"\nPassed: {passed}/{len(results)}")
    
    # Fix recommendations
    print("\n" + "=" * 60)
    print("FIXES NEEDED:")
    print("=" * 60)
    fixes = []
    for name, ok, detail in results:
        if not ok:
            if "Admin" in name:
                fixes.append("Protect /admin/phones with API key auth")
            if "CORS" in name:
                fixes.append("Restrict CORS to dispatchai.ai only")
    
    if not fixes:
        print("  ✅ No critical fixes needed")
    else:
        for f in fixes:
            print(f"  ⚠️ {f}")

asyncio.run(audit())