File size: 27,011 Bytes
330b6e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
"""

Performance tests for concurrent user scenarios and load testing.



This module tests the chat agent's performance under various load conditions

including multiple concurrent users, high message throughput, and stress testing.

"""

import asyncio
import time
import threading
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Tuple
import pytest
import requests
from unittest.mock import Mock, patch

from chat_agent.services.chat_agent import ChatAgent
from chat_agent.services.session_manager import SessionManager
from chat_agent.services.chat_history import ChatHistoryManager
from chat_agent.services.cache_service import CacheService
from chat_agent.utils.connection_pool import ConnectionPoolManager


class PerformanceMetrics:
    """Collects and analyzes performance metrics."""
    
    def __init__(self):
        self.response_times = []
        self.error_count = 0
        self.success_count = 0
        self.start_time = None
        self.end_time = None
        self.concurrent_users = 0
        self.messages_per_second = 0
    
    def add_response_time(self, response_time: float):
        """Add response time measurement."""
        self.response_times.append(response_time)
    
    def add_success(self):
        """Record successful operation."""
        self.success_count += 1
    
    def add_error(self):
        """Record failed operation."""
        self.error_count += 1
    
    def start_timing(self):
        """Start timing measurement."""
        self.start_time = time.time()
    
    def end_timing(self):
        """End timing measurement."""
        self.end_time = time.time()
    
    def get_statistics(self) -> Dict[str, Any]:
        """Get performance statistics."""
        if not self.response_times:
            return {
                'total_requests': 0,
                'success_rate': 0,
                'error_rate': 0
            }
        
        total_time = self.end_time - self.start_time if self.end_time and self.start_time else 0
        total_requests = self.success_count + self.error_count
        
        return {
            'total_requests': total_requests,
            'success_count': self.success_count,
            'error_count': self.error_count,
            'success_rate': (self.success_count / total_requests * 100) if total_requests > 0 else 0,
            'error_rate': (self.error_count / total_requests * 100) if total_requests > 0 else 0,
            'avg_response_time': statistics.mean(self.response_times),
            'median_response_time': statistics.median(self.response_times),
            'min_response_time': min(self.response_times),
            'max_response_time': max(self.response_times),
            'p95_response_time': self._percentile(self.response_times, 95),
            'p99_response_time': self._percentile(self.response_times, 99),
            'total_duration': total_time,
            'requests_per_second': total_requests / total_time if total_time > 0 else 0,
            'concurrent_users': self.concurrent_users
        }
    
    def _percentile(self, data: List[float], percentile: int) -> float:
        """Calculate percentile of response times."""
        if not data:
            return 0
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]


