# backend/app/services/semantic_cache.py # In-memory semantic cache. Replaces Redis-backed CacheService entirely. # No external service required — works in any environment including HF Spaces. # # Design choices: # - numpy dot product on L2-normalised vectors = cosine similarity (same as cos_sim) # without the overhead of importing sentence_transformers.util in the hot path. # - asyncio.Lock guards all writes. Reads outside the lock are safe because Python's # GIL prevents partial dict reads, and we only mutate inside the lock. # - Oldest-first eviction (by insertion order via list) instead of LRU to keep # O(1) insertion and avoid per-access bookkeeping in the hot path. import asyncio import time from typing import Optional import numpy as np from app.core.logging import get_logger logger = get_logger(__name__) class SemanticCache: def __init__( self, max_size: int = 512, ttl_seconds: int = 3600, similarity_threshold: float = 0.92, ) -> None: self._max_size = max_size self._ttl = ttl_seconds self._threshold = similarity_threshold self._lock = asyncio.Lock() # Each entry: {"embedding": np.ndarray (384,), "response": str, "inserted_at": float} # Ordered by insertion time for oldest-first eviction. self._entries: list[dict] = [] self._hits: int = 0 async def get(self, query_embedding: np.ndarray) -> Optional[str]: """ Cosine similarity lookup. Returns cached response if best score >= threshold. query_embedding must already be L2-normalised (bge-small normalises by default). """ if not self._entries: return None now = time.monotonic() # Build matrix of all stored embeddings for batch dot product (one numpy op). valid = [e for e in self._entries if now - e["inserted_at"] < self._ttl] if not valid: return None matrix = np.stack([e["embedding"] for e in valid]) # (N, 384) scores: np.ndarray = matrix @ query_embedding # cosine sim, shape (N,) best_idx = int(np.argmax(scores)) best_score = float(scores[best_idx]) if best_score >= self._threshold: self._hits += 1 logger.debug("Semantic cache hit | score=%.4f", best_score) return valid[best_idx]["response"] return None async def set(self, query_embedding: np.ndarray, response: str) -> None: """Store a new entry. Evicts oldest if at capacity.""" async with self._lock: if len(self._entries) >= self._max_size: # Evict oldest (index 0 is the oldest insertion). self._entries.pop(0) self._entries.append({ "embedding": query_embedding, "response": response, "inserted_at": time.monotonic(), }) async def stats(self) -> dict: return { "entries": len(self._entries), "hits": self._hits, "max_size": self._max_size, "ttl_seconds": self._ttl, "threshold": self._threshold, }