| """ |
| dispatchAI API Security Audit |
| Checks: auth, injection, rate limiting, input validation, info leaks |
| """ |
| import httpx |
| import asyncio |
| import json |
|
|
| BASE = "http://127.0.0.1:8081" |
|
|
| async def audit(): |
| results = [] |
| async with httpx.AsyncClient(timeout=30.0) as c: |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', json={"model":"test","messages":[]}) |
| results.append(("No auth rejected", r.status_code == 401, f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("No auth rejected", False, str(e))) |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', |
| headers={"Authorization":"Bearer invalid-key"}, |
| json={"model":"test","messages":[]}) |
| results.append(("Invalid key rejected", r.status_code == 401, f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("Invalid key rejected", False, str(e))) |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', |
| headers={"Authorization":"Bearer da-demo-key-0001"}, |
| json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile", |
| "messages":[{"role":"user","content":"'; rm -rf /; --"}], |
| "max_tokens":5}) |
| results.append(("Injection handled", r.status_code in (200, 500), f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("Injection handled", False, str(e))) |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', |
| headers={"Authorization":"Bearer da-demo-key-0001"}, |
| json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile", |
| "messages":[{"role":"user","content":"Hello"}], |
| "max_tokens":100000}) |
| results.append(("Large max_tokens handled", r.status_code in (200, 400), f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("Large max_tokens handled", False, str(e))) |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', |
| headers={"Authorization":"Bearer da-demo-key-0001"}, |
| json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile","messages":[]}) |
| results.append(("Empty messages handled", r.status_code in (200, 400, 422), f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("Empty messages handled", False, str(e))) |
| |
| |
| try: |
| r = await c.post(f'{BASE}/v1/chat/completions', |
| headers={"Authorization":"Bearer da-demo-key-0001"}, |
| json={"model":"../../etc/passwd","messages":[{"role":"user","content":"hi"}]}) |
| results.append(("Path traversal rejected", r.status_code == 400, f"Got {r.status_code}")) |
| except Exception as e: |
| results.append(("Path traversal rejected", False, str(e))) |
| |
| |
| try: |
| r = await c.get(f'{BASE}/admin/phones') |
| data = json.loads(r.text) |
| serials_exposed = "serial" in str(data) |
| results.append(("Admin exposes phone serials", serials_exposed, "Should be protected")) |
| except Exception as e: |
| results.append(("Admin exposes phone serials", False, str(e))) |
| |
| |
| try: |
| r = await c.options(f'{BASE}/v1/chat/completions', |
| headers={"Origin":"https://evil.com"}) |
| cors = r.headers.get("access-control-allow-origin", "") |
| results.append(("CORS allows all", cors == "*", f"CORS: {cors}")) |
| except Exception as e: |
| results.append(("CORS check", False, str(e))) |
| |
| |
| print("=" * 60) |
| print("SECURITY AUDIT RESULTS") |
| print("=" * 60) |
| passed = 0 |
| failed = 0 |
| for name, ok, detail in results: |
| emoji = "✅" if ok else "❌" |
| print(f" {emoji} {name}: {detail}") |
| if ok: passed += 1 |
| else: failed += 1 |
| print(f"\nPassed: {passed}/{len(results)}") |
| |
| |
| print("\n" + "=" * 60) |
| print("FIXES NEEDED:") |
| print("=" * 60) |
| fixes = [] |
| for name, ok, detail in results: |
| if not ok: |
| if "Admin" in name: |
| fixes.append("Protect /admin/phones with API key auth") |
| if "CORS" in name: |
| fixes.append("Restrict CORS to dispatchai.ai only") |
| |
| if not fixes: |
| print(" ✅ No critical fixes needed") |
| else: |
| for f in fixes: |
| print(f" ⚠️ {f}") |
|
|
| asyncio.run(audit()) |
|
|