|
|
""" |
|
|
Document and Page Caching |
|
|
|
|
|
File-based caching for rendered pages and processing results. |
|
|
Supports LRU eviction and configurable storage backends. |
|
|
""" |
|
|
|
|
|
import hashlib |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import pickle |
|
|
import shutil |
|
|
import time |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
T = TypeVar("T") |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CacheConfig: |
|
|
"""Configuration for document cache.""" |
|
|
|
|
|
cache_dir: Path = field(default_factory=lambda: Path.home() / ".cache" / "sparknet" / "documents") |
|
|
max_size_gb: float = 10.0 |
|
|
ttl_hours: float = 168.0 |
|
|
enabled: bool = True |
|
|
compression: bool = True |
|
|
|
|
|
def __post_init__(self): |
|
|
self.cache_dir = Path(self.cache_dir) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CacheEntry: |
|
|
"""Metadata for a cached item.""" |
|
|
|
|
|
key: str |
|
|
path: Path |
|
|
size_bytes: int |
|
|
created_at: float |
|
|
last_accessed: float |
|
|
ttl_hours: float |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
@property |
|
|
def is_expired(self) -> bool: |
|
|
"""Check if entry has expired.""" |
|
|
age_hours = (time.time() - self.created_at) / 3600 |
|
|
return age_hours > self.ttl_hours |
|
|
|
|
|
|
|
|
class DocumentCache: |
|
|
""" |
|
|
File-based cache for document processing results. |
|
|
|
|
|
Features: |
|
|
- LRU eviction when cache exceeds max size |
|
|
- TTL-based expiration |
|
|
- Separate namespaces for different data types |
|
|
- Compressed storage option |
|
|
""" |
|
|
|
|
|
NAMESPACES = ["pages", "ocr", "layout", "chunks", "metadata"] |
|
|
|
|
|
def __init__(self, config: Optional[CacheConfig] = None): |
|
|
self.config = config or CacheConfig() |
|
|
self._index: Dict[str, CacheEntry] = {} |
|
|
self._index_path: Optional[Path] = None |
|
|
|
|
|
if self.config.enabled: |
|
|
self._init_cache_dir() |
|
|
self._load_index() |
|
|
|
|
|
def _init_cache_dir(self) -> None: |
|
|
"""Initialize cache directory structure.""" |
|
|
self.config.cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
for namespace in self.NAMESPACES: |
|
|
(self.config.cache_dir / namespace).mkdir(exist_ok=True) |
|
|
|
|
|
self._index_path = self.config.cache_dir / "index.json" |
|
|
|
|
|
def _load_index(self) -> None: |
|
|
"""Load cache index from disk.""" |
|
|
if self._index_path and self._index_path.exists(): |
|
|
try: |
|
|
with open(self._index_path, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
for key, entry_data in data.items(): |
|
|
entry = CacheEntry( |
|
|
key=entry_data["key"], |
|
|
path=Path(entry_data["path"]), |
|
|
size_bytes=entry_data["size_bytes"], |
|
|
created_at=entry_data["created_at"], |
|
|
last_accessed=entry_data["last_accessed"], |
|
|
ttl_hours=entry_data.get("ttl_hours", self.config.ttl_hours), |
|
|
metadata=entry_data.get("metadata", {}) |
|
|
) |
|
|
self._index[key] = entry |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load cache index: {e}") |
|
|
self._index = {} |
|
|
|
|
|
def _save_index(self) -> None: |
|
|
"""Save cache index to disk.""" |
|
|
if not self._index_path: |
|
|
return |
|
|
|
|
|
try: |
|
|
data = {} |
|
|
for key, entry in self._index.items(): |
|
|
data[key] = { |
|
|
"key": entry.key, |
|
|
"path": str(entry.path), |
|
|
"size_bytes": entry.size_bytes, |
|
|
"created_at": entry.created_at, |
|
|
"last_accessed": entry.last_accessed, |
|
|
"ttl_hours": entry.ttl_hours, |
|
|
"metadata": entry.metadata |
|
|
} |
|
|
|
|
|
with open(self._index_path, "w") as f: |
|
|
json.dump(data, f) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to save cache index: {e}") |
|
|
|
|
|
def _generate_key( |
|
|
self, |
|
|
doc_path: Union[str, Path], |
|
|
namespace: str, |
|
|
*args, |
|
|
**kwargs |
|
|
) -> str: |
|
|
"""Generate a unique cache key.""" |
|
|
doc_path = Path(doc_path) |
|
|
|
|
|
|
|
|
try: |
|
|
mtime = doc_path.stat().st_mtime |
|
|
except OSError: |
|
|
mtime = 0 |
|
|
|
|
|
key_parts = [ |
|
|
str(doc_path.absolute()), |
|
|
str(mtime), |
|
|
namespace, |
|
|
*[str(a) for a in args], |
|
|
*[f"{k}={v}" for k, v in sorted(kwargs.items())] |
|
|
] |
|
|
|
|
|
key_str = "|".join(key_parts) |
|
|
return hashlib.sha256(key_str.encode()).hexdigest() |
|
|
|
|
|
def _get_cache_path(self, key: str, namespace: str, ext: str = ".pkl") -> Path: |
|
|
"""Get file path for a cache entry.""" |
|
|
return self.config.cache_dir / namespace / f"{key}{ext}" |
|
|
|
|
|
def _get_total_size(self) -> int: |
|
|
"""Get total cache size in bytes.""" |
|
|
return sum(entry.size_bytes for entry in self._index.values()) |
|
|
|
|
|
def _evict_if_needed(self, required_bytes: int = 0) -> None: |
|
|
"""Evict entries if cache exceeds max size.""" |
|
|
max_bytes = self.config.max_size_gb * 1024 * 1024 * 1024 |
|
|
current_size = self._get_total_size() |
|
|
|
|
|
if current_size + required_bytes <= max_bytes: |
|
|
return |
|
|
|
|
|
|
|
|
entries = sorted( |
|
|
self._index.values(), |
|
|
key=lambda e: e.last_accessed |
|
|
) |
|
|
|
|
|
|
|
|
for entry in entries: |
|
|
if current_size + required_bytes <= max_bytes: |
|
|
break |
|
|
|
|
|
self._delete_entry(entry.key) |
|
|
current_size -= entry.size_bytes |
|
|
|
|
|
def _delete_entry(self, key: str) -> None: |
|
|
"""Delete a cache entry.""" |
|
|
if key not in self._index: |
|
|
return |
|
|
|
|
|
entry = self._index[key] |
|
|
try: |
|
|
if entry.path.exists(): |
|
|
entry.path.unlink() |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to delete cache file: {e}") |
|
|
|
|
|
del self._index[key] |
|
|
|
|
|
def _cleanup_expired(self) -> int: |
|
|
"""Remove expired entries. Returns number removed.""" |
|
|
expired_keys = [ |
|
|
key for key, entry in self._index.items() |
|
|
if entry.is_expired |
|
|
] |
|
|
|
|
|
for key in expired_keys: |
|
|
self._delete_entry(key) |
|
|
|
|
|
if expired_keys: |
|
|
self._save_index() |
|
|
|
|
|
return len(expired_keys) |
|
|
|
|
|
|
|
|
|
|
|
def get_page_image( |
|
|
self, |
|
|
doc_path: Union[str, Path], |
|
|
page_number: int, |
|
|
dpi: int = 200 |
|
|
) -> Optional[np.ndarray]: |
|
|
"""Get cached page image.""" |
|
|
if not self.config.enabled: |
|
|
return None |
|
|
|
|
|
key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
|
|
|
|
|
if key not in self._index: |
|
|
return None |
|
|
|
|
|
entry = self._index[key] |
|
|
if entry.is_expired: |
|
|
self._delete_entry(key) |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
img = Image.open(entry.path) |
|
|
arr = np.array(img) |
|
|
|
|
|
|
|
|
entry.last_accessed = time.time() |
|
|
self._save_index() |
|
|
|
|
|
return arr |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load cached page: {e}") |
|
|
self._delete_entry(key) |
|
|
return None |
|
|
|
|
|
def set_page_image( |
|
|
self, |
|
|
doc_path: Union[str, Path], |
|
|
page_number: int, |
|
|
image: np.ndarray, |
|
|
dpi: int = 200 |
|
|
) -> bool: |
|
|
"""Cache a page image.""" |
|
|
if not self.config.enabled: |
|
|
return False |
|
|
|
|
|
key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
|
|
cache_path = self._get_cache_path(key, "pages", ".png") |
|
|
|
|
|
try: |
|
|
|
|
|
img = Image.fromarray(image) |
|
|
|
|
|
|
|
|
estimated_size = image.nbytes // 10 |
|
|
|
|
|
self._evict_if_needed(estimated_size) |
|
|
|
|
|
|
|
|
img.save(cache_path, format="PNG", optimize=self.config.compression) |
|
|
|
|
|
|
|
|
entry = CacheEntry( |
|
|
key=key, |
|
|
path=cache_path, |
|
|
size_bytes=cache_path.stat().st_size, |
|
|
created_at=time.time(), |
|
|
last_accessed=time.time(), |
|
|
ttl_hours=self.config.ttl_hours, |
|
|
metadata={"page": page_number, "dpi": dpi} |
|
|
) |
|
|
self._index[key] = entry |
|
|
self._save_index() |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cache page image: {e}") |
|
|
return False |
|
|
|
|
|
def get( |
|
|
self, |
|
|
doc_path: Union[str, Path], |
|
|
namespace: str, |
|
|
*args, |
|
|
**kwargs |
|
|
) -> Optional[Any]: |
|
|
"""Get a cached object.""" |
|
|
if not self.config.enabled: |
|
|
return None |
|
|
|
|
|
key = self._generate_key(doc_path, namespace, *args, **kwargs) |
|
|
|
|
|
if key not in self._index: |
|
|
return None |
|
|
|
|
|
entry = self._index[key] |
|
|
if entry.is_expired: |
|
|
self._delete_entry(key) |
|
|
return None |
|
|
|
|
|
try: |
|
|
with open(entry.path, "rb") as f: |
|
|
data = pickle.load(f) |
|
|
|
|
|
entry.last_accessed = time.time() |
|
|
self._save_index() |
|
|
|
|
|
return data |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load cached object: {e}") |
|
|
self._delete_entry(key) |
|
|
return None |
|
|
|
|
|
def set( |
|
|
self, |
|
|
doc_path: Union[str, Path], |
|
|
namespace: str, |
|
|
value: Any, |
|
|
*args, |
|
|
**kwargs |
|
|
) -> bool: |
|
|
"""Cache an object.""" |
|
|
if not self.config.enabled: |
|
|
return False |
|
|
|
|
|
key = self._generate_key(doc_path, namespace, *args, **kwargs) |
|
|
cache_path = self._get_cache_path(key, namespace, ".pkl") |
|
|
|
|
|
try: |
|
|
|
|
|
data = pickle.dumps(value) |
|
|
self._evict_if_needed(len(data)) |
|
|
|
|
|
|
|
|
with open(cache_path, "wb") as f: |
|
|
f.write(data) |
|
|
|
|
|
entry = CacheEntry( |
|
|
key=key, |
|
|
path=cache_path, |
|
|
size_bytes=len(data), |
|
|
created_at=time.time(), |
|
|
last_accessed=time.time(), |
|
|
ttl_hours=self.config.ttl_hours |
|
|
) |
|
|
self._index[key] = entry |
|
|
self._save_index() |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cache object: {e}") |
|
|
return False |
|
|
|
|
|
def invalidate_document(self, doc_path: Union[str, Path]) -> int: |
|
|
"""Invalidate all cache entries for a document. Returns count removed.""" |
|
|
doc_path = Path(doc_path).absolute() |
|
|
doc_str = str(doc_path) |
|
|
|
|
|
keys_to_remove = [] |
|
|
for key, entry in self._index.items(): |
|
|
|
|
|
if entry.metadata.get("doc_path") == doc_str: |
|
|
keys_to_remove.append(key) |
|
|
|
|
|
for key in keys_to_remove: |
|
|
self._delete_entry(key) |
|
|
|
|
|
if keys_to_remove: |
|
|
self._save_index() |
|
|
|
|
|
return len(keys_to_remove) |
|
|
|
|
|
def clear(self) -> None: |
|
|
"""Clear entire cache.""" |
|
|
if self.config.cache_dir.exists(): |
|
|
shutil.rmtree(self.config.cache_dir) |
|
|
|
|
|
self._index = {} |
|
|
self._init_cache_dir() |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get cache statistics.""" |
|
|
total_size = self._get_total_size() |
|
|
return { |
|
|
"enabled": self.config.enabled, |
|
|
"total_entries": len(self._index), |
|
|
"total_size_bytes": total_size, |
|
|
"total_size_mb": total_size / (1024 * 1024), |
|
|
"max_size_gb": self.config.max_size_gb, |
|
|
"utilization_percent": (total_size / (self.config.max_size_gb * 1024 * 1024 * 1024)) * 100, |
|
|
"cache_dir": str(self.config.cache_dir), |
|
|
"namespaces": { |
|
|
ns: sum(1 for e in self._index.values() if ns in str(e.path)) |
|
|
for ns in self.NAMESPACES |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_global_cache: Optional[DocumentCache] = None |
|
|
|
|
|
|
|
|
def get_document_cache(config: Optional[CacheConfig] = None) -> DocumentCache: |
|
|
"""Get or create global document cache.""" |
|
|
global _global_cache |
|
|
|
|
|
if _global_cache is None or config is not None: |
|
|
_global_cache = DocumentCache(config) |
|
|
|
|
|
return _global_cache |
|
|
|
|
|
|
|
|
def cached_page( |
|
|
cache: Optional[DocumentCache] = None, |
|
|
dpi: int = 200 |
|
|
) -> Callable: |
|
|
""" |
|
|
Decorator for caching page rendering results. |
|
|
|
|
|
Usage: |
|
|
@cached_page(cache, dpi=200) |
|
|
def render_page(doc_path, page_number): |
|
|
# ... rendering logic |
|
|
return image_array |
|
|
""" |
|
|
def decorator(func: Callable) -> Callable: |
|
|
def wrapper(doc_path: Union[str, Path], page_number: int, *args, **kwargs): |
|
|
_cache = cache or get_document_cache() |
|
|
|
|
|
|
|
|
cached = _cache.get_page_image(doc_path, page_number, dpi) |
|
|
if cached is not None: |
|
|
return cached |
|
|
|
|
|
|
|
|
result = func(doc_path, page_number, *args, **kwargs) |
|
|
|
|
|
if result is not None: |
|
|
_cache.set_page_image(doc_path, page_number, result, dpi) |
|
|
|
|
|
return result |
|
|
|
|
|
return wrapper |
|
|
return decorator |
|
|
|