Video2Guide / tests /test_progress.py
Claude
Keep review HTML re-fetchable and serialize audio_filename in readState
e845d2d
"""Tests for progress event emission and the SSE/job endpoints."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.jobs import Job, JobResult, JobRegistry
from app.main import app
REPO_ROOT = Path(__file__).resolve().parents[1]
VIDEO = REPO_ROOT / "smaller.m4v"
TRANSCRIPT = REPO_ROOT / "M14_L3_S3.srt"
requires_media = pytest.mark.skipif(
not (VIDEO.exists() and TRANSCRIPT.exists()),
reason="sample media not present",
)
def test_run_pipeline_emits_progress_events_in_order(tmp_path):
"""The orchestrator should emit progress events covering each stage."""
pytest.importorskip("cv2")
from app.pipeline.orchestrator import (
PipelineInputs,
ProgressEvent,
run_pipeline,
PipelineError,
)
if not VIDEO.exists() or not TRANSCRIPT.exists():
pytest.skip("media not present")
events: list[ProgressEvent] = []
inputs = PipelineInputs(
video_path=VIDEO,
transcript_path=TRANSCRIPT,
frames_dir=tmp_path / "static",
title="Progress Test",
max_frames=2,
skip_ocr=True,
)
try:
run_pipeline(inputs, progress=events.append)
except PipelineError:
pytest.skip("pipeline could not produce segments on this clip")
# Must start before 100% and end at 100% with stage 'done'.
assert events, "no progress events"
assert events[0].percent < 100
assert events[-1].stage == "done"
assert events[-1].percent == 100
# Percent should be monotonically non-decreasing.
pcts = [e.percent for e in events]
assert pcts == sorted(pcts), f"percents not monotonically increasing: {pcts}"
# Each expected stage shows up at least once.
stages = {e.stage for e in events}
assert {"scene_detect", "filter", "transcript", "ocr", "render", "done"} <= stages
@requires_media
def test_jobs_endpoint_accepts_blank_max_frames(tmp_path):
"""Regression: an empty max_frames input must not 422 on int coercion."""
client = TestClient(app)
with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t:
resp = client.post(
"/jobs",
data={
"title": "Blank Max",
"format": "single",
"max_frames": "", # the bug: form sent "" when user left it blank
"skip_ocr": "true",
},
files={
"video": (VIDEO.name, v, "video/mp4"),
"transcript": (TRANSCRIPT.name, t, "application/x-subrip"),
},
)
assert resp.status_code == 202, resp.text
def test_jobs_endpoint_rejects_unknown_video_type(tmp_path):
client = TestClient(app)
bogus = tmp_path / "fake.txt"
bogus.write_text("not a video")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with bogus.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x"},
files={
"video": ("fake.txt", v, "text/plain"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
def test_jobs_endpoint_rejects_unknown_format(tmp_path):
client = TestClient(app)
mp4 = tmp_path / "x.mp4"
mp4.write_bytes(b"fake")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with mp4.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x", "format": "evil-format"},
files={
"video": ("x.mp4", v, "video/mp4"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
assert "format" in resp.text.lower()
def test_jobs_endpoint_cleans_workdir_on_save_failure(tmp_path):
# Triggering a 400 after the workdir is mkdtemp'd: a malformed
# max_frames raises during _parse_optional_int. The handler's
# try/except must remove the freshly-created vgm_ directory.
import glob
import os
import tempfile as _t
client = TestClient(app)
before = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*")))
mp4 = tmp_path / "x.mp4"
mp4.write_bytes(b"fake")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with mp4.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x", "max_frames": "not-an-int"},
files={
"video": ("x.mp4", v, "video/mp4"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
after = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*")))
assert after - before == set(), f"leaked workdir(s): {after - before}"
def test_events_endpoint_404_for_unknown_job():
client = TestClient(app)
resp = client.get("/events/no-such-id", headers={"Accept": "text/event-stream"})
assert resp.status_code == 404
def test_result_endpoint_404_for_unknown_job():
client = TestClient(app)
resp = client.get("/result/no-such-id")
assert resp.status_code == 404
def test_result_endpoint_review_html_survives_repeat_get():
"""Review-mode results have no attachment filename; the upload page
loads them into an iframe AND lets the user "Open in new tab",
which re-fetches the same URL. The body must remain readable for
the job's TTL — not be cleared after the first GET.
"""
from app.main import registry
job = registry.create()
try:
registry.finish_success(
job,
JobResult(body=b"<html>review</html>", media_type="text/html; charset=utf-8"),
)
client = TestClient(app)
first = client.get(f"/result/{job.id}")
assert first.status_code == 200
assert first.content == b"<html>review</html>"
second = client.get(f"/result/{job.id}")
assert second.status_code == 200, second.text
assert second.content == b"<html>review</html>"
finally:
registry.pop(job.id)
def test_result_endpoint_attachment_download_is_one_shot():
"""Single/zip results are MB-scale attachment downloads that the
browser fetches once. Keep the existing free-after-deliver behavior
so we don't pin those bytes in memory for the full TTL.
"""
from app.main import registry
job = registry.create()
try:
registry.finish_success(
job,
JobResult(
body=b"PK\x03\x04zipbytes",
media_type="application/zip",
filename="guide.zip",
),
)
client = TestClient(app)
first = client.get(f"/result/{job.id}")
assert first.status_code == 200
assert first.content == b"PK\x03\x04zipbytes"
second = client.get(f"/result/{job.id}")
assert second.status_code == 410
finally:
registry.pop(job.id)
def test_job_registry_emit_and_finish():
reg = JobRegistry()
job = reg.create()
reg.emit(job, type="progress", percent=10, stage="scene_detect", message="Detecting scenes...")
reg.emit(job, type="progress", percent=50, stage="ocr", message="OCR 1/2")
reg.finish_success(job, JobResult(body=b"<html></html>", media_type="text/html"))
seen: list[dict] = []
while True:
try:
seen.append(job.events.get_nowait())
except Exception:
break
assert seen[0]["type"] == "progress"
assert seen[0]["stage"] == "scene_detect"
assert seen[-1]["type"] == "done"
assert job.status == "done"
assert job.result.media_type == "text/html"
@requires_media
def test_full_sse_job_lifecycle(tmp_path):
"""End-to-end: POST /jobs, stream events, fetch result."""
client = TestClient(app)
with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t:
resp = client.post(
"/jobs",
data={
"title": "SSE Test",
"format": "single",
"skip_ocr": "true",
"max_frames": "2",
},
files={
"video": (VIDEO.name, v, "video/mp4"),
"transcript": (TRANSCRIPT.name, t, "application/x-subrip"),
},
)
assert resp.status_code == 202, resp.text
payload = resp.json()
job_id = payload["job_id"]
events_url = payload["events_url"]
result_url = payload["result_url"]
# Stream events synchronously; collect until 'done' or 'error'.
events: list[dict] = []
with client.stream("GET", events_url) as es:
for raw in es.iter_lines():
if not raw or not raw.startswith("data: "):
continue
data = json.loads(raw[len("data: "):])
events.append(data)
if data.get("type") in ("done", "error"):
break
assert events[-1]["type"] == "done", events[-1]
# We should have seen at least one progress event before done.
assert any(e["type"] == "progress" for e in events)
# Pick up the result.
resp = client.get(result_url)
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("text/html")
assert b"<title>SSE Test</title>" in resp.content
def test_review_readstate_serializes_audio_filename():
"""The review-mode editor's readState() builds the JSON the ZIP
endpoint renders from. The audio element carries data-audio-filename,
but if readState() doesn't pick it up, page.segments[*].audio_filename
is empty and the bundled ZIP silently drops all per-segment audio
even though the job has the bytes in memory.
"""
review_template = REPO_ROOT / "app" / "templates" / "review.html"
src = review_template.read_text()
# The audio element exposes the filename to the editor.
assert 'data-audio-filename="{{ seg.audio_filename }}"' in src
# readState() must read it…
assert "data-audio-filename" in src.split("function readState")[1].split("function ")[0]
# …and emit it on the per-segment object so PageMetadata.segments[*]
# carries audio_filename through to the ZIP renderer.
readstate_body = src.split("function readState")[1].split("function ")[0]
assert "audio_filename:" in readstate_body
def test_zip_endpoint_rejects_path_traversal_filenames(tmp_path):
"""A crafted state.segments[*].{image,audio}_filename must not pull
arbitrary readable files into the bundle. The endpoint should drop
any filename that doesn't match the pipeline's own naming patterns
so `frames_dir / filename` can never resolve outside the temp dir.
"""
import io
import zipfile
from app.main import registry
# Plant a sentinel file outside the temp `static/` workdir that a
# traversal payload would otherwise read.
secret = tmp_path / "outside_secret.txt"
secret.write_text("sentinel-secret-bytes")
job = registry.create()
try:
client = TestClient(app)
# No `primary_image_data_uris` entries are written for these
# bogus filenames (they fail the upload-time regex), so the only
# way the bundler could include them is via the unvalidated
# `frames_dir / s.image_filename` join.
# Two `..` segments to climb out of both `static/` and the
# `vgm_zip_*` workdir mkdtemp picks under TMPDIR.
traversal_image = f"../../{secret.name}"
traversal_audio = f"../../{secret.name}"
body = {
"state": {
"title": "Traversal Test",
"frames_dir": "static",
"segments": [
{
"index": 0,
"start_seconds": 0.0,
"end_seconds": 1.0,
"image_filename": traversal_image,
"section_title": "S",
"alt_text": "",
"audio_filename": traversal_audio,
}
],
},
"primary_image_data_uris": {},
}
# We need `frames_dir` in the handler to live under `tmp_path`
# so the `..` payload would actually resolve to our sentinel.
# The handler uses `tempfile.mkdtemp(prefix="vgm_zip_")`, which
# honors $TMPDIR — point it at tmp_path for this test.
import os
old_tmpdir = os.environ.get("TMPDIR")
os.environ["TMPDIR"] = str(tmp_path)
try:
import tempfile as _tempfile
_tempfile.tempdir = str(tmp_path)
resp = client.post(f"/jobs/{job.id}/zip", json=body)
finally:
_tempfile.tempdir = None
if old_tmpdir is None:
os.environ.pop("TMPDIR", None)
else:
os.environ["TMPDIR"] = old_tmpdir
assert resp.status_code == 200, resp.text
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
names = zf.namelist()
# The sentinel file's basename must not appear anywhere in the bundle.
assert not any(secret.name in n for n in names), names
finally:
registry.pop(job.id)