personabot-api / app /services /semantic_cache.py
GitHub Actions
Deploy 5a96418
bbe01fe
# backend/app/services/semantic_cache.py
# In-memory semantic cache. Replaces Redis-backed CacheService entirely.
# No external service required — works in any environment including HF Spaces.
#
# Design choices:
# - numpy dot product on L2-normalised vectors = cosine similarity (same as cos_sim)
# without the overhead of importing sentence_transformers.util in the hot path.
# - asyncio.Lock guards all writes. Reads outside the lock are safe because Python's
# GIL prevents partial dict reads, and we only mutate inside the lock.
# - Oldest-first eviction (by insertion order via list) instead of LRU to keep
# O(1) insertion and avoid per-access bookkeeping in the hot path.
import asyncio
import time
from typing import Optional
import numpy as np
from app.core.logging import get_logger
logger = get_logger(__name__)
class SemanticCache:
def __init__(
self,
max_size: int = 512,
ttl_seconds: int = 3600,
similarity_threshold: float = 0.92,
) -> None:
self._max_size = max_size
self._ttl = ttl_seconds
self._threshold = similarity_threshold
self._lock = asyncio.Lock()
# Each entry: {"embedding": np.ndarray (384,), "response": str, "inserted_at": float}
# Ordered by insertion time for oldest-first eviction.
self._entries: list[dict] = []
self._hits: int = 0
async def get(self, query_embedding: np.ndarray) -> Optional[str]:
"""
Cosine similarity lookup. Returns cached response if best score >= threshold.
query_embedding must already be L2-normalised (bge-small normalises by default).
"""
if not self._entries:
return None
now = time.monotonic()
# Build matrix of all stored embeddings for batch dot product (one numpy op).
valid = [e for e in self._entries if now - e["inserted_at"] < self._ttl]
if not valid:
return None
matrix = np.stack([e["embedding"] for e in valid]) # (N, 384)
scores: np.ndarray = matrix @ query_embedding # cosine sim, shape (N,)
best_idx = int(np.argmax(scores))
best_score = float(scores[best_idx])
if best_score >= self._threshold:
self._hits += 1
logger.debug("Semantic cache hit | score=%.4f", best_score)
return valid[best_idx]["response"]
return None
async def set(self, query_embedding: np.ndarray, response: str) -> None:
"""Store a new entry. Evicts oldest if at capacity."""
async with self._lock:
if len(self._entries) >= self._max_size:
# Evict oldest (index 0 is the oldest insertion).
self._entries.pop(0)
self._entries.append({
"embedding": query_embedding,
"response": response,
"inserted_at": time.monotonic(),
})
async def stats(self) -> dict:
return {
"entries": len(self._entries),
"hits": self._hits,
"max_size": self._max_size,
"ttl_seconds": self._ttl,
"threshold": self._threshold,
}