class ConcurrentUserSimulator:
    """Simulates concurrent users for load testing."""
    
    def __init__(self, base_url: str = "http://localhost:5000"):
        self.base_url = base_url
        self.session = requests.Session()
        self.metrics = PerformanceMetrics()
    
    def simulate_user_session(self, user_id: str, num_messages: int = 10) -> Dict[str, Any]:
        """

        Simulate a single user chat session.

        

        Args:

            user_id: Unique user identifier

            num_messages: Number of messages to send

            

        Returns:

            Session metrics

        """
        session_metrics = {
            'user_id': user_id,
            'messages_sent': 0,
            'messages_failed': 0,
            'response_times': [],
            'session_duration': 0
        }
        
        session_start = time.time()
        
        try:
            # Create session
            session_data = {
                'language': 'python',
                'metadata': {'test_user': user_id}
            }
            
            start_time = time.time()
            response = self.session.post(
                f"{self.base_url}/api/v1/chat/sessions",
                json=session_data,
                headers={'Authorization': f'Bearer test-token-{user_id}'},
                timeout=30
            )
            response_time = time.time() - start_time
            
            if response.status_code != 201:
                session_metrics['session_creation_failed'] = True
                return session_metrics
            
            session_id = response.json()['session_id']
            
            # Send messages
            for i in range(num_messages):
                message_data = {
                    'content': f'Test message {i+1} from user {user_id}',
                    'language': 'python'
                }
                
                start_time = time.time()
                try:
                    response = self.session.post(
                        f"{self.base_url}/api/v1/chat/sessions/{session_id}/messages",
                        json=message_data,
                        headers={'Authorization': f'Bearer test-token-{user_id}'},
                        timeout=30
                    )
                    response_time = time.time() - start_time
                    
                    if response.status_code == 200:
                        session_metrics['messages_sent'] += 1
                        session_metrics['response_times'].append(response_time)
                        self.metrics.add_response_time(response_time)
                        self.metrics.add_success()
                    else:
                        session_metrics['messages_failed'] += 1
                        self.metrics.add_error()
                
                except requests.RequestException as e:
                    session_metrics['messages_failed'] += 1
                    self.metrics.add_error()
                
                # Small delay between messages
                time.sleep(0.1)
            
            # Clean up session
            self.session.delete(
                f"{self.base_url}/api/v1/chat/sessions/{session_id}",
                headers={'Authorization': f'Bearer test-token-{user_id}'}
            )
            
        except Exception as e:
            session_metrics['session_error'] = str(e)
        
        session_metrics['session_duration'] = time.time() - session_start
        return session_metrics
    
    def run_concurrent_test(self, num_users: int, messages_per_user: int = 10) -> Dict[str, Any]:
        """

        Run concurrent user test.

        

        Args:

            num_users: Number of concurrent users

            messages_per_user: Messages per user

            

        Returns:

            Test results and metrics

        """
        self.metrics = PerformanceMetrics()
        self.metrics.concurrent_users = num_users
        self.metrics.start_timing()
        
        user_sessions = []
        
        # Use ThreadPoolExecutor for concurrent execution
        with ThreadPoolExecutor(max_workers=min(num_users, 50)) as executor:
            # Submit all user sessions
            futures = []
            for i in range(num_users):
                user_id = f"test_user_{i}"
                future = executor.submit(self.simulate_user_session, user_id, messages_per_user)
                futures.append(future)
            
            # Collect results
            for future in as_completed(futures):
                try:
                    session_result = future.result(timeout=120)  # 2 minute timeout per session
                    user_sessions.append(session_result)
                except Exception as e:
                    user_sessions.append({'error': str(e)})
                    self.metrics.add_error()
        
        self.metrics.end_timing()
        
        # Analyze results
        successful_sessions = [s for s in user_sessions if 'error' not in s and not s.get('session_creation_failed')]
        failed_sessions = len(user_sessions) - len(successful_sessions)
        
        total_messages_sent = sum(s.get('messages_sent', 0) for s in successful_sessions)
        total_messages_failed = sum(s.get('messages_failed', 0) for s in successful_sessions)
        
        return {
            'test_config': {
                'concurrent_users': num_users,
                'messages_per_user': messages_per_user,
                'total_expected_messages': num_users * messages_per_user
            },
            'session_results': {
                'successful_sessions': len(successful_sessions),
                'failed_sessions': failed_sessions,
                'session_success_rate': len(successful_sessions) / num_users * 100
            },
            'message_results': {
                'total_messages_sent': total_messages_sent,
                'total_messages_failed': total_messages_failed,
                'message_success_rate': total_messages_sent / (total_messages_sent + total_messages_failed) * 100 if (total_messages_sent + total_messages_failed) > 0 else 0
            },
            'performance_metrics': self.metrics.get_statistics(),
            'user_sessions': user_sessions
        }


@pytest.fixture
def performance_metrics():
    """Fixture for performance metrics."""
    return PerformanceMetrics()


@pytest.fixture
def mock_services():
    """Fixture for mocked services."""
    with patch('redis.Redis') as mock_redis:
        mock_redis_client = Mock()
        mock_redis_client.ping.return_value = True
        mock_redis.return_value = mock_redis_client
        
        session_manager = Mock(spec=SessionManager)
        chat_history_manager = Mock(spec=ChatHistoryManager)
        cache_service = Mock(spec=CacheService)
        
        yield {
            'redis_client': mock_redis_client,
            'session_manager': session_manager,
            'chat_history_manager': chat_history_manager,
            'cache_service': cache_service
        }


