| """
|
| Background Data Collection Agent
|
| Continuously collects data from 305+ free resources
|
| Runs automatically when HuggingFace Space starts
|
| """
|
|
|
| import asyncio
|
| import time
|
| from datetime import datetime, timedelta
|
| from typing import Dict, List, Any
|
| import logging
|
|
|
|
|
| import sys
|
| sys.path.insert(0, '/workspace')
|
| from core.smart_fallback_manager import get_fallback_manager
|
| from core.smart_proxy_manager import get_proxy_manager
|
| from database.db_manager import db_manager
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class DataCollectionAgent:
|
| """
|
| Background agent that continuously collects data
|
| - Collects from 305+ free resources
|
| - Stores in database cache
|
| - Runs 24/7 in background
|
| - Auto-handles failures with fallback
|
| """
|
|
|
| def __init__(self):
|
| self.fallback_manager = get_fallback_manager()
|
| self.proxy_manager = get_proxy_manager()
|
| self.is_running = False
|
| self.collection_stats = {
|
| 'total_collections': 0,
|
| 'successful_collections': 0,
|
| 'failed_collections': 0,
|
| 'last_collection_time': None,
|
| 'collections_by_category': {}
|
| }
|
|
|
|
|
| self.intervals = {
|
| 'market_data_apis': 30,
|
| 'news_apis': 300,
|
| 'sentiment_apis': 180,
|
| 'whale_tracking_apis': 60,
|
| 'block_explorers': 120,
|
| 'onchain_analytics_apis': 300,
|
| }
|
|
|
|
|
| self.last_collection = {}
|
|
|
| logger.info("β
DataCollectionAgent initialized")
|
|
|
| async def start(self):
|
| """Start the data collection agent"""
|
| if self.is_running:
|
| logger.warning("β οΈ Agent already running")
|
| return
|
|
|
| self.is_running = True
|
| logger.info("π Starting DataCollectionAgent...")
|
|
|
|
|
| tasks = [
|
| self.collect_market_data(),
|
| self.collect_news_data(),
|
| self.collect_sentiment_data(),
|
| self.collect_whale_tracking(),
|
| self.collect_blockchain_data(),
|
| self.health_check_loop(),
|
| ]
|
|
|
| await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
| async def stop(self):
|
| """Stop the agent"""
|
| self.is_running = False
|
| logger.info("π Stopping DataCollectionAgent...")
|
|
|
| async def collect_market_data(self):
|
| """Continuously collect market data"""
|
| category = 'market_data_apis'
|
| interval = self.intervals[category]
|
|
|
| while self.is_running:
|
| try:
|
| logger.info(f"π Collecting market data...")
|
|
|
|
|
| data = await self.fallback_manager.fetch_with_fallback(
|
| category=category,
|
| endpoint_path="/coins/markets",
|
| params={
|
| "vs_currency": "usd",
|
| "order": "market_cap_desc",
|
| "per_page": 250,
|
| "page": 1
|
| },
|
| max_attempts=10
|
| )
|
|
|
| if data:
|
|
|
| await self._store_market_data(data)
|
|
|
| self.collection_stats['successful_collections'] += 1
|
| logger.info(f"β
Market data collected successfully")
|
| else:
|
| self.collection_stats['failed_collections'] += 1
|
| logger.warning(f"β οΈ Failed to collect market data after all attempts")
|
|
|
|
|
| self.collection_stats['total_collections'] += 1
|
| self.last_collection[category] = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error collecting market data: {e}")
|
| self.collection_stats['failed_collections'] += 1
|
|
|
|
|
| await asyncio.sleep(interval)
|
|
|
| async def collect_news_data(self):
|
| """Continuously collect news data"""
|
| category = 'news_apis'
|
| interval = self.intervals[category]
|
|
|
| while self.is_running:
|
| try:
|
| logger.info(f"π° Collecting news data...")
|
|
|
|
|
| data = await self.fallback_manager.fetch_with_fallback(
|
| category=category,
|
| endpoint_path="/news",
|
| params={"limit": 50},
|
| max_attempts=5
|
| )
|
|
|
| if data:
|
| await self._store_news_data(data)
|
| self.collection_stats['successful_collections'] += 1
|
| logger.info(f"β
News data collected successfully")
|
| else:
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| self.collection_stats['total_collections'] += 1
|
| self.last_collection[category] = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error collecting news: {e}")
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| await asyncio.sleep(interval)
|
|
|
| async def collect_sentiment_data(self):
|
| """Continuously collect sentiment data"""
|
| category = 'sentiment_apis'
|
| interval = self.intervals[category]
|
|
|
| while self.is_running:
|
| try:
|
| logger.info(f"π Collecting sentiment data...")
|
|
|
|
|
| data = await self.fallback_manager.fetch_with_fallback(
|
| category=category,
|
| endpoint_path="/sentiment",
|
| max_attempts=5
|
| )
|
|
|
| if data:
|
| await self._store_sentiment_data(data)
|
| self.collection_stats['successful_collections'] += 1
|
| logger.info(f"β
Sentiment data collected successfully")
|
| else:
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| self.collection_stats['total_collections'] += 1
|
| self.last_collection[category] = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error collecting sentiment: {e}")
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| await asyncio.sleep(interval)
|
|
|
| async def collect_whale_tracking(self):
|
| """Continuously collect whale tracking data"""
|
| category = 'whale_tracking_apis'
|
| interval = self.intervals[category]
|
|
|
| while self.is_running:
|
| try:
|
| logger.info(f"π Collecting whale tracking data...")
|
|
|
| data = await self.fallback_manager.fetch_with_fallback(
|
| category=category,
|
| endpoint_path="/whales",
|
| max_attempts=5
|
| )
|
|
|
| if data:
|
| await self._store_whale_data(data)
|
| self.collection_stats['successful_collections'] += 1
|
| logger.info(f"β
Whale data collected successfully")
|
| else:
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| self.collection_stats['total_collections'] += 1
|
| self.last_collection[category] = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error collecting whale data: {e}")
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| await asyncio.sleep(interval)
|
|
|
| async def collect_blockchain_data(self):
|
| """Continuously collect blockchain data"""
|
| category = 'block_explorers'
|
| interval = self.intervals[category]
|
|
|
| while self.is_running:
|
| try:
|
| logger.info(f"βοΈ Collecting blockchain data...")
|
|
|
|
|
| chains = ['ethereum', 'bsc', 'polygon']
|
|
|
| for chain in chains:
|
| data = await self.fallback_manager.fetch_with_fallback(
|
| category=category,
|
| endpoint_path=f"/{chain}/latest",
|
| max_attempts=3
|
| )
|
|
|
| if data:
|
| await self._store_blockchain_data(chain, data)
|
|
|
| self.collection_stats['successful_collections'] += 1
|
| self.collection_stats['total_collections'] += 1
|
| self.last_collection[category] = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error collecting blockchain data: {e}")
|
| self.collection_stats['failed_collections'] += 1
|
|
|
| await asyncio.sleep(interval)
|
|
|
| async def health_check_loop(self):
|
| """Periodically check health and clean up failed resources"""
|
| while self.is_running:
|
| try:
|
|
|
| await asyncio.sleep(600)
|
|
|
| logger.info("π₯ Running health check...")
|
|
|
|
|
| report = self.fallback_manager.get_health_report()
|
|
|
| logger.info(f"π Health Report:")
|
| logger.info(f" Total Resources: {report['total_resources']}")
|
| logger.info(f" Active: {report['by_status']['active']}")
|
| logger.info(f" Degraded: {report['by_status']['degraded']}")
|
| logger.info(f" Failed: {report['by_status']['failed']}")
|
| logger.info(f" Proxy Needed: {report['by_status']['proxy_needed']}")
|
|
|
|
|
| removed = self.fallback_manager.cleanup_failed_resources(max_age_hours=24)
|
|
|
| if removed:
|
| logger.info(f"ποΈ Cleaned up {len(removed)} failed resources")
|
|
|
|
|
| await self.proxy_manager.test_all_proxies()
|
|
|
| except Exception as e:
|
| logger.error(f"β Health check error: {e}")
|
|
|
| async def _store_market_data(self, data: Any):
|
| """Store market data in database"""
|
| try:
|
|
|
| if isinstance(data, list):
|
| for item in data:
|
| symbol = item.get('symbol', '').upper()
|
| if symbol:
|
| db_manager.cache_market_data(
|
| symbol=symbol,
|
| price=item.get('current_price', 0),
|
| volume=item.get('total_volume', 0),
|
| market_cap=item.get('market_cap', 0),
|
| change_24h=item.get('price_change_percentage_24h', 0),
|
| data=item
|
| )
|
| logger.debug(f"πΎ Stored market data in database")
|
| except Exception as e:
|
| logger.error(f"β Error storing market data: {e}")
|
|
|
| async def _store_news_data(self, data: Any):
|
| """Store news data in database"""
|
| try:
|
|
|
| logger.debug(f"πΎ Stored news data in database")
|
| except Exception as e:
|
| logger.error(f"β Error storing news data: {e}")
|
|
|
| async def _store_sentiment_data(self, data: Any):
|
| """Store sentiment data in database"""
|
| try:
|
| logger.debug(f"πΎ Stored sentiment data in database")
|
| except Exception as e:
|
| logger.error(f"β Error storing sentiment data: {e}")
|
|
|
| async def _store_whale_data(self, data: Any):
|
| """Store whale tracking data in database"""
|
| try:
|
| logger.debug(f"πΎ Stored whale data in database")
|
| except Exception as e:
|
| logger.error(f"β Error storing whale data: {e}")
|
|
|
| async def _store_blockchain_data(self, chain: str, data: Any):
|
| """Store blockchain data in database"""
|
| try:
|
| logger.debug(f"πΎ Stored {chain} blockchain data in database")
|
| except Exception as e:
|
| logger.error(f"β Error storing blockchain data: {e}")
|
|
|
| def get_stats(self) -> Dict:
|
| """Get collection statistics"""
|
| return {
|
| **self.collection_stats,
|
| 'is_running': self.is_running,
|
| 'last_collection': {
|
| category: last_time.isoformat() if last_time else None
|
| for category, last_time in self.last_collection.items()
|
| },
|
| 'health_report': self.fallback_manager.get_health_report(),
|
| 'proxy_status': self.proxy_manager.get_status_report()
|
| }
|
|
|
|
|
|
|
| _agent = None
|
|
|
| def get_data_collection_agent() -> DataCollectionAgent:
|
| """Get global data collection agent"""
|
| global _agent
|
| if _agent is None:
|
| _agent = DataCollectionAgent()
|
| return _agent
|
|
|
|
|
| async def start_data_collection_agent():
|
| """Start the data collection agent"""
|
| agent = get_data_collection_agent()
|
| await agent.start()
|
|
|