"""SQLite-backed append-only ledger — the persistent backend for long-running scenarios. Replaces the in-memory Ledger for production use. The in-memory Ledger remains valid for tests and short demo runs where persistence is not required. Design decisions: - UNIQUE constraint on id enforces idempotency at the DB layer. - Serial OFFSET column gives a deterministic ordering guarantee even if clock skew produces non-monotonic created_at timestamps. - snapshot_to() uses SQLite's native .backup() API — atomic, zero-copy. - from_file() rehydrates the in-memory cache from disk, so hot reads hit the cache and the DB is only consulted for replay after a crash. - reset() is deliberately destructive: it clears the current run, not all runs. Multi-run persistence (keeping history across resets) is a Phase 3 milestone. """ from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from src.core.events import Event from src.core.ledger import Ledger class SQLiteLedger(Ledger): """Persistent append-only ledger backed by SQLite. Drop-in replacement for the in-memory Ledger. The same API, the same idempotency guarantee, plus durable storage and snapshot/restore. """ def __init__(self, path: str | Path = ":memory:") -> None: self._path = str(path) self._conn = sqlite3.connect(self._path, check_same_thread=False) self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") self._create_schema() self._cache: list[Event] = [] self._seen_ids: set[str] = set() if self._path != ":memory:": self._load_cache() # ── schema ──────────────────────────────────────────────────────────────── def _create_schema(self) -> None: self._conn.executescript(""" CREATE TABLE IF NOT EXISTS events ( offset INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT UNIQUE NOT NULL, run_id TEXT NOT NULL, turn INTEGER NOT NULL, kind TEXT NOT NULL, actor TEXT NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL, schema_version INTEGER NOT NULL DEFAULT 1, session_id TEXT, model_profile TEXT, model_id TEXT ); -- Composite (run_id, offset) serves the hottest read — events_for_run -- ordered by offset — from one index, instead of a run_id seek + sort. CREATE INDEX IF NOT EXISTS idx_run_offset ON events(run_id, offset); CREATE INDEX IF NOT EXISTS idx_kind ON events(kind); CREATE INDEX IF NOT EXISTS idx_actor ON events(actor); CREATE INDEX IF NOT EXISTS idx_session_id ON events(session_id); CREATE INDEX IF NOT EXISTS idx_model_id ON events(model_id); """) self._conn.commit() # ── Ledger API ──────────────────────────────────────────────────────────── def append(self, event: Event) -> Event: if event.id in self._seen_ids: return event try: self._conn.execute( "INSERT INTO events " "(id, run_id, turn, kind, actor, payload, created_at, schema_version, " "session_id, model_profile, model_id) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( event.id, event.run_id, event.turn, event.kind, event.actor, json.dumps(event.payload), event.created_at.isoformat(), event.schema_version, event.session_id, event.model_profile, event.model_id, ), ) self._conn.commit() self._cache.append(event) self._seen_ids.add(event.id) except sqlite3.IntegrityError: # Duplicate id inserted concurrently — safe to ignore. pass return event @property def events(self) -> tuple[Event, ...]: return tuple(self._cache) def reset(self) -> None: self._conn.execute("DELETE FROM events") self._conn.commit() self._cache.clear() self._seen_ids.clear() # ── persistence helpers ─────────────────────────────────────────────────── def snapshot_to(self, dest: str | Path) -> None: """Copy the database to *dest* atomically using SQLite's backup API.""" backup = sqlite3.connect(str(dest)) try: self._conn.backup(backup) finally: backup.close() @classmethod def from_file(cls, path: str | Path) -> "SQLiteLedger": """Open an existing database and rehydrate the in-memory cache.""" ledger = cls(path) return ledger _SELECT_COLS = ( "id, run_id, turn, kind, actor, payload, created_at, schema_version, session_id, model_profile, model_id" ) @staticmethod def _row_to_event(row: tuple) -> Event: try: created_at = datetime.fromisoformat(row[6]) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) except (ValueError, TypeError): created_at = datetime.now(timezone.utc) return Event( id=row[0], run_id=row[1], turn=row[2], kind=row[3], # type: ignore[arg-type] actor=row[4], payload=json.loads(row[5]), created_at=created_at, schema_version=row[7], session_id=row[8], model_profile=row[9], model_id=row[10], ) def _load_cache(self) -> None: rows = self._conn.execute(f"SELECT {self._SELECT_COLS} FROM events ORDER BY offset").fetchall() for row in rows: e = self._row_to_event(row) self._cache.append(e) self._seen_ids.add(e.id) def tail(self, from_offset: int = 0) -> tuple[Event, ...]: """Return events with offset > from_offset (for crash-recovery replay).""" rows = self._conn.execute( f"SELECT {self._SELECT_COLS} FROM events WHERE offset > ? ORDER BY offset", (from_offset,), ).fetchall() return tuple(self._row_to_event(row) for row in rows) def latest_offset(self) -> int: row = self._conn.execute("SELECT MAX(offset) FROM events").fetchone() return row[0] or 0 def events_for_run(self, run_id: str) -> tuple[Event, ...]: """Return the events of *run_id* in append/offset order (indexed query).""" rows = self._conn.execute( f"SELECT {self._SELECT_COLS} FROM events WHERE run_id = ? ORDER BY offset", (run_id,), ).fetchall() return tuple(self._row_to_event(row) for row in rows) def runs(self) -> tuple[str, ...]: """Return the distinct run_ids in first-seen order (indexed query).""" rows = self._conn.execute("SELECT run_id FROM events GROUP BY run_id ORDER BY MIN(offset)").fetchall() return tuple(row[0] for row in rows) def close(self) -> None: self._conn.close()