Spaces:
Running
Running
File size: 13,363 Bytes
4b79268 c7cacd1 4b79268 06c75aa 4b79268 e845d2d 4b79268 a037749 e845d2d a037749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | """Tests for progress event emission and the SSE/job endpoints."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from app.jobs import Job, JobResult, JobRegistry
from app.main import app
REPO_ROOT = Path(__file__).resolve().parents[1]
VIDEO = REPO_ROOT / "smaller.m4v"
TRANSCRIPT = REPO_ROOT / "M14_L3_S3.srt"
requires_media = pytest.mark.skipif(
not (VIDEO.exists() and TRANSCRIPT.exists()),
reason="sample media not present",
)
def test_run_pipeline_emits_progress_events_in_order(tmp_path):
"""The orchestrator should emit progress events covering each stage."""
pytest.importorskip("cv2")
from app.pipeline.orchestrator import (
PipelineInputs,
ProgressEvent,
run_pipeline,
PipelineError,
)
if not VIDEO.exists() or not TRANSCRIPT.exists():
pytest.skip("media not present")
events: list[ProgressEvent] = []
inputs = PipelineInputs(
video_path=VIDEO,
transcript_path=TRANSCRIPT,
frames_dir=tmp_path / "static",
title="Progress Test",
max_frames=2,
skip_ocr=True,
)
try:
run_pipeline(inputs, progress=events.append)
except PipelineError:
pytest.skip("pipeline could not produce segments on this clip")
# Must start before 100% and end at 100% with stage 'done'.
assert events, "no progress events"
assert events[0].percent < 100
assert events[-1].stage == "done"
assert events[-1].percent == 100
# Percent should be monotonically non-decreasing.
pcts = [e.percent for e in events]
assert pcts == sorted(pcts), f"percents not monotonically increasing: {pcts}"
# Each expected stage shows up at least once.
stages = {e.stage for e in events}
assert {"scene_detect", "filter", "transcript", "ocr", "render", "done"} <= stages
@requires_media
def test_jobs_endpoint_accepts_blank_max_frames(tmp_path):
"""Regression: an empty max_frames input must not 422 on int coercion."""
client = TestClient(app)
with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t:
resp = client.post(
"/jobs",
data={
"title": "Blank Max",
"format": "single",
"max_frames": "", # the bug: form sent "" when user left it blank
"skip_ocr": "true",
},
files={
"video": (VIDEO.name, v, "video/mp4"),
"transcript": (TRANSCRIPT.name, t, "application/x-subrip"),
},
)
assert resp.status_code == 202, resp.text
def test_jobs_endpoint_rejects_unknown_video_type(tmp_path):
client = TestClient(app)
bogus = tmp_path / "fake.txt"
bogus.write_text("not a video")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with bogus.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x"},
files={
"video": ("fake.txt", v, "text/plain"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
def test_jobs_endpoint_rejects_unknown_format(tmp_path):
client = TestClient(app)
mp4 = tmp_path / "x.mp4"
mp4.write_bytes(b"fake")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with mp4.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x", "format": "evil-format"},
files={
"video": ("x.mp4", v, "video/mp4"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
assert "format" in resp.text.lower()
def test_jobs_endpoint_cleans_workdir_on_save_failure(tmp_path):
# Triggering a 400 after the workdir is mkdtemp'd: a malformed
# max_frames raises during _parse_optional_int. The handler's
# try/except must remove the freshly-created vgm_ directory.
import glob
import os
import tempfile as _t
client = TestClient(app)
before = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*")))
mp4 = tmp_path / "x.mp4"
mp4.write_bytes(b"fake")
srt = tmp_path / "t.srt"
srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n")
with mp4.open("rb") as v, srt.open("rb") as t:
resp = client.post(
"/jobs",
data={"title": "x", "max_frames": "not-an-int"},
files={
"video": ("x.mp4", v, "video/mp4"),
"transcript": ("t.srt", t, "application/x-subrip"),
},
)
assert resp.status_code == 400
after = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*")))
assert after - before == set(), f"leaked workdir(s): {after - before}"
def test_events_endpoint_404_for_unknown_job():
client = TestClient(app)
resp = client.get("/events/no-such-id", headers={"Accept": "text/event-stream"})
assert resp.status_code == 404
def test_result_endpoint_404_for_unknown_job():
client = TestClient(app)
resp = client.get("/result/no-such-id")
assert resp.status_code == 404
def test_result_endpoint_review_html_survives_repeat_get():
"""Review-mode results have no attachment filename; the upload page
loads them into an iframe AND lets the user "Open in new tab",
which re-fetches the same URL. The body must remain readable for
the job's TTL — not be cleared after the first GET.
"""
from app.main import registry
job = registry.create()
try:
registry.finish_success(
job,
JobResult(body=b"<html>review</html>", media_type="text/html; charset=utf-8"),
)
client = TestClient(app)
first = client.get(f"/result/{job.id}")
assert first.status_code == 200
assert first.content == b"<html>review</html>"
second = client.get(f"/result/{job.id}")
assert second.status_code == 200, second.text
assert second.content == b"<html>review</html>"
finally:
registry.pop(job.id)
def test_result_endpoint_attachment_download_is_one_shot():
"""Single/zip results are MB-scale attachment downloads that the
browser fetches once. Keep the existing free-after-deliver behavior
so we don't pin those bytes in memory for the full TTL.
"""
from app.main import registry
job = registry.create()
try:
registry.finish_success(
job,
JobResult(
body=b"PK\x03\x04zipbytes",
media_type="application/zip",
filename="guide.zip",
),
)
client = TestClient(app)
first = client.get(f"/result/{job.id}")
assert first.status_code == 200
assert first.content == b"PK\x03\x04zipbytes"
second = client.get(f"/result/{job.id}")
assert second.status_code == 410
finally:
registry.pop(job.id)
def test_job_registry_emit_and_finish():
reg = JobRegistry()
job = reg.create()
reg.emit(job, type="progress", percent=10, stage="scene_detect", message="Detecting scenes...")
reg.emit(job, type="progress", percent=50, stage="ocr", message="OCR 1/2")
reg.finish_success(job, JobResult(body=b"<html></html>", media_type="text/html"))
seen: list[dict] = []
while True:
try:
seen.append(job.events.get_nowait())
except Exception:
break
assert seen[0]["type"] == "progress"
assert seen[0]["stage"] == "scene_detect"
assert seen[-1]["type"] == "done"
assert job.status == "done"
assert job.result.media_type == "text/html"
@requires_media
def test_full_sse_job_lifecycle(tmp_path):
"""End-to-end: POST /jobs, stream events, fetch result."""
client = TestClient(app)
with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t:
resp = client.post(
"/jobs",
data={
"title": "SSE Test",
"format": "single",
"skip_ocr": "true",
"max_frames": "2",
},
files={
"video": (VIDEO.name, v, "video/mp4"),
"transcript": (TRANSCRIPT.name, t, "application/x-subrip"),
},
)
assert resp.status_code == 202, resp.text
payload = resp.json()
job_id = payload["job_id"]
events_url = payload["events_url"]
result_url = payload["result_url"]
# Stream events synchronously; collect until 'done' or 'error'.
events: list[dict] = []
with client.stream("GET", events_url) as es:
for raw in es.iter_lines():
if not raw or not raw.startswith("data: "):
continue
data = json.loads(raw[len("data: "):])
events.append(data)
if data.get("type") in ("done", "error"):
break
assert events[-1]["type"] == "done", events[-1]
# We should have seen at least one progress event before done.
assert any(e["type"] == "progress" for e in events)
# Pick up the result.
resp = client.get(result_url)
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("text/html")
assert b"<title>SSE Test</title>" in resp.content
def test_review_readstate_serializes_audio_filename():
"""The review-mode editor's readState() builds the JSON the ZIP
endpoint renders from. The audio element carries data-audio-filename,
but if readState() doesn't pick it up, page.segments[*].audio_filename
is empty and the bundled ZIP silently drops all per-segment audio
even though the job has the bytes in memory.
"""
review_template = REPO_ROOT / "app" / "templates" / "review.html"
src = review_template.read_text()
# The audio element exposes the filename to the editor.
assert 'data-audio-filename="{{ seg.audio_filename }}"' in src
# readState() must read it…
assert "data-audio-filename" in src.split("function readState")[1].split("function ")[0]
# …and emit it on the per-segment object so PageMetadata.segments[*]
# carries audio_filename through to the ZIP renderer.
readstate_body = src.split("function readState")[1].split("function ")[0]
assert "audio_filename:" in readstate_body
def test_zip_endpoint_rejects_path_traversal_filenames(tmp_path):
"""A crafted state.segments[*].{image,audio}_filename must not pull
arbitrary readable files into the bundle. The endpoint should drop
any filename that doesn't match the pipeline's own naming patterns
so `frames_dir / filename` can never resolve outside the temp dir.
"""
import io
import zipfile
from app.main import registry
# Plant a sentinel file outside the temp `static/` workdir that a
# traversal payload would otherwise read.
secret = tmp_path / "outside_secret.txt"
secret.write_text("sentinel-secret-bytes")
job = registry.create()
try:
client = TestClient(app)
# No `primary_image_data_uris` entries are written for these
# bogus filenames (they fail the upload-time regex), so the only
# way the bundler could include them is via the unvalidated
# `frames_dir / s.image_filename` join.
# Two `..` segments to climb out of both `static/` and the
# `vgm_zip_*` workdir mkdtemp picks under TMPDIR.
traversal_image = f"../../{secret.name}"
traversal_audio = f"../../{secret.name}"
body = {
"state": {
"title": "Traversal Test",
"frames_dir": "static",
"segments": [
{
"index": 0,
"start_seconds": 0.0,
"end_seconds": 1.0,
"image_filename": traversal_image,
"section_title": "S",
"alt_text": "",
"audio_filename": traversal_audio,
}
],
},
"primary_image_data_uris": {},
}
# We need `frames_dir` in the handler to live under `tmp_path`
# so the `..` payload would actually resolve to our sentinel.
# The handler uses `tempfile.mkdtemp(prefix="vgm_zip_")`, which
# honors $TMPDIR — point it at tmp_path for this test.
import os
old_tmpdir = os.environ.get("TMPDIR")
os.environ["TMPDIR"] = str(tmp_path)
try:
import tempfile as _tempfile
_tempfile.tempdir = str(tmp_path)
resp = client.post(f"/jobs/{job.id}/zip", json=body)
finally:
_tempfile.tempdir = None
if old_tmpdir is None:
os.environ.pop("TMPDIR", None)
else:
os.environ["TMPDIR"] = old_tmpdir
assert resp.status_code == 200, resp.text
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
names = zf.namelist()
# The sentinel file's basename must not appear anywhere in the bundle.
assert not any(secret.name in n for n in names), names
finally:
registry.pop(job.id)
|