Spaces:
Build error
Build error
| """ | |
| Pytest configuration and shared fixtures | |
| """ | |
| import pytest | |
| import tempfile | |
| import os | |
| from datetime import datetime, timezone | |
| from models import ReliabilityEvent, EventSeverity, HealingPolicy, HealingAction, PolicyCondition | |
| from healing_policies import PolicyEngine | |
| from app import ( | |
| ThreadSafeEventStore, | |
| AdvancedAnomalyDetector, | |
| BusinessImpactCalculator, | |
| SimplePredictiveEngine, | |
| EnhancedReliabilityEngine, | |
| OrchestrationManager | |
| ) | |
| def sample_event(): | |
| """Create a sample reliability event for testing""" | |
| return ReliabilityEvent( | |
| component="test-service", | |
| latency_p99=250.0, | |
| error_rate=0.08, | |
| throughput=1000.0, | |
| cpu_util=0.75, | |
| memory_util=0.65, | |
| severity=EventSeverity.MEDIUM | |
| ) | |
| def critical_event(): | |
| """Create a critical reliability event""" | |
| return ReliabilityEvent( | |
| component="critical-service", | |
| latency_p99=600.0, | |
| error_rate=0.35, | |
| throughput=500.0, | |
| cpu_util=0.95, | |
| memory_util=0.92, | |
| severity=EventSeverity.CRITICAL | |
| ) | |
| def normal_event(): | |
| """Create a normal (healthy) reliability event""" | |
| return ReliabilityEvent( | |
| component="healthy-service", | |
| latency_p99=80.0, | |
| error_rate=0.01, | |
| throughput=2000.0, | |
| cpu_util=0.40, | |
| memory_util=0.35, | |
| severity=EventSeverity.LOW | |
| ) | |
| def sample_policy(): | |
| """Create a sample healing policy""" | |
| return HealingPolicy( | |
| name="test_policy", | |
| conditions=[ | |
| PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0) | |
| ], | |
| actions=[HealingAction.RESTART_CONTAINER], | |
| priority=2, | |
| cool_down_seconds=60, | |
| max_executions_per_hour=5 | |
| ) | |
| def policy_engine(): | |
| """Create a fresh policy engine for testing""" | |
| return PolicyEngine(max_cooldown_history=100, max_execution_history=100) | |
| def event_store(): | |
| """Create a fresh event store""" | |
| return ThreadSafeEventStore(max_size=100) | |
| def anomaly_detector(): | |
| """Create a fresh anomaly detector""" | |
| return AdvancedAnomalyDetector() | |
| def business_calculator(): | |
| """Create a business impact calculator""" | |
| return BusinessImpactCalculator() | |
| def predictive_engine(): | |
| """Create a predictive engine""" | |
| return SimplePredictiveEngine(history_window=20) | |
| def temp_dir(): | |
| """Create a temporary directory for test files""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| yield tmpdir | |
| def mock_faiss_index(temp_dir): | |
| """Create a mock FAISS index for testing""" | |
| # This would require FAISS to be installed | |
| # For now, return None to allow tests to skip FAISS operations | |
| return None | |
| async def reliability_engine( | |
| policy_engine, | |
| event_store, | |
| anomaly_detector, | |
| business_calculator | |
| ): | |
| """Create a fully initialized reliability engine""" | |
| orchestrator = OrchestrationManager() | |
| engine = EnhancedReliabilityEngine( | |
| orchestrator=orchestrator, | |
| policy_engine=policy_engine, | |
| event_store=event_store, | |
| anomaly_detector=anomaly_detector, | |
| business_calculator=business_calculator | |
| ) | |
| return engine |