"""Tests for progress event emission and the SSE/job endpoints.""" from __future__ import annotations import json from pathlib import Path import pytest from fastapi.testclient import TestClient from app.jobs import Job, JobResult, JobRegistry from app.main import app REPO_ROOT = Path(__file__).resolve().parents[1] VIDEO = REPO_ROOT / "smaller.m4v" TRANSCRIPT = REPO_ROOT / "M14_L3_S3.srt" requires_media = pytest.mark.skipif( not (VIDEO.exists() and TRANSCRIPT.exists()), reason="sample media not present", ) def test_run_pipeline_emits_progress_events_in_order(tmp_path): """The orchestrator should emit progress events covering each stage.""" pytest.importorskip("cv2") from app.pipeline.orchestrator import ( PipelineInputs, ProgressEvent, run_pipeline, PipelineError, ) if not VIDEO.exists() or not TRANSCRIPT.exists(): pytest.skip("media not present") events: list[ProgressEvent] = [] inputs = PipelineInputs( video_path=VIDEO, transcript_path=TRANSCRIPT, frames_dir=tmp_path / "static", title="Progress Test", max_frames=2, skip_ocr=True, ) try: run_pipeline(inputs, progress=events.append) except PipelineError: pytest.skip("pipeline could not produce segments on this clip") # Must start before 100% and end at 100% with stage 'done'. assert events, "no progress events" assert events[0].percent < 100 assert events[-1].stage == "done" assert events[-1].percent == 100 # Percent should be monotonically non-decreasing. pcts = [e.percent for e in events] assert pcts == sorted(pcts), f"percents not monotonically increasing: {pcts}" # Each expected stage shows up at least once. stages = {e.stage for e in events} assert {"scene_detect", "filter", "transcript", "ocr", "render", "done"} <= stages @requires_media def test_jobs_endpoint_accepts_blank_max_frames(tmp_path): """Regression: an empty max_frames input must not 422 on int coercion.""" client = TestClient(app) with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t: resp = client.post( "/jobs", data={ "title": "Blank Max", "format": "single", "max_frames": "", # the bug: form sent "" when user left it blank "skip_ocr": "true", }, files={ "video": (VIDEO.name, v, "video/mp4"), "transcript": (TRANSCRIPT.name, t, "application/x-subrip"), }, ) assert resp.status_code == 202, resp.text def test_jobs_endpoint_rejects_unknown_video_type(tmp_path): client = TestClient(app) bogus = tmp_path / "fake.txt" bogus.write_text("not a video") srt = tmp_path / "t.srt" srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") with bogus.open("rb") as v, srt.open("rb") as t: resp = client.post( "/jobs", data={"title": "x"}, files={ "video": ("fake.txt", v, "text/plain"), "transcript": ("t.srt", t, "application/x-subrip"), }, ) assert resp.status_code == 400 def test_jobs_endpoint_rejects_unknown_format(tmp_path): client = TestClient(app) mp4 = tmp_path / "x.mp4" mp4.write_bytes(b"fake") srt = tmp_path / "t.srt" srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") with mp4.open("rb") as v, srt.open("rb") as t: resp = client.post( "/jobs", data={"title": "x", "format": "evil-format"}, files={ "video": ("x.mp4", v, "video/mp4"), "transcript": ("t.srt", t, "application/x-subrip"), }, ) assert resp.status_code == 400 assert "format" in resp.text.lower() def test_jobs_endpoint_cleans_workdir_on_save_failure(tmp_path): # Triggering a 400 after the workdir is mkdtemp'd: a malformed # max_frames raises during _parse_optional_int. The handler's # try/except must remove the freshly-created vgm_ directory. import glob import os import tempfile as _t client = TestClient(app) before = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*"))) mp4 = tmp_path / "x.mp4" mp4.write_bytes(b"fake") srt = tmp_path / "t.srt" srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") with mp4.open("rb") as v, srt.open("rb") as t: resp = client.post( "/jobs", data={"title": "x", "max_frames": "not-an-int"}, files={ "video": ("x.mp4", v, "video/mp4"), "transcript": ("t.srt", t, "application/x-subrip"), }, ) assert resp.status_code == 400 after = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*"))) assert after - before == set(), f"leaked workdir(s): {after - before}" def test_events_endpoint_404_for_unknown_job(): client = TestClient(app) resp = client.get("/events/no-such-id", headers={"Accept": "text/event-stream"}) assert resp.status_code == 404 def test_result_endpoint_404_for_unknown_job(): client = TestClient(app) resp = client.get("/result/no-such-id") assert resp.status_code == 404 def test_result_endpoint_review_html_survives_repeat_get(): """Review-mode results have no attachment filename; the upload page loads them into an iframe AND lets the user "Open in new tab", which re-fetches the same URL. The body must remain readable for the job's TTL — not be cleared after the first GET. """ from app.main import registry job = registry.create() try: registry.finish_success( job, JobResult(body=b"review", media_type="text/html; charset=utf-8"), ) client = TestClient(app) first = client.get(f"/result/{job.id}") assert first.status_code == 200 assert first.content == b"review" second = client.get(f"/result/{job.id}") assert second.status_code == 200, second.text assert second.content == b"review" finally: registry.pop(job.id) def test_result_endpoint_attachment_download_is_one_shot(): """Single/zip results are MB-scale attachment downloads that the browser fetches once. Keep the existing free-after-deliver behavior so we don't pin those bytes in memory for the full TTL. """ from app.main import registry job = registry.create() try: registry.finish_success( job, JobResult( body=b"PK\x03\x04zipbytes", media_type="application/zip", filename="guide.zip", ), ) client = TestClient(app) first = client.get(f"/result/{job.id}") assert first.status_code == 200 assert first.content == b"PK\x03\x04zipbytes" second = client.get(f"/result/{job.id}") assert second.status_code == 410 finally: registry.pop(job.id) def test_job_registry_emit_and_finish(): reg = JobRegistry() job = reg.create() reg.emit(job, type="progress", percent=10, stage="scene_detect", message="Detecting scenes...") reg.emit(job, type="progress", percent=50, stage="ocr", message="OCR 1/2") reg.finish_success(job, JobResult(body=b"", media_type="text/html")) seen: list[dict] = [] while True: try: seen.append(job.events.get_nowait()) except Exception: break assert seen[0]["type"] == "progress" assert seen[0]["stage"] == "scene_detect" assert seen[-1]["type"] == "done" assert job.status == "done" assert job.result.media_type == "text/html" @requires_media def test_full_sse_job_lifecycle(tmp_path): """End-to-end: POST /jobs, stream events, fetch result.""" client = TestClient(app) with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t: resp = client.post( "/jobs", data={ "title": "SSE Test", "format": "single", "skip_ocr": "true", "max_frames": "2", }, files={ "video": (VIDEO.name, v, "video/mp4"), "transcript": (TRANSCRIPT.name, t, "application/x-subrip"), }, ) assert resp.status_code == 202, resp.text payload = resp.json() job_id = payload["job_id"] events_url = payload["events_url"] result_url = payload["result_url"] # Stream events synchronously; collect until 'done' or 'error'. events: list[dict] = [] with client.stream("GET", events_url) as es: for raw in es.iter_lines(): if not raw or not raw.startswith("data: "): continue data = json.loads(raw[len("data: "):]) events.append(data) if data.get("type") in ("done", "error"): break assert events[-1]["type"] == "done", events[-1] # We should have seen at least one progress event before done. assert any(e["type"] == "progress" for e in events) # Pick up the result. resp = client.get(result_url) assert resp.status_code == 200 assert resp.headers["content-type"].startswith("text/html") assert b"SSE Test" in resp.content def test_review_readstate_serializes_audio_filename(): """The review-mode editor's readState() builds the JSON the ZIP endpoint renders from. The audio element carries data-audio-filename, but if readState() doesn't pick it up, page.segments[*].audio_filename is empty and the bundled ZIP silently drops all per-segment audio even though the job has the bytes in memory. """ review_template = REPO_ROOT / "app" / "templates" / "review.html" src = review_template.read_text() # The audio element exposes the filename to the editor. assert 'data-audio-filename="{{ seg.audio_filename }}"' in src # readState() must read it… assert "data-audio-filename" in src.split("function readState")[1].split("function ")[0] # …and emit it on the per-segment object so PageMetadata.segments[*] # carries audio_filename through to the ZIP renderer. readstate_body = src.split("function readState")[1].split("function ")[0] assert "audio_filename:" in readstate_body def test_zip_endpoint_rejects_path_traversal_filenames(tmp_path): """A crafted state.segments[*].{image,audio}_filename must not pull arbitrary readable files into the bundle. The endpoint should drop any filename that doesn't match the pipeline's own naming patterns so `frames_dir / filename` can never resolve outside the temp dir. """ import io import zipfile from app.main import registry # Plant a sentinel file outside the temp `static/` workdir that a # traversal payload would otherwise read. secret = tmp_path / "outside_secret.txt" secret.write_text("sentinel-secret-bytes") job = registry.create() try: client = TestClient(app) # No `primary_image_data_uris` entries are written for these # bogus filenames (they fail the upload-time regex), so the only # way the bundler could include them is via the unvalidated # `frames_dir / s.image_filename` join. # Two `..` segments to climb out of both `static/` and the # `vgm_zip_*` workdir mkdtemp picks under TMPDIR. traversal_image = f"../../{secret.name}" traversal_audio = f"../../{secret.name}" body = { "state": { "title": "Traversal Test", "frames_dir": "static", "segments": [ { "index": 0, "start_seconds": 0.0, "end_seconds": 1.0, "image_filename": traversal_image, "section_title": "S", "alt_text": "", "audio_filename": traversal_audio, } ], }, "primary_image_data_uris": {}, } # We need `frames_dir` in the handler to live under `tmp_path` # so the `..` payload would actually resolve to our sentinel. # The handler uses `tempfile.mkdtemp(prefix="vgm_zip_")`, which # honors $TMPDIR — point it at tmp_path for this test. import os old_tmpdir = os.environ.get("TMPDIR") os.environ["TMPDIR"] = str(tmp_path) try: import tempfile as _tempfile _tempfile.tempdir = str(tmp_path) resp = client.post(f"/jobs/{job.id}/zip", json=body) finally: _tempfile.tempdir = None if old_tmpdir is None: os.environ.pop("TMPDIR", None) else: os.environ["TMPDIR"] = old_tmpdir assert resp.status_code == 200, resp.text with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: names = zf.namelist() # The sentinel file's basename must not appear anywhere in the bundle. assert not any(secret.name in n for n in names), names finally: registry.pop(job.id)