""" Unified News Caching System Centralized cache manager for Twitter, Reddit, RSS, and AI/Tech news feeds """ import hashlib import logging import re import pandas as pd from datetime import datetime, timedelta from typing import List, Dict, Optional, Callable logger = logging.getLogger(__name__) class NewsCacheManager: """ Centralized cache manager for news feeds with: - Per-source caching with TTL - Cross-service deduplication - Filtered results caching - Force refresh support """ def __init__(self, default_ttl: int = 180): """ Initialize cache manager Args: default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes) """ self.cache = { 'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'ai_tech': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'predictions': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'sectoral_news': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'market_events': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'economic_calendar': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, 'dedup_index': {}, # Global deduplication index 'filtered_cache': {} # Cached filtered results } logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL") def get_news( self, source: str, fetcher_func: Callable, force_refresh: bool = False, **kwargs ) -> List[Dict]: """ Get news from cache or fetch fresh if needed Args: source: News source ('twitter', 'reddit', 'rss', 'ai_tech') fetcher_func: Function to fetch fresh news force_refresh: If True, bypass cache and fetch fresh **kwargs: Arguments to pass to fetcher_func Returns: List of news items """ if source not in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: logger.error(f"Invalid source: {source}") return [] # Force refresh clears dedup index for that source if force_refresh: self._clear_source_from_dedup(source) # Check if cache is valid if not force_refresh and self._is_cache_valid(source): logger.info(f"✅ Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)") return self.cache[source]['raw_news'] # Cache miss or force refresh - fetch fresh news logger.info(f"🔄 Cache MISS for {source} - fetching fresh news...") try: logger.info(f"📞 Calling fetcher for {source} with kwargs: {kwargs}") new_items = fetcher_func(**kwargs) logger.info(f"📦 Fetcher returned {len(new_items) if new_items else 0} items for {source}") if not new_items: logger.warning(f"⚠️ No news items fetched for {source} - returning cached data") # Return cached data if available, even if expired return self.cache[source]['raw_news'] # Update cache self._update_cache(source, new_items) # Deduplicate across sources deduplicated = self._deduplicate(new_items, source) logger.info(f"✅ Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup") return deduplicated except Exception as e: logger.error(f"Error fetching news for {source}: {e}") # Return cached data if available return self.cache[source]['raw_news'] def _is_cache_valid(self, source: str) -> bool: """ Check if cached data is still fresh Args: source: News source to check Returns: True if cache is valid, False otherwise """ source_cache = self.cache[source] if not source_cache['last_fetch']: return False age = (datetime.now() - source_cache['last_fetch']).total_seconds() is_valid = age < source_cache['ttl'] return is_valid def _get_cache_age(self, source: str) -> float: """ Get age of cached data in seconds Args: source: News source Returns: Age in seconds, or -1 if never fetched """ source_cache = self.cache[source] if not source_cache['last_fetch']: return -1 return (datetime.now() - source_cache['last_fetch']).total_seconds() def _normalize_text(self, text: str) -> str: """ Normalize text for deduplication Args: text: Text to normalize Returns: Normalized text """ if not text: return "" # Convert to lowercase text = text.lower().strip() # Remove punctuation text = re.sub(r'[^\w\s]', '', text) # Normalize whitespace text = re.sub(r'\s+', ' ', text) return text def _compute_hash(self, item: Dict) -> str: """ Compute content hash for deduplication Args: item: News item dict Returns: MD5 hash string """ title = self._normalize_text(item.get('title', '')) summary = self._normalize_text(item.get('summary', '')[:200]) # First 200 chars # Combine title and summary combined = f"{title}|{summary}" return hashlib.md5(combined.encode()).hexdigest() def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]: """ Remove duplicates using global dedup index Args: items: List of news items source: Source name Returns: Deduplicated list of items """ deduplicated = [] duplicate_count = 0 for item in items: content_hash = self._compute_hash(item) if content_hash in self.cache['dedup_index']: # Duplicate found - update sources list dup_entry = self.cache['dedup_index'][content_hash] if source not in dup_entry['sources']: dup_entry['sources'].append(source) duplicate_count += 1 else: # New item - add to index and result self.cache['dedup_index'][content_hash] = { 'first_seen': datetime.now(), 'sources': [source], 'canonical_item': item } deduplicated.append(item) if duplicate_count > 0: logger.info(f"🔍 Deduplication: Found {duplicate_count} duplicates for {source}") return deduplicated def _update_cache(self, source: str, items: List[Dict]): """ Update cache with new items Args: source: News source items: List of news items """ self.cache[source]['raw_news'] = items self.cache[source]['last_fetch'] = datetime.now() logger.info(f"📦 Updated cache for {source} with {len(items)} items") def get_filtered_news( self, source_df: pd.DataFrame, filters: Dict, source_name: str = "unknown" ) -> pd.DataFrame: """ Get filtered news with caching Args: source_df: Source dataframe filters: Filter dict with 'category', 'sentiment', 'impact' keys source_name: Name of source (for logging) Returns: Filtered dataframe """ if source_df.empty: return source_df # Create cache key from filters category = filters.get('category', 'all') sentiment = filters.get('sentiment', 'all') impact = filters.get('impact', 'all') cache_key = f"{source_name}_{category}_{sentiment}_{impact}" # Check filtered cache if cache_key in self.cache['filtered_cache']: cached_entry = self.cache['filtered_cache'][cache_key] if datetime.now() < cached_entry['expires_at']: logger.debug(f"✅ Filtered cache HIT for {cache_key}") return cached_entry['results'] # Apply filters filtered_df = source_df.copy() if category != 'all': filtered_df = filtered_df[filtered_df['category'] == category] if sentiment != 'all': filtered_df = filtered_df[filtered_df['sentiment'] == sentiment] if impact != 'all': filtered_df = filtered_df[filtered_df['impact'] == impact] logger.debug(f"🔍 Filtered {source_name}: {len(source_df)} → {len(filtered_df)} items") # Cache filtered results (5 minute TTL) self.cache['filtered_cache'][cache_key] = { 'results': filtered_df, 'expires_at': datetime.now() + timedelta(seconds=300) } return filtered_df def _clear_source_from_dedup(self, source: str): """ Remove all entries from dedup index that only belong to this source Args: source: Source to remove from dedup index """ to_remove = [] for content_hash, entry in self.cache['dedup_index'].items(): # Remove source from sources list if source in entry['sources']: entry['sources'].remove(source) # If no sources left, mark for removal if not entry['sources']: to_remove.append(content_hash) # Remove entries with no sources for content_hash in to_remove: del self.cache['dedup_index'][content_hash] if to_remove: logger.info(f"🗑️ Removed {len(to_remove)} entries from dedup index for {source}") def clear_cache(self, source: Optional[str] = None): """ Clear cache for specific source or all sources Args: source: Source to clear, or None to clear all """ if source: self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} self._clear_source_from_dedup(source) logger.info(f"🗑️ Cleared cache for {source}") else: for src in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} self.cache['dedup_index'] = {} self.cache['filtered_cache'] = {} logger.info("🗑️ Cleared ALL caches") def get_statistics(self) -> Dict: """ Get cache statistics Returns: Dict with cache stats """ stats = { 'twitter': { 'items': len(self.cache['twitter']['raw_news']), 'age_seconds': self._get_cache_age('twitter'), 'is_valid': self._is_cache_valid('twitter') }, 'reddit': { 'items': len(self.cache['reddit']['raw_news']), 'age_seconds': self._get_cache_age('reddit'), 'is_valid': self._is_cache_valid('reddit') }, 'rss': { 'items': len(self.cache['rss']['raw_news']), 'age_seconds': self._get_cache_age('rss'), 'is_valid': self._is_cache_valid('rss') }, 'ai_tech': { 'items': len(self.cache['ai_tech']['raw_news']), 'age_seconds': self._get_cache_age('ai_tech'), 'is_valid': self._is_cache_valid('ai_tech') }, 'predictions': { 'items': len(self.cache['predictions']['raw_news']), 'age_seconds': self._get_cache_age('predictions'), 'is_valid': self._is_cache_valid('predictions') }, 'sectoral_news': { 'items': len(self.cache['sectoral_news']['raw_news']), 'age_seconds': self._get_cache_age('sectoral_news'), 'is_valid': self._is_cache_valid('sectoral_news') }, 'market_events': { 'items': len(self.cache['market_events']['raw_news']), 'age_seconds': self._get_cache_age('market_events'), 'is_valid': self._is_cache_valid('market_events') }, 'economic_calendar': { 'items': len(self.cache['economic_calendar']['raw_news']), 'age_seconds': self._get_cache_age('economic_calendar'), 'is_valid': self._is_cache_valid('economic_calendar') }, 'dedup_index_size': len(self.cache['dedup_index']), 'filtered_cache_size': len(self.cache['filtered_cache']) } return stats