IIIF-Studio / backend /tests /test_security.py
Claude
fix: Sprint 1b — null deref, supports_vision, empty response logging, test
9df1925 unverified
"""
Tests de sécurité — Sprint F1 + Sprint Fix 1.
Vérifie que toutes les vulnérabilités identifiées sont corrigées :
- Path traversal sur profiles, slug, folio_label, frontend serving
- Path traversal sur image_master_path dans le job_runner
- SSRF sur manifest_url
- Validation des entrées (taille, format)
"""
# 1. stdlib
import uuid
from datetime import datetime, timezone
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
# 2. third-party — fixtures API
from tests.conftest_api import async_client, db_session # noqa: F401
# 3. local — job_runner path traversal tests
import app.models # noqa: F401
import app.services.job_runner as job_runner_module
from app.models.corpus import CorpusModel, ManuscriptModel, PageModel
from app.models.database import Base
from app.models.job import JobModel
from app.models.model_config_db import ModelConfigDB
from app.services.job_runner import _run_job_impl
# ---------------------------------------------------------------------------
# Path traversal — profiles
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_profile_path_traversal_dotdot(async_client):
"""Un profile_id contenant '..' doit être rejeté (400)."""
resp = await async_client.get("/api/v1/profiles/..passwd")
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_profile_path_traversal_slash(async_client):
"""Un profile_id avec un slash (même encodé) doit être rejeté (400 ou 404)."""
# FastAPI normalise les chemins, donc un slash dans l'ID ne sera pas transmis.
# On teste avec un ID contenant des caractères spéciaux interdits.
resp = await async_client.get("/api/v1/profiles/UPPER_CASE")
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_profile_path_traversal_special_chars(async_client):
"""Un profile_id avec des caractères spéciaux doit être rejeté."""
resp = await async_client.get("/api/v1/profiles/test@profile")
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_profile_valid_id_not_found(async_client):
"""Un profile_id valide mais inexistant retourne 404 (pas 400)."""
resp = await async_client.get("/api/v1/profiles/does-not-exist")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Path traversal — corpus slug
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_corpus_slug_path_traversal(async_client):
"""Un slug avec ../ doit être rejeté par la validation Pydantic."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "../../malicious",
"title": "Test",
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_corpus_slug_with_spaces(async_client):
"""Un slug avec des espaces doit être rejeté."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "my corpus",
"title": "Test",
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_corpus_slug_uppercase(async_client):
"""Un slug avec des majuscules doit être rejeté (lowercase only)."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "MyCorpus",
"title": "Test",
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_corpus_slug_valid(async_client):
"""Un slug valide doit être accepté."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "my-corpus-01",
"title": "Test",
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 201
@pytest.mark.asyncio
async def test_corpus_slug_empty(async_client):
"""Un slug vide doit être rejeté."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "",
"title": "Test",
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_corpus_title_too_long(async_client):
"""Un titre trop long (>256 chars) doit être rejeté."""
resp = await async_client.post("/api/v1/corpora", json={
"slug": "test-long",
"title": "x" * 300,
"profile_id": "medieval-illuminated",
})
assert resp.status_code == 422
# ---------------------------------------------------------------------------
# SSRF — manifest_url
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ssrf_localhost(async_client):
"""Un manifest_url pointant vers localhost doit être rejeté."""
# Créer un corpus d'abord
create = await async_client.post("/api/v1/corpora", json={
"slug": "ssrf-test", "title": "SSRF", "profile_id": "medieval-illuminated",
})
cid = create.json()["id"]
resp = await async_client.post(f"/api/v1/corpora/{cid}/ingest/iiif-manifest", json={
"manifest_url": "http://localhost:8000/secret",
})
assert resp.status_code == 400
assert "interdit" in resp.json()["detail"].lower() or "localhost" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_ssrf_metadata_ip(async_client):
"""Un manifest_url vers 169.254.x.x (cloud metadata) doit être rejeté."""
create = await async_client.post("/api/v1/corpora", json={
"slug": "ssrf-meta", "title": "SSRF", "profile_id": "medieval-illuminated",
})
cid = create.json()["id"]
resp = await async_client.post(f"/api/v1/corpora/{cid}/ingest/iiif-manifest", json={
"manifest_url": "http://169.254.169.254/latest/meta-data/",
})
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_ssrf_file_scheme(async_client):
"""Un manifest_url avec file:// doit être rejeté."""
create = await async_client.post("/api/v1/corpora", json={
"slug": "ssrf-file", "title": "SSRF", "profile_id": "medieval-illuminated",
})
cid = create.json()["id"]
resp = await async_client.post(f"/api/v1/corpora/{cid}/ingest/iiif-manifest", json={
"manifest_url": "file:///etc/passwd",
})
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# Input validation — search
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_search_query_too_long(async_client):
"""Une requête de recherche >500 chars doit être rejetée."""
resp = await async_client.get("/api/v1/search", params={"q": "x" * 501})
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_search_query_max_length_ok(async_client):
"""Une requête de recherche de 500 chars doit être acceptée (0 résultat)."""
resp = await async_client.get("/api/v1/search", params={"q": "x" * 500})
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# Input validation — model selection
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_model_id_too_long(async_client):
"""Un model_id >256 chars doit être rejeté."""
create = await async_client.post("/api/v1/corpora", json={
"slug": "model-test", "title": "T", "profile_id": "medieval-illuminated",
})
cid = create.json()["id"]
resp = await async_client.put(f"/api/v1/corpora/{cid}/model", json={
"model_id": "x" * 300,
"provider_type": "google_ai_studio",
})
assert resp.status_code == 422
# ---------------------------------------------------------------------------
# Input validation — corrections
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_corrections_restore_negative_version(async_client):
"""restore_to_version < 1 doit être rejeté."""
resp = await async_client.post("/api/v1/pages/fake-page/corrections", json={
"restore_to_version": 0,
})
assert resp.status_code == 422
# ---------------------------------------------------------------------------
# Path traversal — job_runner (image_master_path)
# ---------------------------------------------------------------------------
_NOW = datetime.now(timezone.utc)
@pytest_asyncio.fixture
async def _sec_db():
"""Session SQLite en mémoire pour les tests path traversal."""
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
async def _create_traversal_setup(_sec_db, image_path: str) -> dict:
"""Crée un jeu de données complet avec le chemin image fourni."""
corpus = CorpusModel(
id=str(uuid.uuid4()), slug="sec-test", title="Sec",
profile_id="medieval-illuminated", created_at=_NOW, updated_at=_NOW,
)
_sec_db.add(corpus)
await _sec_db.commit()
ms = ManuscriptModel(
id=str(uuid.uuid4()), corpus_id=corpus.id, title="MS", total_pages=1,
)
_sec_db.add(ms)
await _sec_db.commit()
page = PageModel(
id=str(uuid.uuid4()), manuscript_id=ms.id, folio_label="f001r",
sequence=1, image_master_path=image_path,
processing_status="INGESTED",
)
_sec_db.add(page)
await _sec_db.commit()
model_cfg = ModelConfigDB(
corpus_id=corpus.id, provider_type="google_ai_studio",
selected_model_id="gemini-2.0-flash",
selected_model_display_name="Gemini 2.0 Flash",
updated_at=_NOW,
)
_sec_db.add(model_cfg)
await _sec_db.commit()
job = JobModel(
id=str(uuid.uuid4()), corpus_id=corpus.id, page_id=page.id,
status="pending", created_at=_NOW,
)
_sec_db.add(job)
await _sec_db.commit()
return {"job": job, "page": page}
@pytest.mark.asyncio
async def test_path_traversal_dotdot_rejected(_sec_db):
"""image_master_path avec ../../etc/passwd doit provoquer un échec du job."""
s = await _create_traversal_setup(_sec_db, "../../etc/passwd")
await _run_job_impl(s["job"].id, _sec_db)
await _sec_db.refresh(s["job"])
assert s["job"].status == "failed"
assert "interdit" in (s["job"].error_message or "").lower() or \
"hors" in (s["job"].error_message or "").lower()
@pytest.mark.asyncio
async def test_path_traversal_absolute_path_rejected(_sec_db):
"""image_master_path absolu hors data_dir doit provoquer un échec du job."""
s = await _create_traversal_setup(_sec_db, "/etc/shadow")
await _run_job_impl(s["job"].id, _sec_db)
await _sec_db.refresh(s["job"])
assert s["job"].status == "failed"
assert "interdit" in (s["job"].error_message or "").lower() or \
"hors" in (s["job"].error_message or "").lower()
@pytest.mark.asyncio
async def test_path_traversal_symlink_escape_rejected(_sec_db, tmp_path):
"""Un chemin sous data_dir mais avec traversal intermédiaire est rejeté."""
s = await _create_traversal_setup(
_sec_db, "data/../../../etc/passwd"
)
await _run_job_impl(s["job"].id, _sec_db)
await _sec_db.refresh(s["job"])
assert s["job"].status == "failed"
# ---------------------------------------------------------------------------
# Path traversal — frontend static serving (startswith prefix confusion)
# ---------------------------------------------------------------------------
def test_static_dir_startswith_prefix_confusion():
"""Paths like /app/static-evil/foo must NOT pass the startswith check.
Before the fix, str(candidate).startswith(str(_STATIC_DIR.resolve()))
would accept '/app/static-evil/foo' because '/app/static-evil' starts
with '/app/static'. The fix appends '/' to the prefix.
"""
from pathlib import Path
# Simulate the fixed check from main.py
_STATIC_DIR = Path("/app/static")
static_resolved = str(_STATIC_DIR.resolve()) + "/"
# A path under a sibling directory with a confusable prefix
evil_path = Path("/app/static-evil/foo.txt")
assert not str(evil_path.resolve()).startswith(static_resolved), (
"Path /app/static-evil/foo.txt should NOT be treated as under /app/static/"
)
# A legitimate path under /app/static/ should still pass
good_path = Path("/app/static/index.html")
assert str(good_path.resolve()).startswith(static_resolved), (
"Path /app/static/index.html SHOULD be treated as under /app/static/"
)