ground-zero / src /memory /memory_manager.py
jefffffff9
Phase 1: Sahel-Voice-Lab — The Memory Loop
096b19d
"""
MemoryManager — persists the vocabulary the assistant has learned.
Storage:
- Local file : data/vocabulary.jsonl (fast read/write during session)
- HF Hub : ous-sow/sahel-agri-feedback → vocabulary.jsonl (survives restarts)
Each line in vocabulary.jsonl is a JSON object:
{
"timestamp": "2026-04-07T12:00:00Z",
"word": "I ni ce",
"language": "bam",
"translation": "Hello / Good day",
"translation_language":"en",
"source": "user_taught"
}
"""
from __future__ import annotations
import json
import logging
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
LOCAL_PATH = Path(__file__).parent.parent.parent / "data" / "vocabulary.jsonl"
HUB_FILENAME = "vocabulary.jsonl"
class MemoryManager:
"""Thread-safe vocabulary store backed by HF Hub."""
def __init__(self, repo_id: str, hf_token: Optional[str] = None) -> None:
self.repo_id = repo_id
self.hf_token = hf_token
self._lock = threading.Lock()
self._entries: list[dict] = []
LOCAL_PATH.parent.mkdir(parents=True, exist_ok=True)
# ── Load ──────────────────────────────────────────────────────────────────
def load(self) -> None:
"""Pull vocabulary.jsonl from HF Hub then cache locally. Non-fatal on failure."""
if self.hf_token and self.repo_id:
try:
from huggingface_hub import hf_hub_download
local = hf_hub_download(
repo_id=self.repo_id,
filename=HUB_FILENAME,
repo_type="dataset",
token=self.hf_token,
force_download=True,
)
import shutil
shutil.copy2(local, LOCAL_PATH)
logger.info("MemoryManager: loaded vocabulary from Hub (%s)", self.repo_id)
except Exception as exc:
logger.warning("MemoryManager: could not load from Hub (%s) — using local", exc)
# Read local file (may have been just downloaded, or pre-existing from last session)
entries: list[dict] = []
if LOCAL_PATH.exists():
with open(LOCAL_PATH, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
with self._lock:
self._entries = entries
logger.info("MemoryManager: %d vocabulary entries loaded", len(entries))
# ── Read ──────────────────────────────────────────────────────────────────
def get_recent(self, n: int = 5) -> list[dict]:
with self._lock:
return list(self._entries[-n:])
def get_all(self) -> list[dict]:
with self._lock:
return list(self._entries)
def count(self) -> int:
with self._lock:
return len(self._entries)
def get_vocabulary_context(self, max_entries: int = 150) -> str:
"""Format vocabulary as a compact string for the LLM system prompt."""
with self._lock:
recent = self._entries[-max_entries:]
if not recent:
return "(no vocabulary learned yet)"
lines = []
for e in recent:
lang = e.get("language", "?")
word = e.get("word", "")
tr = e.get("translation", "")
tr_l = e.get("translation_language", "en")
lines.append(f" [{lang}] {word} = {tr} ({tr_l})")
return "\n".join(lines)
# ── Write ─────────────────────────────────────────────────────────────────
def add_word_pair(
self,
word: str,
language: str,
translation: str,
translation_language: str = "en",
source: str = "user_taught",
) -> dict:
"""
Append a word pair to local JSONL and push to HF Hub.
Returns the new entry dict.
"""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"word": word.strip(),
"language": language,
"translation": translation.strip(),
"translation_language": translation_language,
"source": source,
}
with self._lock:
self._entries.append(entry)
with open(LOCAL_PATH, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
# Push to Hub in background so UI is not blocked
threading.Thread(target=self._push_to_hub, daemon=True).start()
logger.info("MemoryManager: added [%s] %s = %s", language, word, translation)
return entry
def _push_to_hub(self) -> None:
"""Upload the full vocabulary.jsonl to HF Hub."""
if not (self.hf_token and self.repo_id):
return
try:
from huggingface_hub import HfApi
api = HfApi(token=self.hf_token)
api.upload_file(
path_or_fileobj=str(LOCAL_PATH),
path_in_repo=HUB_FILENAME,
repo_id=self.repo_id,
repo_type="dataset",
)
logger.info("MemoryManager: pushed vocabulary to Hub")
except Exception as exc:
logger.warning("MemoryManager: Hub push failed: %s", exc)