""" MemoryManager — persists the vocabulary the assistant has learned. Storage: - Local file : data/vocabulary.jsonl (fast read/write during session) - HF Hub : ous-sow/sahel-agri-feedback → vocabulary.jsonl (survives restarts) Each line in vocabulary.jsonl is a JSON object: { "timestamp": "2026-04-07T12:00:00Z", "word": "I ni ce", "language": "bam", "translation": "Hello / Good day", "translation_language":"en", "source": "user_taught" } """ from __future__ import annotations import json import logging import threading from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) LOCAL_PATH = Path(__file__).parent.parent.parent / "data" / "vocabulary.jsonl" HUB_FILENAME = "vocabulary.jsonl" class MemoryManager: """Thread-safe vocabulary store backed by HF Hub.""" def __init__(self, repo_id: str, hf_token: Optional[str] = None) -> None: self.repo_id = repo_id self.hf_token = hf_token self._lock = threading.Lock() self._entries: list[dict] = [] LOCAL_PATH.parent.mkdir(parents=True, exist_ok=True) # ── Load ────────────────────────────────────────────────────────────────── def load(self) -> None: """Pull vocabulary.jsonl from HF Hub then cache locally. Non-fatal on failure.""" if self.hf_token and self.repo_id: try: from huggingface_hub import hf_hub_download local = hf_hub_download( repo_id=self.repo_id, filename=HUB_FILENAME, repo_type="dataset", token=self.hf_token, force_download=True, ) import shutil shutil.copy2(local, LOCAL_PATH) logger.info("MemoryManager: loaded vocabulary from Hub (%s)", self.repo_id) except Exception as exc: logger.warning("MemoryManager: could not load from Hub (%s) — using local", exc) # Read local file (may have been just downloaded, or pre-existing from last session) entries: list[dict] = [] if LOCAL_PATH.exists(): with open(LOCAL_PATH, encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: pass with self._lock: self._entries = entries logger.info("MemoryManager: %d vocabulary entries loaded", len(entries)) # ── Read ────────────────────────────────────────────────────────────────── def get_recent(self, n: int = 5) -> list[dict]: with self._lock: return list(self._entries[-n:]) def get_all(self) -> list[dict]: with self._lock: return list(self._entries) def count(self) -> int: with self._lock: return len(self._entries) def get_vocabulary_context(self, max_entries: int = 150) -> str: """Format vocabulary as a compact string for the LLM system prompt.""" with self._lock: recent = self._entries[-max_entries:] if not recent: return "(no vocabulary learned yet)" lines = [] for e in recent: lang = e.get("language", "?") word = e.get("word", "") tr = e.get("translation", "") tr_l = e.get("translation_language", "en") lines.append(f" [{lang}] {word} = {tr} ({tr_l})") return "\n".join(lines) # ── Write ───────────────────────────────────────────────────────────────── def add_word_pair( self, word: str, language: str, translation: str, translation_language: str = "en", source: str = "user_taught", ) -> dict: """ Append a word pair to local JSONL and push to HF Hub. Returns the new entry dict. """ entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "word": word.strip(), "language": language, "translation": translation.strip(), "translation_language": translation_language, "source": source, } with self._lock: self._entries.append(entry) with open(LOCAL_PATH, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") # Push to Hub in background so UI is not blocked threading.Thread(target=self._push_to_hub, daemon=True).start() logger.info("MemoryManager: added [%s] %s = %s", language, word, translation) return entry def _push_to_hub(self) -> None: """Upload the full vocabulary.jsonl to HF Hub.""" if not (self.hf_token and self.repo_id): return try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) api.upload_file( path_or_fileobj=str(LOCAL_PATH), path_in_repo=HUB_FILENAME, repo_id=self.repo_id, repo_type="dataset", ) logger.info("MemoryManager: pushed vocabulary to Hub") except Exception as exc: logger.warning("MemoryManager: Hub push failed: %s", exc)