multi-agent-lab / src /core /sqlite_ledger.py
agharsallah
feat: implement per-event model attribution for enhanced event tracking
a71301e
Raw
History Blame Contribute Delete
7.9 kB
"""SQLite-backed append-only ledger β€” the persistent backend for long-running scenarios.
Replaces the in-memory Ledger for production use. The in-memory Ledger remains
valid for tests and short demo runs where persistence is not required.
Design decisions:
- UNIQUE constraint on id enforces idempotency at the DB layer.
- Serial OFFSET column gives a deterministic ordering guarantee even if clock
skew produces non-monotonic created_at timestamps.
- snapshot_to() uses SQLite's native .backup() API β€” atomic, zero-copy.
- from_file() rehydrates the in-memory cache from disk, so hot reads hit the
cache and the DB is only consulted for replay after a crash.
- reset() is deliberately destructive: it clears the current run, not all runs.
Multi-run persistence (keeping history across resets) is a Phase 3 milestone.
"""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from src.core.events import Event
from src.core.ledger import Ledger
class SQLiteLedger(Ledger):
"""Persistent append-only ledger backed by SQLite.
Drop-in replacement for the in-memory Ledger. The same API, the same
idempotency guarantee, plus durable storage and snapshot/restore.
"""
def __init__(self, path: str | Path = ":memory:") -> None:
self._path = str(path)
self._conn = sqlite3.connect(self._path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._create_schema()
self._cache: list[Event] = []
self._seen_ids: set[str] = set()
if self._path != ":memory:":
self._load_cache()
# ── schema ────────────────────────────────────────────────────────────────
def _create_schema(self) -> None:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS events (
offset INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE NOT NULL,
run_id TEXT NOT NULL,
turn INTEGER NOT NULL,
kind TEXT NOT NULL,
actor TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
schema_version INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
model_profile TEXT,
model_id TEXT
);
-- Composite (run_id, offset) serves the hottest read β€” events_for_run
-- ordered by offset β€” from one index, instead of a run_id seek + sort.
CREATE INDEX IF NOT EXISTS idx_run_offset ON events(run_id, offset);
CREATE INDEX IF NOT EXISTS idx_kind ON events(kind);
CREATE INDEX IF NOT EXISTS idx_actor ON events(actor);
CREATE INDEX IF NOT EXISTS idx_session_id ON events(session_id);
CREATE INDEX IF NOT EXISTS idx_model_id ON events(model_id);
""")
self._conn.commit()
# ── Ledger API ────────────────────────────────────────────────────────────
def append(self, event: Event) -> Event:
if event.id in self._seen_ids:
return event
try:
self._conn.execute(
"INSERT INTO events "
"(id, run_id, turn, kind, actor, payload, created_at, schema_version, "
"session_id, model_profile, model_id) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
event.id,
event.run_id,
event.turn,
event.kind,
event.actor,
json.dumps(event.payload),
event.created_at.isoformat(),
event.schema_version,
event.session_id,
event.model_profile,
event.model_id,
),
)
self._conn.commit()
self._cache.append(event)
self._seen_ids.add(event.id)
except sqlite3.IntegrityError:
# Duplicate id inserted concurrently β€” safe to ignore.
pass
return event
@property
def events(self) -> tuple[Event, ...]:
return tuple(self._cache)
def reset(self) -> None:
self._conn.execute("DELETE FROM events")
self._conn.commit()
self._cache.clear()
self._seen_ids.clear()
# ── persistence helpers ───────────────────────────────────────────────────
def snapshot_to(self, dest: str | Path) -> None:
"""Copy the database to *dest* atomically using SQLite's backup API."""
backup = sqlite3.connect(str(dest))
try:
self._conn.backup(backup)
finally:
backup.close()
@classmethod
def from_file(cls, path: str | Path) -> "SQLiteLedger":
"""Open an existing database and rehydrate the in-memory cache."""
ledger = cls(path)
return ledger
_SELECT_COLS = (
"id, run_id, turn, kind, actor, payload, created_at, schema_version, session_id, model_profile, model_id"
)
@staticmethod
def _row_to_event(row: tuple) -> Event:
try:
created_at = datetime.fromisoformat(row[6])
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
created_at = datetime.now(timezone.utc)
return Event(
id=row[0],
run_id=row[1],
turn=row[2],
kind=row[3], # type: ignore[arg-type]
actor=row[4],
payload=json.loads(row[5]),
created_at=created_at,
schema_version=row[7],
session_id=row[8],
model_profile=row[9],
model_id=row[10],
)
def _load_cache(self) -> None:
rows = self._conn.execute(f"SELECT {self._SELECT_COLS} FROM events ORDER BY offset").fetchall()
for row in rows:
e = self._row_to_event(row)
self._cache.append(e)
self._seen_ids.add(e.id)
def tail(self, from_offset: int = 0) -> tuple[Event, ...]:
"""Return events with offset > from_offset (for crash-recovery replay)."""
rows = self._conn.execute(
f"SELECT {self._SELECT_COLS} FROM events WHERE offset > ? ORDER BY offset",
(from_offset,),
).fetchall()
return tuple(self._row_to_event(row) for row in rows)
def latest_offset(self) -> int:
row = self._conn.execute("SELECT MAX(offset) FROM events").fetchone()
return row[0] or 0
def events_for_run(self, run_id: str) -> tuple[Event, ...]:
"""Return the events of *run_id* in append/offset order (indexed query)."""
rows = self._conn.execute(
f"SELECT {self._SELECT_COLS} FROM events WHERE run_id = ? ORDER BY offset",
(run_id,),
).fetchall()
return tuple(self._row_to_event(row) for row in rows)
def runs(self) -> tuple[str, ...]:
"""Return the distinct run_ids in first-seen order (indexed query)."""
rows = self._conn.execute("SELECT run_id FROM events GROUP BY run_id ORDER BY MIN(offset)").fetchall()
return tuple(row[0] for row in rows)
def close(self) -> None:
self._conn.close()