class TestConcurrentUsers:
    """Test concurrent user scenarios."""
    
    def test_single_user_performance(self, mock_services, performance_metrics):
        """Test single user performance baseline."""
        # Simulate single user with multiple messages
        num_messages = 50
        
        performance_metrics.start_timing()
        
        for i in range(num_messages):
            start_time = time.time()
            
            # Simulate message processing
            time.sleep(0.01)  # Simulate processing time
            
            response_time = time.time() - start_time
            performance_metrics.add_response_time(response_time)
            performance_metrics.add_success()
        
        performance_metrics.end_timing()
        
        stats = performance_metrics.get_statistics()
        
        # Assertions for single user performance
        assert stats['success_count'] == num_messages
        assert stats['error_count'] == 0
        assert stats['success_rate'] == 100.0
        assert stats['avg_response_time'] < 0.1  # Should be fast for single user
    
    def test_concurrent_session_creation(self, mock_services):
        """Test concurrent session creation performance."""
        num_concurrent_sessions = 20
        
        def create_session(user_id: str) -> Tuple[str, float]:
            start_time = time.time()
            
            # Mock session creation
            session_id = f"session_{user_id}_{int(time.time())}"
            time.sleep(0.05)  # Simulate database operation
            
            duration = time.time() - start_time
            return session_id, duration
        
        # Test concurrent session creation
        with ThreadPoolExecutor(max_workers=num_concurrent_sessions) as executor:
            futures = []
            start_time = time.time()
            
            for i in range(num_concurrent_sessions):
                future = executor.submit(create_session, f"user_{i}")
                futures.append(future)
            
            results = []
            for future in as_completed(futures):
                session_id, duration = future.result()
                results.append((session_id, duration))
            
            total_time = time.time() - start_time
        
        # Analyze results
        assert len(results) == num_concurrent_sessions
        
        durations = [duration for _, duration in results]
        avg_duration = sum(durations) / len(durations)
        max_duration = max(durations)
        
        # Performance assertions
        assert avg_duration < 0.2  # Average session creation should be fast
        assert max_duration < 0.5   # Even slowest should be reasonable
        assert total_time < 2.0     # Total time should be much less than sequential
    
    def test_concurrent_message_processing(self, mock_services):
        """Test concurrent message processing performance."""
        num_concurrent_messages = 30
        
        def process_message(message_id: str) -> Tuple[str, float, bool]:
            start_time = time.time()
            
            try:
                # Simulate message processing with some variability
                processing_time = 0.02 + (hash(message_id) % 100) / 10000  # 0.02-0.12 seconds
                time.sleep(processing_time)
                
                duration = time.time() - start_time
                return message_id, duration, True
                
            except Exception as e:
                duration = time.time() - start_time
                return message_id, duration, False
        
        # Test concurrent message processing
        with ThreadPoolExecutor(max_workers=15) as executor:
            futures = []
            start_time = time.time()
            
            for i in range(num_concurrent_messages):
                future = executor.submit(process_message, f"msg_{i}")
                futures.append(future)
            
            results = []
            for future in as_completed(futures):
                message_id, duration, success = future.result()
                results.append((message_id, duration, success))
            
            total_time = time.time() - start_time
        
        # Analyze results
        successful_results = [r for r in results if r[2]]
        failed_results = [r for r in results if not r[2]]
        
        assert len(results) == num_concurrent_messages
        assert len(successful_results) == num_concurrent_messages  # All should succeed
        assert len(failed_results) == 0
        
        durations = [duration for _, duration, _ in successful_results]
        avg_duration = sum(durations) / len(durations)
        
        # Performance assertions
        assert avg_duration < 0.2   # Average processing should be reasonable
        assert total_time < 5.0     # Total time should be much less than sequential
    
    def test_memory_usage_under_load(self, mock_services):
        """Test memory usage under concurrent load."""
        import psutil
        import os
        
        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        # Simulate high load scenario
        num_sessions = 50
        messages_per_session = 20
        
        # Create mock data structures to simulate memory usage
        sessions = {}
        messages = {}
        
        for session_id in range(num_sessions):
            sessions[f"session_{session_id}"] = {
                'user_id': f"user_{session_id}",
                'language': 'python',
                'created_at': time.time(),
                'messages': []
            }
            
            for msg_id in range(messages_per_session):
                message_key = f"session_{session_id}_msg_{msg_id}"
                messages[message_key] = {
                    'content': f"Test message {msg_id} " * 50,  # Larger message
                    'timestamp': time.time(),
                    'metadata': {'test': True}
                }
                sessions[f"session_{session_id}"]['messages'].append(message_key)
        
        peak_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        # Clean up
        del sessions
        del messages
        
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        memory_increase = peak_memory - initial_memory
        memory_cleanup = peak_memory - final_memory
        
        # Memory usage assertions
        assert memory_increase < 100  # Should not use more than 100MB for test data
        assert memory_cleanup > 0     # Memory should be freed after cleanup
    
    def test_database_connection_pool_performance(self, mock_services):
        """Test database connection pool under concurrent load."""
        from chat_agent.utils.connection_pool import DatabaseConnectionPool
        
        # Mock database URL
        database_url = "sqlite:///:memory:"
        
        pool = DatabaseConnectionPool(database_url, pool_size=5, max_overflow=10)
        
        def execute_query(query_id: str) -> Tuple[str, float, bool]:
            start_time = time.time()
            
            try:
                with pool.get_connection() as conn:
                    # Simulate database query
                    time.sleep(0.01)
                    result = conn.execute("SELECT 1").fetchone()
                
                duration = time.time() - start_time
                return query_id, duration, True
                
            except Exception as e:
                duration = time.time() - start_time
                return query_id, duration, False
        
        # Test concurrent database access
        num_concurrent_queries = 25
        
        with ThreadPoolExecutor(max_workers=15) as executor:
            futures = []
            
            for i in range(num_concurrent_queries):
                future = executor.submit(execute_query, f"query_{i}")
                futures.append(future)
            
            results = []
            for future in as_completed(futures):
                query_id, duration, success = future.result()
                results.append((query_id, duration, success))
        
        # Analyze results
        successful_queries = [r for r in results if r[2]]
        failed_queries = [r for r in results if not r[2]]
        
        assert len(successful_queries) == num_concurrent_queries
        assert len(failed_queries) == 0
        
        durations = [duration for _, duration, _ in successful_queries]
        avg_duration = sum(durations) / len(durations)
        max_duration = max(durations)
        
        # Performance assertions
        assert avg_duration < 0.1   # Database queries should be fast
        assert max_duration < 0.5   # Even with connection pool contention
        
        # Check pool status
        pool_status = pool.get_pool_status()
        assert pool_status['pool_size'] >= 0
        assert pool_status['checked_out'] >= 0
    
    def test_redis_connection_pool_performance(self, mock_services):
        """Test Redis connection pool under concurrent load."""
        # This test would require actual Redis connection
        # For now, we'll test the mock behavior
        
        redis_client = mock_services['redis_client']
        
        def redis_operation(operation_id: str) -> Tuple[str, float, bool]:
            start_time = time.time()
            
            try:
                # Simulate Redis operations
                redis_client.set(f"key_{operation_id}", f"value_{operation_id}")
                value = redis_client.get(f"key_{operation_id}")
                redis_client.delete(f"key_{operation_id}")
                
                duration = time.time() - start_time
                return operation_id, duration, True
                
            except Exception as e:
                duration = time.time() - start_time
                return operation_id, duration, False
        
        # Test concurrent Redis operations
        num_concurrent_ops = 30
        
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            
            for i in range(num_concurrent_ops):
                future = executor.submit(redis_operation, f"op_{i}")
                futures.append(future)
            
            results = []
            for future in as_completed(futures):
                op_id, duration, success = future.result()
                results.append((op_id, duration, success))
        
        # Analyze results
        successful_ops = [r for r in results if r[2]]
        
        assert len(successful_ops) == num_concurrent_ops
        
        durations = [duration for _, duration, _ in successful_ops]
        avg_duration = sum(durations) / len(durations)
        
        # Performance assertions (for mocked Redis)
        assert avg_duration < 0.01  # Mocked operations should be very fast


