File size: 4,035 Bytes
652df4e
 
 
 
bbbfba8
652df4e
 
 
 
bbbfba8
 
 
652df4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Tests for the SQLite database layer."""

from __future__ import annotations

from typing import TYPE_CHECKING

from src.app.jobs.models import Job, JobStatus
from src.app.persistence.db import Database

if TYPE_CHECKING:
    from pathlib import Path


class TestDatabase:
    def test_connect_creates_tables(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()
        # Should not raise
        db.conn.execute("SELECT * FROM jobs LIMIT 1")
        db.conn.execute("SELECT * FROM providers LIMIT 1")
        db.close()

    def test_save_and_get_job(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        job = Job(
            job_id="job_001",
            provider_id="paddle",
            provider_family="word_box_json",
            source_filename="test.png",
        )
        db.save_job(job)

        loaded = db.get_job("job_001")
        assert loaded is not None
        assert loaded.job_id == "job_001"
        assert loaded.status == JobStatus.QUEUED
        assert loaded.provider_id == "paddle"
        assert loaded.source_filename == "test.png"
        db.close()

    def test_update_job(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        job = Job(job_id="job_002", provider_id="p", provider_family="f")
        db.save_job(job)

        updated = job.model_copy(update={
            "status": JobStatus.SUCCEEDED,
            "has_alto": True,
        })
        db.save_job(updated)

        loaded = db.get_job("job_002")
        assert loaded is not None
        assert loaded.status == JobStatus.SUCCEEDED
        assert loaded.has_alto is True
        db.close()

    def test_list_jobs(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        for i in range(5):
            db.save_job(Job(job_id=f"job_{i:03d}", provider_id="p", provider_family="f"))

        jobs = db.list_jobs(limit=3)
        assert len(jobs) == 3

        all_jobs = db.list_jobs()
        assert len(all_jobs) == 5
        db.close()

    def test_get_nonexistent_job(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()
        assert db.get_job("nonexistent") is None
        db.close()

    def test_job_with_warnings(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        job = Job(
            job_id="job_warn",
            provider_id="p",
            provider_family="f",
            warnings=["bbox overflow", "missing confidence"],
        )
        db.save_job(job)

        loaded = db.get_job("job_warn")
        assert loaded is not None
        assert len(loaded.warnings) == 2
        assert "bbox overflow" in loaded.warnings
        db.close()

    def test_job_with_error(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        job = Job(
            job_id="job_err",
            provider_id="p",
            provider_family="f",
            status=JobStatus.FAILED,
            error="Provider timeout",
        )
        db.save_job(job)

        loaded = db.get_job("job_err")
        assert loaded is not None
        assert loaded.status == JobStatus.FAILED
        assert loaded.error == "Provider timeout"
        db.close()

    def test_provider_crud(self, tmp_storage: Path) -> None:
        db = Database(tmp_storage / "test.db")
        db.connect()

        data = {"provider_id": "paddle", "family": "word_box_json"}
        db.save_provider_record("paddle", data)

        loaded = db.get_provider_record("paddle")
        assert loaded is not None
        assert loaded["provider_id"] == "paddle"

        all_providers = db.list_provider_records()
        assert len(all_providers) == 1

        assert db.delete_provider_record("paddle")
        assert db.get_provider_record("paddle") is None
        db.close()