| |
| """ |
| Benchmarks for Stack 2.9 - Throughput Tests |
| Concurrency and throughput benchmarks. |
| """ |
|
|
| import pytest |
| import sys |
| import time |
| import threading |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent / "stack_cli")) |
|
|
| from stack_cli.agent import StackAgent, create_agent |
|
|
|
|
| class TestConcurrentQueries: |
| """Test concurrent query handling.""" |
|
|
| def test_sequential_throughput(self): |
| """Test sequential query throughput.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| agent = StackAgent() |
| |
| start = time.time() |
| |
| for i in range(20): |
| agent.process(f"query {i}") |
| |
| elapsed = time.time() - start |
| throughput = 20 / elapsed |
| |
| |
| assert throughput > 5 |
|
|
| def test_rapid_fire_queries(self): |
| """Test rapid fire query submission.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| agent = StackAgent() |
| |
| |
| results = [] |
| start = time.time() |
| |
| for i in range(10): |
| results.append(agent.process(f"rapid {i}")) |
| |
| elapsed = time.time() - start |
| |
| assert len(results) == 10 |
| assert elapsed < 3.0 |
|
|
|
|
| class TestThreadSafety: |
| """Test thread safety.""" |
|
|
| def test_concurrent_agent_creation(self): |
| """Test concurrent agent creation.""" |
| agents = [] |
| |
| def create_agent_thread(): |
| with patch('stack_cli.context.create_context_manager'): |
| agents.append(create_agent()) |
| |
| threads = [] |
| |
| for _ in range(5): |
| t = threading.Thread(target=create_agent_thread) |
| threads.append(t) |
| t.start() |
| |
| for t in threads: |
| t.join() |
| |
| assert len(agents) == 5 |
|
|
| def test_concurrent_tool_access(self): |
| """Test concurrent tool access.""" |
| from stack_cli.tools import get_tool, list_tools |
| |
| results = [] |
| |
| def access_tools(): |
| for _ in range(10): |
| tool = get_tool("read") |
| tools = list_tools() |
| results.append((tool is not None, len(tools) > 0)) |
| |
| threads = [] |
| |
| for _ in range(3): |
| t = threading.Thread(target=access_tools) |
| threads.append(t) |
| t.start() |
| |
| for t in threads: |
| t.join() |
| |
| |
| assert all(success for success, _ in results) |
|
|
|
|
| class TestBatchProcessing: |
| """Test batch processing capabilities.""" |
|
|
| def test_batch_file_operations(self): |
| """Test batch file operations.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| files = [f"file{i}.py" for i in range(10)] |
| |
| start = time.time() |
| |
| for f in files: |
| get_tool("read")(path=f) |
| |
| elapsed = time.time() - start |
| |
| assert elapsed < 2.0 |
|
|
| def test_batch_tool_chains(self): |
| """Test batch tool chains.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| chains = [ |
| ("read", {"path": "a.py"}), |
| ("write", {"path": "b.py", "content": "x"}), |
| ("grep", {"path": ".", "pattern": "test"}), |
| ] |
| |
| start = time.time() |
| |
| for tool_name, params in chains * 5: |
| get_tool(tool_name)(**params) |
| |
| elapsed = time.time() - start |
| |
| assert elapsed < 2.0 |
|
|
|
|
| class TestThroughputMetrics: |
| """Test throughput metrics.""" |
|
|
| def test_queries_per_second(self): |
| """Test queries per second throughput.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| agent = StackAgent() |
| |
| start = time.time() |
| query_count = 30 |
| |
| for i in range(query_count): |
| agent.process(f"query {i}") |
| |
| elapsed = time.time() - start |
| qps = query_count / elapsed |
| |
| |
| assert qps > 3, f"QPS too low: {qps}" |
|
|
| def test_tools_per_second(self): |
| """Test tools per second throughput.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| start = time.time() |
| tool_count = 100 |
| |
| for i in range(tool_count): |
| get_tool("read")(path=f"file{i}.py") |
| |
| elapsed = time.time() - start |
| tps = tool_count / elapsed |
| |
| |
| assert tps > 50 |
|
|
|
|
| class TestConcurrentContext: |
| """Test concurrent context operations.""" |
|
|
| def test_concurrent_context_updates(self): |
| """Test concurrent context updates.""" |
| from stack_cli.context import SessionMemory |
| |
| session = SessionMemory() |
| errors = [] |
| |
| def update_session(i): |
| try: |
| session.add_message("user", f"message {i}") |
| session.add_tool_usage("read", {"success": True}) |
| except Exception as e: |
| errors.append(e) |
| |
| threads = [] |
| |
| for i in range(10): |
| t = threading.Thread(target=update_session, args=(i,)) |
| threads.append(t) |
| t.start() |
| |
| for t in threads: |
| t.join() |
| |
| |
| assert len(errors) == 0 |
| assert len(session.messages) == 10 |
|
|
|
|
| class TestResourceUtilization: |
| """Test resource utilization.""" |
|
|
| def test_memory_usage_stable(self): |
| """Test memory usage remains stable.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| agent = StackAgent() |
| |
| |
| for i in range(100): |
| agent.process(f"query {i}") |
| |
| |
| |
| assert len(agent.conversation_history) <= 200 |
|
|
| def test_context_growth_bounded(self): |
| """Test context growth is bounded.""" |
| with patch('stack_cli.context.create_context_manager'): |
| with patch('stack_cli.tools.get_tool') as mock_get_tool: |
| mock_tool = MagicMock(return_value={"success": True}) |
| mock_get_tool.return_value = mock_tool |
| |
| agent = StackAgent() |
| session = agent.context_manager.session |
| |
| |
| for i in range(50): |
| session.add_message("user", f"msg {i}") |
| session.add_tool_usage("read", {"success": True}) |
| |
| summary = session.get_summary() |
| |
| |
| assert summary["messages_count"] == 50 |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v"]) |
|
|