Spaces:
Running
Running
| """ | |
| MemoryManager — persists the vocabulary the assistant has learned. | |
| Storage: | |
| - Local file : data/vocabulary.jsonl (fast read/write during session) | |
| - HF Hub : ous-sow/sahel-agri-feedback → vocabulary.jsonl (survives restarts) | |
| Each line in vocabulary.jsonl is a JSON object: | |
| { | |
| "timestamp": "2026-04-07T12:00:00Z", | |
| "word": "I ni ce", | |
| "language": "bam", | |
| "translation": "Hello / Good day", | |
| "translation_language":"en", | |
| "source": "user_taught" | |
| } | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import threading | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| LOCAL_PATH = Path(__file__).parent.parent.parent / "data" / "vocabulary.jsonl" | |
| HUB_FILENAME = "vocabulary.jsonl" | |
| class MemoryManager: | |
| """Thread-safe vocabulary store backed by HF Hub.""" | |
| def __init__(self, repo_id: str, hf_token: Optional[str] = None) -> None: | |
| self.repo_id = repo_id | |
| self.hf_token = hf_token | |
| self._lock = threading.Lock() | |
| self._entries: list[dict] = [] | |
| LOCAL_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| # ── Load ────────────────────────────────────────────────────────────────── | |
| def load(self) -> None: | |
| """Pull vocabulary.jsonl from HF Hub then cache locally. Non-fatal on failure.""" | |
| if self.hf_token and self.repo_id: | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| local = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=HUB_FILENAME, | |
| repo_type="dataset", | |
| token=self.hf_token, | |
| force_download=True, | |
| ) | |
| import shutil | |
| shutil.copy2(local, LOCAL_PATH) | |
| logger.info("MemoryManager: loaded vocabulary from Hub (%s)", self.repo_id) | |
| except Exception as exc: | |
| logger.warning("MemoryManager: could not load from Hub (%s) — using local", exc) | |
| # Read local file (may have been just downloaded, or pre-existing from last session) | |
| entries: list[dict] = [] | |
| if LOCAL_PATH.exists(): | |
| with open(LOCAL_PATH, encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| entries.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| pass | |
| with self._lock: | |
| self._entries = entries | |
| logger.info("MemoryManager: %d vocabulary entries loaded", len(entries)) | |
| # ── Read ────────────────────────────────────────────────────────────────── | |
| def get_recent(self, n: int = 5) -> list[dict]: | |
| with self._lock: | |
| return list(self._entries[-n:]) | |
| def get_all(self) -> list[dict]: | |
| with self._lock: | |
| return list(self._entries) | |
| def count(self) -> int: | |
| with self._lock: | |
| return len(self._entries) | |
| def get_vocabulary_context(self, max_entries: int = 150) -> str: | |
| """Format vocabulary as a compact string for the LLM system prompt.""" | |
| with self._lock: | |
| recent = self._entries[-max_entries:] | |
| if not recent: | |
| return "(no vocabulary learned yet)" | |
| lines = [] | |
| for e in recent: | |
| lang = e.get("language", "?") | |
| word = e.get("word", "") | |
| tr = e.get("translation", "") | |
| tr_l = e.get("translation_language", "en") | |
| lines.append(f" [{lang}] {word} = {tr} ({tr_l})") | |
| return "\n".join(lines) | |
| # ── Write ───────────────────────────────────────────────────────────────── | |
| def add_word_pair( | |
| self, | |
| word: str, | |
| language: str, | |
| translation: str, | |
| translation_language: str = "en", | |
| source: str = "user_taught", | |
| ) -> dict: | |
| """ | |
| Append a word pair to local JSONL and push to HF Hub. | |
| Returns the new entry dict. | |
| """ | |
| entry = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "word": word.strip(), | |
| "language": language, | |
| "translation": translation.strip(), | |
| "translation_language": translation_language, | |
| "source": source, | |
| } | |
| with self._lock: | |
| self._entries.append(entry) | |
| with open(LOCAL_PATH, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
| # Push to Hub in background so UI is not blocked | |
| threading.Thread(target=self._push_to_hub, daemon=True).start() | |
| logger.info("MemoryManager: added [%s] %s = %s", language, word, translation) | |
| return entry | |
| def _push_to_hub(self) -> None: | |
| """Upload the full vocabulary.jsonl to HF Hub.""" | |
| if not (self.hf_token and self.repo_id): | |
| return | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=self.hf_token) | |
| api.upload_file( | |
| path_or_fileobj=str(LOCAL_PATH), | |
| path_in_repo=HUB_FILENAME, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| ) | |
| logger.info("MemoryManager: pushed vocabulary to Hub") | |
| except Exception as exc: | |
| logger.warning("MemoryManager: Hub push failed: %s", exc) | |