| | """ |
| | Comprehensive Task Scheduler for Crypto API Monitoring |
| | Implements scheduled tasks using APScheduler with full compliance tracking |
| | """ |
| |
|
| | import asyncio |
| | import time |
| | from datetime import datetime, timedelta |
| | from typing import Dict, Optional, Callable, Any, List |
| | from threading import Lock |
| |
|
| | from apscheduler.schedulers.background import BackgroundScheduler |
| | from apscheduler.triggers.interval import IntervalTrigger |
| | from apscheduler.triggers.cron import CronTrigger |
| | from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR |
| |
|
| | |
| | from monitoring.health_checker import HealthChecker |
| | from monitoring.rate_limiter import rate_limiter |
| | from database.db_manager import db_manager |
| | from utils.logger import setup_logger |
| | from config import config |
| |
|
| | |
| | logger = setup_logger("scheduler", level="INFO") |
| |
|
| |
|
| | class TaskScheduler: |
| | """ |
| | Comprehensive task scheduler with compliance tracking |
| | Manages all scheduled tasks for the API monitoring system |
| | """ |
| |
|
| | def __init__(self, db_path: str = "data/api_monitor.db"): |
| | """ |
| | Initialize task scheduler |
| | |
| | Args: |
| | db_path: Path to SQLite database |
| | """ |
| | self.scheduler = BackgroundScheduler() |
| | self.db_path = db_path |
| | self.health_checker = HealthChecker(db_path=db_path) |
| | self.lock = Lock() |
| |
|
| | |
| | self.expected_run_times: Dict[str, datetime] = {} |
| |
|
| | |
| | self._is_running = False |
| |
|
| | |
| | self.scheduler.add_listener( |
| | self._job_executed_listener, |
| | EVENT_JOB_EXECUTED | EVENT_JOB_ERROR |
| | ) |
| |
|
| | logger.info("TaskScheduler initialized") |
| |
|
| | def _job_executed_listener(self, event): |
| | """ |
| | Listener for job execution events |
| | |
| | Args: |
| | event: APScheduler event object |
| | """ |
| | job_id = event.job_id |
| |
|
| | if event.exception: |
| | logger.error( |
| | f"Job {job_id} raised an exception: {event.exception}", |
| | exc_info=True |
| | ) |
| | else: |
| | logger.debug(f"Job {job_id} executed successfully") |
| |
|
| | def _record_compliance( |
| | self, |
| | task_name: str, |
| | expected_time: datetime, |
| | actual_time: datetime, |
| | success: bool = True, |
| | skip_reason: Optional[str] = None |
| | ): |
| | """ |
| | Record schedule compliance metrics |
| | |
| | Args: |
| | task_name: Name of the scheduled task |
| | expected_time: Expected execution time |
| | actual_time: Actual execution time |
| | success: Whether task succeeded |
| | skip_reason: Reason if task was skipped |
| | """ |
| | try: |
| | |
| | delay_seconds = int((actual_time - expected_time).total_seconds()) |
| | on_time = abs(delay_seconds) <= 5 |
| |
|
| | |
| | |
| | provider_id = 1 |
| |
|
| | |
| | |
| | |
| |
|
| | logger.info( |
| | f"Schedule compliance - Task: {task_name}, " |
| | f"Expected: {expected_time.isoformat()}, " |
| | f"Actual: {actual_time.isoformat()}, " |
| | f"Delay: {delay_seconds}s, " |
| | f"On-time: {on_time}, " |
| | f"Skip reason: {skip_reason or 'None'}" |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to record compliance for {task_name}: {e}") |
| |
|
| | def _wrap_task( |
| | self, |
| | task_name: str, |
| | task_func: Callable, |
| | *args, |
| | **kwargs |
| | ): |
| | """ |
| | Wrapper for scheduled tasks to add logging and compliance tracking |
| | |
| | Args: |
| | task_name: Name of the task |
| | task_func: Function to execute |
| | *args: Positional arguments for task_func |
| | **kwargs: Keyword arguments for task_func |
| | """ |
| | start_time = datetime.utcnow() |
| |
|
| | |
| | expected_time = self.expected_run_times.get(task_name, start_time) |
| |
|
| | |
| | |
| |
|
| | logger.info(f"Starting task: {task_name}") |
| |
|
| | try: |
| | |
| | result = task_func(*args, **kwargs) |
| |
|
| | end_time = datetime.utcnow() |
| | duration_ms = (end_time - start_time).total_seconds() * 1000 |
| |
|
| | logger.info( |
| | f"Completed task: {task_name} in {duration_ms:.2f}ms" |
| | ) |
| |
|
| | |
| | self._record_compliance( |
| | task_name=task_name, |
| | expected_time=expected_time, |
| | actual_time=start_time, |
| | success=True |
| | ) |
| |
|
| | return result |
| |
|
| | except Exception as e: |
| | end_time = datetime.utcnow() |
| | duration_ms = (end_time - start_time).total_seconds() * 1000 |
| |
|
| | logger.error( |
| | f"Task {task_name} failed after {duration_ms:.2f}ms: {e}", |
| | exc_info=True |
| | ) |
| |
|
| | |
| | self._record_compliance( |
| | task_name=task_name, |
| | expected_time=expected_time, |
| | actual_time=start_time, |
| | success=False, |
| | skip_reason=f"Error: {str(e)[:200]}" |
| | ) |
| |
|
| | |
| |
|
| | |
| | |
| | |
| |
|
| | def _health_check_task(self): |
| | """ |
| | Health check task - runs checks on all providers with staggering |
| | """ |
| | logger.info("Executing health check task") |
| |
|
| | try: |
| | |
| | providers = config.get_all_providers() |
| |
|
| | |
| | async def run_staggered_checks(): |
| | results = [] |
| | for i, provider in enumerate(providers): |
| | |
| | if i > 0: |
| | await asyncio.sleep(10) |
| |
|
| | result = await self.health_checker.check_provider(provider.name) |
| | if result: |
| | results.append(result) |
| | logger.info( |
| | f"Health check: {provider.name} - {result.status.value} " |
| | f"({result.response_time:.2f}ms)" |
| | ) |
| |
|
| | return results |
| |
|
| | |
| | results = asyncio.run(run_staggered_checks()) |
| |
|
| | logger.info(f"Health check completed: {len(results)} providers checked") |
| |
|
| | except Exception as e: |
| | logger.error(f"Health check task failed: {e}", exc_info=True) |
| |
|
| | def _market_data_collection_task(self): |
| | """ |
| | Market data collection task - collects data from market data providers |
| | """ |
| | logger.info("Executing market data collection task") |
| |
|
| | try: |
| | |
| | providers = config.get_providers_by_category('market_data') |
| |
|
| | logger.info(f"Collecting market data from {len(providers)} providers") |
| |
|
| | |
| | |
| | for provider in providers: |
| | logger.debug(f"Would collect market data from: {provider.name}") |
| |
|
| | except Exception as e: |
| | logger.error(f"Market data collection failed: {e}", exc_info=True) |
| |
|
| | def _explorer_data_collection_task(self): |
| | """ |
| | Explorer data collection task - collects data from blockchain explorers |
| | """ |
| | logger.info("Executing explorer data collection task") |
| |
|
| | try: |
| | |
| | providers = config.get_providers_by_category('blockchain_explorers') |
| |
|
| | logger.info(f"Collecting explorer data from {len(providers)} providers") |
| |
|
| | |
| | for provider in providers: |
| | logger.debug(f"Would collect explorer data from: {provider.name}") |
| |
|
| | except Exception as e: |
| | logger.error(f"Explorer data collection failed: {e}", exc_info=True) |
| |
|
| | def _news_collection_task(self): |
| | """ |
| | News collection task - collects news from news providers |
| | """ |
| | logger.info("Executing news collection task") |
| |
|
| | try: |
| | |
| | providers = config.get_providers_by_category('news') |
| |
|
| | logger.info(f"Collecting news from {len(providers)} providers") |
| |
|
| | |
| | for provider in providers: |
| | logger.debug(f"Would collect news from: {provider.name}") |
| |
|
| | except Exception as e: |
| | logger.error(f"News collection failed: {e}", exc_info=True) |
| |
|
| | def _sentiment_collection_task(self): |
| | """ |
| | Sentiment collection task - collects sentiment data |
| | """ |
| | logger.info("Executing sentiment collection task") |
| |
|
| | try: |
| | |
| | providers = config.get_providers_by_category('sentiment') |
| |
|
| | logger.info(f"Collecting sentiment data from {len(providers)} providers") |
| |
|
| | |
| | for provider in providers: |
| | logger.debug(f"Would collect sentiment data from: {provider.name}") |
| |
|
| | except Exception as e: |
| | logger.error(f"Sentiment collection failed: {e}", exc_info=True) |
| |
|
| | def _rate_limit_snapshot_task(self): |
| | """ |
| | Rate limit snapshot task - captures current rate limit usage |
| | """ |
| | logger.info("Executing rate limit snapshot task") |
| |
|
| | try: |
| | |
| | statuses = rate_limiter.get_all_statuses() |
| |
|
| | |
| | for provider_name, status_data in statuses.items(): |
| | if status_data: |
| | |
| | provider = config.get_provider(provider_name) |
| | if provider: |
| | |
| | db_provider = db_manager.get_provider(name=provider_name) |
| | if db_provider: |
| | |
| | db_manager.save_rate_limit_usage( |
| | provider_id=db_provider.id, |
| | limit_type=status_data['limit_type'], |
| | limit_value=status_data['limit_value'], |
| | current_usage=status_data['current_usage'], |
| | reset_time=datetime.fromisoformat(status_data['reset_time']) |
| | ) |
| |
|
| | logger.debug( |
| | f"Rate limit snapshot: {provider_name} - " |
| | f"{status_data['current_usage']}/{status_data['limit_value']} " |
| | f"({status_data['percentage']}%)" |
| | ) |
| |
|
| | logger.info(f"Rate limit snapshot completed: {len(statuses)} providers") |
| |
|
| | except Exception as e: |
| | logger.error(f"Rate limit snapshot failed: {e}", exc_info=True) |
| |
|
| | def _metrics_aggregation_task(self): |
| | """ |
| | Metrics aggregation task - aggregates system metrics |
| | """ |
| | logger.info("Executing metrics aggregation task") |
| |
|
| | try: |
| | |
| | all_providers = config.get_all_providers() |
| | total_providers = len(all_providers) |
| |
|
| | |
| | connection_attempts = db_manager.get_connection_attempts(hours=1, limit=10000) |
| |
|
| | |
| | online_count = 0 |
| | degraded_count = 0 |
| | offline_count = 0 |
| | total_response_time = 0 |
| | response_count = 0 |
| |
|
| | total_requests = len(connection_attempts) |
| | total_failures = sum( |
| | 1 for attempt in connection_attempts |
| | if attempt.status in ['failed', 'timeout'] |
| | ) |
| |
|
| | |
| | provider_latest_status = {} |
| | for attempt in connection_attempts: |
| | if attempt.provider_id not in provider_latest_status: |
| | provider_latest_status[attempt.provider_id] = attempt |
| |
|
| | if attempt.status == 'success': |
| | online_count += 1 |
| | if attempt.response_time_ms: |
| | total_response_time += attempt.response_time_ms |
| | response_count += 1 |
| | elif attempt.status == 'timeout': |
| | offline_count += 1 |
| | else: |
| | degraded_count += 1 |
| |
|
| | |
| | avg_response_time = ( |
| | total_response_time / response_count |
| | if response_count > 0 |
| | else 0 |
| | ) |
| |
|
| | |
| | online_percentage = (online_count / total_providers * 100) if total_providers > 0 else 0 |
| |
|
| | if online_percentage >= 80: |
| | system_health = "healthy" |
| | elif online_percentage >= 50: |
| | system_health = "degraded" |
| | else: |
| | system_health = "critical" |
| |
|
| | |
| | db_manager.save_system_metrics( |
| | total_providers=total_providers, |
| | online_count=online_count, |
| | degraded_count=degraded_count, |
| | offline_count=offline_count, |
| | avg_response_time_ms=avg_response_time, |
| | total_requests_hour=total_requests, |
| | total_failures_hour=total_failures, |
| | system_health=system_health |
| | ) |
| |
|
| | logger.info( |
| | f"Metrics aggregation completed - " |
| | f"Health: {system_health}, " |
| | f"Online: {online_count}/{total_providers}, " |
| | f"Avg Response: {avg_response_time:.2f}ms" |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Metrics aggregation failed: {e}", exc_info=True) |
| |
|
| | def _database_cleanup_task(self): |
| | """ |
| | Database cleanup task - removes old records (>30 days) |
| | """ |
| | logger.info("Executing database cleanup task") |
| |
|
| | try: |
| | |
| | deleted_counts = db_manager.cleanup_old_data(days=30) |
| |
|
| | total_deleted = sum(deleted_counts.values()) |
| |
|
| | logger.info( |
| | f"Database cleanup completed - Deleted {total_deleted} old records" |
| | ) |
| |
|
| | |
| | for table, count in deleted_counts.items(): |
| | if count > 0: |
| | logger.info(f" {table}: {count} records deleted") |
| |
|
| | except Exception as e: |
| | logger.error(f"Database cleanup failed: {e}", exc_info=True) |
| |
|
| | |
| | |
| | |
| |
|
| | def start(self): |
| | """ |
| | Start all scheduled tasks |
| | """ |
| | if self._is_running: |
| | logger.warning("Scheduler is already running") |
| | return |
| |
|
| | logger.info("Starting task scheduler...") |
| |
|
| | try: |
| | |
| | now = datetime.utcnow() |
| |
|
| | |
| | self.expected_run_times['health_checks'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('health_checks', self._health_check_task), |
| | trigger=IntervalTrigger(minutes=5), |
| | id='health_checks', |
| | name='Health Checks (Staggered)', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Health checks every 5 minutes") |
| |
|
| | |
| | self.expected_run_times['market_data'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('market_data', self._market_data_collection_task), |
| | trigger=IntervalTrigger(minutes=1), |
| | id='market_data', |
| | name='Market Data Collection', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Market data collection every 1 minute") |
| |
|
| | |
| | self.expected_run_times['explorer_data'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('explorer_data', self._explorer_data_collection_task), |
| | trigger=IntervalTrigger(minutes=5), |
| | id='explorer_data', |
| | name='Explorer Data Collection', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Explorer data collection every 5 minutes") |
| |
|
| | |
| | self.expected_run_times['news_collection'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('news_collection', self._news_collection_task), |
| | trigger=IntervalTrigger(minutes=10), |
| | id='news_collection', |
| | name='News Collection', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: News collection every 10 minutes") |
| |
|
| | |
| | self.expected_run_times['sentiment_collection'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('sentiment_collection', self._sentiment_collection_task), |
| | trigger=IntervalTrigger(minutes=15), |
| | id='sentiment_collection', |
| | name='Sentiment Collection', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Sentiment collection every 15 minutes") |
| |
|
| | |
| | self.expected_run_times['rate_limit_snapshot'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('rate_limit_snapshot', self._rate_limit_snapshot_task), |
| | trigger=IntervalTrigger(minutes=1), |
| | id='rate_limit_snapshot', |
| | name='Rate Limit Snapshot', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Rate limit snapshot every 1 minute") |
| |
|
| | |
| | self.expected_run_times['metrics_aggregation'] = now |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('metrics_aggregation', self._metrics_aggregation_task), |
| | trigger=IntervalTrigger(minutes=5), |
| | id='metrics_aggregation', |
| | name='Metrics Aggregation', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Metrics aggregation every 5 minutes") |
| |
|
| | |
| | self.expected_run_times['database_cleanup'] = now.replace(hour=3, minute=0, second=0) |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task('database_cleanup', self._database_cleanup_task), |
| | trigger=CronTrigger(hour=3, minute=0), |
| | id='database_cleanup', |
| | name='Database Cleanup (Daily 3 AM)', |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| | logger.info("Scheduled: Database cleanup daily at 3 AM") |
| |
|
| | |
| | self.scheduler.start() |
| | self._is_running = True |
| |
|
| | logger.info("Task scheduler started successfully") |
| |
|
| | |
| | jobs = self.scheduler.get_jobs() |
| | logger.info(f"Active scheduled jobs: {len(jobs)}") |
| | for job in jobs: |
| | logger.info(f" - {job.name} (ID: {job.id}) - Next run: {job.next_run_time}") |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to start scheduler: {e}", exc_info=True) |
| | raise |
| |
|
| | def stop(self): |
| | """ |
| | Stop scheduler gracefully |
| | """ |
| | if not self._is_running: |
| | logger.warning("Scheduler is not running") |
| | return |
| |
|
| | logger.info("Stopping task scheduler...") |
| |
|
| | try: |
| | |
| | self.scheduler.shutdown(wait=True) |
| | self._is_running = False |
| |
|
| | |
| | asyncio.run(self.health_checker.close()) |
| |
|
| | logger.info("Task scheduler stopped successfully") |
| |
|
| | except Exception as e: |
| | logger.error(f"Error stopping scheduler: {e}", exc_info=True) |
| |
|
| | def add_job( |
| | self, |
| | job_id: str, |
| | job_name: str, |
| | job_func: Callable, |
| | trigger_type: str = 'interval', |
| | **trigger_kwargs |
| | ) -> bool: |
| | """ |
| | Add a custom scheduled job |
| | |
| | Args: |
| | job_id: Unique job identifier |
| | job_name: Human-readable job name |
| | job_func: Function to execute |
| | trigger_type: Type of trigger ('interval' or 'cron') |
| | **trigger_kwargs: Trigger-specific parameters |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | |
| | Examples: |
| | # Add interval job |
| | scheduler.add_job( |
| | 'my_job', 'My Custom Job', my_function, |
| | trigger_type='interval', minutes=30 |
| | ) |
| | |
| | # Add cron job |
| | scheduler.add_job( |
| | 'daily_job', 'Daily Job', daily_function, |
| | trigger_type='cron', hour=12, minute=0 |
| | ) |
| | """ |
| | try: |
| | |
| | if trigger_type == 'interval': |
| | trigger = IntervalTrigger(**trigger_kwargs) |
| | elif trigger_type == 'cron': |
| | trigger = CronTrigger(**trigger_kwargs) |
| | else: |
| | logger.error(f"Unknown trigger type: {trigger_type}") |
| | return False |
| |
|
| | |
| | self.scheduler.add_job( |
| | func=lambda: self._wrap_task(job_id, job_func), |
| | trigger=trigger, |
| | id=job_id, |
| | name=job_name, |
| | replace_existing=True, |
| | max_instances=1 |
| | ) |
| |
|
| | |
| | self.expected_run_times[job_id] = datetime.utcnow() |
| |
|
| | logger.info(f"Added custom job: {job_name} (ID: {job_id})") |
| | return True |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to add job {job_id}: {e}", exc_info=True) |
| | return False |
| |
|
| | def remove_job(self, job_id: str) -> bool: |
| | """ |
| | Remove a scheduled job |
| | |
| | Args: |
| | job_id: Job identifier to remove |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | self.scheduler.remove_job(job_id) |
| |
|
| | |
| | if job_id in self.expected_run_times: |
| | del self.expected_run_times[job_id] |
| |
|
| | logger.info(f"Removed job: {job_id}") |
| | return True |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to remove job {job_id}: {e}", exc_info=True) |
| | return False |
| |
|
| | def trigger_immediate(self, job_id: str) -> bool: |
| | """ |
| | Trigger immediate execution of a scheduled job |
| | |
| | Args: |
| | job_id: Job identifier to trigger |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | job = self.scheduler.get_job(job_id) |
| |
|
| | if not job: |
| | logger.error(f"Job not found: {job_id}") |
| | return False |
| |
|
| | |
| | job.modify(next_run_time=datetime.utcnow()) |
| |
|
| | logger.info(f"Triggered immediate execution of job: {job_id}") |
| | return True |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to trigger job {job_id}: {e}", exc_info=True) |
| | return False |
| |
|
| | def get_job_status(self, job_id: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Get status of scheduled jobs |
| | |
| | Args: |
| | job_id: Specific job ID, or None for all jobs |
| | |
| | Returns: |
| | Dictionary with job status information |
| | """ |
| | try: |
| | if job_id: |
| | job = self.scheduler.get_job(job_id) |
| | if not job: |
| | return {} |
| |
|
| | return { |
| | 'id': job.id, |
| | 'name': job.name, |
| | 'next_run': job.next_run_time.isoformat() if job.next_run_time else None, |
| | 'trigger': str(job.trigger) |
| | } |
| | else: |
| | |
| | jobs = self.scheduler.get_jobs() |
| | return { |
| | 'total_jobs': len(jobs), |
| | 'is_running': self._is_running, |
| | 'jobs': [ |
| | { |
| | 'id': job.id, |
| | 'name': job.name, |
| | 'next_run': job.next_run_time.isoformat() if job.next_run_time else None, |
| | 'trigger': str(job.trigger) |
| | } |
| | for job in jobs |
| | ] |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to get job status: {e}", exc_info=True) |
| | return {} |
| |
|
| | def is_running(self) -> bool: |
| | """ |
| | Check if scheduler is running |
| | |
| | Returns: |
| | True if running, False otherwise |
| | """ |
| | return self._is_running |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | task_scheduler = TaskScheduler() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def start_scheduler(): |
| | """Start the global task scheduler""" |
| | task_scheduler.start() |
| |
|
| |
|
| | def stop_scheduler(): |
| | """Stop the global task scheduler""" |
| | task_scheduler.stop() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | print("Task Scheduler Module") |
| | print("=" * 80) |
| |
|
| | |
| | scheduler = TaskScheduler() |
| |
|
| | try: |
| | |
| | scheduler.start() |
| |
|
| | |
| | print("\nScheduler is running. Press Ctrl+C to stop...") |
| | print(f"Scheduler status: {scheduler.get_job_status()}") |
| |
|
| | |
| | import time |
| | while True: |
| | time.sleep(60) |
| |
|
| | |
| | status = scheduler.get_job_status() |
| | print(f"\n[{datetime.utcnow().isoformat()}] Active jobs: {status['total_jobs']}") |
| | for job in status.get('jobs', []): |
| | print(f" - {job['name']}: Next run at {job['next_run']}") |
| |
|
| | except KeyboardInterrupt: |
| | print("\n\nStopping scheduler...") |
| | scheduler.stop() |
| | print("Scheduler stopped. Goodbye!") |
| |
|