| | """ |
| | Semantic caching system for cost optimization. |
| | """ |
| | import json |
| | import logging |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any |
| | import hashlib |
| | import numpy as np |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class SemanticCache: |
| | """Semantic cache using embeddings and cosine similarity.""" |
| |
|
| | def __init__( |
| | self, |
| | cache_dir: str = "data/cache", |
| | similarity_threshold: float = 0.95 |
| | ): |
| | """ |
| | Initialize semantic cache. |
| | |
| | Args: |
| | cache_dir: Directory to store cache files |
| | similarity_threshold: Cosine similarity threshold for cache hits |
| | """ |
| | self.cache_dir = Path(cache_dir) |
| | self.cache_dir.mkdir(parents=True, exist_ok=True) |
| | self.similarity_threshold = similarity_threshold |
| | self.cache_file = self.cache_dir / "semantic_cache.json" |
| | self.cache_data = self._load_cache() |
| |
|
| | def _load_cache(self) -> Dict[str, Any]: |
| | """Load cache from disk.""" |
| | if self.cache_file.exists(): |
| | try: |
| | with open(self.cache_file, 'r') as f: |
| | return json.load(f) |
| | except Exception as e: |
| | logger.error(f"Error loading cache: {str(e)}") |
| | return {} |
| | return {} |
| |
|
| | def _save_cache(self): |
| | """Save cache to disk.""" |
| | try: |
| | with open(self.cache_file, 'w') as f: |
| | json.dump(self.cache_data, f, indent=2) |
| | except Exception as e: |
| | logger.error(f"Error saving cache: {str(e)}") |
| |
|
| | def _cosine_similarity( |
| | self, |
| | embedding1: list, |
| | embedding2: list |
| | ) -> float: |
| | """ |
| | Calculate cosine similarity between two embeddings. |
| | |
| | Args: |
| | embedding1: First embedding vector |
| | embedding2: Second embedding vector |
| | |
| | Returns: |
| | Cosine similarity score |
| | """ |
| | vec1 = np.array(embedding1) |
| | vec2 = np.array(embedding2) |
| |
|
| | dot_product = np.dot(vec1, vec2) |
| | norm1 = np.linalg.norm(vec1) |
| | norm2 = np.linalg.norm(vec2) |
| |
|
| | if norm1 == 0 or norm2 == 0: |
| | return 0.0 |
| |
|
| | return dot_product / (norm1 * norm2) |
| |
|
| | def _generate_key(self, query: str, category: Optional[str] = None) -> str: |
| | """Generate cache key from query and category.""" |
| | content = f"{query}_{category or 'none'}" |
| | return hashlib.sha256(content.encode()).hexdigest() |
| |
|
| | def get( |
| | self, |
| | query: str, |
| | query_embedding: list, |
| | category: Optional[str] = None |
| | ) -> Optional[Dict[str, Any]]: |
| | """ |
| | Try to retrieve cached result. |
| | |
| | Args: |
| | query: Search query |
| | query_embedding: Query embedding vector |
| | category: Optional category filter |
| | |
| | Returns: |
| | Cached result if found, None otherwise |
| | """ |
| | try: |
| | |
| | exact_key = self._generate_key(query, category) |
| | if exact_key in self.cache_data: |
| | logger.info("Exact cache hit") |
| | return self.cache_data[exact_key]["result"] |
| |
|
| | |
| | best_similarity = 0.0 |
| | best_result = None |
| |
|
| | for key, cached_item in self.cache_data.items(): |
| | |
| | if cached_item.get("category") != (category or "none"): |
| | continue |
| |
|
| | cached_embedding = cached_item.get("embedding") |
| | if not cached_embedding: |
| | continue |
| |
|
| | similarity = self._cosine_similarity(query_embedding, cached_embedding) |
| |
|
| | if similarity > best_similarity: |
| | best_similarity = similarity |
| | best_result = cached_item["result"] |
| |
|
| | if best_similarity >= self.similarity_threshold: |
| | logger.info(f"Semantic cache hit with similarity {best_similarity:.3f}") |
| | return best_result |
| |
|
| | logger.info("Cache miss") |
| | return None |
| |
|
| | except Exception as e: |
| | logger.error(f"Error retrieving from cache: {str(e)}") |
| | return None |
| |
|
| | def set( |
| | self, |
| | query: str, |
| | query_embedding: list, |
| | result: Dict[str, Any], |
| | category: Optional[str] = None |
| | ): |
| | """ |
| | Store result in cache. |
| | |
| | Args: |
| | query: Search query |
| | query_embedding: Query embedding vector |
| | result: Result to cache |
| | category: Optional category filter |
| | """ |
| | try: |
| | key = self._generate_key(query, category) |
| |
|
| | self.cache_data[key] = { |
| | "query": query, |
| | "category": category or "none", |
| | "embedding": query_embedding, |
| | "result": result |
| | } |
| |
|
| | self._save_cache() |
| | logger.info(f"Cached result for query: {query[:50]}...") |
| |
|
| | except Exception as e: |
| | logger.error(f"Error storing in cache: {str(e)}") |
| |
|
| |
|