""" dispatchAI API Security Audit Checks: auth, injection, rate limiting, input validation, info leaks """ import httpx import asyncio import json BASE = "http://127.0.0.1:8081" async def audit(): results = [] async with httpx.AsyncClient(timeout=30.0) as c: # 1. No auth try: r = await c.post(f'{BASE}/v1/chat/completions', json={"model":"test","messages":[]}) results.append(("No auth rejected", r.status_code == 401, f"Got {r.status_code}")) except Exception as e: results.append(("No auth rejected", False, str(e))) # 2. Invalid API key try: r = await c.post(f'{BASE}/v1/chat/completions', headers={"Authorization":"Bearer invalid-key"}, json={"model":"test","messages":[]}) results.append(("Invalid key rejected", r.status_code == 401, f"Got {r.status_code}")) except Exception as e: results.append(("Invalid key rejected", False, str(e))) # 3. SQL/Shell injection in prompt try: r = await c.post(f'{BASE}/v1/chat/completions', headers={"Authorization":"Bearer da-demo-key-0001"}, json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile", "messages":[{"role":"user","content":"'; rm -rf /; --"}], "max_tokens":5}) results.append(("Injection handled", r.status_code in (200, 500), f"Got {r.status_code}")) except Exception as e: results.append(("Injection handled", False, str(e))) # 4. Max tokens limit try: r = await c.post(f'{BASE}/v1/chat/completions', headers={"Authorization":"Bearer da-demo-key-0001"}, json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile", "messages":[{"role":"user","content":"Hello"}], "max_tokens":100000}) results.append(("Large max_tokens handled", r.status_code in (200, 400), f"Got {r.status_code}")) except Exception as e: results.append(("Large max_tokens handled", False, str(e))) # 5. Empty messages try: r = await c.post(f'{BASE}/v1/chat/completions', headers={"Authorization":"Bearer da-demo-key-0001"}, json={"model":"dispatchAI/SmolLM2-135M-Instruct-mobile","messages":[]}) results.append(("Empty messages handled", r.status_code in (200, 400, 422), f"Got {r.status_code}")) except Exception as e: results.append(("Empty messages handled", False, str(e))) # 6. Invalid model name try: r = await c.post(f'{BASE}/v1/chat/completions', headers={"Authorization":"Bearer da-demo-key-0001"}, json={"model":"../../etc/passwd","messages":[{"role":"user","content":"hi"}]}) results.append(("Path traversal rejected", r.status_code == 400, f"Got {r.status_code}")) except Exception as e: results.append(("Path traversal rejected", False, str(e))) # 7. Admin endpoint exposure try: r = await c.get(f'{BASE}/admin/phones') data = json.loads(r.text) serials_exposed = "serial" in str(data) results.append(("Admin exposes phone serials", serials_exposed, "Should be protected")) except Exception as e: results.append(("Admin exposes phone serials", False, str(e))) # 8. CORS headers try: r = await c.options(f'{BASE}/v1/chat/completions', headers={"Origin":"https://evil.com"}) cors = r.headers.get("access-control-allow-origin", "") results.append(("CORS allows all", cors == "*", f"CORS: {cors}")) except Exception as e: results.append(("CORS check", False, str(e))) # Print results print("=" * 60) print("SECURITY AUDIT RESULTS") print("=" * 60) passed = 0 failed = 0 for name, ok, detail in results: emoji = "✅" if ok else "❌" print(f" {emoji} {name}: {detail}") if ok: passed += 1 else: failed += 1 print(f"\nPassed: {passed}/{len(results)}") # Fix recommendations print("\n" + "=" * 60) print("FIXES NEEDED:") print("=" * 60) fixes = [] for name, ok, detail in results: if not ok: if "Admin" in name: fixes.append("Protect /admin/phones with API key auth") if "CORS" in name: fixes.append("Restrict CORS to dispatchai.ai only") if not fixes: print(" ✅ No critical fixes needed") else: for f in fixes: print(f" ⚠️ {f}") asyncio.run(audit())