dispatchAI-API / api /security_audit.py
3morixd's picture
Upload api/security_audit.py with huggingface_hub
d2a5602 verified
Raw
History Blame Contribute Delete
4.86 kB
"""
dispatchAI API Security Audit
Checks: auth, injection, rate limiting, input validation, info leaks
"""
import httpx
import asyncio
import json
BASE = "http://127.0.0.1:8081"
async def audit():
results = []
async with httpx.AsyncClient(timeout=30.0) as c:
# 1. No auth
try:
r = await c.post(f'{BASE}/v1/chat/completions', json={"model":"test","messages":[]})
results.append(("No auth rejected", r.status_code == 401, f"Got {r.status_code}"))
except Exception as e:
results.append(("No auth rejected", False, str(e)))
# 2. Invalid API key
try:
r = await c.post(f'{BASE}/v1/chat/completions',
headers={"Authorization":"Bearer invalid-key"},
json={"model":"test","messages":[]})
results.append(("Invalid key rejected", r.status_code == 401, f"Got {r.status_code}"))
except Exception as e:
results.append(("Invalid key rejected", False, str(e)))
# 3. SQL/Shell injection in prompt
try:
r = await c.post(f'{BASE}/v1/chat/completions',
headers={"Authorization":"Bearer da-demo-key-0001"},
json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile",
"messages":[{"role":"user","content":"'; rm -rf /; --"}],
"max_tokens":5})
results.append(("Injection handled", r.status_code in (200, 500), f"Got {r.status_code}"))
except Exception as e:
results.append(("Injection handled", False, str(e)))
# 4. Max tokens limit
try:
r = await c.post(f'{BASE}/v1/chat/completions',
headers={"Authorization":"Bearer da-demo-key-0001"},
json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile",
"messages":[{"role":"user","content":"Hello"}],
"max_tokens":100000})
results.append(("Large max_tokens handled", r.status_code in (200, 400), f"Got {r.status_code}"))
except Exception as e:
results.append(("Large max_tokens handled", False, str(e)))
# 5. Empty messages
try:
r = await c.post(f'{BASE}/v1/chat/completions',
headers={"Authorization":"Bearer da-demo-key-0001"},
json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile","messages":[]})
results.append(("Empty messages handled", r.status_code in (200, 400, 422), f"Got {r.status_code}"))
except Exception as e:
results.append(("Empty messages handled", False, str(e)))
# 6. Invalid model name
try:
r = await c.post(f'{BASE}/v1/chat/completions',
headers={"Authorization":"Bearer da-demo-key-0001"},
json={"model":"../../etc/passwd","messages":[{"role":"user","content":"hi"}]})
results.append(("Path traversal rejected", r.status_code == 400, f"Got {r.status_code}"))
except Exception as e:
results.append(("Path traversal rejected", False, str(e)))
# 7. Admin endpoint exposure
try:
r = await c.get(f'{BASE}/admin/phones')
data = json.loads(r.text)
serials_exposed = "serial" in str(data)
results.append(("Admin exposes phone serials", serials_exposed, "Should be protected"))
except Exception as e:
results.append(("Admin exposes phone serials", False, str(e)))
# 8. CORS headers
try:
r = await c.options(f'{BASE}/v1/chat/completions',
headers={"Origin":"https://evil.com"})
cors = r.headers.get("access-control-allow-origin", "")
results.append(("CORS allows all", cors == "*", f"CORS: {cors}"))
except Exception as e:
results.append(("CORS check", False, str(e)))
# Print results
print("=" * 60)
print("SECURITY AUDIT RESULTS")
print("=" * 60)
passed = 0
failed = 0
for name, ok, detail in results:
emoji = "✅" if ok else "❌"
print(f" {emoji} {name}: {detail}")
if ok: passed += 1
else: failed += 1
print(f"\nPassed: {passed}/{len(results)}")
# Fix recommendations
print("\n" + "=" * 60)
print("FIXES NEEDED:")
print("=" * 60)
fixes = []
for name, ok, detail in results:
if not ok:
if "Admin" in name:
fixes.append("Protect /admin/phones with API key auth")
if "CORS" in name:
fixes.append("Restrict CORS to dispatchai.ai only")
if not fixes:
print(" ✅ No critical fixes needed")
else:
for f in fixes:
print(f" ⚠️ {f}")
asyncio.run(audit())