class TestLoadTesting:
    """Load testing scenarios."""
    
    @pytest.mark.slow
    def test_sustained_load(self, mock_services):
        """Test sustained load over time."""
        duration_seconds = 30  # 30 second test
        requests_per_second = 10
        
        metrics = PerformanceMetrics()
        metrics.start_timing()
        
        def generate_load():
            end_time = time.time() + duration_seconds
            request_count = 0
            
            while time.time() < end_time:
                start_time = time.time()
                
                # Simulate request processing
                time.sleep(0.01)  # Simulate work
                
                response_time = time.time() - start_time
                metrics.add_response_time(response_time)
                metrics.add_success()
                
                request_count += 1
                
                # Control request rate
                elapsed = time.time() - start_time
                sleep_time = (1.0 / requests_per_second) - elapsed
                if sleep_time > 0:
                    time.sleep(sleep_time)
        
        # Run load test
        generate_load()
        metrics.end_timing()
        
        stats = metrics.get_statistics()
        
        # Assertions for sustained load
        expected_requests = duration_seconds * requests_per_second
        assert stats['total_requests'] >= expected_requests * 0.9  # Allow 10% variance
        assert stats['success_rate'] >= 95.0  # 95% success rate minimum
        assert stats['avg_response_time'] < 0.1  # Average response time
    
    @pytest.mark.slow
    def test_spike_load(self, mock_services):
        """Test handling of sudden load spikes."""
        normal_load_rps = 5
        spike_load_rps = 50
        spike_duration = 10  # seconds
        
        metrics = PerformanceMetrics()
        metrics.start_timing()
        
        def simulate_spike():
            # Normal load for 5 seconds
            end_normal = time.time() + 5
            while time.time() < end_normal:
                start_time = time.time()
                time.sleep(0.01)
                
                response_time = time.time() - start_time
                metrics.add_response_time(response_time)
                metrics.add_success()
                
                time.sleep(1.0 / normal_load_rps - 0.01)
            
            # Spike load for 10 seconds
            end_spike = time.time() + spike_duration
            while time.time() < end_spike:
                start_time = time.time()
                time.sleep(0.01)
                
                response_time = time.time() - start_time
                metrics.add_response_time(response_time)
                
                if response_time < 0.5:  # Consider successful if under 500ms
                    metrics.add_success()
                else:
                    metrics.add_error()
                
                time.sleep(max(0, 1.0 / spike_load_rps - 0.01))
            
            # Return to normal load for 5 seconds
            end_normal2 = time.time() + 5
            while time.time() < end_normal2:
                start_time = time.time()
                time.sleep(0.01)
                
                response_time = time.time() - start_time
                metrics.add_response_time(response_time)
                metrics.add_success()
                
                time.sleep(1.0 / normal_load_rps - 0.01)
        
        simulate_spike()
        metrics.end_timing()
        
        stats = metrics.get_statistics()
        
        # Assertions for spike handling
        assert stats['success_rate'] >= 80.0  # Should handle most requests even during spike
        assert stats['p95_response_time'] < 1.0  # 95th percentile should be reasonable


if __name__ == "__main__":
    # Run a simple load test
    simulator = ConcurrentUserSimulator()
    
    print("Running concurrent user test...")
    results = simulator.run_concurrent_test(num_users=10, messages_per_user=5)
    
    print("\nTest Results:")
    print(f"Concurrent Users: {results['test_config']['concurrent_users']}")
    print(f"Messages per User: {results['test_config']['messages_per_user']}")
    print(f"Session Success Rate: {results['session_results']['session_success_rate']:.1f}%")
    print(f"Message Success Rate: {results['message_results']['message_success_rate']:.1f}%")
    print(f"Average Response Time: {results['performance_metrics']['avg_response_time']:.3f}s")
    print(f"95th Percentile Response Time: {results['performance_metrics']['p95_response_time']:.3f}s")
    print(f"Requests per Second: {results['performance_metrics']['requests_per_second']:.1f}")