Spaces:
Running
Running
File size: 6,030 Bytes
096b19d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
MemoryManager — persists the vocabulary the assistant has learned.
Storage:
- Local file : data/vocabulary.jsonl (fast read/write during session)
- HF Hub : ous-sow/sahel-agri-feedback → vocabulary.jsonl (survives restarts)
Each line in vocabulary.jsonl is a JSON object:
{
"timestamp": "2026-04-07T12:00:00Z",
"word": "I ni ce",
"language": "bam",
"translation": "Hello / Good day",
"translation_language":"en",
"source": "user_taught"
}
"""
from __future__ import annotations
import json
import logging
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
LOCAL_PATH = Path(__file__).parent.parent.parent / "data" / "vocabulary.jsonl"
HUB_FILENAME = "vocabulary.jsonl"
class MemoryManager:
"""Thread-safe vocabulary store backed by HF Hub."""
def __init__(self, repo_id: str, hf_token: Optional[str] = None) -> None:
self.repo_id = repo_id
self.hf_token = hf_token
self._lock = threading.Lock()
self._entries: list[dict] = []
LOCAL_PATH.parent.mkdir(parents=True, exist_ok=True)
# ── Load ──────────────────────────────────────────────────────────────────
def load(self) -> None:
"""Pull vocabulary.jsonl from HF Hub then cache locally. Non-fatal on failure."""
if self.hf_token and self.repo_id:
try:
from huggingface_hub import hf_hub_download
local = hf_hub_download(
repo_id=self.repo_id,
filename=HUB_FILENAME,
repo_type="dataset",
token=self.hf_token,
force_download=True,
)
import shutil
shutil.copy2(local, LOCAL_PATH)
logger.info("MemoryManager: loaded vocabulary from Hub (%s)", self.repo_id)
except Exception as exc:
logger.warning("MemoryManager: could not load from Hub (%s) — using local", exc)
# Read local file (may have been just downloaded, or pre-existing from last session)
entries: list[dict] = []
if LOCAL_PATH.exists():
with open(LOCAL_PATH, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
with self._lock:
self._entries = entries
logger.info("MemoryManager: %d vocabulary entries loaded", len(entries))
# ── Read ──────────────────────────────────────────────────────────────────
def get_recent(self, n: int = 5) -> list[dict]:
with self._lock:
return list(self._entries[-n:])
def get_all(self) -> list[dict]:
with self._lock:
return list(self._entries)
def count(self) -> int:
with self._lock:
return len(self._entries)
def get_vocabulary_context(self, max_entries: int = 150) -> str:
"""Format vocabulary as a compact string for the LLM system prompt."""
with self._lock:
recent = self._entries[-max_entries:]
if not recent:
return "(no vocabulary learned yet)"
lines = []
for e in recent:
lang = e.get("language", "?")
word = e.get("word", "")
tr = e.get("translation", "")
tr_l = e.get("translation_language", "en")
lines.append(f" [{lang}] {word} = {tr} ({tr_l})")
return "\n".join(lines)
# ── Write ─────────────────────────────────────────────────────────────────
def add_word_pair(
self,
word: str,
language: str,
translation: str,
translation_language: str = "en",
source: str = "user_taught",
) -> dict:
"""
Append a word pair to local JSONL and push to HF Hub.
Returns the new entry dict.
"""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"word": word.strip(),
"language": language,
"translation": translation.strip(),
"translation_language": translation_language,
"source": source,
}
with self._lock:
self._entries.append(entry)
with open(LOCAL_PATH, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
# Push to Hub in background so UI is not blocked
threading.Thread(target=self._push_to_hub, daemon=True).start()
logger.info("MemoryManager: added [%s] %s = %s", language, word, translation)
return entry
def _push_to_hub(self) -> None:
"""Upload the full vocabulary.jsonl to HF Hub."""
if not (self.hf_token and self.repo_id):
return
try:
from huggingface_hub import HfApi
api = HfApi(token=self.hf_token)
api.upload_file(
path_or_fileobj=str(LOCAL_PATH),
path_in_repo=HUB_FILENAME,
repo_id=self.repo_id,
repo_type="dataset",
)
logger.info("MemoryManager: pushed vocabulary to Hub")
except Exception as exc:
logger.warning("MemoryManager: Hub push failed: %s", exc)
|