File size: 7,736 Bytes
d520909 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
"""
Document Cache
Caches rendered page images and document metadata for performance.
"""
import hashlib
import os
from pathlib import Path
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from loguru import logger
import numpy as np
from PIL import Image
from cachetools import TTLCache, LRUCache
@dataclass
class CacheEntry:
"""A cached page image entry."""
document_id: str
page_number: int
dpi: int
image: np.ndarray
created_at: datetime
size_bytes: int
class DocumentCache:
"""
In-memory cache for rendered document pages.
Uses LRU eviction with optional disk persistence.
"""
def __init__(
self,
max_pages: int = 100,
max_memory_mb: int = 1024,
ttl_seconds: int = 3600,
disk_cache_dir: Optional[str] = None,
):
"""
Initialize document cache.
Args:
max_pages: Maximum number of pages to cache in memory
max_memory_mb: Maximum memory usage in MB
ttl_seconds: Time-to-live for cache entries
disk_cache_dir: Optional directory for disk caching
"""
self.max_pages = max_pages
self.max_memory_mb = max_memory_mb
self.ttl_seconds = ttl_seconds
self.disk_cache_dir = disk_cache_dir
# In-memory cache
self._cache: TTLCache = TTLCache(maxsize=max_pages, ttl=ttl_seconds)
# Memory tracking
self._memory_used_bytes = 0
# Statistics
self._hits = 0
self._misses = 0
# Initialize disk cache if enabled
if disk_cache_dir:
self._disk_cache_path = Path(disk_cache_dir)
self._disk_cache_path.mkdir(parents=True, exist_ok=True)
else:
self._disk_cache_path = None
logger.debug(f"Initialized DocumentCache (max_pages={max_pages}, max_memory={max_memory_mb}MB)")
def _make_key(self, document_id: str, page_number: int, dpi: int) -> str:
"""Generate cache key."""
return f"{document_id}:p{page_number}:d{dpi}"
def get(
self,
document_id: str,
page_number: int,
dpi: int = 300,
) -> Optional[np.ndarray]:
"""
Get a cached page image.
Args:
document_id: Document identifier
page_number: Page number
dpi: Rendering DPI
Returns:
Cached image array or None
"""
key = self._make_key(document_id, page_number, dpi)
# Check in-memory cache
entry = self._cache.get(key)
if entry is not None:
self._hits += 1
return entry.image
# Check disk cache
if self._disk_cache_path:
disk_path = self._disk_cache_path / f"{key}.npy"
if disk_path.exists():
try:
image = np.load(disk_path)
# Promote to memory cache
self._put_memory(key, document_id, page_number, dpi, image)
self._hits += 1
return image
except Exception as e:
logger.warning(f"Failed to load from disk cache: {e}")
self._misses += 1
return None
def put(
self,
document_id: str,
page_number: int,
dpi: int,
image: np.ndarray,
persist_to_disk: bool = False,
):
"""
Cache a page image.
Args:
document_id: Document identifier
page_number: Page number
dpi: Rendering DPI
image: Page image as numpy array
persist_to_disk: Whether to persist to disk
"""
key = self._make_key(document_id, page_number, dpi)
# Put in memory cache
self._put_memory(key, document_id, page_number, dpi, image)
# Optionally persist to disk
if persist_to_disk and self._disk_cache_path:
self._put_disk(key, image)
def _put_memory(
self,
key: str,
document_id: str,
page_number: int,
dpi: int,
image: np.ndarray,
):
"""Put entry in memory cache."""
size_bytes = image.nbytes
# Check memory limit
max_bytes = self.max_memory_mb * 1024 * 1024
if self._memory_used_bytes + size_bytes > max_bytes:
# Evict oldest entries until we have space
self._evict_to_fit(size_bytes)
entry = CacheEntry(
document_id=document_id,
page_number=page_number,
dpi=dpi,
image=image,
created_at=datetime.utcnow(),
size_bytes=size_bytes,
)
self._cache[key] = entry
self._memory_used_bytes += size_bytes
def _put_disk(self, key: str, image: np.ndarray):
"""Persist entry to disk cache."""
if not self._disk_cache_path:
return
try:
disk_path = self._disk_cache_path / f"{key}.npy"
np.save(disk_path, image)
except Exception as e:
logger.warning(f"Failed to write to disk cache: {e}")
def _evict_to_fit(self, needed_bytes: int):
"""Evict entries to fit new entry."""
max_bytes = self.max_memory_mb * 1024 * 1024
target = max_bytes - needed_bytes
# Get entries sorted by creation time (oldest first)
entries = list(self._cache.items())
for key, entry in entries:
if self._memory_used_bytes <= target:
break
self._memory_used_bytes -= entry.size_bytes
del self._cache[key]
def invalidate(self, document_id: str, page_number: Optional[int] = None):
"""
Invalidate cache entries for a document.
Args:
document_id: Document to invalidate
page_number: Optional specific page (None = all pages)
"""
keys_to_remove = []
for key in self._cache.keys():
if key.startswith(f"{document_id}:"):
if page_number is None or f":p{page_number}:" in key:
keys_to_remove.append(key)
for key in keys_to_remove:
entry = self._cache.pop(key, None)
if entry:
self._memory_used_bytes -= entry.size_bytes
# Also remove from disk cache
if self._disk_cache_path:
for key in keys_to_remove:
disk_path = self._disk_cache_path / f"{key}.npy"
if disk_path.exists():
disk_path.unlink()
def clear(self):
"""Clear all cache entries."""
self._cache.clear()
self._memory_used_bytes = 0
# Clear disk cache
if self._disk_cache_path:
for f in self._disk_cache_path.glob("*.npy"):
f.unlink()
logger.info("Document cache cleared")
@property
def stats(self) -> Dict:
"""Get cache statistics."""
total = self._hits + self._misses
hit_rate = (self._hits / total * 100) if total > 0 else 0
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate": f"{hit_rate:.1f}%",
"entries": len(self._cache),
"memory_used_mb": self._memory_used_bytes / (1024 * 1024),
"max_memory_mb": self.max_memory_mb,
}
# Global cache instance
_document_cache: Optional[DocumentCache] = None
def get_document_cache() -> DocumentCache:
"""Get or create the global document cache."""
global _document_cache
if _document_cache is None:
_document_cache = DocumentCache()
return _document_cache
|