| """ |
| Caching utilities for embeddings and generation results. |
| |
| Implements: |
| - Embedding cache (text, image, audio) |
| - Generation result cache |
| - Content-based caching with similarity matching |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import pickle |
| from pathlib import Path |
| from typing import Any, Dict, Optional, Tuple |
|
|
| import numpy as np |
| from joblib import Memory |
|
|
| from src.embeddings.similarity import cosine_similarity |
|
|
|
|
| class EmbeddingCache: |
| """Cache for embeddings to avoid recomputation.""" |
|
|
| def __init__(self, cache_dir: str = ".cache/embeddings", similarity_threshold: float = 0.99): |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self.similarity_threshold = similarity_threshold |
| |
| |
| self._memory_cache: Dict[str, np.ndarray] = {} |
| |
| |
| self._memory = Memory(location=str(self.cache_dir / "joblib"), verbose=0) |
|
|
| def _get_key(self, content: str, modality: str) -> str: |
| """Generate cache key from content.""" |
| content_hash = hashlib.sha256(f"{modality}:{content}".encode()).hexdigest() |
| return f"{modality}_{content_hash[:16]}" |
|
|
| def _get_cache_path(self, key: str) -> Path: |
| """Get cache file path for a key.""" |
| return self.cache_dir / f"{key}.npy" |
|
|
| def get(self, content: str, modality: str) -> Optional[np.ndarray]: |
| """Retrieve cached embedding.""" |
| key = self._get_key(content, modality) |
| |
| |
| if key in self._memory_cache: |
| return self._memory_cache[key] |
| |
| |
| cache_path = self._get_cache_path(key) |
| if cache_path.exists(): |
| embedding = np.load(cache_path) |
| self._memory_cache[key] = embedding |
| return embedding |
| |
| return None |
|
|
| def set(self, content: str, modality: str, embedding: np.ndarray) -> None: |
| """Store embedding in cache.""" |
| key = self._get_key(content, modality) |
| cache_path = self._get_cache_path(key) |
| |
| |
| self._memory_cache[key] = embedding |
| |
| |
| np.save(cache_path, embedding) |
|
|
| def get_similar(self, query_embedding: np.ndarray, modality: str) -> Optional[Tuple[str, np.ndarray, float]]: |
| """Find similar cached embedding using cosine similarity.""" |
| |
| pattern = f"{modality}_*.npy" |
| cached_files = list(self.cache_dir.glob(pattern)) |
| |
| best_match: Optional[Tuple[str, np.ndarray, float]] = None |
| best_similarity = -1.0 |
| |
| for cache_file in cached_files: |
| try: |
| cached_embedding = np.load(cache_file) |
| similarity = cosine_similarity(query_embedding, cached_embedding) |
| |
| if similarity > best_similarity and similarity >= self.similarity_threshold: |
| best_similarity = similarity |
| |
| content_id = cache_file.stem |
| best_match = (content_id, cached_embedding, similarity) |
| except Exception: |
| continue |
| |
| return best_match |
|
|
|
|
| class GenerationCache: |
| """Cache for generation results (images, audio, text).""" |
|
|
| def __init__(self, cache_dir: str = ".cache/generations"): |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| |
| self._memory_cache: Dict[str, Dict[str, Any]] = {} |
| self._metadata_file = self.cache_dir / "metadata.json" |
| |
| |
| self._metadata: Dict[str, Dict[str, Any]] = {} |
| if self._metadata_file.exists(): |
| try: |
| with self._metadata_file.open("r") as f: |
| self._metadata = json.load(f) |
| except Exception: |
| self._metadata = {} |
|
|
| def _get_key(self, prompt: str, modality: str, generator_config: Optional[Dict] = None) -> str: |
| """Generate cache key from prompt and generator config.""" |
| config_str = json.dumps(generator_config or {}, sort_keys=True) |
| content = f"{modality}:{prompt}:{config_str}" |
| return hashlib.sha256(content.encode()).hexdigest()[:32] |
|
|
| def get(self, prompt: str, modality: str, generator_config: Optional[Dict] = None) -> Optional[Dict[str, Any]]: |
| """Retrieve cached generation result.""" |
| key = self._get_key(prompt, modality, generator_config) |
| |
| |
| if key in self._memory_cache: |
| return self._memory_cache[key] |
| |
| |
| if key in self._metadata: |
| entry = self._metadata[key] |
| result_path = self.cache_dir / entry["file_path"] |
| |
| if result_path.exists(): |
| result = { |
| "file_path": str(result_path), |
| "modality": entry["modality"], |
| "prompt": entry["prompt"], |
| "metadata": entry.get("metadata", {}), |
| } |
| self._memory_cache[key] = result |
| return result |
| |
| return None |
|
|
| def set( |
| self, |
| prompt: str, |
| modality: str, |
| file_path: str, |
| metadata: Optional[Dict[str, Any]] = None, |
| generator_config: Optional[Dict] = None, |
| ) -> str: |
| """Store generation result in cache.""" |
| key = self._get_key(prompt, modality, generator_config) |
| |
| |
| source_path = Path(file_path) |
| if str(source_path.parent) != str(self.cache_dir): |
| cache_file_path = self.cache_dir / f"{key}_{modality}{source_path.suffix}" |
| import shutil |
| shutil.copy2(source_path, cache_file_path) |
| file_path = str(cache_file_path) |
| else: |
| file_path = str(source_path) |
| |
| entry = { |
| "file_path": file_path, |
| "modality": modality, |
| "prompt": prompt, |
| "metadata": metadata or {}, |
| "generator_config": generator_config or {}, |
| } |
| |
| |
| self._metadata[key] = entry |
| self._memory_cache[key] = { |
| "file_path": file_path, |
| "modality": modality, |
| "prompt": prompt, |
| "metadata": metadata or {}, |
| } |
| |
| |
| with self._metadata_file.open("w") as f: |
| json.dump(self._metadata, f, indent=2) |
| |
| return key |
|
|
| def find_similar(self, prompt: str, modality: str, similarity_threshold: float = 0.85) -> list[Dict[str, Any]]: |
| """Find similar cached results using prompt similarity (simple keyword matching).""" |
| |
| |
| matches = [] |
| prompt_lower = prompt.lower() |
| |
| for entry in self._metadata.values(): |
| if entry["modality"] == modality: |
| cached_prompt_lower = entry["prompt"].lower() |
| |
| words = set(prompt_lower.split()) |
| cached_words = set(cached_prompt_lower.split()) |
| overlap = len(words & cached_words) / max(len(words | cached_words), 1) |
| |
| if overlap >= similarity_threshold: |
| result_path = self.cache_dir / entry["file_path"] |
| if result_path.exists(): |
| matches.append({ |
| "file_path": str(result_path), |
| "prompt": entry["prompt"], |
| "similarity": overlap, |
| "metadata": entry.get("metadata", {}), |
| }) |
| |
| return sorted(matches, key=lambda x: x["similarity"], reverse=True) |
|
|
|
|
| class ContentBasedCache: |
| """Content-based cache that uses semantic similarity for matching.""" |
|
|
| def __init__( |
| self, |
| cache_dir: str = ".cache/content", |
| embedding_cache: Optional[EmbeddingCache] = None, |
| similarity_threshold: float = 0.90, |
| ): |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self.similarity_threshold = similarity_threshold |
| self.embedding_cache = embedding_cache or EmbeddingCache() |
| |
| self._index_file = self.cache_dir / "content_index.json" |
| self._index: Dict[str, Dict[str, Any]] = {} |
| |
| if self._index_file.exists(): |
| try: |
| with self._index_file.open("r") as f: |
| self._index = json.load(f) |
| except Exception: |
| self._index = {} |
|
|
| def _get_content_key(self, content: str) -> str: |
| """Generate key for content.""" |
| return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
| def add(self, content: str, result: Dict[str, Any], embedding: Optional[np.ndarray] = None) -> None: |
| """Add content and result to cache.""" |
| key = self._get_content_key(content) |
| |
| |
| if embedding is not None: |
| self.embedding_cache.set(content, "content", embedding) |
| |
| entry = { |
| "content": content, |
| "result": result, |
| "embedding_key": self.embedding_cache._get_key(content, "content") if embedding is not None else None, |
| } |
| |
| self._index[key] = entry |
| |
| |
| with self._index_file.open("w") as f: |
| json.dump(self._index, f, indent=2) |
|
|
| def find_similar(self, query: str, query_embedding: Optional[np.ndarray] = None) -> list[Tuple[Dict[str, Any], float]]: |
| """Find similar cached content using semantic similarity.""" |
| if query_embedding is None: |
| |
| cached_emb = self.embedding_cache.get(query, "content") |
| if cached_emb is None: |
| return [] |
| query_embedding = cached_emb |
| |
| matches = [] |
| for key, entry in self._index.items(): |
| if entry.get("embedding_key"): |
| cached_emb = self.embedding_cache.get(entry["content"], "content") |
| if cached_emb is not None: |
| similarity = cosine_similarity(query_embedding, cached_emb) |
| if similarity >= self.similarity_threshold: |
| matches.append((entry["result"], similarity)) |
| |
| return sorted(matches, key=lambda x: x[1], reverse=True) |
|
|