MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Cache
Caches rendered page images and document metadata for performance.
"""
import hashlib
import os
from pathlib import Path
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from loguru import logger
import numpy as np
from PIL import Image
from cachetools import TTLCache, LRUCache
@dataclass
class CacheEntry:
"""A cached page image entry."""
document_id: str
page_number: int
dpi: int
image: np.ndarray
created_at: datetime
size_bytes: int
class DocumentCache:
"""
In-memory cache for rendered document pages.
Uses LRU eviction with optional disk persistence.
"""
def __init__(
self,
max_pages: int = 100,
max_memory_mb: int = 1024,
ttl_seconds: int = 3600,
disk_cache_dir: Optional[str] = None,
):
"""
Initialize document cache.
Args:
max_pages: Maximum number of pages to cache in memory
max_memory_mb: Maximum memory usage in MB
ttl_seconds: Time-to-live for cache entries
disk_cache_dir: Optional directory for disk caching
"""
self.max_pages = max_pages
self.max_memory_mb = max_memory_mb
self.ttl_seconds = ttl_seconds
self.disk_cache_dir = disk_cache_dir
# In-memory cache
self._cache: TTLCache = TTLCache(maxsize=max_pages, ttl=ttl_seconds)
# Memory tracking
self._memory_used_bytes = 0
# Statistics
self._hits = 0
self._misses = 0
# Initialize disk cache if enabled
if disk_cache_dir:
self._disk_cache_path = Path(disk_cache_dir)
self._disk_cache_path.mkdir(parents=True, exist_ok=True)
else:
self._disk_cache_path = None
logger.debug(f"Initialized DocumentCache (max_pages={max_pages}, max_memory={max_memory_mb}MB)")
def _make_key(self, document_id: str, page_number: int, dpi: int) -> str:
"""Generate cache key."""
return f"{document_id}:p{page_number}:d{dpi}"
def get(
self,
document_id: str,
page_number: int,
dpi: int = 300,
) -> Optional[np.ndarray]:
"""
Get a cached page image.
Args:
document_id: Document identifier
page_number: Page number
dpi: Rendering DPI
Returns:
Cached image array or None
"""
key = self._make_key(document_id, page_number, dpi)
# Check in-memory cache
entry = self._cache.get(key)
if entry is not None:
self._hits += 1
return entry.image
# Check disk cache
if self._disk_cache_path:
disk_path = self._disk_cache_path / f"{key}.npy"
if disk_path.exists():
try:
image = np.load(disk_path)
# Promote to memory cache
self._put_memory(key, document_id, page_number, dpi, image)
self._hits += 1
return image
except Exception as e:
logger.warning(f"Failed to load from disk cache: {e}")
self._misses += 1
return None
def put(
self,
document_id: str,
page_number: int,
dpi: int,
image: np.ndarray,
persist_to_disk: bool = False,
):
"""
Cache a page image.
Args:
document_id: Document identifier
page_number: Page number
dpi: Rendering DPI
image: Page image as numpy array
persist_to_disk: Whether to persist to disk
"""
key = self._make_key(document_id, page_number, dpi)
# Put in memory cache
self._put_memory(key, document_id, page_number, dpi, image)
# Optionally persist to disk
if persist_to_disk and self._disk_cache_path:
self._put_disk(key, image)
def _put_memory(
self,
key: str,
document_id: str,
page_number: int,
dpi: int,
image: np.ndarray,
):
"""Put entry in memory cache."""
size_bytes = image.nbytes
# Check memory limit
max_bytes = self.max_memory_mb * 1024 * 1024
if self._memory_used_bytes + size_bytes > max_bytes:
# Evict oldest entries until we have space
self._evict_to_fit(size_bytes)
entry = CacheEntry(
document_id=document_id,
page_number=page_number,
dpi=dpi,
image=image,
created_at=datetime.utcnow(),
size_bytes=size_bytes,
)
self._cache[key] = entry
self._memory_used_bytes += size_bytes
def _put_disk(self, key: str, image: np.ndarray):
"""Persist entry to disk cache."""
if not self._disk_cache_path:
return
try:
disk_path = self._disk_cache_path / f"{key}.npy"
np.save(disk_path, image)
except Exception as e:
logger.warning(f"Failed to write to disk cache: {e}")
def _evict_to_fit(self, needed_bytes: int):
"""Evict entries to fit new entry."""
max_bytes = self.max_memory_mb * 1024 * 1024
target = max_bytes - needed_bytes
# Get entries sorted by creation time (oldest first)
entries = list(self._cache.items())
for key, entry in entries:
if self._memory_used_bytes <= target:
break
self._memory_used_bytes -= entry.size_bytes
del self._cache[key]
def invalidate(self, document_id: str, page_number: Optional[int] = None):
"""
Invalidate cache entries for a document.
Args:
document_id: Document to invalidate
page_number: Optional specific page (None = all pages)
"""
keys_to_remove = []
for key in self._cache.keys():
if key.startswith(f"{document_id}:"):
if page_number is None or f":p{page_number}:" in key:
keys_to_remove.append(key)
for key in keys_to_remove:
entry = self._cache.pop(key, None)
if entry:
self._memory_used_bytes -= entry.size_bytes
# Also remove from disk cache
if self._disk_cache_path:
for key in keys_to_remove:
disk_path = self._disk_cache_path / f"{key}.npy"
if disk_path.exists():
disk_path.unlink()
def clear(self):
"""Clear all cache entries."""
self._cache.clear()
self._memory_used_bytes = 0
# Clear disk cache
if self._disk_cache_path:
for f in self._disk_cache_path.glob("*.npy"):
f.unlink()
logger.info("Document cache cleared")
@property
def stats(self) -> Dict:
"""Get cache statistics."""
total = self._hits + self._misses
hit_rate = (self._hits / total * 100) if total > 0 else 0
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate": f"{hit_rate:.1f}%",
"entries": len(self._cache),
"memory_used_mb": self._memory_used_bytes / (1024 * 1024),
"max_memory_mb": self.max_memory_mb,
}
# Global cache instance
_document_cache: Optional[DocumentCache] = None
def get_document_cache() -> DocumentCache:
"""Get or create the global document cache."""
global _document_cache
if _document_cache is None:
_document_cache = DocumentCache()
return _document_cache