| | """ |
| | Database Manager Module |
| | Provides comprehensive database operations for the crypto API monitoring system |
| | """ |
| |
|
| | import os |
| | from contextlib import contextmanager |
| | from datetime import datetime, timedelta |
| | from typing import Optional, List, Dict, Any, Tuple |
| | from pathlib import Path |
| |
|
| | from sqlalchemy import create_engine, func, and_, or_, desc, text |
| | from sqlalchemy.orm import sessionmaker, Session |
| | from sqlalchemy.exc import SQLAlchemyError, IntegrityError |
| |
|
| | from database.models import ( |
| | Base, |
| | Provider, |
| | ConnectionAttempt, |
| | DataCollection, |
| | RateLimitUsage, |
| | ScheduleConfig, |
| | ScheduleCompliance, |
| | FailureLog, |
| | Alert, |
| | SystemMetrics, |
| | ConnectionStatus, |
| | ProviderCategory, |
| | |
| | MarketPrice, |
| | NewsArticle, |
| | WhaleTransaction, |
| | SentimentMetric, |
| | GasPrice, |
| | BlockchainStat |
| | ) |
| | from database.data_access import DataAccessMixin |
| | from utils.logger import setup_logger |
| |
|
| | |
| | logger = setup_logger("db_manager", level="INFO") |
| |
|
| |
|
| | class DatabaseManager(DataAccessMixin): |
| | """ |
| | Comprehensive database manager for API monitoring system |
| | Handles all database operations with proper error handling and logging |
| | """ |
| |
|
| | def __init__(self, db_path: str = "data/api_monitor.db"): |
| | """ |
| | Initialize database manager |
| | |
| | Args: |
| | db_path: Path to SQLite database file |
| | """ |
| | self.db_path = db_path |
| | self._ensure_data_directory() |
| |
|
| | |
| | db_url = f"sqlite:///{self.db_path}" |
| | self.engine = create_engine( |
| | db_url, |
| | echo=False, |
| | connect_args={"check_same_thread": False} |
| | ) |
| |
|
| | |
| | self.SessionLocal = sessionmaker( |
| | autocommit=False, |
| | autoflush=False, |
| | bind=self.engine, |
| | expire_on_commit=False |
| | ) |
| |
|
| | logger.info(f"Database manager initialized with database: {self.db_path}") |
| |
|
| | def _ensure_data_directory(self): |
| | """Ensure the data directory exists""" |
| | data_dir = Path(self.db_path).parent |
| | data_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | @contextmanager |
| | def get_session(self) -> Session: |
| | """ |
| | Context manager for database sessions |
| | Automatically handles commit/rollback and cleanup |
| | |
| | Yields: |
| | SQLAlchemy session |
| | |
| | Example: |
| | with db_manager.get_session() as session: |
| | provider = session.query(Provider).first() |
| | """ |
| | session = self.SessionLocal() |
| | try: |
| | yield session |
| | session.commit() |
| | except Exception as e: |
| | session.rollback() |
| | logger.error(f"Session error: {str(e)}", exc_info=True) |
| | raise |
| | finally: |
| | session.close() |
| |
|
| | def init_database(self) -> bool: |
| | """ |
| | Initialize database by creating all tables |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | Base.metadata.create_all(bind=self.engine) |
| | logger.info("Database tables created successfully") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to initialize database: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | def drop_all_tables(self) -> bool: |
| | """ |
| | Drop all tables (use with caution!) |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | Base.metadata.drop_all(bind=self.engine) |
| | logger.warning("All database tables dropped") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to drop tables: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | def create_provider( |
| | self, |
| | name: str, |
| | category: str, |
| | endpoint_url: str, |
| | requires_key: bool = False, |
| | api_key_masked: Optional[str] = None, |
| | rate_limit_type: Optional[str] = None, |
| | rate_limit_value: Optional[int] = None, |
| | timeout_ms: int = 10000, |
| | priority_tier: int = 3 |
| | ) -> Optional[Provider]: |
| | """ |
| | Create a new provider |
| | |
| | Args: |
| | name: Provider name |
| | category: Provider category |
| | endpoint_url: API endpoint URL |
| | requires_key: Whether API key is required |
| | api_key_masked: Masked API key for display |
| | rate_limit_type: Rate limit type (per_minute, per_hour, per_day) |
| | rate_limit_value: Rate limit value |
| | timeout_ms: Timeout in milliseconds |
| | priority_tier: Priority tier (1-4, 1 is highest) |
| | |
| | Returns: |
| | Created Provider object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | provider = Provider( |
| | name=name, |
| | category=category, |
| | endpoint_url=endpoint_url, |
| | requires_key=requires_key, |
| | api_key_masked=api_key_masked, |
| | rate_limit_type=rate_limit_type, |
| | rate_limit_value=rate_limit_value, |
| | timeout_ms=timeout_ms, |
| | priority_tier=priority_tier |
| | ) |
| | session.add(provider) |
| | session.commit() |
| | session.refresh(provider) |
| | logger.info(f"Created provider: {name}") |
| | return provider |
| | except IntegrityError: |
| | logger.error(f"Provider already exists: {name}") |
| | return None |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to create provider {name}: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_provider(self, provider_id: Optional[int] = None, name: Optional[str] = None) -> Optional[Provider]: |
| | """ |
| | Get a provider by ID or name |
| | |
| | Args: |
| | provider_id: Provider ID |
| | name: Provider name |
| | |
| | Returns: |
| | Provider object or None if not found |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | if provider_id: |
| | provider = session.query(Provider).filter(Provider.id == provider_id).first() |
| | elif name: |
| | provider = session.query(Provider).filter(Provider.name == name).first() |
| | else: |
| | logger.warning("Either provider_id or name must be provided") |
| | return None |
| |
|
| | if provider: |
| | session.refresh(provider) |
| | return provider |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get provider: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_all_providers(self, category: Optional[str] = None, enabled_only: bool = False) -> List[Provider]: |
| | """ |
| | Get all providers with optional filtering |
| | |
| | Args: |
| | category: Filter by category |
| | enabled_only: Only return enabled providers (based on schedule_config) |
| | |
| | Returns: |
| | List of Provider objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | query = session.query(Provider) |
| |
|
| | if category: |
| | query = query.filter(Provider.category == category) |
| |
|
| | if enabled_only: |
| | query = query.join(ScheduleConfig).filter(ScheduleConfig.enabled == True) |
| |
|
| | providers = query.order_by(Provider.priority_tier, Provider.name).all() |
| |
|
| | |
| | for provider in providers: |
| | session.refresh(provider) |
| |
|
| | return providers |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get providers: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | def update_provider(self, provider_id: int, **kwargs) -> bool: |
| | """ |
| | Update a provider's attributes |
| | |
| | Args: |
| | provider_id: Provider ID |
| | **kwargs: Attributes to update |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | provider = session.query(Provider).filter(Provider.id == provider_id).first() |
| | if not provider: |
| | logger.warning(f"Provider not found: {provider_id}") |
| | return False |
| |
|
| | for key, value in kwargs.items(): |
| | if hasattr(provider, key): |
| | setattr(provider, key, value) |
| |
|
| | provider.updated_at = datetime.utcnow() |
| | session.commit() |
| | logger.info(f"Updated provider: {provider.name}") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to update provider {provider_id}: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | def delete_provider(self, provider_id: int) -> bool: |
| | """ |
| | Delete a provider and all related records |
| | |
| | Args: |
| | provider_id: Provider ID |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | provider = session.query(Provider).filter(Provider.id == provider_id).first() |
| | if not provider: |
| | logger.warning(f"Provider not found: {provider_id}") |
| | return False |
| |
|
| | provider_name = provider.name |
| | session.delete(provider) |
| | session.commit() |
| | logger.info(f"Deleted provider: {provider_name}") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to delete provider {provider_id}: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | def save_connection_attempt( |
| | self, |
| | provider_id: int, |
| | endpoint: str, |
| | status: str, |
| | response_time_ms: Optional[int] = None, |
| | http_status_code: Optional[int] = None, |
| | error_type: Optional[str] = None, |
| | error_message: Optional[str] = None, |
| | retry_count: int = 0, |
| | retry_result: Optional[str] = None |
| | ) -> Optional[ConnectionAttempt]: |
| | """ |
| | Save a connection attempt log |
| | |
| | Args: |
| | provider_id: Provider ID |
| | endpoint: API endpoint |
| | status: Connection status |
| | response_time_ms: Response time in milliseconds |
| | http_status_code: HTTP status code |
| | error_type: Error type if failed |
| | error_message: Error message if failed |
| | retry_count: Number of retries |
| | retry_result: Result of retry attempt |
| | |
| | Returns: |
| | Created ConnectionAttempt object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | attempt = ConnectionAttempt( |
| | provider_id=provider_id, |
| | endpoint=endpoint, |
| | status=status, |
| | response_time_ms=response_time_ms, |
| | http_status_code=http_status_code, |
| | error_type=error_type, |
| | error_message=error_message, |
| | retry_count=retry_count, |
| | retry_result=retry_result |
| | ) |
| | session.add(attempt) |
| | session.commit() |
| | session.refresh(attempt) |
| | return attempt |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save connection attempt: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_connection_attempts( |
| | self, |
| | provider_id: Optional[int] = None, |
| | status: Optional[str] = None, |
| | hours: int = 24, |
| | limit: int = 1000 |
| | ) -> List[ConnectionAttempt]: |
| | """ |
| | Get connection attempts with filtering |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | status: Filter by status |
| | hours: Get attempts from last N hours |
| | limit: Maximum number of records to return |
| | |
| | Returns: |
| | List of ConnectionAttempt objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(ConnectionAttempt).filter( |
| | ConnectionAttempt.timestamp >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(ConnectionAttempt.provider_id == provider_id) |
| |
|
| | if status: |
| | query = query.filter(ConnectionAttempt.status == status) |
| |
|
| | attempts = query.order_by(desc(ConnectionAttempt.timestamp)).limit(limit).all() |
| |
|
| | for attempt in attempts: |
| | session.refresh(attempt) |
| |
|
| | return attempts |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get connection attempts: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def save_data_collection( |
| | self, |
| | provider_id: int, |
| | category: str, |
| | scheduled_time: datetime, |
| | actual_fetch_time: datetime, |
| | data_timestamp: Optional[datetime] = None, |
| | staleness_minutes: Optional[float] = None, |
| | record_count: int = 0, |
| | payload_size_bytes: int = 0, |
| | data_quality_score: float = 1.0, |
| | on_schedule: bool = True, |
| | skip_reason: Optional[str] = None |
| | ) -> Optional[DataCollection]: |
| | """ |
| | Save a data collection record |
| | |
| | Args: |
| | provider_id: Provider ID |
| | category: Data category |
| | scheduled_time: Scheduled collection time |
| | actual_fetch_time: Actual fetch time |
| | data_timestamp: Timestamp from API response |
| | staleness_minutes: Data staleness in minutes |
| | record_count: Number of records collected |
| | payload_size_bytes: Payload size in bytes |
| | data_quality_score: Data quality score (0-1) |
| | on_schedule: Whether collection was on schedule |
| | skip_reason: Reason if skipped |
| | |
| | Returns: |
| | Created DataCollection object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | collection = DataCollection( |
| | provider_id=provider_id, |
| | category=category, |
| | scheduled_time=scheduled_time, |
| | actual_fetch_time=actual_fetch_time, |
| | data_timestamp=data_timestamp, |
| | staleness_minutes=staleness_minutes, |
| | record_count=record_count, |
| | payload_size_bytes=payload_size_bytes, |
| | data_quality_score=data_quality_score, |
| | on_schedule=on_schedule, |
| | skip_reason=skip_reason |
| | ) |
| | session.add(collection) |
| | session.commit() |
| | session.refresh(collection) |
| | return collection |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save data collection: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_data_collections( |
| | self, |
| | provider_id: Optional[int] = None, |
| | category: Optional[str] = None, |
| | hours: int = 24, |
| | limit: int = 1000 |
| | ) -> List[DataCollection]: |
| | """ |
| | Get data collections with filtering |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | category: Filter by category |
| | hours: Get collections from last N hours |
| | limit: Maximum number of records to return |
| | |
| | Returns: |
| | List of DataCollection objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(DataCollection).filter( |
| | DataCollection.actual_fetch_time >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(DataCollection.provider_id == provider_id) |
| |
|
| | if category: |
| | query = query.filter(DataCollection.category == category) |
| |
|
| | collections = query.order_by(desc(DataCollection.actual_fetch_time)).limit(limit).all() |
| |
|
| | for collection in collections: |
| | session.refresh(collection) |
| |
|
| | return collections |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get data collections: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def save_rate_limit_usage( |
| | self, |
| | provider_id: int, |
| | limit_type: str, |
| | limit_value: int, |
| | current_usage: int, |
| | reset_time: datetime |
| | ) -> Optional[RateLimitUsage]: |
| | """ |
| | Save rate limit usage record |
| | |
| | Args: |
| | provider_id: Provider ID |
| | limit_type: Limit type (per_minute, per_hour, per_day) |
| | limit_value: Rate limit value |
| | current_usage: Current usage count |
| | reset_time: When the limit resets |
| | |
| | Returns: |
| | Created RateLimitUsage object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | percentage = (current_usage / limit_value * 100) if limit_value > 0 else 0 |
| |
|
| | usage = RateLimitUsage( |
| | provider_id=provider_id, |
| | limit_type=limit_type, |
| | limit_value=limit_value, |
| | current_usage=current_usage, |
| | percentage=percentage, |
| | reset_time=reset_time |
| | ) |
| | session.add(usage) |
| | session.commit() |
| | session.refresh(usage) |
| | return usage |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save rate limit usage: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_rate_limit_usage( |
| | self, |
| | provider_id: Optional[int] = None, |
| | hours: int = 24, |
| | high_usage_only: bool = False, |
| | threshold: float = 80.0 |
| | ) -> List[RateLimitUsage]: |
| | """ |
| | Get rate limit usage records |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | hours: Get usage from last N hours |
| | high_usage_only: Only return high usage records |
| | threshold: Percentage threshold for high usage |
| | |
| | Returns: |
| | List of RateLimitUsage objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(RateLimitUsage).filter( |
| | RateLimitUsage.timestamp >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(RateLimitUsage.provider_id == provider_id) |
| |
|
| | if high_usage_only: |
| | query = query.filter(RateLimitUsage.percentage >= threshold) |
| |
|
| | usage_records = query.order_by(desc(RateLimitUsage.timestamp)).all() |
| |
|
| | for record in usage_records: |
| | session.refresh(record) |
| |
|
| | return usage_records |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get rate limit usage: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def create_schedule_config( |
| | self, |
| | provider_id: int, |
| | schedule_interval: str, |
| | enabled: bool = True, |
| | next_run: Optional[datetime] = None |
| | ) -> Optional[ScheduleConfig]: |
| | """ |
| | Create schedule configuration for a provider |
| | |
| | Args: |
| | provider_id: Provider ID |
| | schedule_interval: Schedule interval (e.g., "every_1_min") |
| | enabled: Whether schedule is enabled |
| | next_run: Next scheduled run time |
| | |
| | Returns: |
| | Created ScheduleConfig object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | config = ScheduleConfig( |
| | provider_id=provider_id, |
| | schedule_interval=schedule_interval, |
| | enabled=enabled, |
| | next_run=next_run |
| | ) |
| | session.add(config) |
| | session.commit() |
| | session.refresh(config) |
| | logger.info(f"Created schedule config for provider {provider_id}") |
| | return config |
| | except IntegrityError: |
| | logger.error(f"Schedule config already exists for provider {provider_id}") |
| | return None |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to create schedule config: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_schedule_config(self, provider_id: int) -> Optional[ScheduleConfig]: |
| | """ |
| | Get schedule configuration for a provider |
| | |
| | Args: |
| | provider_id: Provider ID |
| | |
| | Returns: |
| | ScheduleConfig object or None if not found |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | config = session.query(ScheduleConfig).filter( |
| | ScheduleConfig.provider_id == provider_id |
| | ).first() |
| |
|
| | if config: |
| | session.refresh(config) |
| | return config |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get schedule config: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def update_schedule_config(self, provider_id: int, **kwargs) -> bool: |
| | """ |
| | Update schedule configuration |
| | |
| | Args: |
| | provider_id: Provider ID |
| | **kwargs: Attributes to update |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | config = session.query(ScheduleConfig).filter( |
| | ScheduleConfig.provider_id == provider_id |
| | ).first() |
| |
|
| | if not config: |
| | logger.warning(f"Schedule config not found for provider {provider_id}") |
| | return False |
| |
|
| | for key, value in kwargs.items(): |
| | if hasattr(config, key): |
| | setattr(config, key, value) |
| |
|
| | session.commit() |
| | logger.info(f"Updated schedule config for provider {provider_id}") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to update schedule config: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | def get_all_schedule_configs(self, enabled_only: bool = True) -> List[ScheduleConfig]: |
| | """ |
| | Get all schedule configurations |
| | |
| | Args: |
| | enabled_only: Only return enabled schedules |
| | |
| | Returns: |
| | List of ScheduleConfig objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | query = session.query(ScheduleConfig) |
| |
|
| | if enabled_only: |
| | query = query.filter(ScheduleConfig.enabled == True) |
| |
|
| | configs = query.all() |
| |
|
| | for config in configs: |
| | session.refresh(config) |
| |
|
| | return configs |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get schedule configs: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def save_schedule_compliance( |
| | self, |
| | provider_id: int, |
| | expected_time: datetime, |
| | actual_time: Optional[datetime] = None, |
| | delay_seconds: Optional[int] = None, |
| | on_time: bool = True, |
| | skip_reason: Optional[str] = None |
| | ) -> Optional[ScheduleCompliance]: |
| | """ |
| | Save schedule compliance record |
| | |
| | Args: |
| | provider_id: Provider ID |
| | expected_time: Expected execution time |
| | actual_time: Actual execution time |
| | delay_seconds: Delay in seconds |
| | on_time: Whether execution was on time |
| | skip_reason: Reason if skipped |
| | |
| | Returns: |
| | Created ScheduleCompliance object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | compliance = ScheduleCompliance( |
| | provider_id=provider_id, |
| | expected_time=expected_time, |
| | actual_time=actual_time, |
| | delay_seconds=delay_seconds, |
| | on_time=on_time, |
| | skip_reason=skip_reason |
| | ) |
| | session.add(compliance) |
| | session.commit() |
| | session.refresh(compliance) |
| | return compliance |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save schedule compliance: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_schedule_compliance( |
| | self, |
| | provider_id: Optional[int] = None, |
| | hours: int = 24, |
| | late_only: bool = False |
| | ) -> List[ScheduleCompliance]: |
| | """ |
| | Get schedule compliance records |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | hours: Get records from last N hours |
| | late_only: Only return late executions |
| | |
| | Returns: |
| | List of ScheduleCompliance objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(ScheduleCompliance).filter( |
| | ScheduleCompliance.timestamp >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(ScheduleCompliance.provider_id == provider_id) |
| |
|
| | if late_only: |
| | query = query.filter(ScheduleCompliance.on_time == False) |
| |
|
| | compliance_records = query.order_by(desc(ScheduleCompliance.timestamp)).all() |
| |
|
| | for record in compliance_records: |
| | session.refresh(record) |
| |
|
| | return compliance_records |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get schedule compliance: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def save_failure_log( |
| | self, |
| | provider_id: int, |
| | endpoint: str, |
| | error_type: str, |
| | error_message: Optional[str] = None, |
| | http_status: Optional[int] = None, |
| | retry_attempted: bool = False, |
| | retry_result: Optional[str] = None, |
| | remediation_applied: Optional[str] = None |
| | ) -> Optional[FailureLog]: |
| | """ |
| | Save failure log record |
| | |
| | Args: |
| | provider_id: Provider ID |
| | endpoint: API endpoint |
| | error_type: Type of error |
| | error_message: Error message |
| | http_status: HTTP status code |
| | retry_attempted: Whether retry was attempted |
| | retry_result: Result of retry |
| | remediation_applied: Remediation action taken |
| | |
| | Returns: |
| | Created FailureLog object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | failure = FailureLog( |
| | provider_id=provider_id, |
| | endpoint=endpoint, |
| | error_type=error_type, |
| | error_message=error_message, |
| | http_status=http_status, |
| | retry_attempted=retry_attempted, |
| | retry_result=retry_result, |
| | remediation_applied=remediation_applied |
| | ) |
| | session.add(failure) |
| | session.commit() |
| | session.refresh(failure) |
| | return failure |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save failure log: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_failure_logs( |
| | self, |
| | provider_id: Optional[int] = None, |
| | error_type: Optional[str] = None, |
| | hours: int = 24, |
| | limit: int = 1000 |
| | ) -> List[FailureLog]: |
| | """ |
| | Get failure logs with filtering |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | error_type: Filter by error type |
| | hours: Get logs from last N hours |
| | limit: Maximum number of records to return |
| | |
| | Returns: |
| | List of FailureLog objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(FailureLog).filter( |
| | FailureLog.timestamp >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(FailureLog.provider_id == provider_id) |
| |
|
| | if error_type: |
| | query = query.filter(FailureLog.error_type == error_type) |
| |
|
| | failures = query.order_by(desc(FailureLog.timestamp)).limit(limit).all() |
| |
|
| | for failure in failures: |
| | session.refresh(failure) |
| |
|
| | return failures |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get failure logs: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | def create_alert( |
| | self, |
| | provider_id: int, |
| | alert_type: str, |
| | message: str, |
| | severity: str = "medium" |
| | ) -> Optional[Alert]: |
| | """ |
| | Create an alert |
| | |
| | Args: |
| | provider_id: Provider ID |
| | alert_type: Type of alert |
| | message: Alert message |
| | severity: Alert severity (low, medium, high, critical) |
| | |
| | Returns: |
| | Created Alert object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | alert = Alert( |
| | provider_id=provider_id, |
| | alert_type=alert_type, |
| | message=message, |
| | severity=severity |
| | ) |
| | session.add(alert) |
| | session.commit() |
| | session.refresh(alert) |
| | logger.warning(f"Alert created: {alert_type} - {message}") |
| | return alert |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to create alert: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_alerts( |
| | self, |
| | provider_id: Optional[int] = None, |
| | alert_type: Optional[str] = None, |
| | severity: Optional[str] = None, |
| | acknowledged: Optional[bool] = None, |
| | hours: int = 24 |
| | ) -> List[Alert]: |
| | """ |
| | Get alerts with filtering |
| | |
| | Args: |
| | provider_id: Filter by provider ID |
| | alert_type: Filter by alert type |
| | severity: Filter by severity |
| | acknowledged: Filter by acknowledgment status |
| | hours: Get alerts from last N hours |
| | |
| | Returns: |
| | List of Alert objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | query = session.query(Alert).filter( |
| | Alert.timestamp >= cutoff_time |
| | ) |
| |
|
| | if provider_id: |
| | query = query.filter(Alert.provider_id == provider_id) |
| |
|
| | if alert_type: |
| | query = query.filter(Alert.alert_type == alert_type) |
| |
|
| | if severity: |
| | query = query.filter(Alert.severity == severity) |
| |
|
| | if acknowledged is not None: |
| | query = query.filter(Alert.acknowledged == acknowledged) |
| |
|
| | alerts = query.order_by(desc(Alert.timestamp)).all() |
| |
|
| | for alert in alerts: |
| | session.refresh(alert) |
| |
|
| | return alerts |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get alerts: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | def acknowledge_alert(self, alert_id: int) -> bool: |
| | """ |
| | Acknowledge an alert |
| | |
| | Args: |
| | alert_id: Alert ID |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | alert = session.query(Alert).filter(Alert.id == alert_id).first() |
| | if not alert: |
| | logger.warning(f"Alert not found: {alert_id}") |
| | return False |
| |
|
| | alert.acknowledged = True |
| | alert.acknowledged_at = datetime.utcnow() |
| | session.commit() |
| | logger.info(f"Alert acknowledged: {alert_id}") |
| | return True |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to acknowledge alert: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | def save_system_metrics( |
| | self, |
| | total_providers: int, |
| | online_count: int, |
| | degraded_count: int, |
| | offline_count: int, |
| | avg_response_time_ms: float, |
| | total_requests_hour: int, |
| | total_failures_hour: int, |
| | system_health: str = "healthy" |
| | ) -> Optional[SystemMetrics]: |
| | """ |
| | Save system metrics snapshot |
| | |
| | Args: |
| | total_providers: Total number of providers |
| | online_count: Number of online providers |
| | degraded_count: Number of degraded providers |
| | offline_count: Number of offline providers |
| | avg_response_time_ms: Average response time |
| | total_requests_hour: Total requests in last hour |
| | total_failures_hour: Total failures in last hour |
| | system_health: Overall system health |
| | |
| | Returns: |
| | Created SystemMetrics object or None if failed |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | metrics = SystemMetrics( |
| | total_providers=total_providers, |
| | online_count=online_count, |
| | degraded_count=degraded_count, |
| | offline_count=offline_count, |
| | avg_response_time_ms=avg_response_time_ms, |
| | total_requests_hour=total_requests_hour, |
| | total_failures_hour=total_failures_hour, |
| | system_health=system_health |
| | ) |
| | session.add(metrics) |
| | session.commit() |
| | session.refresh(metrics) |
| | return metrics |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to save system metrics: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | def get_system_metrics(self, hours: int = 24, limit: int = 1000) -> List[SystemMetrics]: |
| | """ |
| | Get system metrics history |
| | |
| | Args: |
| | hours: Get metrics from last N hours |
| | limit: Maximum number of records to return |
| | |
| | Returns: |
| | List of SystemMetrics objects |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| | metrics = session.query(SystemMetrics).filter( |
| | SystemMetrics.timestamp >= cutoff_time |
| | ).order_by(desc(SystemMetrics.timestamp)).limit(limit).all() |
| |
|
| | for metric in metrics: |
| | session.refresh(metric) |
| |
|
| | return metrics |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get system metrics: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | def get_latest_system_metrics(self) -> Optional[SystemMetrics]: |
| | """ |
| | Get the most recent system metrics |
| | |
| | Returns: |
| | Latest SystemMetrics object or None |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | metrics = session.query(SystemMetrics).order_by( |
| | desc(SystemMetrics.timestamp) |
| | ).first() |
| |
|
| | if metrics: |
| | session.refresh(metrics) |
| | return metrics |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get latest system metrics: {str(e)}", exc_info=True) |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | def get_provider_stats(self, provider_id: int, hours: int = 24) -> Dict[str, Any]: |
| | """ |
| | Get comprehensive statistics for a provider |
| | |
| | Args: |
| | provider_id: Provider ID |
| | hours: Time window in hours |
| | |
| | Returns: |
| | Dictionary with provider statistics |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| |
|
| | |
| | provider = session.query(Provider).filter(Provider.id == provider_id).first() |
| | if not provider: |
| | return {} |
| |
|
| | |
| | connection_stats = session.query( |
| | func.count(ConnectionAttempt.id).label('total_attempts'), |
| | func.sum(func.case((ConnectionAttempt.status == 'success', 1), else_=0)).label('successful'), |
| | func.sum(func.case((ConnectionAttempt.status == 'failed', 1), else_=0)).label('failed'), |
| | func.sum(func.case((ConnectionAttempt.status == 'timeout', 1), else_=0)).label('timeout'), |
| | func.sum(func.case((ConnectionAttempt.status == 'rate_limited', 1), else_=0)).label('rate_limited'), |
| | func.avg(ConnectionAttempt.response_time_ms).label('avg_response_time') |
| | ).filter( |
| | ConnectionAttempt.provider_id == provider_id, |
| | ConnectionAttempt.timestamp >= cutoff_time |
| | ).first() |
| |
|
| | |
| | collection_stats = session.query( |
| | func.count(DataCollection.id).label('total_collections'), |
| | func.sum(DataCollection.record_count).label('total_records'), |
| | func.sum(DataCollection.payload_size_bytes).label('total_bytes'), |
| | func.avg(DataCollection.data_quality_score).label('avg_quality'), |
| | func.avg(DataCollection.staleness_minutes).label('avg_staleness') |
| | ).filter( |
| | DataCollection.provider_id == provider_id, |
| | DataCollection.actual_fetch_time >= cutoff_time |
| | ).first() |
| |
|
| | |
| | failure_count = session.query(func.count(FailureLog.id)).filter( |
| | FailureLog.provider_id == provider_id, |
| | FailureLog.timestamp >= cutoff_time |
| | ).scalar() |
| |
|
| | |
| | total_attempts = connection_stats.total_attempts or 0 |
| | successful = connection_stats.successful or 0 |
| | success_rate = (successful / total_attempts * 100) if total_attempts > 0 else 0 |
| |
|
| | return { |
| | 'provider_name': provider.name, |
| | 'provider_id': provider_id, |
| | 'time_window_hours': hours, |
| | 'connection_stats': { |
| | 'total_attempts': total_attempts, |
| | 'successful': successful, |
| | 'failed': connection_stats.failed or 0, |
| | 'timeout': connection_stats.timeout or 0, |
| | 'rate_limited': connection_stats.rate_limited or 0, |
| | 'success_rate': round(success_rate, 2), |
| | 'avg_response_time_ms': round(connection_stats.avg_response_time or 0, 2) |
| | }, |
| | 'data_collection_stats': { |
| | 'total_collections': collection_stats.total_collections or 0, |
| | 'total_records': collection_stats.total_records or 0, |
| | 'total_bytes': collection_stats.total_bytes or 0, |
| | 'avg_quality_score': round(collection_stats.avg_quality or 0, 2), |
| | 'avg_staleness_minutes': round(collection_stats.avg_staleness or 0, 2) |
| | }, |
| | 'failure_count': failure_count or 0 |
| | } |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get provider stats: {str(e)}", exc_info=True) |
| | return {} |
| |
|
| | def get_failure_analysis(self, hours: int = 24) -> Dict[str, Any]: |
| | """ |
| | Get comprehensive failure analysis across all providers |
| | |
| | Args: |
| | hours: Time window in hours |
| | |
| | Returns: |
| | Dictionary with failure analysis |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| |
|
| | |
| | error_type_stats = session.query( |
| | FailureLog.error_type, |
| | func.count(FailureLog.id).label('count') |
| | ).filter( |
| | FailureLog.timestamp >= cutoff_time |
| | ).group_by(FailureLog.error_type).all() |
| |
|
| | |
| | provider_stats = session.query( |
| | Provider.name, |
| | func.count(FailureLog.id).label('count') |
| | ).join( |
| | FailureLog, Provider.id == FailureLog.provider_id |
| | ).filter( |
| | FailureLog.timestamp >= cutoff_time |
| | ).group_by(Provider.name).order_by(desc('count')).limit(10).all() |
| |
|
| | |
| | retry_stats = session.query( |
| | func.sum(func.case((FailureLog.retry_attempted == True, 1), else_=0)).label('total_retries'), |
| | func.sum(func.case((FailureLog.retry_result == 'success', 1), else_=0)).label('successful_retries') |
| | ).filter( |
| | FailureLog.timestamp >= cutoff_time |
| | ).first() |
| |
|
| | total_retries = retry_stats.total_retries or 0 |
| | successful_retries = retry_stats.successful_retries or 0 |
| | retry_success_rate = (successful_retries / total_retries * 100) if total_retries > 0 else 0 |
| |
|
| | return { |
| | 'time_window_hours': hours, |
| | 'failures_by_error_type': [ |
| | {'error_type': stat.error_type, 'count': stat.count} |
| | for stat in error_type_stats |
| | ], |
| | 'top_failing_providers': [ |
| | {'provider': stat.name, 'failure_count': stat.count} |
| | for stat in provider_stats |
| | ], |
| | 'retry_statistics': { |
| | 'total_retries': total_retries, |
| | 'successful_retries': successful_retries, |
| | 'retry_success_rate': round(retry_success_rate, 2) |
| | } |
| | } |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get failure analysis: {str(e)}", exc_info=True) |
| | return {} |
| |
|
| | def get_recent_logs( |
| | self, |
| | log_type: str, |
| | provider_id: Optional[int] = None, |
| | hours: int = 1, |
| | limit: int = 100 |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Get recent logs of specified type with filtering |
| | |
| | Args: |
| | log_type: Type of logs (connection, failure, collection, rate_limit) |
| | provider_id: Filter by provider ID |
| | hours: Get logs from last N hours |
| | limit: Maximum number of records |
| | |
| | Returns: |
| | List of log dictionaries |
| | """ |
| | try: |
| | cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
| |
|
| | if log_type == 'connection': |
| | attempts = self.get_connection_attempts(provider_id=provider_id, hours=hours, limit=limit) |
| | return [ |
| | { |
| | 'id': a.id, |
| | 'timestamp': a.timestamp.isoformat(), |
| | 'provider_id': a.provider_id, |
| | 'endpoint': a.endpoint, |
| | 'status': a.status, |
| | 'response_time_ms': a.response_time_ms, |
| | 'http_status_code': a.http_status_code, |
| | 'error_type': a.error_type, |
| | 'error_message': a.error_message |
| | } |
| | for a in attempts |
| | ] |
| |
|
| | elif log_type == 'failure': |
| | failures = self.get_failure_logs(provider_id=provider_id, hours=hours, limit=limit) |
| | return [ |
| | { |
| | 'id': f.id, |
| | 'timestamp': f.timestamp.isoformat(), |
| | 'provider_id': f.provider_id, |
| | 'endpoint': f.endpoint, |
| | 'error_type': f.error_type, |
| | 'error_message': f.error_message, |
| | 'http_status': f.http_status, |
| | 'retry_attempted': f.retry_attempted, |
| | 'retry_result': f.retry_result |
| | } |
| | for f in failures |
| | ] |
| |
|
| | elif log_type == 'collection': |
| | collections = self.get_data_collections(provider_id=provider_id, hours=hours, limit=limit) |
| | return [ |
| | { |
| | 'id': c.id, |
| | 'provider_id': c.provider_id, |
| | 'category': c.category, |
| | 'scheduled_time': c.scheduled_time.isoformat(), |
| | 'actual_fetch_time': c.actual_fetch_time.isoformat(), |
| | 'record_count': c.record_count, |
| | 'payload_size_bytes': c.payload_size_bytes, |
| | 'data_quality_score': c.data_quality_score, |
| | 'on_schedule': c.on_schedule |
| | } |
| | for c in collections |
| | ] |
| |
|
| | elif log_type == 'rate_limit': |
| | usage = self.get_rate_limit_usage(provider_id=provider_id, hours=hours) |
| | return [ |
| | { |
| | 'id': u.id, |
| | 'timestamp': u.timestamp.isoformat(), |
| | 'provider_id': u.provider_id, |
| | 'limit_type': u.limit_type, |
| | 'limit_value': u.limit_value, |
| | 'current_usage': u.current_usage, |
| | 'percentage': u.percentage, |
| | 'reset_time': u.reset_time.isoformat() |
| | } |
| | for u in usage[:limit] |
| | ] |
| |
|
| | else: |
| | logger.warning(f"Unknown log type: {log_type}") |
| | return [] |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to get recent logs: {str(e)}", exc_info=True) |
| | return [] |
| |
|
| | def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: |
| | """ |
| | Remove old records from the database to manage storage |
| | |
| | Args: |
| | days: Remove records older than N days |
| | |
| | Returns: |
| | Dictionary with count of deleted records per table |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | cutoff_time = datetime.utcnow() - timedelta(days=days) |
| | deleted_counts = {} |
| |
|
| | |
| | deleted = session.query(ConnectionAttempt).filter( |
| | ConnectionAttempt.timestamp < cutoff_time |
| | ).delete() |
| | deleted_counts['connection_attempts'] = deleted |
| |
|
| | |
| | deleted = session.query(DataCollection).filter( |
| | DataCollection.actual_fetch_time < cutoff_time |
| | ).delete() |
| | deleted_counts['data_collections'] = deleted |
| |
|
| | |
| | deleted = session.query(RateLimitUsage).filter( |
| | RateLimitUsage.timestamp < cutoff_time |
| | ).delete() |
| | deleted_counts['rate_limit_usage'] = deleted |
| |
|
| | |
| | deleted = session.query(ScheduleCompliance).filter( |
| | ScheduleCompliance.timestamp < cutoff_time |
| | ).delete() |
| | deleted_counts['schedule_compliance'] = deleted |
| |
|
| | |
| | deleted = session.query(FailureLog).filter( |
| | FailureLog.timestamp < cutoff_time |
| | ).delete() |
| | deleted_counts['failure_logs'] = deleted |
| |
|
| | |
| | deleted = session.query(Alert).filter( |
| | and_( |
| | Alert.timestamp < cutoff_time, |
| | Alert.acknowledged == True |
| | ) |
| | ).delete() |
| | deleted_counts['alerts'] = deleted |
| |
|
| | |
| | deleted = session.query(SystemMetrics).filter( |
| | SystemMetrics.timestamp < cutoff_time |
| | ).delete() |
| | deleted_counts['system_metrics'] = deleted |
| |
|
| | session.commit() |
| |
|
| | total_deleted = sum(deleted_counts.values()) |
| | logger.info(f"Cleaned up {total_deleted} old records (older than {days} days)") |
| |
|
| | return deleted_counts |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to cleanup old data: {str(e)}", exc_info=True) |
| | return {} |
| |
|
| | def get_database_stats(self) -> Dict[str, Any]: |
| | """ |
| | Get database statistics |
| | |
| | Returns: |
| | Dictionary with database statistics |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | stats = { |
| | 'providers': session.query(func.count(Provider.id)).scalar(), |
| | 'connection_attempts': session.query(func.count(ConnectionAttempt.id)).scalar(), |
| | 'data_collections': session.query(func.count(DataCollection.id)).scalar(), |
| | 'rate_limit_usage': session.query(func.count(RateLimitUsage.id)).scalar(), |
| | 'schedule_configs': session.query(func.count(ScheduleConfig.id)).scalar(), |
| | 'schedule_compliance': session.query(func.count(ScheduleCompliance.id)).scalar(), |
| | 'failure_logs': session.query(func.count(FailureLog.id)).scalar(), |
| | 'alerts': session.query(func.count(Alert.id)).scalar(), |
| | 'system_metrics': session.query(func.count(SystemMetrics.id)).scalar(), |
| | } |
| |
|
| | |
| | if os.path.exists(self.db_path): |
| | stats['database_size_mb'] = round(os.path.getsize(self.db_path) / (1024 * 1024), 2) |
| | else: |
| | stats['database_size_mb'] = 0 |
| |
|
| | return stats |
| | except SQLAlchemyError as e: |
| | logger.error(f"Failed to get database stats: {str(e)}", exc_info=True) |
| | return {} |
| |
|
| | def health_check(self) -> Dict[str, Any]: |
| | """ |
| | Perform database health check |
| | |
| | Returns: |
| | Dictionary with health check results |
| | """ |
| | try: |
| | with self.get_session() as session: |
| | |
| | result = session.execute(text("SELECT 1")).scalar() |
| |
|
| | |
| | stats = self.get_database_stats() |
| |
|
| | return { |
| | 'status': 'healthy' if result == 1 else 'unhealthy', |
| | 'database_path': self.db_path, |
| | 'database_exists': os.path.exists(self.db_path), |
| | 'stats': stats, |
| | 'timestamp': datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Health check failed: {str(e)}", exc_info=True) |
| | return { |
| | 'status': 'unhealthy', |
| | 'error': str(e), |
| | 'timestamp': datetime.utcnow().isoformat() |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | db_manager = DatabaseManager() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def init_db(db_path: str = "data/api_monitor.db") -> DatabaseManager: |
| | """ |
| | Initialize database and return manager instance |
| | |
| | Args: |
| | db_path: Path to database file |
| | |
| | Returns: |
| | DatabaseManager instance |
| | """ |
| | manager = DatabaseManager(db_path=db_path) |
| | manager.init_database() |
| | logger.info("Database initialized successfully") |
| | return manager |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | print("Database Manager Module") |
| | print("=" * 80) |
| |
|
| | |
| | manager = init_db() |
| |
|
| | |
| | health = manager.health_check() |
| | print(f"\nHealth Check: {health['status']}") |
| | print(f"Database Stats: {health.get('stats', {})}") |
| |
|
| | |
| | stats = manager.get_database_stats() |
| | print(f"\nDatabase Statistics:") |
| | for table, count in stats.items(): |
| | if table != 'database_size_mb': |
| | print(f" {table}: {count}") |
| | print(f" Database Size: {stats.get('database_size_mb', 0)} MB") |
| |
|