| | """ |
| | Unified News Caching System |
| | Centralized cache manager for Twitter, Reddit, RSS, and AI/Tech news feeds |
| | """ |
| |
|
| | import hashlib |
| | import logging |
| | import re |
| | import pandas as pd |
| | from datetime import datetime, timedelta |
| | from typing import List, Dict, Optional, Callable |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class NewsCacheManager: |
| | """ |
| | Centralized cache manager for news feeds with: |
| | - Per-source caching with TTL |
| | - Cross-service deduplication |
| | - Filtered results caching |
| | - Force refresh support |
| | """ |
| |
|
| | def __init__(self, default_ttl: int = 180): |
| | """ |
| | Initialize cache manager |
| | |
| | Args: |
| | default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes) |
| | """ |
| | self.cache = { |
| | 'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'ai_tech': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'predictions': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'sectoral_news': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'market_events': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'economic_calendar': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| | 'dedup_index': {}, |
| | 'filtered_cache': {} |
| | } |
| | logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL") |
| |
|
| | def get_news( |
| | self, |
| | source: str, |
| | fetcher_func: Callable, |
| | force_refresh: bool = False, |
| | **kwargs |
| | ) -> List[Dict]: |
| | """ |
| | Get news from cache or fetch fresh if needed |
| | |
| | Args: |
| | source: News source ('twitter', 'reddit', 'rss', 'ai_tech') |
| | fetcher_func: Function to fetch fresh news |
| | force_refresh: If True, bypass cache and fetch fresh |
| | **kwargs: Arguments to pass to fetcher_func |
| | |
| | Returns: |
| | List of news items |
| | """ |
| | if source not in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
| | logger.error(f"Invalid source: {source}") |
| | return [] |
| |
|
| | |
| | if force_refresh: |
| | self._clear_source_from_dedup(source) |
| |
|
| | |
| | if not force_refresh and self._is_cache_valid(source): |
| | logger.info(f"β
Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)") |
| | return self.cache[source]['raw_news'] |
| |
|
| | |
| | logger.info(f"π Cache MISS for {source} - fetching fresh news...") |
| | try: |
| | logger.info(f"π Calling fetcher for {source} with kwargs: {kwargs}") |
| | new_items = fetcher_func(**kwargs) |
| | logger.info(f"π¦ Fetcher returned {len(new_items) if new_items else 0} items for {source}") |
| |
|
| | if not new_items: |
| | logger.warning(f"β οΈ No news items fetched for {source} - returning cached data") |
| | |
| | return self.cache[source]['raw_news'] |
| |
|
| | |
| | self._update_cache(source, new_items) |
| |
|
| | |
| | deduplicated = self._deduplicate(new_items, source) |
| |
|
| | logger.info(f"β
Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup") |
| |
|
| | return deduplicated |
| |
|
| | except Exception as e: |
| | logger.error(f"Error fetching news for {source}: {e}") |
| | |
| | return self.cache[source]['raw_news'] |
| |
|
| | def _is_cache_valid(self, source: str) -> bool: |
| | """ |
| | Check if cached data is still fresh |
| | |
| | Args: |
| | source: News source to check |
| | |
| | Returns: |
| | True if cache is valid, False otherwise |
| | """ |
| | source_cache = self.cache[source] |
| | if not source_cache['last_fetch']: |
| | return False |
| |
|
| | age = (datetime.now() - source_cache['last_fetch']).total_seconds() |
| | is_valid = age < source_cache['ttl'] |
| |
|
| | return is_valid |
| |
|
| | def _get_cache_age(self, source: str) -> float: |
| | """ |
| | Get age of cached data in seconds |
| | |
| | Args: |
| | source: News source |
| | |
| | Returns: |
| | Age in seconds, or -1 if never fetched |
| | """ |
| | source_cache = self.cache[source] |
| | if not source_cache['last_fetch']: |
| | return -1 |
| |
|
| | return (datetime.now() - source_cache['last_fetch']).total_seconds() |
| |
|
| | def _normalize_text(self, text: str) -> str: |
| | """ |
| | Normalize text for deduplication |
| | |
| | Args: |
| | text: Text to normalize |
| | |
| | Returns: |
| | Normalized text |
| | """ |
| | if not text: |
| | return "" |
| |
|
| | |
| | text = text.lower().strip() |
| |
|
| | |
| | text = re.sub(r'[^\w\s]', '', text) |
| |
|
| | |
| | text = re.sub(r'\s+', ' ', text) |
| |
|
| | return text |
| |
|
| | def _compute_hash(self, item: Dict) -> str: |
| | """ |
| | Compute content hash for deduplication |
| | |
| | Args: |
| | item: News item dict |
| | |
| | Returns: |
| | MD5 hash string |
| | """ |
| | title = self._normalize_text(item.get('title', '')) |
| | summary = self._normalize_text(item.get('summary', '')[:200]) |
| |
|
| | |
| | combined = f"{title}|{summary}" |
| |
|
| | return hashlib.md5(combined.encode()).hexdigest() |
| |
|
| | def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]: |
| | """ |
| | Remove duplicates using global dedup index |
| | |
| | Args: |
| | items: List of news items |
| | source: Source name |
| | |
| | Returns: |
| | Deduplicated list of items |
| | """ |
| | deduplicated = [] |
| | duplicate_count = 0 |
| |
|
| | for item in items: |
| | content_hash = self._compute_hash(item) |
| |
|
| | if content_hash in self.cache['dedup_index']: |
| | |
| | dup_entry = self.cache['dedup_index'][content_hash] |
| | if source not in dup_entry['sources']: |
| | dup_entry['sources'].append(source) |
| | duplicate_count += 1 |
| | else: |
| | |
| | self.cache['dedup_index'][content_hash] = { |
| | 'first_seen': datetime.now(), |
| | 'sources': [source], |
| | 'canonical_item': item |
| | } |
| | deduplicated.append(item) |
| |
|
| | if duplicate_count > 0: |
| | logger.info(f"π Deduplication: Found {duplicate_count} duplicates for {source}") |
| |
|
| | return deduplicated |
| |
|
| | def _update_cache(self, source: str, items: List[Dict]): |
| | """ |
| | Update cache with new items |
| | |
| | Args: |
| | source: News source |
| | items: List of news items |
| | """ |
| | self.cache[source]['raw_news'] = items |
| | self.cache[source]['last_fetch'] = datetime.now() |
| | logger.info(f"π¦ Updated cache for {source} with {len(items)} items") |
| |
|
| | def get_filtered_news( |
| | self, |
| | source_df: pd.DataFrame, |
| | filters: Dict, |
| | source_name: str = "unknown" |
| | ) -> pd.DataFrame: |
| | """ |
| | Get filtered news with caching |
| | |
| | Args: |
| | source_df: Source dataframe |
| | filters: Filter dict with 'category', 'sentiment', 'impact' keys |
| | source_name: Name of source (for logging) |
| | |
| | Returns: |
| | Filtered dataframe |
| | """ |
| | if source_df.empty: |
| | return source_df |
| |
|
| | |
| | category = filters.get('category', 'all') |
| | sentiment = filters.get('sentiment', 'all') |
| | impact = filters.get('impact', 'all') |
| | cache_key = f"{source_name}_{category}_{sentiment}_{impact}" |
| |
|
| | |
| | if cache_key in self.cache['filtered_cache']: |
| | cached_entry = self.cache['filtered_cache'][cache_key] |
| | if datetime.now() < cached_entry['expires_at']: |
| | logger.debug(f"β
Filtered cache HIT for {cache_key}") |
| | return cached_entry['results'] |
| |
|
| | |
| | filtered_df = source_df.copy() |
| |
|
| | if category != 'all': |
| | filtered_df = filtered_df[filtered_df['category'] == category] |
| |
|
| | if sentiment != 'all': |
| | filtered_df = filtered_df[filtered_df['sentiment'] == sentiment] |
| |
|
| | if impact != 'all': |
| | filtered_df = filtered_df[filtered_df['impact'] == impact] |
| |
|
| | logger.debug(f"π Filtered {source_name}: {len(source_df)} β {len(filtered_df)} items") |
| |
|
| | |
| | self.cache['filtered_cache'][cache_key] = { |
| | 'results': filtered_df, |
| | 'expires_at': datetime.now() + timedelta(seconds=300) |
| | } |
| |
|
| | return filtered_df |
| |
|
| | def _clear_source_from_dedup(self, source: str): |
| | """ |
| | Remove all entries from dedup index that only belong to this source |
| | |
| | Args: |
| | source: Source to remove from dedup index |
| | """ |
| | to_remove = [] |
| | for content_hash, entry in self.cache['dedup_index'].items(): |
| | |
| | if source in entry['sources']: |
| | entry['sources'].remove(source) |
| | |
| | if not entry['sources']: |
| | to_remove.append(content_hash) |
| |
|
| | |
| | for content_hash in to_remove: |
| | del self.cache['dedup_index'][content_hash] |
| |
|
| | if to_remove: |
| | logger.info(f"ποΈ Removed {len(to_remove)} entries from dedup index for {source}") |
| |
|
| | def clear_cache(self, source: Optional[str] = None): |
| | """ |
| | Clear cache for specific source or all sources |
| | |
| | Args: |
| | source: Source to clear, or None to clear all |
| | """ |
| | if source: |
| | self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
| | self._clear_source_from_dedup(source) |
| | logger.info(f"ποΈ Cleared cache for {source}") |
| | else: |
| | for src in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
| | self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
| | self.cache['dedup_index'] = {} |
| | self.cache['filtered_cache'] = {} |
| | logger.info("ποΈ Cleared ALL caches") |
| |
|
| | def get_statistics(self) -> Dict: |
| | """ |
| | Get cache statistics |
| | |
| | Returns: |
| | Dict with cache stats |
| | """ |
| | stats = { |
| | 'twitter': { |
| | 'items': len(self.cache['twitter']['raw_news']), |
| | 'age_seconds': self._get_cache_age('twitter'), |
| | 'is_valid': self._is_cache_valid('twitter') |
| | }, |
| | 'reddit': { |
| | 'items': len(self.cache['reddit']['raw_news']), |
| | 'age_seconds': self._get_cache_age('reddit'), |
| | 'is_valid': self._is_cache_valid('reddit') |
| | }, |
| | 'rss': { |
| | 'items': len(self.cache['rss']['raw_news']), |
| | 'age_seconds': self._get_cache_age('rss'), |
| | 'is_valid': self._is_cache_valid('rss') |
| | }, |
| | 'ai_tech': { |
| | 'items': len(self.cache['ai_tech']['raw_news']), |
| | 'age_seconds': self._get_cache_age('ai_tech'), |
| | 'is_valid': self._is_cache_valid('ai_tech') |
| | }, |
| | 'predictions': { |
| | 'items': len(self.cache['predictions']['raw_news']), |
| | 'age_seconds': self._get_cache_age('predictions'), |
| | 'is_valid': self._is_cache_valid('predictions') |
| | }, |
| | 'sectoral_news': { |
| | 'items': len(self.cache['sectoral_news']['raw_news']), |
| | 'age_seconds': self._get_cache_age('sectoral_news'), |
| | 'is_valid': self._is_cache_valid('sectoral_news') |
| | }, |
| | 'market_events': { |
| | 'items': len(self.cache['market_events']['raw_news']), |
| | 'age_seconds': self._get_cache_age('market_events'), |
| | 'is_valid': self._is_cache_valid('market_events') |
| | }, |
| | 'economic_calendar': { |
| | 'items': len(self.cache['economic_calendar']['raw_news']), |
| | 'age_seconds': self._get_cache_age('economic_calendar'), |
| | 'is_valid': self._is_cache_valid('economic_calendar') |
| | }, |
| | 'dedup_index_size': len(self.cache['dedup_index']), |
| | 'filtered_cache_size': len(self.cache['filtered_cache']) |
| | } |
| |
|
| | return stats |
| |
|