File size: 7,904 Bytes
0d7db8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
0d7db8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca0e0
a71301e
 
 
0d7db8e
a71301e
 
 
 
 
a2ca0e0
a71301e
0d7db8e
 
 
 
 
 
 
 
 
 
 
a71301e
 
 
0d7db8e
 
 
 
 
 
 
 
 
a2ca0e0
a71301e
 
0d7db8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a71301e
 
 
a2ca0e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a71301e
 
a2ca0e0
 
0d7db8e
a2ca0e0
0d7db8e
a2ca0e0
0d7db8e
 
 
 
 
 
a2ca0e0
0d7db8e
 
a2ca0e0
0d7db8e
 
 
 
 
a2ca0e0
 
 
 
 
 
 
 
 
 
 
 
 
0d7db8e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""SQLite-backed append-only ledger β€” the persistent backend for long-running scenarios.

Replaces the in-memory Ledger for production use.  The in-memory Ledger remains
valid for tests and short demo runs where persistence is not required.

Design decisions:
  - UNIQUE constraint on id enforces idempotency at the DB layer.
  - Serial OFFSET column gives a deterministic ordering guarantee even if clock
    skew produces non-monotonic created_at timestamps.
  - snapshot_to() uses SQLite's native .backup() API β€” atomic, zero-copy.
  - from_file() rehydrates the in-memory cache from disk, so hot reads hit the
    cache and the DB is only consulted for replay after a crash.
  - reset() is deliberately destructive: it clears the current run, not all runs.
    Multi-run persistence (keeping history across resets) is a Phase 3 milestone.
"""

from __future__ import annotations

import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

from src.core.events import Event
from src.core.ledger import Ledger


class SQLiteLedger(Ledger):
    """Persistent append-only ledger backed by SQLite.

    Drop-in replacement for the in-memory Ledger.  The same API, the same
    idempotency guarantee, plus durable storage and snapshot/restore.
    """

    def __init__(self, path: str | Path = ":memory:") -> None:
        self._path = str(path)
        self._conn = sqlite3.connect(self._path, check_same_thread=False)
        self._conn.execute("PRAGMA journal_mode=WAL")
        self._conn.execute("PRAGMA synchronous=NORMAL")
        self._create_schema()
        self._cache: list[Event] = []
        self._seen_ids: set[str] = set()
        if self._path != ":memory:":
            self._load_cache()

    # ── schema ────────────────────────────────────────────────────────────────

    def _create_schema(self) -> None:
        self._conn.executescript("""
            CREATE TABLE IF NOT EXISTS events (
                offset          INTEGER PRIMARY KEY AUTOINCREMENT,
                id              TEXT    UNIQUE NOT NULL,
                run_id          TEXT    NOT NULL,
                turn            INTEGER NOT NULL,
                kind            TEXT    NOT NULL,
                actor           TEXT    NOT NULL,
                payload         TEXT    NOT NULL,
                created_at      TEXT    NOT NULL,
                schema_version  INTEGER NOT NULL DEFAULT 1,
                session_id      TEXT,
                model_profile   TEXT,
                model_id        TEXT
            );
            -- Composite (run_id, offset) serves the hottest read β€” events_for_run
            -- ordered by offset β€” from one index, instead of a run_id seek + sort.
            CREATE INDEX IF NOT EXISTS idx_run_offset ON events(run_id, offset);
            CREATE INDEX IF NOT EXISTS idx_kind       ON events(kind);
            CREATE INDEX IF NOT EXISTS idx_actor      ON events(actor);
            CREATE INDEX IF NOT EXISTS idx_session_id ON events(session_id);
            CREATE INDEX IF NOT EXISTS idx_model_id   ON events(model_id);
        """)
        self._conn.commit()

    # ── Ledger API ────────────────────────────────────────────────────────────

    def append(self, event: Event) -> Event:
        if event.id in self._seen_ids:
            return event
        try:
            self._conn.execute(
                "INSERT INTO events "
                "(id, run_id, turn, kind, actor, payload, created_at, schema_version, "
                "session_id, model_profile, model_id) "
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                (
                    event.id,
                    event.run_id,
                    event.turn,
                    event.kind,
                    event.actor,
                    json.dumps(event.payload),
                    event.created_at.isoformat(),
                    event.schema_version,
                    event.session_id,
                    event.model_profile,
                    event.model_id,
                ),
            )
            self._conn.commit()
            self._cache.append(event)
            self._seen_ids.add(event.id)
        except sqlite3.IntegrityError:
            # Duplicate id inserted concurrently β€” safe to ignore.
            pass
        return event

    @property
    def events(self) -> tuple[Event, ...]:
        return tuple(self._cache)

    def reset(self) -> None:
        self._conn.execute("DELETE FROM events")
        self._conn.commit()
        self._cache.clear()
        self._seen_ids.clear()

    # ── persistence helpers ───────────────────────────────────────────────────

    def snapshot_to(self, dest: str | Path) -> None:
        """Copy the database to *dest* atomically using SQLite's backup API."""
        backup = sqlite3.connect(str(dest))
        try:
            self._conn.backup(backup)
        finally:
            backup.close()

    @classmethod
    def from_file(cls, path: str | Path) -> "SQLiteLedger":
        """Open an existing database and rehydrate the in-memory cache."""
        ledger = cls(path)
        return ledger

    _SELECT_COLS = (
        "id, run_id, turn, kind, actor, payload, created_at, schema_version, session_id, model_profile, model_id"
    )

    @staticmethod
    def _row_to_event(row: tuple) -> Event:
        try:
            created_at = datetime.fromisoformat(row[6])
            if created_at.tzinfo is None:
                created_at = created_at.replace(tzinfo=timezone.utc)
        except (ValueError, TypeError):
            created_at = datetime.now(timezone.utc)
        return Event(
            id=row[0],
            run_id=row[1],
            turn=row[2],
            kind=row[3],  # type: ignore[arg-type]
            actor=row[4],
            payload=json.loads(row[5]),
            created_at=created_at,
            schema_version=row[7],
            session_id=row[8],
            model_profile=row[9],
            model_id=row[10],
        )

    def _load_cache(self) -> None:
        rows = self._conn.execute(f"SELECT {self._SELECT_COLS} FROM events ORDER BY offset").fetchall()
        for row in rows:
            e = self._row_to_event(row)
            self._cache.append(e)
            self._seen_ids.add(e.id)

    def tail(self, from_offset: int = 0) -> tuple[Event, ...]:
        """Return events with offset > from_offset (for crash-recovery replay)."""
        rows = self._conn.execute(
            f"SELECT {self._SELECT_COLS} FROM events WHERE offset > ? ORDER BY offset",
            (from_offset,),
        ).fetchall()
        return tuple(self._row_to_event(row) for row in rows)

    def latest_offset(self) -> int:
        row = self._conn.execute("SELECT MAX(offset) FROM events").fetchone()
        return row[0] or 0

    def events_for_run(self, run_id: str) -> tuple[Event, ...]:
        """Return the events of *run_id* in append/offset order (indexed query)."""
        rows = self._conn.execute(
            f"SELECT {self._SELECT_COLS} FROM events WHERE run_id = ? ORDER BY offset",
            (run_id,),
        ).fetchall()
        return tuple(self._row_to_event(row) for row in rows)

    def runs(self) -> tuple[str, ...]:
        """Return the distinct run_ids in first-seen order (indexed query)."""
        rows = self._conn.execute("SELECT run_id FROM events GROUP BY run_id ORDER BY MIN(offset)").fetchall()
        return tuple(row[0] for row in rows)

    def close(self) -> None:
        self._conn.close()