Spaces:
Build error
Build error
| """ | |
| Enterprise Agentic Reliability Framework - Main Application (FIXED VERSION) | |
| Multi-Agent AI System for Production Reliability Monitoring | |
| CRITICAL FIXES APPLIED: | |
| - Removed event loop creation (uses Gradio native async) | |
| - Fixed FAISS thread safety with single-writer pattern | |
| - ProcessPoolExecutor for CPU-intensive encoding | |
| - Atomic saves with fsync | |
| - Dependency injection | |
| - Rate limiting | |
| - Comprehensive input validation | |
| - Circuit breakers for agent resilience | |
| """ | |
| import os | |
| import json | |
| import numpy as np | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import datetime | |
| import threading | |
| import logging | |
| import asyncio | |
| import tempfile | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from collections import deque, OrderedDict | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from concurrent.futures import ProcessPoolExecutor | |
| from queue import Queue | |
| from circuitbreaker import circuit | |
| import atomicwrites | |
| # Import our modules | |
| from models import ( | |
| ReliabilityEvent, EventSeverity, AnomalyResult, | |
| HealingAction, ForecastResult, PolicyCondition | |
| ) | |
| from claude_adapter import get_claude_adapter | |
| from healing_policies import PolicyEngine, DEFAULT_HEALING_POLICIES | |
| # === Logging Configuration === | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Claude adapter for AI reasoning | |
| claude_adapter = get_claude_adapter() | |
| # === CONSTANTS (FIXED: Extracted all magic numbers) === | |
| class Constants: | |
| """Centralized constants to eliminate magic numbers""" | |
| # Thresholds | |
| LATENCY_WARNING = 150.0 | |
| LATENCY_CRITICAL = 300.0 | |
| LATENCY_EXTREME = 500.0 | |
| ERROR_RATE_WARNING = 0.05 | |
| ERROR_RATE_HIGH = 0.15 | |
| ERROR_RATE_CRITICAL = 0.3 | |
| CPU_WARNING = 0.8 | |
| CPU_CRITICAL = 0.9 | |
| MEMORY_WARNING = 0.8 | |
| MEMORY_CRITICAL = 0.9 | |
| # Forecasting | |
| SLOPE_THRESHOLD_INCREASING = 5.0 | |
| SLOPE_THRESHOLD_DECREASING = -2.0 | |
| FORECAST_MIN_DATA_POINTS = 5 | |
| FORECAST_LOOKAHEAD_MINUTES = 15 | |
| # Performance | |
| HISTORY_WINDOW = 50 | |
| MAX_EVENTS_STORED = 1000 | |
| AGENT_TIMEOUT_SECONDS = 5 | |
| CACHE_EXPIRY_MINUTES = 15 | |
| # FAISS | |
| FAISS_BATCH_SIZE = 10 | |
| FAISS_SAVE_INTERVAL_SECONDS = 30 | |
| VECTOR_DIM = 384 | |
| # Business metrics | |
| BASE_REVENUE_PER_MINUTE = 100.0 | |
| BASE_USERS = 1000 | |
| # Rate limiting | |
| MAX_REQUESTS_PER_MINUTE = 60 | |
| MAX_REQUESTS_PER_HOUR = 500 | |
| # === Configuration === | |
| class Config: | |
| """Centralized configuration for the reliability framework""" | |
| HF_TOKEN: str = os.getenv("HF_TOKEN", "").strip() | |
| HF_API_URL: str = "https://router.huggingface.co/hf-inference/v1/completions" | |
| INDEX_FILE: str = os.getenv("INDEX_FILE", "data/incident_vectors.index") | |
| TEXTS_FILE: str = os.getenv("TEXTS_FILE", "data/incident_texts.json") | |
| DATA_DIR: str = os.getenv("DATA_DIR", "data") | |
| # Create data directory if it doesn't exist | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| config = Config() | |
| HEADERS = {"Authorization": f"Bearer {config.HF_TOKEN}"} if config.HF_TOKEN else {} | |
| # === Input Validation (FIXED: Comprehensive validation) === | |
| def validate_component_id(component_id: str) -> Tuple[bool, str]: | |
| """Validate component ID format""" | |
| if not isinstance(component_id, str): | |
| return False, "Component ID must be a string" | |
| if not (1 <= len(component_id) <= 255): | |
| return False, "Component ID must be 1-255 characters" | |
| import re | |
| if not re.match(r"^[a-z0-9-]+$", component_id): | |
| return False, "Component ID must contain only lowercase letters, numbers, and hyphens" | |
| return True, "" | |
| def validate_inputs( | |
| latency: Any, | |
| error_rate: Any, | |
| throughput: Any, | |
| cpu_util: Any, | |
| memory_util: Any | |
| ) -> Tuple[bool, str]: | |
| """ | |
| Comprehensive input validation with type checking | |
| FIXED: Added proper type validation before conversion | |
| """ | |
| try: | |
| # Type conversion with error handling | |
| try: | |
| latency_f = float(latency) | |
| except (ValueError, TypeError): | |
| return False, "โ Invalid latency: must be a number" | |
| try: | |
| error_rate_f = float(error_rate) | |
| except (ValueError, TypeError): | |
| return False, "โ Invalid error rate: must be a number" | |
| try: | |
| throughput_f = float(throughput) if throughput else 1000.0 | |
| except (ValueError, TypeError): | |
| return False, "โ Invalid throughput: must be a number" | |
| # CPU and memory are optional | |
| cpu_util_f = None | |
| if cpu_util: | |
| try: | |
| cpu_util_f = float(cpu_util) | |
| except (ValueError, TypeError): | |
| return False, "โ Invalid CPU utilization: must be a number" | |
| memory_util_f = None | |
| if memory_util: | |
| try: | |
| memory_util_f = float(memory_util) | |
| except (ValueError, TypeError): | |
| return False, "โ Invalid memory utilization: must be a number" | |
| # Range validation | |
| if not (0 <= latency_f <= 10000): | |
| return False, "โ Invalid latency: must be between 0-10000ms" | |
| if not (0 <= error_rate_f <= 1): | |
| return False, "โ Invalid error rate: must be between 0-1" | |
| if throughput_f < 0: | |
| return False, "โ Invalid throughput: must be positive" | |
| if cpu_util_f is not None and not (0 <= cpu_util_f <= 1): | |
| return False, "โ Invalid CPU utilization: must be between 0-1" | |
| if memory_util_f is not None and not (0 <= memory_util_f <= 1): | |
| return False, "โ Invalid memory utilization: must be between 0-1" | |
| return True, "" | |
| except Exception as e: | |
| logger.error(f"Validation error: {e}", exc_info=True) | |
| return False, f"โ Validation error: {str(e)}" | |
| # === Thread-Safe Data Structures === | |
| class ThreadSafeEventStore: | |
| """Thread-safe storage for reliability events""" | |
| def __init__(self, max_size: int = Constants.MAX_EVENTS_STORED): | |
| self._events = deque(maxlen=max_size) | |
| self._lock = threading.RLock() | |
| logger.info(f"Initialized ThreadSafeEventStore with max_size={max_size}") | |
| def add(self, event: ReliabilityEvent) -> None: | |
| """Add event to store""" | |
| with self._lock: | |
| self._events.append(event) | |
| logger.debug(f"Added event for {event.component}: {event.severity.value}") | |
| def get_recent(self, n: int = 15) -> List[ReliabilityEvent]: | |
| """Get n most recent events""" | |
| with self._lock: | |
| return list(self._events)[-n:] if self._events else [] | |
| def get_all(self) -> List[ReliabilityEvent]: | |
| """Get all events""" | |
| with self._lock: | |
| return list(self._events) | |
| def count(self) -> int: | |
| """Get total event count""" | |
| with self._lock: | |
| return len(self._events) | |
| # === FAISS Integration (FIXED: Single-writer pattern for thread safety) === | |
| class ProductionFAISSIndex: | |
| """ | |
| Production-safe FAISS index with single-writer pattern | |
| CRITICAL FIX: FAISS is NOT thread-safe for concurrent writes | |
| Solution: Queue-based single writer thread + atomic saves | |
| """ | |
| def __init__(self, index, texts: List[str]): | |
| self.index = index | |
| self.texts = texts | |
| self._lock = threading.RLock() | |
| # FIXED: Initialize shutdown event BEFORE starting thread | |
| self._shutdown = threading.Event() | |
| # Single writer thread (no concurrent write conflicts) | |
| self._write_queue: Queue = Queue() | |
| self._writer_thread = threading.Thread( | |
| target=self._writer_loop, | |
| daemon=True, | |
| name="FAISSWriter" | |
| ) | |
| self._writer_thread.start() # โ Only start ONCE, AFTER _shutdown exists | |
| # ProcessPool for encoding (avoids GIL + memory leaks) | |
| self._encoder_pool = ProcessPoolExecutor(max_workers=2) | |
| logger.info( | |
| f"Initialized ProductionFAISSIndex with {len(texts)} vectors, " | |
| f"single-writer pattern" | |
| ) | |
| def add_async(self, vector: np.ndarray, text: str) -> None: | |
| """ | |
| Add vector and text asynchronously (thread-safe) | |
| FIXED: Queue-based design - no concurrent FAISS writes | |
| """ | |
| self._write_queue.put((vector, text)) | |
| logger.debug(f"Queued vector for indexing: {text[:50]}...") | |
| def _writer_loop(self) -> None: | |
| """ | |
| Single writer thread - processes queue in batches | |
| This ensures only ONE thread ever writes to FAISS index | |
| """ | |
| batch = [] | |
| last_save = datetime.datetime.now() | |
| save_interval = datetime.timedelta( | |
| seconds=Constants.FAISS_SAVE_INTERVAL_SECONDS | |
| ) | |
| while not self._shutdown.is_set(): | |
| try: | |
| # Collect batch (non-blocking with timeout) | |
| import queue | |
| try: | |
| item = self._write_queue.get(timeout=1.0) | |
| batch.append(item) | |
| except queue.Empty: | |
| pass | |
| # Process batch when ready | |
| if len(batch) >= Constants.FAISS_BATCH_SIZE or \ | |
| (batch and datetime.datetime.now() - last_save > save_interval): | |
| self._flush_batch(batch) | |
| batch = [] | |
| # Periodic save | |
| if datetime.datetime.now() - last_save > save_interval: | |
| self._save_atomic() | |
| last_save = datetime.datetime.now() | |
| except Exception as e: | |
| logger.error(f"Writer loop error: {e}", exc_info=True) | |
| def _flush_batch(self, batch: List[Tuple[np.ndarray, str]]) -> None: | |
| """ | |
| Flush batch to FAISS index | |
| SAFE: Only called from single writer thread | |
| """ | |
| if not batch: | |
| return | |
| try: | |
| vectors = np.vstack([v for v, _ in batch]) | |
| texts = [t for _, t in batch] | |
| # SAFE: Single writer - no concurrent access | |
| self.index.add(vectors) | |
| with self._lock: # Only lock for text list modification | |
| self.texts.extend(texts) | |
| logger.info(f"Flushed batch of {len(batch)} vectors to FAISS index") | |
| except Exception as e: | |
| logger.error(f"Error flushing batch: {e}", exc_info=True) | |
| def _save_atomic(self) -> None: | |
| """ | |
| Atomic save with fsync for durability | |
| FIXED: Prevents corruption on crash | |
| """ | |
| try: | |
| import faiss | |
| # Write to temporary file first | |
| with tempfile.NamedTemporaryFile( | |
| mode='wb', | |
| delete=False, | |
| dir=os.path.dirname(config.INDEX_FILE), | |
| prefix='index_', | |
| suffix='.tmp' | |
| ) as tmp: | |
| temp_path = tmp.name | |
| # Write index | |
| faiss.write_index(self.index, temp_path) | |
| # Fsync for durability | |
| with open(temp_path, 'r+b') as f: | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| # Atomic rename | |
| os.replace(temp_path, config.INDEX_FILE) | |
| # Save texts with atomic write | |
| with self._lock: | |
| texts_copy = self.texts.copy() | |
| with atomicwrites.atomic_write( | |
| config.TEXTS_FILE, | |
| mode='w', | |
| overwrite=True | |
| ) as f: | |
| json.dump(texts_copy, f) | |
| logger.info( | |
| f"Atomically saved FAISS index with {len(texts_copy)} vectors" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error saving index: {e}", exc_info=True) | |
| def get_count(self) -> int: | |
| """Get total count of vectors""" | |
| with self._lock: | |
| return len(self.texts) + self._write_queue.qsize() | |
| def force_save(self) -> None: | |
| """Force immediate save of pending vectors""" | |
| logger.info("Forcing FAISS index save...") | |
| # Wait for queue to drain (with timeout) | |
| timeout = 10.0 | |
| start = datetime.datetime.now() | |
| while not self._write_queue.empty(): | |
| if (datetime.datetime.now() - start).total_seconds() > timeout: | |
| logger.warning("Force save timeout - queue not empty") | |
| break | |
| import time | |
| time.sleep(0.1) | |
| self._save_atomic() | |
| def shutdown(self) -> None: | |
| """Graceful shutdown""" | |
| logger.info("Shutting down FAISS index...") | |
| self._shutdown.set() | |
| self.force_save() | |
| self._writer_thread.join(timeout=5.0) | |
| self._encoder_pool.shutdown(wait=True) | |
| # === FAISS & Embeddings Setup === | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| logger.info("Loading SentenceTransformer model...") | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| logger.info("SentenceTransformer model loaded successfully") | |
| if os.path.exists(config.INDEX_FILE): | |
| logger.info(f"Loading existing FAISS index from {config.INDEX_FILE}") | |
| index = faiss.read_index(config.INDEX_FILE) | |
| if index.d != Constants.VECTOR_DIM: | |
| logger.warning( | |
| f"Index dimension mismatch: {index.d} != {Constants.VECTOR_DIM}. " | |
| f"Creating new index." | |
| ) | |
| index = faiss.IndexFlatL2(Constants.VECTOR_DIM) | |
| incident_texts = [] | |
| else: | |
| with open(config.TEXTS_FILE, "r") as f: | |
| incident_texts = json.load(f) | |
| logger.info(f"Loaded {len(incident_texts)} incident texts") | |
| else: | |
| logger.info("Creating new FAISS index") | |
| index = faiss.IndexFlatL2(Constants.VECTOR_DIM) | |
| incident_texts = [] | |
| thread_safe_index = ProductionFAISSIndex(index, incident_texts) | |
| except ImportError as e: | |
| logger.warning(f"FAISS or SentenceTransformers not available: {e}") | |
| index = None | |
| incident_texts = [] | |
| model = None | |
| thread_safe_index = None | |
| except Exception as e: | |
| logger.error(f"Error initializing FAISS: {e}", exc_info=True) | |
| index = None | |
| incident_texts = [] | |
| model = None | |
| thread_safe_index = None | |
| # === Predictive Models === | |
| class SimplePredictiveEngine: | |
| """ | |
| Lightweight forecasting engine with proper constant usage | |
| FIXED: All magic numbers extracted to Constants | |
| """ | |
| def __init__(self, history_window: int = Constants.HISTORY_WINDOW): | |
| self.history_window = history_window | |
| self.service_history: Dict[str, deque] = {} | |
| self.prediction_cache: Dict[str, Tuple[ForecastResult, datetime.datetime]] = {} | |
| self.max_cache_age = datetime.timedelta(minutes=Constants.CACHE_EXPIRY_MINUTES) | |
| self._lock = threading.RLock() | |
| logger.info(f"Initialized SimplePredictiveEngine with history_window={history_window}") | |
| def add_telemetry(self, service: str, event_data: Dict) -> None: | |
| """Add telemetry data to service history""" | |
| with self._lock: | |
| if service not in self.service_history: | |
| self.service_history[service] = deque(maxlen=self.history_window) | |
| telemetry_point = { | |
| 'timestamp': datetime.datetime.now(datetime.timezone.utc), | |
| 'latency': event_data.get('latency_p99', 0), | |
| 'error_rate': event_data.get('error_rate', 0), | |
| 'throughput': event_data.get('throughput', 0), | |
| 'cpu_util': event_data.get('cpu_util'), | |
| 'memory_util': event_data.get('memory_util') | |
| } | |
| self.service_history[service].append(telemetry_point) | |
| self._clean_cache() | |
| def _clean_cache(self) -> None: | |
| """Remove expired entries from prediction cache""" | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| expired = [k for k, (_, ts) in self.prediction_cache.items() | |
| if now - ts > self.max_cache_age] | |
| for k in expired: | |
| del self.prediction_cache[k] | |
| if expired: | |
| logger.debug(f"Cleaned {len(expired)} expired cache entries") | |
| def forecast_service_health( | |
| self, | |
| service: str, | |
| lookahead_minutes: int = Constants.FORECAST_LOOKAHEAD_MINUTES | |
| ) -> List[ForecastResult]: | |
| """Forecast service health metrics""" | |
| with self._lock: | |
| if service not in self.service_history or \ | |
| len(self.service_history[service]) < Constants.FORECAST_MIN_DATA_POINTS: | |
| return [] | |
| history = list(self.service_history[service]) | |
| forecasts = [] | |
| # Forecast latency | |
| latency_forecast = self._forecast_latency(history, lookahead_minutes) | |
| if latency_forecast: | |
| forecasts.append(latency_forecast) | |
| # Forecast error rate | |
| error_forecast = self._forecast_error_rate(history, lookahead_minutes) | |
| if error_forecast: | |
| forecasts.append(error_forecast) | |
| # Forecast resource utilization | |
| resource_forecasts = self._forecast_resources(history, lookahead_minutes) | |
| forecasts.extend(resource_forecasts) | |
| # Cache results | |
| with self._lock: | |
| for forecast in forecasts: | |
| cache_key = f"{service}_{forecast.metric}" | |
| self.prediction_cache[cache_key] = (forecast, datetime.datetime.now(datetime.timezone.utc)) | |
| return forecasts | |
| def _forecast_latency( | |
| self, | |
| history: List, | |
| lookahead_minutes: int | |
| ) -> Optional[ForecastResult]: | |
| """Forecast latency using linear regression""" | |
| try: | |
| latencies = [point['latency'] for point in history[-20:]] | |
| if len(latencies) < Constants.FORECAST_MIN_DATA_POINTS: | |
| return None | |
| # Linear trend | |
| x = np.arange(len(latencies)) | |
| slope, intercept = np.polyfit(x, latencies, 1) | |
| # Predict next value | |
| next_x = len(latencies) | |
| predicted_latency = slope * next_x + intercept | |
| # Calculate confidence | |
| residuals = latencies - (slope * x + intercept) | |
| confidence = max(0, 1 - (np.std(residuals) / max(1, np.mean(latencies)))) | |
| # Determine trend and risk | |
| if slope > Constants.SLOPE_THRESHOLD_INCREASING: | |
| trend = "increasing" | |
| risk = "critical" if predicted_latency > Constants.LATENCY_EXTREME else "high" | |
| elif slope < Constants.SLOPE_THRESHOLD_DECREASING: | |
| trend = "decreasing" | |
| risk = "low" | |
| else: | |
| trend = "stable" | |
| risk = "low" if predicted_latency < Constants.LATENCY_WARNING else "medium" | |
| # Calculate time to reach critical threshold | |
| time_to_critical = None | |
| if slope > 0 and predicted_latency < Constants.LATENCY_EXTREME: | |
| denominator = predicted_latency - latencies[-1] | |
| if abs(denominator) > 0.1: | |
| minutes_to_critical = lookahead_minutes * \ | |
| (Constants.LATENCY_EXTREME - predicted_latency) / denominator | |
| if minutes_to_critical > 0: | |
| time_to_critical = minutes_to_critical | |
| return ForecastResult( | |
| metric="latency", | |
| predicted_value=predicted_latency, | |
| confidence=confidence, | |
| trend=trend, | |
| time_to_threshold=time_to_critical, | |
| risk_level=risk | |
| ) | |
| except Exception as e: | |
| logger.error(f"Latency forecast error: {e}", exc_info=True) | |
| return None | |
| def _forecast_error_rate( | |
| self, | |
| history: List, | |
| lookahead_minutes: int | |
| ) -> Optional[ForecastResult]: | |
| """Forecast error rate using exponential smoothing""" | |
| try: | |
| error_rates = [point['error_rate'] for point in history[-15:]] | |
| if len(error_rates) < Constants.FORECAST_MIN_DATA_POINTS: | |
| return None | |
| # Exponential smoothing | |
| alpha = 0.3 | |
| forecast = error_rates[0] | |
| for rate in error_rates[1:]: | |
| forecast = alpha * rate + (1 - alpha) * forecast | |
| predicted_rate = forecast | |
| # Trend analysis | |
| recent_trend = np.mean(error_rates[-3:]) - np.mean(error_rates[-6:-3]) | |
| if recent_trend > 0.02: | |
| trend = "increasing" | |
| risk = "critical" if predicted_rate > Constants.ERROR_RATE_CRITICAL else "high" | |
| elif recent_trend < -0.01: | |
| trend = "decreasing" | |
| risk = "low" | |
| else: | |
| trend = "stable" | |
| risk = "low" if predicted_rate < Constants.ERROR_RATE_WARNING else "medium" | |
| # Confidence based on volatility | |
| confidence = max(0, 1 - (np.std(error_rates) / max(0.01, np.mean(error_rates)))) | |
| return ForecastResult( | |
| metric="error_rate", | |
| predicted_value=predicted_rate, | |
| confidence=confidence, | |
| trend=trend, | |
| risk_level=risk | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error rate forecast error: {e}", exc_info=True) | |
| return None | |
| def _forecast_resources( | |
| self, | |
| history: List, | |
| lookahead_minutes: int | |
| ) -> List[ForecastResult]: | |
| """Forecast CPU and memory utilization""" | |
| forecasts = [] | |
| # CPU forecast | |
| cpu_values = [point['cpu_util'] for point in history if point.get('cpu_util') is not None] | |
| if len(cpu_values) >= Constants.FORECAST_MIN_DATA_POINTS: | |
| try: | |
| predicted_cpu = np.mean(cpu_values[-5:]) | |
| trend = "increasing" if cpu_values[-1] > np.mean(cpu_values[-10:-5]) else "stable" | |
| risk = "low" | |
| if predicted_cpu > Constants.CPU_CRITICAL: | |
| risk = "critical" | |
| elif predicted_cpu > Constants.CPU_WARNING: | |
| risk = "high" | |
| elif predicted_cpu > 0.7: | |
| risk = "medium" | |
| forecasts.append(ForecastResult( | |
| metric="cpu_util", | |
| predicted_value=predicted_cpu, | |
| confidence=0.7, | |
| trend=trend, | |
| risk_level=risk | |
| )) | |
| except Exception as e: | |
| logger.error(f"CPU forecast error: {e}", exc_info=True) | |
| # Memory forecast | |
| memory_values = [point['memory_util'] for point in history if point.get('memory_util') is not None] | |
| if len(memory_values) >= Constants.FORECAST_MIN_DATA_POINTS: | |
| try: | |
| predicted_memory = np.mean(memory_values[-5:]) | |
| trend = "increasing" if memory_values[-1] > np.mean(memory_values[-10:-5]) else "stable" | |
| risk = "low" | |
| if predicted_memory > Constants.MEMORY_CRITICAL: | |
| risk = "critical" | |
| elif predicted_memory > Constants.MEMORY_WARNING: | |
| risk = "high" | |
| elif predicted_memory > 0.7: | |
| risk = "medium" | |
| forecasts.append(ForecastResult( | |
| metric="memory_util", | |
| predicted_value=predicted_memory, | |
| confidence=0.7, | |
| trend=trend, | |
| risk_level=risk | |
| )) | |
| except Exception as e: | |
| logger.error(f"Memory forecast error: {e}", exc_info=True) | |
| return forecasts | |
| def get_predictive_insights(self, service: str) -> Dict[str, Any]: | |
| """Generate actionable insights from forecasts""" | |
| forecasts = self.forecast_service_health(service) | |
| critical_risks = [f for f in forecasts if f.risk_level in ["high", "critical"]] | |
| warnings = [] | |
| recommendations = [] | |
| for forecast in critical_risks: | |
| if forecast.metric == "latency" and forecast.risk_level in ["high", "critical"]: | |
| warnings.append(f"๐ Latency expected to reach {forecast.predicted_value:.0f}ms") | |
| if forecast.time_to_threshold: | |
| minutes = int(forecast.time_to_threshold) | |
| recommendations.append(f"โฐ Critical latency (~{Constants.LATENCY_EXTREME}ms) in ~{minutes} minutes") | |
| recommendations.append("๐ง Consider scaling or optimizing dependencies") | |
| elif forecast.metric == "error_rate" and forecast.risk_level in ["high", "critical"]: | |
| warnings.append(f"๐จ Errors expected to reach {forecast.predicted_value*100:.1f}%") | |
| recommendations.append("๐ Investigate recent deployments or dependency issues") | |
| elif forecast.metric == "cpu_util" and forecast.risk_level in ["high", "critical"]: | |
| warnings.append(f"๐ฅ CPU expected at {forecast.predicted_value*100:.1f}%") | |
| recommendations.append("โก Consider scaling compute resources") | |
| elif forecast.metric == "memory_util" and forecast.risk_level in ["high", "critical"]: | |
| warnings.append(f"๐พ Memory expected at {forecast.predicted_value*100:.1f}%") | |
| recommendations.append("๐งน Check for memory leaks or optimize usage") | |
| return { | |
| 'service': service, | |
| 'forecasts': [ | |
| { | |
| 'metric': f.metric, | |
| 'predicted_value': f.predicted_value, | |
| 'confidence': f.confidence, | |
| 'trend': f.trend, | |
| 'risk_level': f.risk_level, | |
| 'time_to_threshold': f.time_to_threshold | |
| } | |
| for f in forecasts | |
| ], | |
| 'warnings': warnings[:3], | |
| 'recommendations': list(dict.fromkeys(recommendations))[:3], | |
| 'critical_risk_count': len(critical_risks), | |
| 'forecast_timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| } | |
| class BusinessImpactCalculator: | |
| """Calculate business impact of anomalies""" | |
| def __init__(self, revenue_per_request: float = 0.01): | |
| self.revenue_per_request = revenue_per_request | |
| logger.info(f"Initialized BusinessImpactCalculator") | |
| def calculate_impact( | |
| self, | |
| event: ReliabilityEvent, | |
| duration_minutes: int = 5 | |
| ) -> Dict[str, Any]: | |
| """Calculate business impact for a reliability event""" | |
| base_revenue_per_minute = Constants.BASE_REVENUE_PER_MINUTE | |
| impact_multiplier = 1.0 | |
| # Impact factors | |
| if event.latency_p99 > Constants.LATENCY_CRITICAL: | |
| impact_multiplier += 0.5 | |
| if event.error_rate > 0.1: | |
| impact_multiplier += 0.8 | |
| if event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL: | |
| impact_multiplier += 0.3 | |
| revenue_loss = base_revenue_per_minute * impact_multiplier * (duration_minutes / 60) | |
| base_users_affected = Constants.BASE_USERS | |
| user_impact_multiplier = (event.error_rate * 10) + \ | |
| (max(0, event.latency_p99 - 100) / 500) | |
| affected_users = int(base_users_affected * user_impact_multiplier) | |
| # Severity classification | |
| if revenue_loss > 500 or affected_users > 5000: | |
| severity = "CRITICAL" | |
| elif revenue_loss > 100 or affected_users > 1000: | |
| severity = "HIGH" | |
| elif revenue_loss > 50 or affected_users > 500: | |
| severity = "MEDIUM" | |
| else: | |
| severity = "LOW" | |
| logger.info( | |
| f"Business impact: ${revenue_loss:.2f} revenue loss, " | |
| f"{affected_users} users, {severity} severity" | |
| ) | |
| return { | |
| 'revenue_loss_estimate': round(revenue_loss, 2), | |
| 'affected_users_estimate': affected_users, | |
| 'severity_level': severity, | |
| 'throughput_reduction_pct': round(min(100, user_impact_multiplier * 100), 1) | |
| } | |
| class AdvancedAnomalyDetector: | |
| """Enhanced anomaly detection with adaptive thresholds""" | |
| def __init__(self): | |
| self.historical_data = deque(maxlen=100) | |
| self.adaptive_thresholds = { | |
| 'latency_p99': Constants.LATENCY_WARNING, | |
| 'error_rate': Constants.ERROR_RATE_WARNING | |
| } | |
| self._lock = threading.RLock() | |
| logger.info("Initialized AdvancedAnomalyDetector") | |
| def detect_anomaly(self, event: ReliabilityEvent) -> bool: | |
| """Detect if event is anomalous using adaptive thresholds""" | |
| with self._lock: | |
| latency_anomaly = event.latency_p99 > self.adaptive_thresholds['latency_p99'] | |
| error_anomaly = event.error_rate > self.adaptive_thresholds['error_rate'] | |
| resource_anomaly = False | |
| if event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL: | |
| resource_anomaly = True | |
| if event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL: | |
| resource_anomaly = True | |
| self._update_thresholds(event) | |
| is_anomaly = latency_anomaly or error_anomaly or resource_anomaly | |
| if is_anomaly: | |
| logger.info( | |
| f"Anomaly detected for {event.component}: " | |
| f"latency={latency_anomaly}, error={error_anomaly}, " | |
| f"resource={resource_anomaly}" | |
| ) | |
| return is_anomaly | |
| def _update_thresholds(self, event: ReliabilityEvent) -> None: | |
| """Update adaptive thresholds based on historical data""" | |
| self.historical_data.append(event) | |
| if len(self.historical_data) > 10: | |
| recent_latencies = [e.latency_p99 for e in list(self.historical_data)[-20:]] | |
| new_threshold = np.percentile(recent_latencies, 90) | |
| self.adaptive_thresholds['latency_p99'] = new_threshold | |
| logger.debug(f"Updated adaptive latency threshold to {new_threshold:.2f}ms") | |
| # === Multi-Agent System === | |
| class AgentSpecialization(Enum): | |
| """Agent specialization types""" | |
| DETECTIVE = "anomaly_detection" | |
| DIAGNOSTICIAN = "root_cause_analysis" | |
| PREDICTIVE = "predictive_analytics" | |
| class BaseAgent: | |
| """Base class for all specialized agents""" | |
| def __init__(self, specialization: AgentSpecialization): | |
| self.specialization = specialization | |
| self.performance_metrics = { | |
| 'processed_events': 0, | |
| 'successful_analyses': 0, | |
| 'average_confidence': 0.0 | |
| } | |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """Base analysis method to be implemented by specialized agents""" | |
| raise NotImplementedError | |
| class AnomalyDetectionAgent(BaseAgent): | |
| """Specialized agent for anomaly detection and pattern recognition""" | |
| def __init__(self): | |
| super().__init__(AgentSpecialization.DETECTIVE) | |
| logger.info("Initialized AnomalyDetectionAgent") | |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """Perform comprehensive anomaly analysis""" | |
| try: | |
| anomaly_score = self._calculate_anomaly_score(event) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': anomaly_score, | |
| 'findings': { | |
| 'anomaly_score': anomaly_score, | |
| 'severity_tier': self._classify_severity(anomaly_score), | |
| 'primary_metrics_affected': self._identify_affected_metrics(event) | |
| }, | |
| 'recommendations': self._generate_detection_recommendations(event, anomaly_score) | |
| } | |
| except Exception as e: | |
| logger.error(f"AnomalyDetectionAgent error: {e}", exc_info=True) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': 0.0, | |
| 'findings': {}, | |
| 'recommendations': [f"Analysis error: {str(e)}"] | |
| } | |
| def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: | |
| """Calculate comprehensive anomaly score (0-1)""" | |
| scores = [] | |
| # Latency anomaly (weighted 40%) | |
| if event.latency_p99 > Constants.LATENCY_WARNING: | |
| latency_score = min(1.0, (event.latency_p99 - Constants.LATENCY_WARNING) / 500) | |
| scores.append(0.4 * latency_score) | |
| # Error rate anomaly (weighted 30%) | |
| if event.error_rate > Constants.ERROR_RATE_WARNING: | |
| error_score = min(1.0, event.error_rate / 0.3) | |
| scores.append(0.3 * error_score) | |
| # Resource anomaly (weighted 30%) | |
| resource_score = 0 | |
| if event.cpu_util and event.cpu_util > Constants.CPU_WARNING: | |
| resource_score += 0.15 * min(1.0, (event.cpu_util - Constants.CPU_WARNING) / 0.2) | |
| if event.memory_util and event.memory_util > Constants.MEMORY_WARNING: | |
| resource_score += 0.15 * min(1.0, (event.memory_util - Constants.MEMORY_WARNING) / 0.2) | |
| scores.append(resource_score) | |
| return min(1.0, sum(scores)) | |
| def _classify_severity(self, anomaly_score: float) -> str: | |
| """Classify severity tier based on anomaly score""" | |
| if anomaly_score > 0.8: | |
| return "CRITICAL" | |
| elif anomaly_score > 0.6: | |
| return "HIGH" | |
| elif anomaly_score > 0.4: | |
| return "MEDIUM" | |
| else: | |
| return "LOW" | |
| def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[Dict[str, Any]]: | |
| """Identify which metrics are outside normal ranges""" | |
| affected = [] | |
| # Latency checks | |
| if event.latency_p99 > Constants.LATENCY_EXTREME: | |
| affected.append({ | |
| "metric": "latency", | |
| "value": event.latency_p99, | |
| "severity": "CRITICAL", | |
| "threshold": Constants.LATENCY_WARNING | |
| }) | |
| elif event.latency_p99 > Constants.LATENCY_CRITICAL: | |
| affected.append({ | |
| "metric": "latency", | |
| "value": event.latency_p99, | |
| "severity": "HIGH", | |
| "threshold": Constants.LATENCY_WARNING | |
| }) | |
| elif event.latency_p99 > Constants.LATENCY_WARNING: | |
| affected.append({ | |
| "metric": "latency", | |
| "value": event.latency_p99, | |
| "severity": "MEDIUM", | |
| "threshold": Constants.LATENCY_WARNING | |
| }) | |
| # Error rate checks | |
| if event.error_rate > Constants.ERROR_RATE_CRITICAL: | |
| affected.append({ | |
| "metric": "error_rate", | |
| "value": event.error_rate, | |
| "severity": "CRITICAL", | |
| "threshold": Constants.ERROR_RATE_WARNING | |
| }) | |
| elif event.error_rate > Constants.ERROR_RATE_HIGH: | |
| affected.append({ | |
| "metric": "error_rate", | |
| "value": event.error_rate, | |
| "severity": "HIGH", | |
| "threshold": Constants.ERROR_RATE_WARNING | |
| }) | |
| elif event.error_rate > Constants.ERROR_RATE_WARNING: | |
| affected.append({ | |
| "metric": "error_rate", | |
| "value": event.error_rate, | |
| "severity": "MEDIUM", | |
| "threshold": Constants.ERROR_RATE_WARNING | |
| }) | |
| # CPU checks | |
| if event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL: | |
| affected.append({ | |
| "metric": "cpu", | |
| "value": event.cpu_util, | |
| "severity": "CRITICAL", | |
| "threshold": Constants.CPU_WARNING | |
| }) | |
| elif event.cpu_util and event.cpu_util > Constants.CPU_WARNING: | |
| affected.append({ | |
| "metric": "cpu", | |
| "value": event.cpu_util, | |
| "severity": "HIGH", | |
| "threshold": Constants.CPU_WARNING | |
| }) | |
| # Memory checks | |
| if event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL: | |
| affected.append({ | |
| "metric": "memory", | |
| "value": event.memory_util, | |
| "severity": "CRITICAL", | |
| "threshold": Constants.MEMORY_WARNING | |
| }) | |
| elif event.memory_util and event.memory_util > Constants.MEMORY_WARNING: | |
| affected.append({ | |
| "metric": "memory", | |
| "value": event.memory_util, | |
| "severity": "HIGH", | |
| "threshold": Constants.MEMORY_WARNING | |
| }) | |
| return affected | |
| def _generate_detection_recommendations( | |
| self, | |
| event: ReliabilityEvent, | |
| anomaly_score: float | |
| ) -> List[str]: | |
| """Generate actionable recommendations""" | |
| recommendations = [] | |
| affected_metrics = self._identify_affected_metrics(event) | |
| for metric in affected_metrics: | |
| metric_name = metric["metric"] | |
| severity = metric["severity"] | |
| value = metric["value"] | |
| threshold = metric["threshold"] | |
| if metric_name == "latency": | |
| if severity == "CRITICAL": | |
| recommendations.append( | |
| f"๐จ CRITICAL: Latency {value:.0f}ms (>{threshold}ms) - " | |
| f"Check database & external dependencies" | |
| ) | |
| elif severity == "HIGH": | |
| recommendations.append( | |
| f"โ ๏ธ HIGH: Latency {value:.0f}ms (>{threshold}ms) - " | |
| f"Investigate service performance" | |
| ) | |
| else: | |
| recommendations.append( | |
| f"๐ Latency elevated: {value:.0f}ms (>{threshold}ms) - Monitor trend" | |
| ) | |
| elif metric_name == "error_rate": | |
| if severity == "CRITICAL": | |
| recommendations.append( | |
| f"๐จ CRITICAL: Error rate {value*100:.1f}% (>{threshold*100:.1f}%) - " | |
| f"Check recent deployments" | |
| ) | |
| elif severity == "HIGH": | |
| recommendations.append( | |
| f"โ ๏ธ HIGH: Error rate {value*100:.1f}% (>{threshold*100:.1f}%) - " | |
| f"Review application logs" | |
| ) | |
| else: | |
| recommendations.append( | |
| f"๐ Errors increasing: {value*100:.1f}% (>{threshold*100:.1f}%)" | |
| ) | |
| elif metric_name == "cpu": | |
| recommendations.append( | |
| f"๐ฅ CPU {severity}: {value*100:.1f}% utilization - Consider scaling" | |
| ) | |
| elif metric_name == "memory": | |
| recommendations.append( | |
| f"๐พ Memory {severity}: {value*100:.1f}% utilization - Check for memory leaks" | |
| ) | |
| # Overall severity recommendations | |
| if anomaly_score > 0.8: | |
| recommendations.append("๐ฏ IMMEDIATE ACTION REQUIRED: Multiple critical metrics affected") | |
| elif anomaly_score > 0.6: | |
| recommendations.append("๐ฏ INVESTIGATE: Significant performance degradation detected") | |
| elif anomaly_score > 0.4: | |
| recommendations.append("๐ MONITOR: Early warning signs detected") | |
| return recommendations[:4] | |
| class RootCauseAgent(BaseAgent): | |
| """Specialized agent for root cause analysis""" | |
| def __init__(self): | |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) | |
| logger.info("Initialized RootCauseAgent") | |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """Perform root cause analysis""" | |
| try: | |
| causes = self._analyze_potential_causes(event) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': 0.7, | |
| 'findings': { | |
| 'likely_root_causes': causes, | |
| 'evidence_patterns': self._identify_evidence(event), | |
| 'investigation_priority': self._prioritize_investigation(causes) | |
| }, | |
| 'recommendations': [ | |
| f"Check {cause['cause']} for issues" for cause in causes[:2] | |
| ] | |
| } | |
| except Exception as e: | |
| logger.error(f"RootCauseAgent error: {e}", exc_info=True) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': 0.0, | |
| 'findings': {}, | |
| 'recommendations': [f"Analysis error: {str(e)}"] | |
| } | |
| def _analyze_potential_causes(self, event: ReliabilityEvent) -> List[Dict[str, Any]]: | |
| """Analyze potential root causes based on event patterns""" | |
| causes = [] | |
| # Pattern 1: Database/External Dependency Failure | |
| if event.latency_p99 > Constants.LATENCY_EXTREME and event.error_rate > 0.2: | |
| causes.append({ | |
| "cause": "Database/External Dependency Failure", | |
| "confidence": 0.85, | |
| "evidence": f"Extreme latency ({event.latency_p99:.0f}ms) with high errors ({event.error_rate*100:.1f}%)", | |
| "investigation": "Check database connection pool, external API health" | |
| }) | |
| # Pattern 2: Resource Exhaustion | |
| if (event.cpu_util and event.cpu_util > Constants.CPU_CRITICAL and | |
| event.memory_util and event.memory_util > Constants.MEMORY_CRITICAL): | |
| causes.append({ | |
| "cause": "Resource Exhaustion", | |
| "confidence": 0.90, | |
| "evidence": f"CPU ({event.cpu_util*100:.1f}%) and Memory ({event.memory_util*100:.1f}%) critically high", | |
| "investigation": "Check for memory leaks, infinite loops, insufficient resources" | |
| }) | |
| # Pattern 3: Application Bug / Configuration Issue | |
| if event.error_rate > Constants.ERROR_RATE_CRITICAL and event.latency_p99 < 200: | |
| causes.append({ | |
| "cause": "Application Bug / Configuration Issue", | |
| "confidence": 0.75, | |
| "evidence": f"High error rate ({event.error_rate*100:.1f}%) without latency impact", | |
| "investigation": "Review recent deployments, configuration changes, application logs" | |
| }) | |
| # Pattern 4: Gradual Performance Degradation | |
| if (200 <= event.latency_p99 <= 400 and | |
| Constants.ERROR_RATE_WARNING <= event.error_rate <= Constants.ERROR_RATE_HIGH): | |
| causes.append({ | |
| "cause": "Gradual Performance Degradation", | |
| "confidence": 0.65, | |
| "evidence": f"Moderate latency ({event.latency_p99:.0f}ms) and errors ({event.error_rate*100:.1f}%)", | |
| "investigation": "Check resource trends, dependency performance, capacity planning" | |
| }) | |
| # Default: Unknown pattern | |
| if not causes: | |
| causes.append({ | |
| "cause": "Unknown - Requires Investigation", | |
| "confidence": 0.3, | |
| "evidence": "Pattern does not match known failure modes", | |
| "investigation": "Complete system review needed" | |
| }) | |
| return causes | |
| def _identify_evidence(self, event: ReliabilityEvent) -> List[str]: | |
| """Identify evidence patterns in the event data""" | |
| evidence = [] | |
| if event.latency_p99 > event.error_rate * 1000: | |
| evidence.append("latency_disproportionate_to_errors") | |
| if (event.cpu_util and event.cpu_util > Constants.CPU_WARNING and | |
| event.memory_util and event.memory_util > Constants.MEMORY_WARNING): | |
| evidence.append("correlated_resource_exhaustion") | |
| if event.error_rate > Constants.ERROR_RATE_HIGH and event.latency_p99 < Constants.LATENCY_CRITICAL: | |
| evidence.append("errors_without_latency_impact") | |
| return evidence | |
| def _prioritize_investigation(self, causes: List[Dict[str, Any]]) -> str: | |
| """Determine investigation priority""" | |
| for cause in causes: | |
| if "Database" in cause["cause"] or "Resource Exhaustion" in cause["cause"]: | |
| return "HIGH" | |
| return "MEDIUM" | |
| class PredictiveAgent(BaseAgent): | |
| """Specialized agent for predictive analytics""" | |
| def __init__(self, engine: SimplePredictiveEngine): | |
| super().__init__(AgentSpecialization.PREDICTIVE) | |
| self.engine = engine | |
| logger.info("Initialized PredictiveAgent") | |
| async def analyze(self, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """Perform predictive analysis for future risks""" | |
| try: | |
| event_data = { | |
| 'latency_p99': event.latency_p99, | |
| 'error_rate': event.error_rate, | |
| 'throughput': event.throughput, | |
| 'cpu_util': event.cpu_util, | |
| 'memory_util': event.memory_util | |
| } | |
| self.engine.add_telemetry(event.component, event_data) | |
| insights = self.engine.get_predictive_insights(event.component) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': 0.8 if insights['critical_risk_count'] > 0 else 0.5, | |
| 'findings': insights, | |
| 'recommendations': insights['recommendations'] | |
| } | |
| except Exception as e: | |
| logger.error(f"PredictiveAgent error: {e}", exc_info=True) | |
| return { | |
| 'specialization': self.specialization.value, | |
| 'confidence': 0.0, | |
| 'findings': {}, | |
| 'recommendations': [f"Analysis error: {str(e)}"] | |
| } | |
| # FIXED: Add circuit breaker for agent resilience | |
| async def call_agent_with_protection(agent: BaseAgent, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """ | |
| Call agent with circuit breaker protection | |
| FIXED: Prevents cascading failures from misbehaving agents | |
| """ | |
| try: | |
| result = await asyncio.wait_for( | |
| agent.analyze(event), | |
| timeout=Constants.AGENT_TIMEOUT_SECONDS | |
| ) | |
| return result | |
| except asyncio.TimeoutError: | |
| logger.warning(f"Agent {agent.specialization.value} timed out") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Agent {agent.specialization.value} error: {e}", exc_info=True) | |
| raise | |
| class OrchestrationManager: | |
| """Orchestrates multiple specialized agents for comprehensive analysis""" | |
| def __init__( | |
| self, | |
| detective: Optional[AnomalyDetectionAgent] = None, | |
| diagnostician: Optional[RootCauseAgent] = None, | |
| predictive: Optional[PredictiveAgent] = None | |
| ): | |
| """ | |
| Initialize orchestration manager | |
| FIXED: Dependency injection for testability | |
| """ | |
| self.agents = { | |
| AgentSpecialization.DETECTIVE: detective or AnomalyDetectionAgent(), | |
| AgentSpecialization.DIAGNOSTICIAN: diagnostician or RootCauseAgent(), | |
| AgentSpecialization.PREDICTIVE: predictive or PredictiveAgent(SimplePredictiveEngine()), | |
| } | |
| logger.info(f"Initialized OrchestrationManager with {len(self.agents)} agents") | |
| async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: | |
| """ | |
| Coordinate multiple agents for comprehensive analysis | |
| FIXED: Improved timeout handling with circuit breakers | |
| """ | |
| # Create tasks for all agents | |
| agent_tasks = [] | |
| agent_specs = [] | |
| for spec, agent in self.agents.items(): | |
| agent_tasks.append(call_agent_with_protection(agent, event)) | |
| agent_specs.append(spec) | |
| # FIXED: Parallel execution with global timeout | |
| agent_results = {} | |
| try: | |
| # Run all agents in parallel with global timeout | |
| results = await asyncio.wait_for( | |
| asyncio.gather(*agent_tasks, return_exceptions=True), | |
| timeout=Constants.AGENT_TIMEOUT_SECONDS + 1 | |
| ) | |
| # Process results | |
| for spec, result in zip(agent_specs, results): | |
| if isinstance(result, Exception): | |
| logger.error(f"Agent {spec.value} failed: {result}") | |
| continue | |
| agent_results[spec.value] = result | |
| logger.debug(f"Agent {spec.value} completed successfully") | |
| except asyncio.TimeoutError: | |
| logger.warning("Agent orchestration timed out") | |
| except Exception as e: | |
| logger.error(f"Agent orchestration error: {e}", exc_info=True) | |
| return self._synthesize_agent_findings(event, agent_results) | |
| def _synthesize_agent_findings( | |
| self, | |
| event: ReliabilityEvent, | |
| agent_results: Dict | |
| ) -> Dict[str, Any]: | |
| """Combine insights from all specialized agents""" | |
| detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) | |
| diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) | |
| predictive_result = agent_results.get(AgentSpecialization.PREDICTIVE.value) | |
| if not detective_result: | |
| logger.warning("No detective agent results available") | |
| return {'error': 'No agent results available'} | |
| synthesis = { | |
| 'incident_summary': { | |
| 'severity': detective_result['findings'].get('severity_tier', 'UNKNOWN'), | |
| 'anomaly_confidence': detective_result['confidence'], | |
| 'primary_metrics_affected': [ | |
| metric["metric"] for metric in | |
| detective_result['findings'].get('primary_metrics_affected', []) | |
| ] | |
| }, | |
| 'root_cause_insights': diagnostician_result['findings'] if diagnostician_result else {}, | |
| 'predictive_insights': predictive_result['findings'] if predictive_result else {}, | |
| 'recommended_actions': self._prioritize_actions( | |
| detective_result.get('recommendations', []), | |
| diagnostician_result.get('recommendations', []) if diagnostician_result else [], | |
| predictive_result.get('recommendations', []) if predictive_result else [] | |
| ), | |
| 'agent_metadata': { | |
| 'participating_agents': list(agent_results.keys()), | |
| 'analysis_timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| } | |
| } | |
| return synthesis | |
| def _prioritize_actions( | |
| self, | |
| detection_actions: List[str], | |
| diagnosis_actions: List[str], | |
| predictive_actions: List[str] | |
| ) -> List[str]: | |
| """Combine and prioritize actions from multiple agents""" | |
| all_actions = detection_actions + diagnosis_actions + predictive_actions | |
| seen = set() | |
| unique_actions = [] | |
| for action in all_actions: | |
| if action not in seen: | |
| seen.add(action) | |
| unique_actions.append(action) | |
| return unique_actions[:5] | |
| # === Enhanced Reliability Engine === | |
| class EnhancedReliabilityEngine: | |
| """ | |
| Main engine for processing reliability events | |
| FIXED: Dependency injection for all components | |
| """ | |
| def __init__( | |
| self, | |
| orchestrator: Optional[OrchestrationManager] = None, | |
| policy_engine: Optional[PolicyEngine] = None, | |
| event_store: Optional[ThreadSafeEventStore] = None, | |
| anomaly_detector: Optional[AdvancedAnomalyDetector] = None, | |
| business_calculator: Optional[BusinessImpactCalculator] = None | |
| ): | |
| """ | |
| Initialize reliability engine with dependency injection | |
| FIXED: All dependencies injected for testability | |
| """ | |
| self.orchestrator = orchestrator or OrchestrationManager() | |
| self.policy_engine = policy_engine or PolicyEngine() | |
| self.event_store = event_store or ThreadSafeEventStore() | |
| self.anomaly_detector = anomaly_detector or AdvancedAnomalyDetector() | |
| self.business_calculator = business_calculator or BusinessImpactCalculator() | |
| self.performance_metrics = { | |
| 'total_incidents_processed': 0, | |
| 'multi_agent_analyses': 0, | |
| 'anomalies_detected': 0 | |
| } | |
| self._lock = threading.RLock() | |
| logger.info("Initialized EnhancedReliabilityEngine") | |
| async def process_event_enhanced( | |
| self, | |
| component: str, | |
| latency: float, | |
| error_rate: float, | |
| throughput: float = 1000, | |
| cpu_util: Optional[float] = None, | |
| memory_util: Optional[float] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a reliability event through the complete analysis pipeline | |
| FIXED: Proper async/await throughout | |
| """ | |
| logger.info( | |
| f"Processing event for {component}: latency={latency}ms, " | |
| f"error_rate={error_rate*100:.1f}%" | |
| ) | |
| # Validate component ID | |
| is_valid, error_msg = validate_component_id(component) | |
| if not is_valid: | |
| return {'error': error_msg, 'status': 'INVALID'} | |
| # Create event | |
| try: | |
| event = ReliabilityEvent( | |
| component=component, | |
| latency_p99=latency, | |
| error_rate=error_rate, | |
| throughput=throughput, | |
| cpu_util=cpu_util, | |
| memory_util=memory_util, | |
| upstream_deps=["auth-service", "database"] if component == "api-service" else [] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Event creation error: {e}", exc_info=True) | |
| return {'error': f'Invalid event data: {str(e)}', 'status': 'INVALID'} | |
| # Multi-agent analysis | |
| agent_analysis = await self.orchestrator.orchestrate_analysis(event) | |
| # Anomaly detection | |
| is_anomaly = self.anomaly_detector.detect_anomaly(event) | |
| # Determine severity based on agent confidence | |
| agent_confidence = 0.0 | |
| if agent_analysis and 'incident_summary' in agent_analysis: | |
| agent_confidence = agent_analysis.get('incident_summary', {}).get('anomaly_confidence', 0) | |
| else: | |
| agent_confidence = 0.8 if is_anomaly else 0.1 | |
| # Set event severity | |
| if agent_confidence > 0.8: | |
| severity = EventSeverity.CRITICAL | |
| elif agent_confidence > 0.6: | |
| severity = EventSeverity.HIGH | |
| elif agent_confidence > 0.4: | |
| severity = EventSeverity.MEDIUM | |
| else: | |
| severity = EventSeverity.LOW | |
| # Create mutable copy with updated severity | |
| event = event.model_copy(update={'severity': severity}) | |
| # Evaluate healing policies | |
| healing_actions = self.policy_engine.evaluate_policies(event) | |
| # Calculate business impact | |
| business_impact = self.business_calculator.calculate_impact(event) if is_anomaly else None | |
| # Store in vector database for similarity detection | |
| if thread_safe_index is not None and model is not None and is_anomaly: | |
| try: | |
| # FIXED: Non-blocking encoding with ProcessPoolExecutor | |
| analysis_text = agent_analysis.get('recommended_actions', ['No analysis'])[0] | |
| vector_text = f"{component} {latency} {error_rate} {analysis_text}" | |
| # Encode asynchronously | |
| loop = asyncio.get_event_loop() | |
| vec = await loop.run_in_executor( | |
| thread_safe_index._encoder_pool, | |
| model.encode, | |
| [vector_text] | |
| ) | |
| thread_safe_index.add_async(np.array(vec, dtype=np.float32), vector_text) | |
| except Exception as e: | |
| logger.error(f"Error storing vector: {e}", exc_info=True) | |
| # Build comprehensive result | |
| result = { | |
| "timestamp": event.timestamp.isoformat(), | |
| "component": component, | |
| "latency_p99": latency, | |
| "error_rate": error_rate, | |
| "throughput": throughput, | |
| "status": "ANOMALY" if is_anomaly else "NORMAL", | |
| "multi_agent_analysis": agent_analysis, | |
| "healing_actions": [action.value for action in healing_actions], | |
| "business_impact": business_impact, | |
| "severity": event.severity.value, | |
| "similar_incidents_count": thread_safe_index.get_count() if thread_safe_index and is_anomaly else 0, | |
| "processing_metadata": { | |
| "agents_used": agent_analysis.get('agent_metadata', {}).get('participating_agents', []), | |
| "analysis_confidence": agent_analysis.get('incident_summary', {}).get('anomaly_confidence', 0) | |
| } | |
| } | |
| # Store event in history | |
| self.event_store.add(event) | |
| # Update performance metrics | |
| with self._lock: | |
| self.performance_metrics['total_incidents_processed'] += 1 | |
| self.performance_metrics['multi_agent_analyses'] += 1 | |
| if is_anomaly: | |
| self.performance_metrics['anomalies_detected'] += 1 | |
| logger.info(f"Event processed: {result['status']} with {result['severity']} severity") | |
| # Enhance with Claude AI reasoning (optional layer) | |
| try: | |
| result = await self.enhance_with_claude(event, result) | |
| except Exception as e: | |
| logger.error(f"Failed to enhance with Claude: {e}") | |
| # Continue without enhancement | |
| return result | |
| async def enhance_with_claude( | |
| self, | |
| event: ReliabilityEvent, | |
| agent_results: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Enhance agent results with Claude AI reasoning | |
| This is a NON-INVASIVE layer - all existing logic stays intact. | |
| If Claude fails, original results are returned unchanged. | |
| """ | |
| try: | |
| # Build comprehensive context for Claude | |
| context_parts = [] | |
| # Add event summary | |
| context_parts.append("INCIDENT SUMMARY:") | |
| context_parts.append(f"Component: {event.component}") | |
| context_parts.append(f"Timestamp: {event.timestamp.isoformat()}") | |
| context_parts.append(f"Severity: {event.severity.value}") | |
| context_parts.append("") | |
| # Add metrics | |
| context_parts.append("METRICS:") | |
| context_parts.append(f"โข Latency P99: {event.latency_p99}ms") | |
| context_parts.append(f"โข Error Rate: {event.error_rate:.1%}") | |
| context_parts.append(f"โข Throughput: {event.throughput} req/s") | |
| if event.cpu_util: | |
| context_parts.append(f"โข CPU: {event.cpu_util:.1%}") | |
| if event.memory_util: | |
| context_parts.append(f"โข Memory: {event.memory_util:.1%}") | |
| context_parts.append("") | |
| # Add agent findings | |
| if agent_results: | |
| context_parts.append("AGENT ANALYSIS:") | |
| if 'multi_agent_analysis' in agent_results: | |
| analysis = agent_results['multi_agent_analysis'] | |
| context_parts.append(json.dumps(analysis, indent=2)) | |
| elif 'incident_summary' in agent_results: | |
| context_parts.append(json.dumps(agent_results['incident_summary'], indent=2)) | |
| context = "\n".join(context_parts) | |
| # Create prompt for Claude | |
| prompt = f"""{context} | |
| TASK: Provide an executive summary synthesizing all agent analyses. | |
| Include: | |
| 1. Concise incident description | |
| 2. Most likely root cause | |
| 3. Single best recovery action | |
| 4. Estimated impact and recovery time | |
| Be specific and actionable.""" | |
| system_prompt = """You are a senior Site Reliability Engineer synthesizing | |
| multiple AI agent analyses into clear, actionable guidance for incident response. | |
| Focus on clarity, accuracy, and decisive recommendations.""" | |
| # Get Claude's synthesis | |
| logger.info("Requesting Claude synthesis of agent results") | |
| claude_synthesis = claude_adapter.generate_completion( | |
| prompt=prompt, | |
| system_prompt=system_prompt | |
| ) | |
| # Add Claude's insights to results (non-destructive) | |
| agent_results['claude_synthesis'] = { | |
| 'summary': claude_synthesis, | |
| 'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| 'source': 'claude-opus-4' | |
| } | |
| logger.info("โ Claude synthesis added to results") | |
| return agent_results | |
| except Exception as e: | |
| logger.error(f"Claude enhancement failed: {e}", exc_info=True) | |
| # Return original results unchanged - system still works! | |
| return agent_results | |
| # === Initialize Engine (with dependency injection) === | |
| enhanced_engine = EnhancedReliabilityEngine() | |
| # === Rate Limiting === | |
| class RateLimiter: | |
| """Simple rate limiter for request throttling""" | |
| def __init__(self, max_per_minute: int = Constants.MAX_REQUESTS_PER_MINUTE): | |
| self.max_per_minute = max_per_minute | |
| self.requests: deque = deque(maxlen=max_per_minute) | |
| self._lock = threading.RLock() | |
| def is_allowed(self) -> Tuple[bool, str]: | |
| """Check if request is allowed""" | |
| with self._lock: | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| # Remove requests older than 1 minute | |
| one_minute_ago = now - datetime.timedelta(minutes=1) | |
| while self.requests and self.requests[0] < one_minute_ago: | |
| self.requests.popleft() | |
| # Check rate limit | |
| if len(self.requests) >= self.max_per_minute: | |
| return False, f"Rate limit exceeded: {self.max_per_minute} requests/minute" | |
| # Add current request | |
| self.requests.append(now) | |
| return True, "" | |
| rate_limiter = RateLimiter() | |
| # === Gradio UI === | |
| def create_enhanced_ui(): | |
| """ | |
| Create the comprehensive Gradio UI for the reliability framework | |
| FIXED: Uses native async handlers (no event loop creation) | |
| FIXED: Rate limiting on all endpoints | |
| """ | |
| with gr.Blocks(title="๐ง Enterprise Agentic Reliability Framework", theme="soft") as demo: | |
| gr.Markdown(""" | |
| # ๐ง Enterprise Agentic Reliability Framework | |
| **Multi-Agent AI System for Production Reliability** | |
| *Specialized AI agents working together to detect, diagnose, predict, and heal system issues* | |
| ๐ **Security Patched** | โก **Performance Optimized** | ๐งช **Production Ready** | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ๐ Telemetry Input") | |
| component = gr.Dropdown( | |
| choices=["api-service", "auth-service", "payment-service", "database", "cache-service"], | |
| value="api-service", | |
| label="Component", | |
| info="Select the service being monitored" | |
| ) | |
| latency = gr.Slider( | |
| minimum=10, maximum=1000, value=100, step=1, | |
| label="Latency P99 (ms)", | |
| info=f"Alert threshold: >{Constants.LATENCY_WARNING}ms (adaptive)" | |
| ) | |
| error_rate = gr.Slider( | |
| minimum=0, maximum=0.5, value=0.02, step=0.001, | |
| label="Error Rate", | |
| info=f"Alert threshold: >{Constants.ERROR_RATE_WARNING}" | |
| ) | |
| throughput = gr.Number( | |
| value=1000, | |
| label="Throughput (req/sec)", | |
| info="Current request rate" | |
| ) | |
| cpu_util = gr.Slider( | |
| minimum=0, maximum=1, value=0.4, step=0.01, | |
| label="CPU Utilization", | |
| info="0.0 - 1.0 scale" | |
| ) | |
| memory_util = gr.Slider( | |
| minimum=0, maximum=1, value=0.3, step=0.01, | |
| label="Memory Utilization", | |
| info="0.0 - 1.0 scale" | |
| ) | |
| submit_btn = gr.Button("๐ Submit Telemetry Event", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### ๐ Multi-Agent Analysis") | |
| output_text = gr.Textbox( | |
| label="Agent Synthesis", | |
| placeholder="AI agents are analyzing...", | |
| lines=6 | |
| ) | |
| with gr.Accordion("๐ค Agent Specialists Analysis", open=False): | |
| gr.Markdown(""" | |
| **Specialized AI Agents:** | |
| - ๐ต๏ธ **Detective**: Anomaly detection & pattern recognition | |
| - ๐ **Diagnostician**: Root cause analysis & investigation | |
| - ๐ฎ **Predictive**: Future risk forecasting & trend analysis | |
| """) | |
| agent_insights = gr.JSON( | |
| label="Detailed Agent Findings", | |
| value={} | |
| ) | |
| with gr.Accordion("๐ฎ Predictive Analytics & Forecasting", open=False): | |
| gr.Markdown(""" | |
| **Future Risk Forecasting:** | |
| - ๐ Latency trends and thresholds | |
| - ๐จ Error rate predictions | |
| - ๐ฅ Resource utilization forecasts | |
| - โฐ Time-to-failure estimates | |
| """) | |
| predictive_insights = gr.JSON( | |
| label="Predictive Forecasts", | |
| value={} | |
| ) | |
| with gr.Accordion("๐ค Claude AI Synthesis", open=True): | |
| gr.Markdown(""" | |
| **Claude Opus 4.5 Executive Summary:** | |
| - ๐ Incident synthesis from all agents | |
| - ๐ฏ Root cause identification | |
| - ๐ก Recommended recovery actions | |
| - โฐ Impact and recovery time estimates | |
| """) | |
| claude_output = gr.Markdown( | |
| value="*Claude AI synthesis will appear here after incident analysis*", | |
| label="AI Executive Summary" | |
| ) | |
| gr.Markdown("### ๐ Recent Events (Last 15)") | |
| events_table = gr.Dataframe( | |
| headers=["Timestamp", "Component", "Latency", "Error Rate", "Throughput", "Severity", "Analysis"], | |
| label="Event History", | |
| wrap=True, | |
| ) | |
| with gr.Accordion("โน๏ธ Framework Capabilities", open=False): | |
| gr.Markdown(""" | |
| - **๐ค Multi-Agent AI**: Specialized agents for detection, diagnosis, prediction, and healing | |
| - **๐ฎ Predictive Analytics**: Forecast future risks and performance degradation | |
| - **๐ง Policy-Based Healing**: Automated recovery actions based on severity and context | |
| - **๐ฐ Business Impact**: Revenue and user impact quantification | |
| - **๐ฏ Adaptive Detection**: ML-powered thresholds that learn from your environment | |
| - **๐ Vector Memory**: FAISS-based incident memory for similarity detection | |
| - **โก Production Ready**: Circuit breakers, cooldowns, thread safety, and enterprise features | |
| - **๐ Security Patched**: All critical CVEs fixed (Gradio 5.50.0+, Requests 2.32.5+) | |
| """) | |
| with gr.Accordion("๐ง Healing Policies", open=False): | |
| policy_info = [] | |
| for policy in enhanced_engine.policy_engine.policies: | |
| if policy.enabled: | |
| actions = ", ".join([action.value for action in policy.actions]) | |
| policy_info.append( | |
| f"**{policy.name}** (Priority {policy.priority}): {actions}\n" | |
| f" - Cooldown: {policy.cool_down_seconds}s\n" | |
| f" - Max executions: {policy.max_executions_per_hour}/hour" | |
| ) | |
| gr.Markdown("\n\n".join(policy_info)) | |
| # FIXED: Native async handler (no event loop creation needed) | |
| async def submit_event_enhanced_async( | |
| component, latency, error_rate, throughput, cpu_util, memory_util | |
| ): | |
| """ | |
| Async event handler - uses Gradio's native async support | |
| CRITICAL FIX: No event loop creation - Gradio handles this | |
| FIXED: Rate limiting added | |
| FIXED: Comprehensive error handling | |
| """ | |
| try: | |
| # Rate limiting check | |
| allowed, rate_msg = rate_limiter.is_allowed() | |
| if not allowed: | |
| logger.warning(f"Rate limit exceeded") | |
| return rate_msg, {}, {}, "*Rate limit exceeded*", gr.Dataframe(value=[]) | |
| # Type conversion | |
| try: | |
| latency = float(latency) | |
| error_rate = float(error_rate) | |
| throughput = float(throughput) if throughput else 1000 | |
| cpu_util = float(cpu_util) if cpu_util else None | |
| memory_util = float(memory_util) if memory_util else None | |
| except (ValueError, TypeError) as e: | |
| error_msg = f"โ Invalid input types: {str(e)}" | |
| logger.warning(error_msg) | |
| return error_msg, {}, {}, "*Invalid input type*", gr.Dataframe(value=[]) | |
| # Input validation | |
| is_valid, error_msg = validate_inputs( | |
| latency, error_rate, throughput, cpu_util, memory_util | |
| ) | |
| if not is_valid: | |
| logger.warning(f"Invalid input: {error_msg}") | |
| return error_msg, {}, {}, "*Validation failed*", gr.Dataframe(value=[]) | |
| # FIXED: Direct async call - no event loop creation needed | |
| result = await enhanced_engine.process_event_enhanced( | |
| component, latency, error_rate, throughput, cpu_util, memory_util | |
| ) | |
| # Handle errors | |
| if 'error' in result: | |
| return f"โ {result['error']}", {}, {}, "*Error occurred*", gr.Dataframe(value=[]) | |
| # Build table data (THREAD-SAFE) | |
| table_data = [] | |
| for event in enhanced_engine.event_store.get_recent(15): | |
| table_data.append([ | |
| event.timestamp.strftime("%Y-%m-%d %H:%M:%S"), | |
| event.component, | |
| f"{event.latency_p99:.0f}ms", | |
| f"{event.error_rate:.3f}", | |
| f"{event.throughput:.0f}", | |
| event.severity.value.upper(), | |
| "Multi-agent analysis" | |
| ]) | |
| # Format output message | |
| status_emoji = "๐จ" if result["status"] == "ANOMALY" else "โ " | |
| output_msg = f"{status_emoji} **{result['status']}**\n" | |
| if "multi_agent_analysis" in result: | |
| analysis = result["multi_agent_analysis"] | |
| confidence = analysis.get('incident_summary', {}).get('anomaly_confidence', 0) | |
| output_msg += f"๐ฏ **Confidence**: {confidence*100:.1f}%\n" | |
| predictive_data = analysis.get('predictive_insights', {}) | |
| if predictive_data.get('critical_risk_count', 0) > 0: | |
| output_msg += f"๐ฎ **PREDICTIVE**: {predictive_data['critical_risk_count']} critical risks forecast\n" | |
| if analysis.get('recommended_actions'): | |
| actions_preview = ', '.join(analysis['recommended_actions'][:2]) | |
| output_msg += f"๐ก **Top Insights**: {actions_preview}\n" | |
| if result.get("business_impact"): | |
| impact = result["business_impact"] | |
| output_msg += ( | |
| f"๐ฐ **Business Impact**: \${impact['revenue_loss_estimate']:.2f} | " | |
| f"๐ฅ {impact['affected_users_estimate']} users | " | |
| f"๐จ {impact['severity_level']}\n" | |
| ) | |
| if result.get("healing_actions") and result["healing_actions"] != ["no_action"]: | |
| actions = ", ".join(result["healing_actions"]) | |
| output_msg += f"๐ง **Auto-Actions**: {actions}" | |
| agent_insights_data = result.get("multi_agent_analysis", {}) | |
| predictive_insights_data = agent_insights_data.get('predictive_insights', {}) | |
| # Extract Claude synthesis for display | |
| claude_synthesis = result.get('claude_synthesis', {}) | |
| claude_text = claude_synthesis.get('summary', '*No Claude synthesis available*') | |
| # Format Claude output beautifully | |
| claude_display = f""" | |
| ### ๐ค Claude Opus 4.5 Executive Analysis | |
| {claude_text} | |
| --- | |
| *Generated: {claude_synthesis.get('timestamp', 'N/A')}* | |
| *Model: {claude_synthesis.get('source', 'claude-opus-4')}* | |
| """ | |
| return ( | |
| output_msg, | |
| agent_insights_data, | |
| predictive_insights_data, | |
| claude_display, | |
| gr.Dataframe( | |
| headers=["Timestamp", "Component", "Latency", "Error Rate", "Throughput", "Severity", "Analysis"], | |
| value=table_data, | |
| wrap=True | |
| ) | |
| ) | |
| except Exception as e: | |
| error_msg = f"โ Error processing event: {str(e)}" | |
| logger.error(error_msg, exc_info=True) | |
| return error_msg, {}, {}, "*Processing error*", gr.Dataframe(value=[]) | |
| # FIXED: Use async handler directly | |
| submit_btn.click( | |
| fn=submit_event_enhanced_async, | |
| inputs=[component, latency, error_rate, throughput, cpu_util, memory_util], | |
| outputs=[output_text, agent_insights, predictive_insights, claude_output, events_table] | |
| ) | |
| return demo | |
| # === Main Entry Point === | |
| if __name__ == "__main__": | |
| logger.info("=" * 80) | |
| logger.info("Starting Enterprise Agentic Reliability Framework (PATCHED VERSION)") | |
| logger.info("=" * 80) | |
| logger.info(f"Python version: {os.sys.version}") | |
| logger.info(f"Total events in history: {enhanced_engine.event_store.count()}") | |
| logger.info(f"Vector index size: {thread_safe_index.get_count() if thread_safe_index else 0}") | |
| logger.info(f"Agents initialized: {len(enhanced_engine.orchestrator.agents)}") | |
| logger.info(f"Policies loaded: {len(enhanced_engine.policy_engine.policies)}") | |
| logger.info(f"Configuration: HF_TOKEN={'SET' if config.HF_TOKEN else 'NOT SET'}") | |
| logger.info(f"Rate limit: {Constants.MAX_REQUESTS_PER_MINUTE} requests/minute") | |
| logger.info("=" * 80) | |
| try: | |
| demo = create_enhanced_ui() | |
| logger.info("Launching Gradio UI on 0.0.0.0:7860...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) | |
| except KeyboardInterrupt: | |
| logger.info("Received shutdown signal...") | |
| except Exception as e: | |
| logger.error(f"Application error: {e}", exc_info=True) | |
| finally: | |
| # Graceful shutdown | |
| logger.info("Shutting down gracefully...") | |
| if thread_safe_index: | |
| logger.info("Saving pending vectors before shutdown...") | |
| thread_safe_index.shutdown() | |
| logger.info("=" * 80) | |
| logger.info("Application shutdown complete") | |
| logger.info("=" * 80) |