|
|
""" |
|
|
SPARKNET API Integration Tests - Phase 1B |
|
|
|
|
|
Comprehensive test suite for REST API endpoints: |
|
|
- Document API (/api/documents) |
|
|
- RAG API (/api/rag) |
|
|
- Auth API (/api/auth) |
|
|
- Health/Status endpoints |
|
|
|
|
|
Uses FastAPI TestClient for synchronous testing without running the server. |
|
|
""" |
|
|
|
|
|
import pytest |
|
|
import json |
|
|
import io |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, Optional |
|
|
from unittest.mock import patch, MagicMock, AsyncMock |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent)) |
|
|
|
|
|
from fastapi.testclient import TestClient |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module") |
|
|
def mock_components(): |
|
|
"""Mock SPARKNET components for testing.""" |
|
|
|
|
|
mock_embeddings = MagicMock() |
|
|
mock_embeddings.embed_documents = MagicMock(return_value=[[0.1] * 1024]) |
|
|
mock_embeddings.embed_query = MagicMock(return_value=[0.1] * 1024) |
|
|
|
|
|
mock_store = MagicMock() |
|
|
mock_store._collection = MagicMock() |
|
|
mock_store._collection.count = MagicMock(return_value=100) |
|
|
mock_store.search = MagicMock(return_value=[]) |
|
|
mock_store.add_documents = MagicMock(return_value=["doc_1"]) |
|
|
|
|
|
mock_llm_client = MagicMock() |
|
|
mock_llm_client.generate = MagicMock(return_value="Mock response") |
|
|
mock_llm_client.get_llm = MagicMock(return_value=MagicMock()) |
|
|
|
|
|
mock_workflow = MagicMock() |
|
|
mock_workflow.run = AsyncMock(return_value={ |
|
|
"response": "Test response", |
|
|
"sources": [], |
|
|
"confidence": 0.9 |
|
|
}) |
|
|
|
|
|
return { |
|
|
"embeddings": mock_embeddings, |
|
|
"store": mock_store, |
|
|
"llm_client": mock_llm_client, |
|
|
"workflow": mock_workflow, |
|
|
} |
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module") |
|
|
def client(mock_components): |
|
|
"""Create TestClient with mocked dependencies.""" |
|
|
|
|
|
with patch.dict("api.main.app_state", { |
|
|
"start_time": 1000000, |
|
|
"embeddings": mock_components["embeddings"], |
|
|
"store": mock_components["store"], |
|
|
"llm_client": mock_components["llm_client"], |
|
|
"workflow": mock_components["workflow"], |
|
|
"rag_ready": True, |
|
|
"workflows": {}, |
|
|
"patents": {}, |
|
|
"planner": MagicMock(), |
|
|
"critic": MagicMock(), |
|
|
"memory": MagicMock(), |
|
|
"vision_ocr": None, |
|
|
}): |
|
|
from api.main import app |
|
|
with TestClient(app) as test_client: |
|
|
yield test_client |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def auth_headers(client) -> Dict[str, str]: |
|
|
"""Get authentication headers with valid token.""" |
|
|
|
|
|
response = client.post( |
|
|
"/api/auth/token", |
|
|
data={"username": "admin", "password": "admin123"} |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
token = response.json()["access_token"] |
|
|
return {"Authorization": f"Bearer {token}"} |
|
|
|
|
|
|
|
|
return {} |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def sample_pdf_file(): |
|
|
"""Create a sample PDF file for upload tests.""" |
|
|
|
|
|
pdf_content = b"""%PDF-1.4 |
|
|
1 0 obj << /Type /Catalog /Pages 2 0 R >> endobj |
|
|
2 0 obj << /Type /Pages /Kids [3 0 R] /Count 1 >> endobj |
|
|
3 0 obj << /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Contents 4 0 R >> endobj |
|
|
4 0 obj << /Length 44 >> stream |
|
|
BT /F1 12 Tf 100 700 Td (Test Document) Tj ET |
|
|
endstream endobj |
|
|
xref |
|
|
0 5 |
|
|
0000000000 65535 f |
|
|
0000000009 00000 n |
|
|
0000000058 00000 n |
|
|
0000000115 00000 n |
|
|
0000000214 00000 n |
|
|
trailer << /Size 5 /Root 1 0 R >> |
|
|
startxref |
|
|
306 |
|
|
%%EOF""" |
|
|
return io.BytesIO(pdf_content) |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def sample_text_file(): |
|
|
"""Create a sample text file for upload tests.""" |
|
|
content = b"""SPARKNET Test Document |
|
|
|
|
|
This is a sample document for testing the document processing pipeline. |
|
|
|
|
|
## Section 1: Introduction |
|
|
The SPARKNET framework provides AI-powered document intelligence. |
|
|
|
|
|
## Section 2: Features |
|
|
- Multi-agent RAG pipeline |
|
|
- Table extraction |
|
|
- Evidence grounding |
|
|
|
|
|
## Section 3: Conclusion |
|
|
This document tests the upload and processing functionality. |
|
|
""" |
|
|
return io.BytesIO(content) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestHealthEndpoints: |
|
|
"""Test health and status endpoints.""" |
|
|
|
|
|
def test_root_endpoint(self, client): |
|
|
"""Test root endpoint returns service info.""" |
|
|
response = client.get("/") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
data = response.json() |
|
|
assert data["status"] == "operational" |
|
|
assert data["service"] == "SPARKNET API" |
|
|
assert "version" in data |
|
|
|
|
|
def test_health_endpoint(self, client): |
|
|
"""Test health endpoint returns component status.""" |
|
|
response = client.get("/api/health") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
data = response.json() |
|
|
assert "status" in data |
|
|
assert "components" in data |
|
|
assert "statistics" in data |
|
|
assert "uptime_seconds" in data |
|
|
|
|
|
|
|
|
components = data["components"] |
|
|
expected_keys = ["rag", "embeddings", "vector_store", "llm_client"] |
|
|
for key in expected_keys: |
|
|
assert key in components |
|
|
|
|
|
def test_status_endpoint(self, client): |
|
|
"""Test status endpoint returns comprehensive info.""" |
|
|
response = client.get("/api/status") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
data = response.json() |
|
|
assert data["status"] == "operational" |
|
|
assert "statistics" in data |
|
|
assert "models" in data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestAuthEndpoints: |
|
|
"""Test authentication endpoints.""" |
|
|
|
|
|
def test_get_token_valid_credentials(self, client): |
|
|
"""Test token generation with valid credentials.""" |
|
|
response = client.post( |
|
|
"/api/auth/token", |
|
|
data={"username": "admin", "password": "admin123"} |
|
|
) |
|
|
|
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
assert "access_token" in data |
|
|
assert data["token_type"] == "bearer" |
|
|
|
|
|
def test_get_token_invalid_credentials(self, client): |
|
|
"""Test token generation fails with invalid credentials.""" |
|
|
response = client.post( |
|
|
"/api/auth/token", |
|
|
data={"username": "invalid", "password": "wrong"} |
|
|
) |
|
|
assert response.status_code in [401, 500] |
|
|
|
|
|
def test_get_current_user(self, client, auth_headers): |
|
|
"""Test getting current user info.""" |
|
|
if not auth_headers: |
|
|
pytest.skip("Auth not available") |
|
|
|
|
|
response = client.get("/api/auth/me", headers=auth_headers) |
|
|
assert response.status_code == 200 |
|
|
|
|
|
data = response.json() |
|
|
assert "username" in data |
|
|
|
|
|
def test_protected_endpoint_without_token(self, client): |
|
|
"""Test that protected endpoints require authentication.""" |
|
|
response = client.get("/api/auth/me") |
|
|
assert response.status_code == 401 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestDocumentEndpoints: |
|
|
"""Test document management endpoints.""" |
|
|
|
|
|
def test_list_documents_empty(self, client): |
|
|
"""Test listing documents when none exist.""" |
|
|
response = client.get("/api/documents") |
|
|
assert response.status_code == 200 |
|
|
|
|
|
data = response.json() |
|
|
assert isinstance(data, list) |
|
|
|
|
|
def test_upload_text_document(self, client, sample_text_file): |
|
|
"""Test uploading a text document.""" |
|
|
response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("test.txt", sample_text_file, "text/plain")} |
|
|
) |
|
|
|
|
|
assert response.status_code == 200 |
|
|
data = response.json() |
|
|
|
|
|
assert "document_id" in data |
|
|
assert data["filename"] == "test.txt" |
|
|
assert data["status"] in ["uploaded", "processing", "processed"] |
|
|
|
|
|
def test_upload_pdf_document(self, client, sample_pdf_file): |
|
|
"""Test uploading a PDF document.""" |
|
|
response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("test.pdf", sample_pdf_file, "application/pdf")} |
|
|
) |
|
|
|
|
|
assert response.status_code == 200 |
|
|
data = response.json() |
|
|
|
|
|
assert "document_id" in data |
|
|
assert data["filename"] == "test.pdf" |
|
|
|
|
|
def test_upload_unsupported_format(self, client): |
|
|
"""Test uploading unsupported file format is rejected.""" |
|
|
fake_file = io.BytesIO(b"fake executable content") |
|
|
|
|
|
response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("test.exe", fake_file, "application/octet-stream")} |
|
|
) |
|
|
|
|
|
|
|
|
assert response.status_code in [400, 415] |
|
|
|
|
|
def test_get_document_not_found(self, client): |
|
|
"""Test getting non-existent document returns 404.""" |
|
|
response = client.get("/api/documents/nonexistent_id") |
|
|
assert response.status_code == 404 |
|
|
|
|
|
def test_document_workflow(self, client, sample_text_file): |
|
|
"""Test complete document workflow: upload -> process -> index.""" |
|
|
|
|
|
upload_response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("workflow_test.txt", sample_text_file, "text/plain")} |
|
|
) |
|
|
assert upload_response.status_code == 200 |
|
|
doc_id = upload_response.json()["document_id"] |
|
|
|
|
|
|
|
|
detail_response = client.get(f"/api/documents/{doc_id}/detail") |
|
|
assert detail_response.status_code == 200 |
|
|
|
|
|
|
|
|
chunks_response = client.get(f"/api/documents/{doc_id}/chunks") |
|
|
assert chunks_response.status_code == 200 |
|
|
|
|
|
|
|
|
index_response = client.post(f"/api/documents/{doc_id}/index") |
|
|
|
|
|
assert index_response.status_code in [200, 400, 422] |
|
|
|
|
|
|
|
|
delete_response = client.delete(f"/api/documents/{doc_id}") |
|
|
assert delete_response.status_code == 200 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRAGEndpoints: |
|
|
"""Test RAG query and search endpoints.""" |
|
|
|
|
|
def test_rag_query_basic(self, client): |
|
|
"""Test basic RAG query endpoint.""" |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
json={ |
|
|
"query": "What is SPARKNET?", |
|
|
"max_sources": 5 |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
assert response.status_code in [200, 500, 503] |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
assert "response" in data or "error" in data |
|
|
|
|
|
def test_rag_query_with_filters(self, client): |
|
|
"""Test RAG query with document filters.""" |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
json={ |
|
|
"query": "Test query", |
|
|
"document_ids": ["doc_1", "doc_2"], |
|
|
"max_sources": 3, |
|
|
"min_confidence": 0.5 |
|
|
} |
|
|
) |
|
|
|
|
|
assert response.status_code in [200, 500, 503] |
|
|
|
|
|
def test_rag_search_semantic(self, client): |
|
|
"""Test semantic search without synthesis.""" |
|
|
response = client.post( |
|
|
"/api/rag/search", |
|
|
json={ |
|
|
"query": "document processing", |
|
|
"top_k": 10 |
|
|
} |
|
|
) |
|
|
|
|
|
assert response.status_code in [200, 500, 503] |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
assert "results" in data or "error" in data |
|
|
|
|
|
def test_rag_store_status(self, client): |
|
|
"""Test getting vector store status.""" |
|
|
response = client.get("/api/rag/store/status") |
|
|
|
|
|
assert response.status_code in [200, 500] |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
assert "status" in data |
|
|
|
|
|
def test_rag_cache_stats(self, client): |
|
|
"""Test getting cache statistics.""" |
|
|
response = client.get("/api/rag/cache/stats") |
|
|
|
|
|
assert response.status_code in [200, 404, 500] |
|
|
|
|
|
def test_rag_query_empty_query(self, client): |
|
|
"""Test that empty query is rejected.""" |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
json={"query": ""} |
|
|
) |
|
|
|
|
|
|
|
|
assert response.status_code == 422 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestDocumentProcessing: |
|
|
"""Test document processing functionality.""" |
|
|
|
|
|
def test_process_document_endpoint(self, client, sample_text_file): |
|
|
"""Test triggering document processing.""" |
|
|
|
|
|
upload_response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("process_test.txt", sample_text_file, "text/plain")} |
|
|
) |
|
|
|
|
|
if upload_response.status_code != 200: |
|
|
pytest.skip("Upload failed") |
|
|
|
|
|
doc_id = upload_response.json()["document_id"] |
|
|
|
|
|
|
|
|
process_response = client.post(f"/api/documents/{doc_id}/process") |
|
|
assert process_response.status_code in [200, 202, 400] |
|
|
|
|
|
def test_batch_index_documents(self, client): |
|
|
"""Test batch indexing multiple documents.""" |
|
|
response = client.post( |
|
|
"/api/documents/batch-index", |
|
|
json={"document_ids": ["doc_1", "doc_2", "doc_3"]} |
|
|
) |
|
|
|
|
|
|
|
|
assert response.status_code in [200, 400, 404] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestErrorHandling: |
|
|
"""Test API error handling.""" |
|
|
|
|
|
def test_invalid_json_body(self, client): |
|
|
"""Test handling of invalid JSON in request body.""" |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
content="not valid json", |
|
|
headers={"Content-Type": "application/json"} |
|
|
) |
|
|
|
|
|
assert response.status_code == 422 |
|
|
|
|
|
def test_missing_required_fields(self, client): |
|
|
"""Test handling of missing required fields.""" |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
json={} |
|
|
) |
|
|
|
|
|
assert response.status_code == 422 |
|
|
|
|
|
def test_invalid_document_id_format(self, client): |
|
|
"""Test handling of various document ID formats.""" |
|
|
|
|
|
response = client.get("/api/documents/../../etc/passwd") |
|
|
assert response.status_code in [400, 404] |
|
|
|
|
|
|
|
|
long_id = "a" * 1000 |
|
|
response = client.get(f"/api/documents/{long_id}") |
|
|
assert response.status_code in [400, 404] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestConcurrency: |
|
|
"""Test concurrent request handling.""" |
|
|
|
|
|
def test_multiple_health_checks(self, client): |
|
|
"""Test multiple concurrent health checks.""" |
|
|
import concurrent.futures |
|
|
|
|
|
def make_request(): |
|
|
return client.get("/api/health") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
|
|
futures = [executor.submit(make_request) for _ in range(10)] |
|
|
results = [f.result() for f in futures] |
|
|
|
|
|
|
|
|
assert all(r.status_code == 200 for r in results) |
|
|
|
|
|
def test_multiple_document_uploads(self, client): |
|
|
"""Test handling multiple simultaneous uploads.""" |
|
|
import concurrent.futures |
|
|
|
|
|
def upload_file(i): |
|
|
content = f"Test content {i}".encode() |
|
|
file = io.BytesIO(content) |
|
|
return client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": (f"test_{i}.txt", file, "text/plain")} |
|
|
) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: |
|
|
futures = [executor.submit(upload_file, i) for i in range(5)] |
|
|
results = [f.result() for f in futures] |
|
|
|
|
|
|
|
|
assert all(r.status_code in [200, 500] for r in results) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestIntegrationWorkflows: |
|
|
"""Test end-to-end integration workflows.""" |
|
|
|
|
|
def test_document_to_rag_query_workflow(self, client, sample_text_file): |
|
|
"""Test complete workflow from document upload to RAG query.""" |
|
|
|
|
|
upload_response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("integration_test.txt", sample_text_file, "text/plain")} |
|
|
) |
|
|
|
|
|
if upload_response.status_code != 200: |
|
|
pytest.skip("Upload failed, skipping workflow test") |
|
|
|
|
|
doc_id = upload_response.json()["document_id"] |
|
|
|
|
|
|
|
|
get_response = client.get(f"/api/documents/{doc_id}") |
|
|
assert get_response.status_code == 200 |
|
|
|
|
|
|
|
|
index_response = client.post(f"/api/documents/{doc_id}/index") |
|
|
|
|
|
if index_response.status_code != 200: |
|
|
pytest.skip("Indexing not available") |
|
|
|
|
|
|
|
|
query_response = client.post( |
|
|
"/api/rag/query", |
|
|
json={ |
|
|
"query": "What does this document contain?", |
|
|
"document_ids": [doc_id] |
|
|
} |
|
|
) |
|
|
|
|
|
assert query_response.status_code in [200, 500, 503] |
|
|
|
|
|
|
|
|
client.delete(f"/api/documents/{doc_id}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
class TestPerformance: |
|
|
"""Performance tests (marked as slow).""" |
|
|
|
|
|
def test_large_document_upload(self, client): |
|
|
"""Test uploading a larger document.""" |
|
|
|
|
|
large_content = b"Test content line\n" * 60000 |
|
|
large_file = io.BytesIO(large_content) |
|
|
|
|
|
response = client.post( |
|
|
"/api/documents/upload", |
|
|
files={"file": ("large_test.txt", large_file, "text/plain")} |
|
|
) |
|
|
|
|
|
|
|
|
assert response.status_code in [200, 413] |
|
|
|
|
|
def test_rapid_query_requests(self, client): |
|
|
"""Test handling rapid consecutive queries.""" |
|
|
import time |
|
|
|
|
|
start = time.time() |
|
|
responses = [] |
|
|
|
|
|
for i in range(20): |
|
|
response = client.post( |
|
|
"/api/rag/query", |
|
|
json={"query": f"Test query {i}"} |
|
|
) |
|
|
responses.append(response) |
|
|
|
|
|
elapsed = time.time() - start |
|
|
|
|
|
|
|
|
assert elapsed < 30 |
|
|
|
|
|
|
|
|
success_count = sum(1 for r in responses if r.status_code in [200, 500, 503]) |
|
|
assert success_count >= len(responses) * 0.8 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
pytest.main([__file__, "-v", "--tb=short"]) |
|
|
|