Spaces:
Runtime error
Runtime error
| """ | |
| Integration tests for comprehensive error handling and logging. | |
| Tests the complete error handling flow across all chat agent components | |
| including circuit breaker integration, fallback responses, and logging. | |
| """ | |
| import pytest | |
| import time | |
| import json | |
| import tempfile | |
| import os | |
| from unittest.mock import Mock, patch, MagicMock | |
| from pathlib import Path | |
| from chat_agent.services.groq_client import GroqClient, GroqError, GroqRateLimitError | |
| from chat_agent.services.chat_agent import ChatAgent | |
| from chat_agent.services.language_context import LanguageContextManager | |
| from chat_agent.services.session_manager import SessionManager | |
| from chat_agent.services.chat_history import ChatHistoryManager | |
| from chat_agent.utils.error_handler import ChatAgentError, ErrorCategory, ErrorSeverity | |
| from chat_agent.utils.circuit_breaker import CircuitState, get_circuit_breaker_manager | |
| from chat_agent.utils.logging_config import setup_logging | |
| class TestGroqClientErrorHandling: | |
| """Test error handling in Groq client with circuit breaker.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| # Setup logging to temporary directory | |
| self.temp_dir = tempfile.mkdtemp() | |
| os.environ['LOG_LEVEL'] = 'DEBUG' | |
| # Mock Groq API key | |
| os.environ['GROQ_API_KEY'] = 'test-api-key' | |
| # Create Groq client | |
| self.groq_client = GroqClient() | |
| def teardown_method(self): | |
| """Clean up test fixtures.""" | |
| # Clean up temporary directory | |
| import shutil | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| def test_api_rate_limit_with_circuit_breaker(self, mock_chatgroq, mock_groq): | |
| """Test rate limit handling with circuit breaker protection.""" | |
| from chat_agent.services.groq_client import ChatMessage, LanguageContext | |
| # Mock rate limit error | |
| mock_chatgroq.return_value.invoke.side_effect = Exception("Rate limit exceeded (429)") | |
| # Create test data | |
| prompt = "Test prompt" | |
| chat_history = [] | |
| language_context = LanguageContext("python", "Test template", "python") | |
| # First few calls should trigger rate limit errors | |
| for i in range(3): | |
| try: | |
| response = self.groq_client.generate_response(prompt, chat_history, language_context) | |
| # Should return fallback response instead of raising | |
| assert "high demand" in response.lower() | |
| except Exception as e: | |
| # Some calls might still raise before circuit opens | |
| assert "rate limit" in str(e).lower() | |
| # Circuit breaker should be open now | |
| circuit_breaker = self.groq_client.circuit_breaker | |
| # Note: Circuit might not be open yet due to fallback handling | |
| # This tests the integration rather than exact circuit state | |
| def test_api_authentication_error(self, mock_chatgroq, mock_groq): | |
| """Test authentication error handling.""" | |
| from chat_agent.services.groq_client import ChatMessage, LanguageContext | |
| # Mock authentication error | |
| mock_chatgroq.return_value.invoke.side_effect = Exception("Authentication failed (401)") | |
| # Create test data | |
| prompt = "Test prompt" | |
| chat_history = [] | |
| language_context = LanguageContext("python", "Test template", "python") | |
| # Should return fallback response for authentication errors | |
| response = self.groq_client.generate_response(prompt, chat_history, language_context) | |
| assert isinstance(response, str) | |
| assert len(response) > 0 | |
| def test_streaming_with_error_handling(self, mock_groq): | |
| """Test streaming response with error handling.""" | |
| from chat_agent.services.groq_client import ChatMessage, LanguageContext | |
| # Mock streaming response that fails | |
| mock_response = Mock() | |
| mock_response.__iter__ = Mock(side_effect=Exception("Network error")) | |
| mock_groq.return_value.chat.completions.create.return_value = mock_response | |
| # Create test data | |
| prompt = "Test prompt" | |
| chat_history = [] | |
| language_context = LanguageContext("python", "Test template", "python") | |
| # Should yield fallback response chunks | |
| chunks = list(self.groq_client.stream_response(prompt, chat_history, language_context)) | |
| assert len(chunks) > 0 | |
| full_response = "".join(chunks) | |
| assert len(full_response) > 0 | |
| # Should contain fallback content | |
| assert any(word in full_response.lower() for word in ['programming', 'tips', 'try']) | |
| class TestChatAgentErrorHandling: | |
| """Test error handling in chat agent service.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| # Setup logging | |
| self.loggers = setup_logging("test_chat_agent", "DEBUG") | |
| # Create mock dependencies | |
| self.mock_groq_client = Mock(spec=GroqClient) | |
| self.mock_language_manager = Mock(spec=LanguageContextManager) | |
| self.mock_session_manager = Mock(spec=SessionManager) | |
| self.mock_history_manager = Mock(spec=ChatHistoryManager) | |
| # Create chat agent | |
| self.chat_agent = ChatAgent( | |
| self.mock_groq_client, | |
| self.mock_language_manager, | |
| self.mock_session_manager, | |
| self.mock_history_manager | |
| ) | |
| def test_session_error_handling(self): | |
| """Test handling of session-related errors.""" | |
| from chat_agent.services.session_manager import SessionNotFoundError | |
| # Mock session not found error | |
| self.mock_session_manager.get_session.side_effect = SessionNotFoundError("Session not found") | |
| # Should handle session error gracefully | |
| with pytest.raises(ChatAgentError) as exc_info: | |
| self.chat_agent.process_message("invalid-session", "Test message") | |
| # Error should be properly classified | |
| error = exc_info.value | |
| assert isinstance(error, ChatAgentError) | |
| def test_chat_history_error_handling(self): | |
| """Test handling of chat history errors.""" | |
| from chat_agent.services.chat_history import ChatHistoryError | |
| from chat_agent.models.chat_session import ChatSession | |
| # Mock successful session validation | |
| mock_session = Mock(spec=ChatSession) | |
| mock_session.language = "python" | |
| self.mock_session_manager.get_session.return_value = mock_session | |
| self.mock_language_manager.get_language.return_value = "python" | |
| # Mock chat history error | |
| self.mock_history_manager.store_message.side_effect = ChatHistoryError("Database error") | |
| # Should handle history error gracefully | |
| with pytest.raises(ChatAgentError) as exc_info: | |
| self.chat_agent.process_message("test-session", "Test message") | |
| error = exc_info.value | |
| assert isinstance(error, ChatAgentError) | |
| def test_groq_client_error_handling(self): | |
| """Test handling of Groq client errors.""" | |
| from chat_agent.models.chat_session import ChatSession | |
| from chat_agent.models.message import Message | |
| # Mock successful setup | |
| mock_session = Mock(spec=ChatSession) | |
| mock_session.language = "python" | |
| self.mock_session_manager.get_session.return_value = mock_session | |
| self.mock_language_manager.get_language.return_value = "python" | |
| self.mock_language_manager.get_language_prompt_template.return_value = "Test template" | |
| mock_message = Mock(spec=Message) | |
| mock_message.id = "msg-123" | |
| self.mock_history_manager.store_message.return_value = mock_message | |
| self.mock_history_manager.get_recent_history.return_value = [] | |
| # Mock Groq client error | |
| self.mock_groq_client.generate_response.side_effect = GroqRateLimitError("Rate limit exceeded") | |
| # Should handle Groq error and return fallback | |
| result = self.chat_agent.process_message("test-session", "Test message") | |
| # Should still return a valid response structure | |
| assert isinstance(result, dict) | |
| assert 'response' in result | |
| assert 'session_id' in result | |
| def test_language_switch_error_handling(self): | |
| """Test error handling during language switching.""" | |
| from chat_agent.models.chat_session import ChatSession | |
| # Mock session | |
| mock_session = Mock(spec=ChatSession) | |
| mock_session.language = "python" | |
| self.mock_session_manager.get_session.return_value = mock_session | |
| # Mock language validation failure | |
| self.mock_language_manager.validate_language.return_value = False | |
| # Should handle invalid language gracefully | |
| with pytest.raises(ChatAgentError) as exc_info: | |
| self.chat_agent.switch_language("test-session", "invalid-language") | |
| error = exc_info.value | |
| assert "Unsupported language" in str(error) | |
| class TestCircuitBreakerIntegration: | |
| """Test circuit breaker integration across services.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.circuit_manager = get_circuit_breaker_manager() | |
| def test_multiple_service_circuit_breakers(self): | |
| """Test circuit breakers across multiple services.""" | |
| from chat_agent.utils.circuit_breaker import CircuitBreakerConfig | |
| # Create circuit breakers for different services | |
| groq_breaker = self.circuit_manager.create_breaker( | |
| "groq_service", | |
| CircuitBreakerConfig(failure_threshold=3, recovery_timeout=1) | |
| ) | |
| db_breaker = self.circuit_manager.create_breaker( | |
| "database_service", | |
| CircuitBreakerConfig(failure_threshold=2, recovery_timeout=2) | |
| ) | |
| # Test independent operation | |
| def groq_function(): | |
| return "groq_response" | |
| def db_function(): | |
| raise Exception("Database error") | |
| # Groq should work | |
| result = groq_breaker.call(groq_function) | |
| assert result == "groq_response" | |
| assert groq_breaker.is_closed | |
| # Database should fail and eventually open circuit | |
| for i in range(2): | |
| with pytest.raises(Exception): | |
| db_breaker.call(db_function) | |
| assert db_breaker.is_open | |
| assert groq_breaker.is_closed # Should remain independent | |
| def test_circuit_breaker_statistics(self): | |
| """Test circuit breaker statistics collection.""" | |
| from chat_agent.utils.circuit_breaker import CircuitBreakerConfig | |
| # Create test breaker | |
| breaker = self.circuit_manager.create_breaker( | |
| "stats_test", | |
| CircuitBreakerConfig(failure_threshold=2) | |
| ) | |
| def success_function(): | |
| return "success" | |
| def failure_function(): | |
| raise ValueError("failure") | |
| # Execute mixed calls | |
| breaker.call(success_function) | |
| try: | |
| breaker.call(failure_function) | |
| except ValueError: | |
| pass | |
| breaker.call(success_function) | |
| # Check statistics | |
| stats = breaker.get_stats() | |
| assert stats.total_requests == 3 | |
| assert stats.total_successes == 2 | |
| assert stats.total_failures == 1 | |
| # Check manager statistics | |
| all_stats = self.circuit_manager.get_all_stats() | |
| assert "stats_test" in all_stats | |
| assert all_stats["stats_test"].total_requests == 3 | |
| class TestLoggingIntegration: | |
| """Test logging integration across components.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| self.temp_dir = tempfile.mkdtemp() | |
| # Setup logging with temporary directory | |
| with patch('chat_agent.utils.logging_config.Path') as mock_path: | |
| mock_path.return_value.mkdir = Mock() | |
| mock_path.return_value.__truediv__ = lambda self, other: Path(self.temp_dir) / other | |
| self.loggers = setup_logging("test_integration", "DEBUG") | |
| def teardown_method(self): | |
| """Clean up test fixtures.""" | |
| import shutil | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| def test_structured_error_logging(self): | |
| """Test structured error logging across components.""" | |
| error_logger = self.loggers['error'] | |
| # Create test error with context | |
| error = ChatAgentError( | |
| message="Test integration error", | |
| category=ErrorCategory.API_ERROR, | |
| severity=ErrorSeverity.HIGH, | |
| context={ | |
| 'session_id': 'test-session-123', | |
| 'operation': 'process_message', | |
| 'duration': 2.5 | |
| } | |
| ) | |
| # Log error with extra context | |
| error_logger.error("Integration test error", extra={ | |
| 'error_code': error.error_code, | |
| 'category': error.category.value, | |
| 'severity': error.severity.value, | |
| 'context': error.context, | |
| 'session_id': 'test-session-123' | |
| }) | |
| # Verify logging occurred (handlers would write to files in real scenario) | |
| assert len(error_logger.handlers) > 0 | |
| def test_performance_logging_integration(self): | |
| """Test performance logging integration.""" | |
| from chat_agent.utils.logging_config import get_performance_logger | |
| perf_logger = get_performance_logger('integration_test') | |
| # Log various operations | |
| perf_logger.log_operation("test_operation", 1.5, { | |
| 'session_id': 'test-session', | |
| 'language': 'python', | |
| 'message_length': 100 | |
| }) | |
| perf_logger.log_api_call("/api/chat", "POST", 200, 0.8, { | |
| 'session_id': 'test-session' | |
| }) | |
| # Verify performance logger is working | |
| assert perf_logger.logger.name.endswith('integration_test') | |
| class TestEndToEndErrorScenarios: | |
| """Test end-to-end error scenarios.""" | |
| def setup_method(self): | |
| """Set up test fixtures.""" | |
| # Setup comprehensive logging | |
| self.loggers = setup_logging("e2e_test", "DEBUG") | |
| # Create realistic mock setup | |
| self.setup_realistic_mocks() | |
| def setup_realistic_mocks(self): | |
| """Setup realistic mocks for end-to-end testing.""" | |
| # Mock database and Redis connections | |
| self.mock_db = Mock() | |
| self.mock_redis = Mock() | |
| # Mock services with realistic behavior | |
| self.mock_groq_client = Mock(spec=GroqClient) | |
| self.mock_language_manager = Mock(spec=LanguageContextManager) | |
| self.mock_session_manager = Mock(spec=SessionManager) | |
| self.mock_history_manager = Mock(spec=ChatHistoryManager) | |
| def test_complete_service_failure_scenario(self): | |
| """Test complete service failure with graceful degradation.""" | |
| from chat_agent.models.chat_session import ChatSession | |
| # Setup session that exists | |
| mock_session = Mock(spec=ChatSession) | |
| mock_session.language = "python" | |
| self.mock_session_manager.get_session.return_value = mock_session | |
| self.mock_language_manager.get_language.return_value = "python" | |
| # All services fail | |
| self.mock_history_manager.store_message.side_effect = Exception("Database down") | |
| self.mock_groq_client.generate_response.side_effect = Exception("API down") | |
| # Create chat agent | |
| chat_agent = ChatAgent( | |
| self.mock_groq_client, | |
| self.mock_language_manager, | |
| self.mock_session_manager, | |
| self.mock_history_manager | |
| ) | |
| # Should handle complete failure gracefully | |
| with pytest.raises(ChatAgentError): | |
| chat_agent.process_message("test-session", "Test message") | |
| def test_partial_service_failure_scenario(self): | |
| """Test partial service failure with continued operation.""" | |
| from chat_agent.models.chat_session import ChatSession | |
| from chat_agent.models.message import Message | |
| # Setup working session and language services | |
| mock_session = Mock(spec=ChatSession) | |
| mock_session.language = "python" | |
| self.mock_session_manager.get_session.return_value = mock_session | |
| self.mock_language_manager.get_language.return_value = "python" | |
| self.mock_language_manager.get_language_prompt_template.return_value = "Test template" | |
| # History service works | |
| mock_message = Mock(spec=Message) | |
| mock_message.id = "msg-123" | |
| self.mock_history_manager.store_message.return_value = mock_message | |
| self.mock_history_manager.get_recent_history.return_value = [] | |
| # Only Groq service fails | |
| self.mock_groq_client.generate_response.return_value = "Fallback response from circuit breaker" | |
| # Create chat agent | |
| chat_agent = ChatAgent( | |
| self.mock_groq_client, | |
| self.mock_language_manager, | |
| self.mock_session_manager, | |
| self.mock_history_manager | |
| ) | |
| # Should continue operating with fallback | |
| result = chat_agent.process_message("test-session", "Test message") | |
| assert isinstance(result, dict) | |
| assert 'response' in result | |
| assert result['response'] == "Fallback response from circuit breaker" | |
| def test_recovery_after_failure(self): | |
| """Test service recovery after failure.""" | |
| from chat_agent.utils.circuit_breaker import CircuitBreakerConfig | |
| # Create circuit breaker with short recovery time | |
| circuit_manager = get_circuit_breaker_manager() | |
| breaker = circuit_manager.create_breaker( | |
| "recovery_test", | |
| CircuitBreakerConfig( | |
| failure_threshold=2, | |
| recovery_timeout=0.1, # Very short for testing | |
| success_threshold=1 | |
| ) | |
| ) | |
| # Function that fails then succeeds | |
| self.call_count = 0 | |
| def flaky_function(): | |
| self.call_count += 1 | |
| if self.call_count <= 2: | |
| raise Exception("Service temporarily down") | |
| return "Service recovered" | |
| # Cause failures to open circuit | |
| for i in range(2): | |
| with pytest.raises(Exception): | |
| breaker.call(flaky_function) | |
| assert breaker.is_open | |
| # Wait for recovery timeout | |
| time.sleep(0.2) | |
| # Next call should succeed and close circuit | |
| result = breaker.call(flaky_function) | |
| assert result == "Service recovered" | |
| assert breaker.is_closed | |
| if __name__ == '__main__': | |
| pytest.main([__file__]) |