""" Document and Page Caching File-based caching for rendered pages and processing results. Supports LRU eviction and configurable storage backends. """ import hashlib import json import logging import os import pickle import shutil import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union import numpy as np from PIL import Image logger = logging.getLogger(__name__) T = TypeVar("T") @dataclass class CacheConfig: """Configuration for document cache.""" cache_dir: Path = field(default_factory=lambda: Path.home() / ".cache" / "sparknet" / "documents") max_size_gb: float = 10.0 ttl_hours: float = 168.0 # 7 days enabled: bool = True compression: bool = True def __post_init__(self): self.cache_dir = Path(self.cache_dir) @dataclass class CacheEntry: """Metadata for a cached item.""" key: str path: Path size_bytes: int created_at: float last_accessed: float ttl_hours: float metadata: Dict[str, Any] = field(default_factory=dict) @property def is_expired(self) -> bool: """Check if entry has expired.""" age_hours = (time.time() - self.created_at) / 3600 return age_hours > self.ttl_hours class DocumentCache: """ File-based cache for document processing results. Features: - LRU eviction when cache exceeds max size - TTL-based expiration - Separate namespaces for different data types - Compressed storage option """ NAMESPACES = ["pages", "ocr", "layout", "chunks", "metadata"] def __init__(self, config: Optional[CacheConfig] = None): self.config = config or CacheConfig() self._index: Dict[str, CacheEntry] = {} self._index_path: Optional[Path] = None if self.config.enabled: self._init_cache_dir() self._load_index() def _init_cache_dir(self) -> None: """Initialize cache directory structure.""" self.config.cache_dir.mkdir(parents=True, exist_ok=True) for namespace in self.NAMESPACES: (self.config.cache_dir / namespace).mkdir(exist_ok=True) self._index_path = self.config.cache_dir / "index.json" def _load_index(self) -> None: """Load cache index from disk.""" if self._index_path and self._index_path.exists(): try: with open(self._index_path, "r") as f: data = json.load(f) for key, entry_data in data.items(): entry = CacheEntry( key=entry_data["key"], path=Path(entry_data["path"]), size_bytes=entry_data["size_bytes"], created_at=entry_data["created_at"], last_accessed=entry_data["last_accessed"], ttl_hours=entry_data.get("ttl_hours", self.config.ttl_hours), metadata=entry_data.get("metadata", {}) ) self._index[key] = entry except Exception as e: logger.warning(f"Failed to load cache index: {e}") self._index = {} def _save_index(self) -> None: """Save cache index to disk.""" if not self._index_path: return try: data = {} for key, entry in self._index.items(): data[key] = { "key": entry.key, "path": str(entry.path), "size_bytes": entry.size_bytes, "created_at": entry.created_at, "last_accessed": entry.last_accessed, "ttl_hours": entry.ttl_hours, "metadata": entry.metadata } with open(self._index_path, "w") as f: json.dump(data, f) except Exception as e: logger.warning(f"Failed to save cache index: {e}") def _generate_key( self, doc_path: Union[str, Path], namespace: str, *args, **kwargs ) -> str: """Generate a unique cache key.""" doc_path = Path(doc_path) # Include file modification time for cache invalidation try: mtime = doc_path.stat().st_mtime except OSError: mtime = 0 key_parts = [ str(doc_path.absolute()), str(mtime), namespace, *[str(a) for a in args], *[f"{k}={v}" for k, v in sorted(kwargs.items())] ] key_str = "|".join(key_parts) return hashlib.sha256(key_str.encode()).hexdigest() def _get_cache_path(self, key: str, namespace: str, ext: str = ".pkl") -> Path: """Get file path for a cache entry.""" return self.config.cache_dir / namespace / f"{key}{ext}" def _get_total_size(self) -> int: """Get total cache size in bytes.""" return sum(entry.size_bytes for entry in self._index.values()) def _evict_if_needed(self, required_bytes: int = 0) -> None: """Evict entries if cache exceeds max size.""" max_bytes = self.config.max_size_gb * 1024 * 1024 * 1024 current_size = self._get_total_size() if current_size + required_bytes <= max_bytes: return # Sort by last accessed (LRU) entries = sorted( self._index.values(), key=lambda e: e.last_accessed ) # Evict until we have enough space for entry in entries: if current_size + required_bytes <= max_bytes: break self._delete_entry(entry.key) current_size -= entry.size_bytes def _delete_entry(self, key: str) -> None: """Delete a cache entry.""" if key not in self._index: return entry = self._index[key] try: if entry.path.exists(): entry.path.unlink() except Exception as e: logger.warning(f"Failed to delete cache file: {e}") del self._index[key] def _cleanup_expired(self) -> int: """Remove expired entries. Returns number removed.""" expired_keys = [ key for key, entry in self._index.items() if entry.is_expired ] for key in expired_keys: self._delete_entry(key) if expired_keys: self._save_index() return len(expired_keys) # Public API def get_page_image( self, doc_path: Union[str, Path], page_number: int, dpi: int = 200 ) -> Optional[np.ndarray]: """Get cached page image.""" if not self.config.enabled: return None key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) if key not in self._index: return None entry = self._index[key] if entry.is_expired: self._delete_entry(key) return None try: # Load image img = Image.open(entry.path) arr = np.array(img) # Update access time entry.last_accessed = time.time() self._save_index() return arr except Exception as e: logger.warning(f"Failed to load cached page: {e}") self._delete_entry(key) return None def set_page_image( self, doc_path: Union[str, Path], page_number: int, image: np.ndarray, dpi: int = 200 ) -> bool: """Cache a page image.""" if not self.config.enabled: return False key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) cache_path = self._get_cache_path(key, "pages", ".png") try: # Convert and save img = Image.fromarray(image) # Estimate size estimated_size = image.nbytes // 10 # PNG compression self._evict_if_needed(estimated_size) # Save image img.save(cache_path, format="PNG", optimize=self.config.compression) # Create index entry entry = CacheEntry( key=key, path=cache_path, size_bytes=cache_path.stat().st_size, created_at=time.time(), last_accessed=time.time(), ttl_hours=self.config.ttl_hours, metadata={"page": page_number, "dpi": dpi} ) self._index[key] = entry self._save_index() return True except Exception as e: logger.warning(f"Failed to cache page image: {e}") return False def get( self, doc_path: Union[str, Path], namespace: str, *args, **kwargs ) -> Optional[Any]: """Get a cached object.""" if not self.config.enabled: return None key = self._generate_key(doc_path, namespace, *args, **kwargs) if key not in self._index: return None entry = self._index[key] if entry.is_expired: self._delete_entry(key) return None try: with open(entry.path, "rb") as f: data = pickle.load(f) entry.last_accessed = time.time() self._save_index() return data except Exception as e: logger.warning(f"Failed to load cached object: {e}") self._delete_entry(key) return None def set( self, doc_path: Union[str, Path], namespace: str, value: Any, *args, **kwargs ) -> bool: """Cache an object.""" if not self.config.enabled: return False key = self._generate_key(doc_path, namespace, *args, **kwargs) cache_path = self._get_cache_path(key, namespace, ".pkl") try: # Serialize and estimate size data = pickle.dumps(value) self._evict_if_needed(len(data)) # Save with open(cache_path, "wb") as f: f.write(data) entry = CacheEntry( key=key, path=cache_path, size_bytes=len(data), created_at=time.time(), last_accessed=time.time(), ttl_hours=self.config.ttl_hours ) self._index[key] = entry self._save_index() return True except Exception as e: logger.warning(f"Failed to cache object: {e}") return False def invalidate_document(self, doc_path: Union[str, Path]) -> int: """Invalidate all cache entries for a document. Returns count removed.""" doc_path = Path(doc_path).absolute() doc_str = str(doc_path) keys_to_remove = [] for key, entry in self._index.items(): # Check metadata for document path if entry.metadata.get("doc_path") == doc_str: keys_to_remove.append(key) for key in keys_to_remove: self._delete_entry(key) if keys_to_remove: self._save_index() return len(keys_to_remove) def clear(self) -> None: """Clear entire cache.""" if self.config.cache_dir.exists(): shutil.rmtree(self.config.cache_dir) self._index = {} self._init_cache_dir() def get_stats(self) -> Dict[str, Any]: """Get cache statistics.""" total_size = self._get_total_size() return { "enabled": self.config.enabled, "total_entries": len(self._index), "total_size_bytes": total_size, "total_size_mb": total_size / (1024 * 1024), "max_size_gb": self.config.max_size_gb, "utilization_percent": (total_size / (self.config.max_size_gb * 1024 * 1024 * 1024)) * 100, "cache_dir": str(self.config.cache_dir), "namespaces": { ns: sum(1 for e in self._index.values() if ns in str(e.path)) for ns in self.NAMESPACES } } # Global cache instance _global_cache: Optional[DocumentCache] = None def get_document_cache(config: Optional[CacheConfig] = None) -> DocumentCache: """Get or create global document cache.""" global _global_cache if _global_cache is None or config is not None: _global_cache = DocumentCache(config) return _global_cache def cached_page( cache: Optional[DocumentCache] = None, dpi: int = 200 ) -> Callable: """ Decorator for caching page rendering results. Usage: @cached_page(cache, dpi=200) def render_page(doc_path, page_number): # ... rendering logic return image_array """ def decorator(func: Callable) -> Callable: def wrapper(doc_path: Union[str, Path], page_number: int, *args, **kwargs): _cache = cache or get_document_cache() # Try cache first cached = _cache.get_page_image(doc_path, page_number, dpi) if cached is not None: return cached # Compute and cache result = func(doc_path, page_number, *args, **kwargs) if result is not None: _cache.set_page_image(doc_path, page_number, result, dpi) return result return wrapper return decorator