| |
| """ |
| Performance Monitor |
| |
| Monitors system performance metrics for the NZ Legislation Loophole Analysis application. |
| Tracks memory usage, CPU utilization, processing times, and other performance indicators. |
| """ |
|
|
| import time |
| import threading |
| import psutil |
| from typing import Dict, Any, Optional, List |
| from collections import deque |
| import streamlit as st |
|
|
| class PerformanceMonitor: |
| """Performance monitoring system""" |
|
|
| def __init__(self, max_history: int = 1000): |
| """ |
| Initialize performance monitor |
| |
| Args: |
| max_history: Maximum number of historical data points to keep |
| """ |
| self.max_history = max_history |
| self.lock = threading.RLock() |
|
|
| |
| self.memory_history = deque(maxlen=max_history) |
| self.cpu_history = deque(maxlen=max_history) |
| self.processing_times = deque(maxlen=max_history) |
|
|
| |
| self.current_metrics = { |
| 'memory_usage_mb': 0, |
| 'memory_percent': 0, |
| 'cpu_percent': 0, |
| 'active_threads': 0, |
| 'processing_time_avg': 0, |
| 'processing_time_max': 0, |
| 'processing_time_min': 0, |
| 'total_processed_chunks': 0, |
| 'chunks_per_second': 0 |
| } |
|
|
| |
| self.processing_start_time = None |
| self.last_chunk_time = time.time() |
|
|
| |
| self.monitoring = True |
| self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) |
| self.monitor_thread.start() |
|
|
| def _monitor_loop(self): |
| """Background monitoring loop""" |
| while self.monitoring: |
| try: |
| self._update_metrics() |
| time.sleep(1) |
| except Exception as e: |
| print(f"Performance monitoring error: {e}") |
| time.sleep(5) |
|
|
| def _update_metrics(self): |
| """Update current performance metrics""" |
| process = psutil.Process() |
|
|
| with self.lock: |
| |
| memory_info = process.memory_info() |
| memory_usage_mb = memory_info.rss / 1024 / 1024 |
| memory_percent = process.memory_percent() |
|
|
| |
| cpu_percent = process.cpu_percent(interval=0.1) |
|
|
| |
| active_threads = len(process.threads()) |
|
|
| |
| self.current_metrics.update({ |
| 'memory_usage_mb': memory_usage_mb, |
| 'memory_percent': memory_percent, |
| 'cpu_percent': cpu_percent, |
| 'active_threads': active_threads |
| }) |
|
|
| |
| current_time = time.time() |
| self.memory_history.append((current_time, memory_usage_mb)) |
| self.cpu_history.append((current_time, cpu_percent)) |
|
|
| def start_processing_timer(self): |
| """Start timing a processing operation""" |
| self.processing_start_time = time.time() |
|
|
| def end_processing_timer(self) -> float: |
| """End timing and return elapsed time""" |
| if self.processing_start_time is None: |
| return 0 |
|
|
| elapsed = time.time() - self.processing_start_time |
| self.processing_start_time = None |
|
|
| with self.lock: |
| self.processing_times.append(elapsed) |
|
|
| |
| if self.processing_times: |
| self.current_metrics['processing_time_avg'] = sum(self.processing_times) / len(self.processing_times) |
| self.current_metrics['processing_time_max'] = max(self.processing_times) |
| self.current_metrics['processing_time_min'] = min(self.processing_times) |
|
|
| return elapsed |
|
|
| def record_chunk_processing(self): |
| """Record that a chunk has been processed""" |
| current_time = time.time() |
|
|
| with self.lock: |
| self.current_metrics['total_processed_chunks'] += 1 |
|
|
| |
| time_diff = current_time - self.last_chunk_time |
| if time_diff > 0: |
| current_cps = 1.0 / time_diff |
| |
| self.current_metrics['chunks_per_second'] = ( |
| 0.9 * self.current_metrics['chunks_per_second'] + 0.1 * current_cps |
| ) |
|
|
| self.last_chunk_time = current_time |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Get current performance statistics""" |
| with self.lock: |
| return self.current_metrics.copy() |
|
|
| def get_memory_history(self, time_window_seconds: int = 300) -> List[tuple]: |
| """Get memory usage history within time window""" |
| current_time = time.time() |
| cutoff_time = current_time - time_window_seconds |
|
|
| with self.lock: |
| return [(t, v) for t, v in self.memory_history if t >= cutoff_time] |
|
|
| def get_cpu_history(self, time_window_seconds: int = 300) -> List[tuple]: |
| """Get CPU usage history within time window""" |
| current_time = time.time() |
| cutoff_time = current_time - time_window_seconds |
|
|
| with self.lock: |
| return [(t, v) for t, v in self.cpu_history if t >= cutoff_time] |
|
|
| def get_processing_time_stats(self) -> Dict[str, Any]: |
| """Get processing time statistics""" |
| with self.lock: |
| if not self.processing_times: |
| return { |
| 'count': 0, |
| 'average': 0, |
| 'maximum': 0, |
| 'minimum': 0, |
| 'median': 0 |
| } |
|
|
| sorted_times = sorted(self.processing_times) |
|
|
| return { |
| 'count': len(self.processing_times), |
| 'average': sum(self.processing_times) / len(self.processing_times), |
| 'maximum': max(self.processing_times), |
| 'minimum': min(self.processing_times), |
| 'median': sorted_times[len(sorted_times) // 2] |
| } |
|
|
| def get_system_info(self) -> Dict[str, Any]: |
| """Get system information""" |
| return { |
| 'cpu_count': psutil.cpu_count(), |
| 'cpu_count_logical': psutil.cpu_count(logical=True), |
| 'total_memory_gb': psutil.virtual_memory().total / (1024**3), |
| 'available_memory_gb': psutil.virtual_memory().available / (1024**3), |
| 'python_version': f"{psutil.python_implementation()} {psutil.python_version()}", |
| 'platform': psutil.platform |
| } |
|
|
| def reset_stats(self): |
| """Reset performance statistics""" |
| with self.lock: |
| self.processing_times.clear() |
| self.current_metrics['total_processed_chunks'] = 0 |
| self.current_metrics['chunks_per_second'] = 0 |
| self.current_metrics['processing_time_avg'] = 0 |
| self.current_metrics['processing_time_max'] = 0 |
| self.current_metrics['processing_time_min'] = 0 |
|
|
| def cleanup(self): |
| """Cleanup resources""" |
| self.monitoring = False |
| if self.monitor_thread.is_alive(): |
| self.monitor_thread.join(timeout=2) |
|
|
| def get_performance_report(self) -> Dict[str, Any]: |
| """Generate a comprehensive performance report""" |
| return { |
| 'current_metrics': self.get_stats(), |
| 'processing_stats': self.get_processing_time_stats(), |
| 'system_info': self.get_system_info(), |
| 'memory_history_count': len(self.memory_history), |
| 'cpu_history_count': len(self.cpu_history), |
| 'processing_times_count': len(self.processing_times) |
| } |
|
|
| def check_memory_threshold(self, threshold_mb: int) -> bool: |
| """Check if memory usage is above threshold""" |
| return self.current_metrics['memory_usage_mb'] > threshold_mb |
|
|
| def check_cpu_threshold(self, threshold_percent: float) -> bool: |
| """Check if CPU usage is above threshold""" |
| return self.current_metrics['cpu_percent'] > threshold_percent |
|
|
| def get_recommendations(self) -> List[str]: |
| """Get performance recommendations based on current metrics""" |
| recommendations = [] |
|
|
| |
| if self.current_metrics['memory_usage_mb'] > 7000: |
| recommendations.append("High memory usage detected. Consider reducing batch size or chunk size.") |
| elif self.current_metrics['memory_usage_mb'] > 5000: |
| recommendations.append("Moderate memory usage. Monitor closely during processing.") |
|
|
| |
| if self.current_metrics['cpu_percent'] > 90: |
| recommendations.append("High CPU usage. Consider reducing processing intensity.") |
| elif self.current_metrics['cpu_percent'] > 70: |
| recommendations.append("Moderate CPU usage. Processing is running optimally.") |
|
|
| |
| avg_time = self.current_metrics.get('processing_time_avg', 0) |
| if avg_time > 10: |
| recommendations.append("Slow processing detected. Consider using a more powerful model or optimizing settings.") |
| elif avg_time > 5: |
| recommendations.append("Moderate processing speed. Consider increasing batch size if memory allows.") |
|
|
| |
| |
| chunks_per_second = self.current_metrics.get('chunks_per_second', 0) |
| if chunks_per_second < 1: |
| recommendations.append("Low processing throughput. Consider optimizing chunk size or model parameters.") |
|
|
| if not recommendations: |
| recommendations.append("Performance is optimal. All metrics are within normal ranges.") |
|
|
| return recommendations |
|
|
| |
| _performance_instance = None |
| _performance_lock = threading.Lock() |
|
|
| def get_performance_monitor(max_history: int = 1000) -> PerformanceMonitor: |
| """Get or create global performance monitor instance""" |
| global _performance_instance |
|
|
| with _performance_lock: |
| if _performance_instance is None: |
| _performance_instance = PerformanceMonitor(max_history) |
|
|
| return _performance_instance |
|
|