Spaces:
Running
Running
| """Tests for progress event emission and the SSE/job endpoints.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| from app.jobs import Job, JobResult, JobRegistry | |
| from app.main import app | |
| REPO_ROOT = Path(__file__).resolve().parents[1] | |
| VIDEO = REPO_ROOT / "smaller.m4v" | |
| TRANSCRIPT = REPO_ROOT / "M14_L3_S3.srt" | |
| requires_media = pytest.mark.skipif( | |
| not (VIDEO.exists() and TRANSCRIPT.exists()), | |
| reason="sample media not present", | |
| ) | |
| def test_run_pipeline_emits_progress_events_in_order(tmp_path): | |
| """The orchestrator should emit progress events covering each stage.""" | |
| pytest.importorskip("cv2") | |
| from app.pipeline.orchestrator import ( | |
| PipelineInputs, | |
| ProgressEvent, | |
| run_pipeline, | |
| PipelineError, | |
| ) | |
| if not VIDEO.exists() or not TRANSCRIPT.exists(): | |
| pytest.skip("media not present") | |
| events: list[ProgressEvent] = [] | |
| inputs = PipelineInputs( | |
| video_path=VIDEO, | |
| transcript_path=TRANSCRIPT, | |
| frames_dir=tmp_path / "static", | |
| title="Progress Test", | |
| max_frames=2, | |
| skip_ocr=True, | |
| ) | |
| try: | |
| run_pipeline(inputs, progress=events.append) | |
| except PipelineError: | |
| pytest.skip("pipeline could not produce segments on this clip") | |
| # Must start before 100% and end at 100% with stage 'done'. | |
| assert events, "no progress events" | |
| assert events[0].percent < 100 | |
| assert events[-1].stage == "done" | |
| assert events[-1].percent == 100 | |
| # Percent should be monotonically non-decreasing. | |
| pcts = [e.percent for e in events] | |
| assert pcts == sorted(pcts), f"percents not monotonically increasing: {pcts}" | |
| # Each expected stage shows up at least once. | |
| stages = {e.stage for e in events} | |
| assert {"scene_detect", "filter", "transcript", "ocr", "render", "done"} <= stages | |
| def test_jobs_endpoint_accepts_blank_max_frames(tmp_path): | |
| """Regression: an empty max_frames input must not 422 on int coercion.""" | |
| client = TestClient(app) | |
| with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t: | |
| resp = client.post( | |
| "/jobs", | |
| data={ | |
| "title": "Blank Max", | |
| "format": "single", | |
| "max_frames": "", # the bug: form sent "" when user left it blank | |
| "skip_ocr": "true", | |
| }, | |
| files={ | |
| "video": (VIDEO.name, v, "video/mp4"), | |
| "transcript": (TRANSCRIPT.name, t, "application/x-subrip"), | |
| }, | |
| ) | |
| assert resp.status_code == 202, resp.text | |
| def test_jobs_endpoint_rejects_unknown_video_type(tmp_path): | |
| client = TestClient(app) | |
| bogus = tmp_path / "fake.txt" | |
| bogus.write_text("not a video") | |
| srt = tmp_path / "t.srt" | |
| srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") | |
| with bogus.open("rb") as v, srt.open("rb") as t: | |
| resp = client.post( | |
| "/jobs", | |
| data={"title": "x"}, | |
| files={ | |
| "video": ("fake.txt", v, "text/plain"), | |
| "transcript": ("t.srt", t, "application/x-subrip"), | |
| }, | |
| ) | |
| assert resp.status_code == 400 | |
| def test_jobs_endpoint_rejects_unknown_format(tmp_path): | |
| client = TestClient(app) | |
| mp4 = tmp_path / "x.mp4" | |
| mp4.write_bytes(b"fake") | |
| srt = tmp_path / "t.srt" | |
| srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") | |
| with mp4.open("rb") as v, srt.open("rb") as t: | |
| resp = client.post( | |
| "/jobs", | |
| data={"title": "x", "format": "evil-format"}, | |
| files={ | |
| "video": ("x.mp4", v, "video/mp4"), | |
| "transcript": ("t.srt", t, "application/x-subrip"), | |
| }, | |
| ) | |
| assert resp.status_code == 400 | |
| assert "format" in resp.text.lower() | |
| def test_jobs_endpoint_cleans_workdir_on_save_failure(tmp_path): | |
| # Triggering a 400 after the workdir is mkdtemp'd: a malformed | |
| # max_frames raises during _parse_optional_int. The handler's | |
| # try/except must remove the freshly-created vgm_ directory. | |
| import glob | |
| import os | |
| import tempfile as _t | |
| client = TestClient(app) | |
| before = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*"))) | |
| mp4 = tmp_path / "x.mp4" | |
| mp4.write_bytes(b"fake") | |
| srt = tmp_path / "t.srt" | |
| srt.write_text("1\n00:00:00,000 --> 00:00:01,000\nhello\n") | |
| with mp4.open("rb") as v, srt.open("rb") as t: | |
| resp = client.post( | |
| "/jobs", | |
| data={"title": "x", "max_frames": "not-an-int"}, | |
| files={ | |
| "video": ("x.mp4", v, "video/mp4"), | |
| "transcript": ("t.srt", t, "application/x-subrip"), | |
| }, | |
| ) | |
| assert resp.status_code == 400 | |
| after = set(glob.glob(os.path.join(_t.gettempdir(), "vgm_*"))) | |
| assert after - before == set(), f"leaked workdir(s): {after - before}" | |
| def test_events_endpoint_404_for_unknown_job(): | |
| client = TestClient(app) | |
| resp = client.get("/events/no-such-id", headers={"Accept": "text/event-stream"}) | |
| assert resp.status_code == 404 | |
| def test_result_endpoint_404_for_unknown_job(): | |
| client = TestClient(app) | |
| resp = client.get("/result/no-such-id") | |
| assert resp.status_code == 404 | |
| def test_result_endpoint_review_html_survives_repeat_get(): | |
| """Review-mode results have no attachment filename; the upload page | |
| loads them into an iframe AND lets the user "Open in new tab", | |
| which re-fetches the same URL. The body must remain readable for | |
| the job's TTL — not be cleared after the first GET. | |
| """ | |
| from app.main import registry | |
| job = registry.create() | |
| try: | |
| registry.finish_success( | |
| job, | |
| JobResult(body=b"<html>review</html>", media_type="text/html; charset=utf-8"), | |
| ) | |
| client = TestClient(app) | |
| first = client.get(f"/result/{job.id}") | |
| assert first.status_code == 200 | |
| assert first.content == b"<html>review</html>" | |
| second = client.get(f"/result/{job.id}") | |
| assert second.status_code == 200, second.text | |
| assert second.content == b"<html>review</html>" | |
| finally: | |
| registry.pop(job.id) | |
| def test_result_endpoint_attachment_download_is_one_shot(): | |
| """Single/zip results are MB-scale attachment downloads that the | |
| browser fetches once. Keep the existing free-after-deliver behavior | |
| so we don't pin those bytes in memory for the full TTL. | |
| """ | |
| from app.main import registry | |
| job = registry.create() | |
| try: | |
| registry.finish_success( | |
| job, | |
| JobResult( | |
| body=b"PK\x03\x04zipbytes", | |
| media_type="application/zip", | |
| filename="guide.zip", | |
| ), | |
| ) | |
| client = TestClient(app) | |
| first = client.get(f"/result/{job.id}") | |
| assert first.status_code == 200 | |
| assert first.content == b"PK\x03\x04zipbytes" | |
| second = client.get(f"/result/{job.id}") | |
| assert second.status_code == 410 | |
| finally: | |
| registry.pop(job.id) | |
| def test_job_registry_emit_and_finish(): | |
| reg = JobRegistry() | |
| job = reg.create() | |
| reg.emit(job, type="progress", percent=10, stage="scene_detect", message="Detecting scenes...") | |
| reg.emit(job, type="progress", percent=50, stage="ocr", message="OCR 1/2") | |
| reg.finish_success(job, JobResult(body=b"<html></html>", media_type="text/html")) | |
| seen: list[dict] = [] | |
| while True: | |
| try: | |
| seen.append(job.events.get_nowait()) | |
| except Exception: | |
| break | |
| assert seen[0]["type"] == "progress" | |
| assert seen[0]["stage"] == "scene_detect" | |
| assert seen[-1]["type"] == "done" | |
| assert job.status == "done" | |
| assert job.result.media_type == "text/html" | |
| def test_full_sse_job_lifecycle(tmp_path): | |
| """End-to-end: POST /jobs, stream events, fetch result.""" | |
| client = TestClient(app) | |
| with VIDEO.open("rb") as v, TRANSCRIPT.open("rb") as t: | |
| resp = client.post( | |
| "/jobs", | |
| data={ | |
| "title": "SSE Test", | |
| "format": "single", | |
| "skip_ocr": "true", | |
| "max_frames": "2", | |
| }, | |
| files={ | |
| "video": (VIDEO.name, v, "video/mp4"), | |
| "transcript": (TRANSCRIPT.name, t, "application/x-subrip"), | |
| }, | |
| ) | |
| assert resp.status_code == 202, resp.text | |
| payload = resp.json() | |
| job_id = payload["job_id"] | |
| events_url = payload["events_url"] | |
| result_url = payload["result_url"] | |
| # Stream events synchronously; collect until 'done' or 'error'. | |
| events: list[dict] = [] | |
| with client.stream("GET", events_url) as es: | |
| for raw in es.iter_lines(): | |
| if not raw or not raw.startswith("data: "): | |
| continue | |
| data = json.loads(raw[len("data: "):]) | |
| events.append(data) | |
| if data.get("type") in ("done", "error"): | |
| break | |
| assert events[-1]["type"] == "done", events[-1] | |
| # We should have seen at least one progress event before done. | |
| assert any(e["type"] == "progress" for e in events) | |
| # Pick up the result. | |
| resp = client.get(result_url) | |
| assert resp.status_code == 200 | |
| assert resp.headers["content-type"].startswith("text/html") | |
| assert b"<title>SSE Test</title>" in resp.content | |
| def test_review_readstate_serializes_audio_filename(): | |
| """The review-mode editor's readState() builds the JSON the ZIP | |
| endpoint renders from. The audio element carries data-audio-filename, | |
| but if readState() doesn't pick it up, page.segments[*].audio_filename | |
| is empty and the bundled ZIP silently drops all per-segment audio | |
| even though the job has the bytes in memory. | |
| """ | |
| review_template = REPO_ROOT / "app" / "templates" / "review.html" | |
| src = review_template.read_text() | |
| # The audio element exposes the filename to the editor. | |
| assert 'data-audio-filename="{{ seg.audio_filename }}"' in src | |
| # readState() must read it… | |
| assert "data-audio-filename" in src.split("function readState")[1].split("function ")[0] | |
| # …and emit it on the per-segment object so PageMetadata.segments[*] | |
| # carries audio_filename through to the ZIP renderer. | |
| readstate_body = src.split("function readState")[1].split("function ")[0] | |
| assert "audio_filename:" in readstate_body | |
| def test_zip_endpoint_rejects_path_traversal_filenames(tmp_path): | |
| """A crafted state.segments[*].{image,audio}_filename must not pull | |
| arbitrary readable files into the bundle. The endpoint should drop | |
| any filename that doesn't match the pipeline's own naming patterns | |
| so `frames_dir / filename` can never resolve outside the temp dir. | |
| """ | |
| import io | |
| import zipfile | |
| from app.main import registry | |
| # Plant a sentinel file outside the temp `static/` workdir that a | |
| # traversal payload would otherwise read. | |
| secret = tmp_path / "outside_secret.txt" | |
| secret.write_text("sentinel-secret-bytes") | |
| job = registry.create() | |
| try: | |
| client = TestClient(app) | |
| # No `primary_image_data_uris` entries are written for these | |
| # bogus filenames (they fail the upload-time regex), so the only | |
| # way the bundler could include them is via the unvalidated | |
| # `frames_dir / s.image_filename` join. | |
| # Two `..` segments to climb out of both `static/` and the | |
| # `vgm_zip_*` workdir mkdtemp picks under TMPDIR. | |
| traversal_image = f"../../{secret.name}" | |
| traversal_audio = f"../../{secret.name}" | |
| body = { | |
| "state": { | |
| "title": "Traversal Test", | |
| "frames_dir": "static", | |
| "segments": [ | |
| { | |
| "index": 0, | |
| "start_seconds": 0.0, | |
| "end_seconds": 1.0, | |
| "image_filename": traversal_image, | |
| "section_title": "S", | |
| "alt_text": "", | |
| "audio_filename": traversal_audio, | |
| } | |
| ], | |
| }, | |
| "primary_image_data_uris": {}, | |
| } | |
| # We need `frames_dir` in the handler to live under `tmp_path` | |
| # so the `..` payload would actually resolve to our sentinel. | |
| # The handler uses `tempfile.mkdtemp(prefix="vgm_zip_")`, which | |
| # honors $TMPDIR — point it at tmp_path for this test. | |
| import os | |
| old_tmpdir = os.environ.get("TMPDIR") | |
| os.environ["TMPDIR"] = str(tmp_path) | |
| try: | |
| import tempfile as _tempfile | |
| _tempfile.tempdir = str(tmp_path) | |
| resp = client.post(f"/jobs/{job.id}/zip", json=body) | |
| finally: | |
| _tempfile.tempdir = None | |
| if old_tmpdir is None: | |
| os.environ.pop("TMPDIR", None) | |
| else: | |
| os.environ["TMPDIR"] = old_tmpdir | |
| assert resp.status_code == 200, resp.text | |
| with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
| names = zf.namelist() | |
| # The sentinel file's basename must not appear anywhere in the bundle. | |
| assert not any(secret.name in n for n in names), names | |
| finally: | |
| registry.pop(job.id) | |