|
|
""" |
|
|
Document Cache |
|
|
|
|
|
Caches rendered page images and document metadata for performance. |
|
|
""" |
|
|
|
|
|
import hashlib |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Dict, Optional, Tuple |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, timedelta |
|
|
from loguru import logger |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
|
|
|
from cachetools import TTLCache, LRUCache |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CacheEntry: |
|
|
"""A cached page image entry.""" |
|
|
document_id: str |
|
|
page_number: int |
|
|
dpi: int |
|
|
image: np.ndarray |
|
|
created_at: datetime |
|
|
size_bytes: int |
|
|
|
|
|
|
|
|
class DocumentCache: |
|
|
""" |
|
|
In-memory cache for rendered document pages. |
|
|
Uses LRU eviction with optional disk persistence. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_pages: int = 100, |
|
|
max_memory_mb: int = 1024, |
|
|
ttl_seconds: int = 3600, |
|
|
disk_cache_dir: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Initialize document cache. |
|
|
|
|
|
Args: |
|
|
max_pages: Maximum number of pages to cache in memory |
|
|
max_memory_mb: Maximum memory usage in MB |
|
|
ttl_seconds: Time-to-live for cache entries |
|
|
disk_cache_dir: Optional directory for disk caching |
|
|
""" |
|
|
self.max_pages = max_pages |
|
|
self.max_memory_mb = max_memory_mb |
|
|
self.ttl_seconds = ttl_seconds |
|
|
self.disk_cache_dir = disk_cache_dir |
|
|
|
|
|
|
|
|
self._cache: TTLCache = TTLCache(maxsize=max_pages, ttl=ttl_seconds) |
|
|
|
|
|
|
|
|
self._memory_used_bytes = 0 |
|
|
|
|
|
|
|
|
self._hits = 0 |
|
|
self._misses = 0 |
|
|
|
|
|
|
|
|
if disk_cache_dir: |
|
|
self._disk_cache_path = Path(disk_cache_dir) |
|
|
self._disk_cache_path.mkdir(parents=True, exist_ok=True) |
|
|
else: |
|
|
self._disk_cache_path = None |
|
|
|
|
|
logger.debug(f"Initialized DocumentCache (max_pages={max_pages}, max_memory={max_memory_mb}MB)") |
|
|
|
|
|
def _make_key(self, document_id: str, page_number: int, dpi: int) -> str: |
|
|
"""Generate cache key.""" |
|
|
return f"{document_id}:p{page_number}:d{dpi}" |
|
|
|
|
|
def get( |
|
|
self, |
|
|
document_id: str, |
|
|
page_number: int, |
|
|
dpi: int = 300, |
|
|
) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Get a cached page image. |
|
|
|
|
|
Args: |
|
|
document_id: Document identifier |
|
|
page_number: Page number |
|
|
dpi: Rendering DPI |
|
|
|
|
|
Returns: |
|
|
Cached image array or None |
|
|
""" |
|
|
key = self._make_key(document_id, page_number, dpi) |
|
|
|
|
|
|
|
|
entry = self._cache.get(key) |
|
|
if entry is not None: |
|
|
self._hits += 1 |
|
|
return entry.image |
|
|
|
|
|
|
|
|
if self._disk_cache_path: |
|
|
disk_path = self._disk_cache_path / f"{key}.npy" |
|
|
if disk_path.exists(): |
|
|
try: |
|
|
image = np.load(disk_path) |
|
|
|
|
|
self._put_memory(key, document_id, page_number, dpi, image) |
|
|
self._hits += 1 |
|
|
return image |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load from disk cache: {e}") |
|
|
|
|
|
self._misses += 1 |
|
|
return None |
|
|
|
|
|
def put( |
|
|
self, |
|
|
document_id: str, |
|
|
page_number: int, |
|
|
dpi: int, |
|
|
image: np.ndarray, |
|
|
persist_to_disk: bool = False, |
|
|
): |
|
|
""" |
|
|
Cache a page image. |
|
|
|
|
|
Args: |
|
|
document_id: Document identifier |
|
|
page_number: Page number |
|
|
dpi: Rendering DPI |
|
|
image: Page image as numpy array |
|
|
persist_to_disk: Whether to persist to disk |
|
|
""" |
|
|
key = self._make_key(document_id, page_number, dpi) |
|
|
|
|
|
|
|
|
self._put_memory(key, document_id, page_number, dpi, image) |
|
|
|
|
|
|
|
|
if persist_to_disk and self._disk_cache_path: |
|
|
self._put_disk(key, image) |
|
|
|
|
|
def _put_memory( |
|
|
self, |
|
|
key: str, |
|
|
document_id: str, |
|
|
page_number: int, |
|
|
dpi: int, |
|
|
image: np.ndarray, |
|
|
): |
|
|
"""Put entry in memory cache.""" |
|
|
size_bytes = image.nbytes |
|
|
|
|
|
|
|
|
max_bytes = self.max_memory_mb * 1024 * 1024 |
|
|
if self._memory_used_bytes + size_bytes > max_bytes: |
|
|
|
|
|
self._evict_to_fit(size_bytes) |
|
|
|
|
|
entry = CacheEntry( |
|
|
document_id=document_id, |
|
|
page_number=page_number, |
|
|
dpi=dpi, |
|
|
image=image, |
|
|
created_at=datetime.utcnow(), |
|
|
size_bytes=size_bytes, |
|
|
) |
|
|
|
|
|
self._cache[key] = entry |
|
|
self._memory_used_bytes += size_bytes |
|
|
|
|
|
def _put_disk(self, key: str, image: np.ndarray): |
|
|
"""Persist entry to disk cache.""" |
|
|
if not self._disk_cache_path: |
|
|
return |
|
|
|
|
|
try: |
|
|
disk_path = self._disk_cache_path / f"{key}.npy" |
|
|
np.save(disk_path, image) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to write to disk cache: {e}") |
|
|
|
|
|
def _evict_to_fit(self, needed_bytes: int): |
|
|
"""Evict entries to fit new entry.""" |
|
|
max_bytes = self.max_memory_mb * 1024 * 1024 |
|
|
target = max_bytes - needed_bytes |
|
|
|
|
|
|
|
|
entries = list(self._cache.items()) |
|
|
|
|
|
for key, entry in entries: |
|
|
if self._memory_used_bytes <= target: |
|
|
break |
|
|
self._memory_used_bytes -= entry.size_bytes |
|
|
del self._cache[key] |
|
|
|
|
|
def invalidate(self, document_id: str, page_number: Optional[int] = None): |
|
|
""" |
|
|
Invalidate cache entries for a document. |
|
|
|
|
|
Args: |
|
|
document_id: Document to invalidate |
|
|
page_number: Optional specific page (None = all pages) |
|
|
""" |
|
|
keys_to_remove = [] |
|
|
|
|
|
for key in self._cache.keys(): |
|
|
if key.startswith(f"{document_id}:"): |
|
|
if page_number is None or f":p{page_number}:" in key: |
|
|
keys_to_remove.append(key) |
|
|
|
|
|
for key in keys_to_remove: |
|
|
entry = self._cache.pop(key, None) |
|
|
if entry: |
|
|
self._memory_used_bytes -= entry.size_bytes |
|
|
|
|
|
|
|
|
if self._disk_cache_path: |
|
|
for key in keys_to_remove: |
|
|
disk_path = self._disk_cache_path / f"{key}.npy" |
|
|
if disk_path.exists(): |
|
|
disk_path.unlink() |
|
|
|
|
|
def clear(self): |
|
|
"""Clear all cache entries.""" |
|
|
self._cache.clear() |
|
|
self._memory_used_bytes = 0 |
|
|
|
|
|
|
|
|
if self._disk_cache_path: |
|
|
for f in self._disk_cache_path.glob("*.npy"): |
|
|
f.unlink() |
|
|
|
|
|
logger.info("Document cache cleared") |
|
|
|
|
|
@property |
|
|
def stats(self) -> Dict: |
|
|
"""Get cache statistics.""" |
|
|
total = self._hits + self._misses |
|
|
hit_rate = (self._hits / total * 100) if total > 0 else 0 |
|
|
|
|
|
return { |
|
|
"hits": self._hits, |
|
|
"misses": self._misses, |
|
|
"hit_rate": f"{hit_rate:.1f}%", |
|
|
"entries": len(self._cache), |
|
|
"memory_used_mb": self._memory_used_bytes / (1024 * 1024), |
|
|
"max_memory_mb": self.max_memory_mb, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_document_cache: Optional[DocumentCache] = None |
|
|
|
|
|
|
|
|
def get_document_cache() -> DocumentCache: |
|
|
"""Get or create the global document cache.""" |
|
|
global _document_cache |
|
|
if _document_cache is None: |
|
|
_document_cache = DocumentCache() |
|
|
return _document_cache |
|
|
|