Spaces:
Running on Zero
Running on Zero
File size: 7,904 Bytes
0d7db8e a2ca0e0 0d7db8e a2ca0e0 a71301e 0d7db8e a71301e a2ca0e0 a71301e 0d7db8e a71301e 0d7db8e a2ca0e0 a71301e 0d7db8e a71301e a2ca0e0 a71301e a2ca0e0 0d7db8e a2ca0e0 0d7db8e a2ca0e0 0d7db8e a2ca0e0 0d7db8e a2ca0e0 0d7db8e a2ca0e0 0d7db8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | """SQLite-backed append-only ledger β the persistent backend for long-running scenarios.
Replaces the in-memory Ledger for production use. The in-memory Ledger remains
valid for tests and short demo runs where persistence is not required.
Design decisions:
- UNIQUE constraint on id enforces idempotency at the DB layer.
- Serial OFFSET column gives a deterministic ordering guarantee even if clock
skew produces non-monotonic created_at timestamps.
- snapshot_to() uses SQLite's native .backup() API β atomic, zero-copy.
- from_file() rehydrates the in-memory cache from disk, so hot reads hit the
cache and the DB is only consulted for replay after a crash.
- reset() is deliberately destructive: it clears the current run, not all runs.
Multi-run persistence (keeping history across resets) is a Phase 3 milestone.
"""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from src.core.events import Event
from src.core.ledger import Ledger
class SQLiteLedger(Ledger):
"""Persistent append-only ledger backed by SQLite.
Drop-in replacement for the in-memory Ledger. The same API, the same
idempotency guarantee, plus durable storage and snapshot/restore.
"""
def __init__(self, path: str | Path = ":memory:") -> None:
self._path = str(path)
self._conn = sqlite3.connect(self._path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._create_schema()
self._cache: list[Event] = []
self._seen_ids: set[str] = set()
if self._path != ":memory:":
self._load_cache()
# ββ schema ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _create_schema(self) -> None:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS events (
offset INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL,
run_id TEXT NOT NULL,
turn INTEGER NOT NULL,
kind TEXT NOT NULL,
actor TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
schema_version INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
model_profile TEXT,
model_id TEXT
);
-- Composite (run_id, offset) serves the hottest read β events_for_run
-- ordered by offset β from one index, instead of a run_id seek + sort.
CREATE INDEX IF NOT EXISTS idx_run_offset ON events(run_id, offset);
CREATE INDEX IF NOT EXISTS idx_kind ON events(kind);
CREATE INDEX IF NOT EXISTS idx_actor ON events(actor);
CREATE INDEX IF NOT EXISTS idx_session_id ON events(session_id);
CREATE INDEX IF NOT EXISTS idx_model_id ON events(model_id);
""")
self._conn.commit()
# ββ Ledger API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def append(self, event: Event) -> Event:
if event.id in self._seen_ids:
return event
try:
self._conn.execute(
"INSERT INTO events "
"(id, run_id, turn, kind, actor, payload, created_at, schema_version, "
"session_id, model_profile, model_id) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
event.id,
event.run_id,
event.turn,
event.kind,
event.actor,
json.dumps(event.payload),
event.created_at.isoformat(),
event.schema_version,
event.session_id,
event.model_profile,
event.model_id,
),
)
self._conn.commit()
self._cache.append(event)
self._seen_ids.add(event.id)
except sqlite3.IntegrityError:
# Duplicate id inserted concurrently β safe to ignore.
pass
return event
@property
def events(self) -> tuple[Event, ...]:
return tuple(self._cache)
def reset(self) -> None:
self._conn.execute("DELETE FROM events")
self._conn.commit()
self._cache.clear()
self._seen_ids.clear()
# ββ persistence helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ
def snapshot_to(self, dest: str | Path) -> None:
"""Copy the database to *dest* atomically using SQLite's backup API."""
backup = sqlite3.connect(str(dest))
try:
self._conn.backup(backup)
finally:
backup.close()
@classmethod
def from_file(cls, path: str | Path) -> "SQLiteLedger":
"""Open an existing database and rehydrate the in-memory cache."""
ledger = cls(path)
return ledger
_SELECT_COLS = (
"id, run_id, turn, kind, actor, payload, created_at, schema_version, session_id, model_profile, model_id"
)
@staticmethod
def _row_to_event(row: tuple) -> Event:
try:
created_at = datetime.fromisoformat(row[6])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
created_at = datetime.now(timezone.utc)
return Event(
id=row[0],
run_id=row[1],
turn=row[2],
kind=row[3], # type: ignore[arg-type]
actor=row[4],
payload=json.loads(row[5]),
created_at=created_at,
schema_version=row[7],
session_id=row[8],
model_profile=row[9],
model_id=row[10],
)
def _load_cache(self) -> None:
rows = self._conn.execute(f"SELECT {self._SELECT_COLS} FROM events ORDER BY offset").fetchall()
for row in rows:
e = self._row_to_event(row)
self._cache.append(e)
self._seen_ids.add(e.id)
def tail(self, from_offset: int = 0) -> tuple[Event, ...]:
"""Return events with offset > from_offset (for crash-recovery replay)."""
rows = self._conn.execute(
f"SELECT {self._SELECT_COLS} FROM events WHERE offset > ? ORDER BY offset",
(from_offset,),
).fetchall()
return tuple(self._row_to_event(row) for row in rows)
def latest_offset(self) -> int:
row = self._conn.execute("SELECT MAX(offset) FROM events").fetchone()
return row[0] or 0
def events_for_run(self, run_id: str) -> tuple[Event, ...]:
"""Return the events of *run_id* in append/offset order (indexed query)."""
rows = self._conn.execute(
f"SELECT {self._SELECT_COLS} FROM events WHERE run_id = ? ORDER BY offset",
(run_id,),
).fetchall()
return tuple(self._row_to_event(row) for row in rows)
def runs(self) -> tuple[str, ...]:
"""Return the distinct run_ids in first-seen order (indexed query)."""
rows = self._conn.execute("SELECT run_id FROM events GROUP BY run_id ORDER BY MIN(offset)").fetchall()
return tuple(row[0] for row in rows)
def close(self) -> None:
self._conn.close()
|