Stack-2-9-finetuned / samples /benchmarks /test_throughput.py
walidsobhie-code
refactor: Squeeze folders further - cleaner structure
65888d5
#!/usr/bin/env python3
"""
Benchmarks for Stack 2.9 - Throughput Tests
Concurrency and throughput benchmarks.
"""
import pytest
import sys
import time
import threading
from pathlib import Path
from unittest.mock import MagicMock, patch
# Add stack_cli to path
sys.path.insert(0, str(Path(__file__).parent.parent / "stack_cli"))
from stack_cli.agent import StackAgent, create_agent
class TestConcurrentQueries:
"""Test concurrent query handling."""
def test_sequential_throughput(self):
"""Test sequential query throughput."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
agent = StackAgent()
start = time.time()
for i in range(20):
agent.process(f"query {i}")
elapsed = time.time() - start
throughput = 20 / elapsed
# Should handle at least 5 queries per second
assert throughput > 5
def test_rapid_fire_queries(self):
"""Test rapid fire query submission."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
agent = StackAgent()
# Submit quickly
results = []
start = time.time()
for i in range(10):
results.append(agent.process(f"rapid {i}"))
elapsed = time.time() - start
assert len(results) == 10
assert elapsed < 3.0
class TestThreadSafety:
"""Test thread safety."""
def test_concurrent_agent_creation(self):
"""Test concurrent agent creation."""
agents = []
def create_agent_thread():
with patch('stack_cli.context.create_context_manager'):
agents.append(create_agent())
threads = []
for _ in range(5):
t = threading.Thread(target=create_agent_thread)
threads.append(t)
t.start()
for t in threads:
t.join()
assert len(agents) == 5
def test_concurrent_tool_access(self):
"""Test concurrent tool access."""
from stack_cli.tools import get_tool, list_tools
results = []
def access_tools():
for _ in range(10):
tool = get_tool("read")
tools = list_tools()
results.append((tool is not None, len(tools) > 0))
threads = []
for _ in range(3):
t = threading.Thread(target=access_tools)
threads.append(t)
t.start()
for t in threads:
t.join()
# All accesses should succeed
assert all(success for success, _ in results)
class TestBatchProcessing:
"""Test batch processing capabilities."""
def test_batch_file_operations(self):
"""Test batch file operations."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
files = [f"file{i}.py" for i in range(10)]
start = time.time()
for f in files:
get_tool("read")(path=f)
elapsed = time.time() - start
assert elapsed < 2.0
def test_batch_tool_chains(self):
"""Test batch tool chains."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
chains = [
("read", {"path": "a.py"}),
("write", {"path": "b.py", "content": "x"}),
("grep", {"path": ".", "pattern": "test"}),
]
start = time.time()
for tool_name, params in chains * 5:
get_tool(tool_name)(**params)
elapsed = time.time() - start
assert elapsed < 2.0
class TestThroughputMetrics:
"""Test throughput metrics."""
def test_queries_per_second(self):
"""Test queries per second throughput."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
agent = StackAgent()
start = time.time()
query_count = 30
for i in range(query_count):
agent.process(f"query {i}")
elapsed = time.time() - start
qps = query_count / elapsed
# Should achieve reasonable QPS
assert qps > 3, f"QPS too low: {qps}"
def test_tools_per_second(self):
"""Test tools per second throughput."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
start = time.time()
tool_count = 100
for i in range(tool_count):
get_tool("read")(path=f"file{i}.py")
elapsed = time.time() - start
tps = tool_count / elapsed
# Should be very fast
assert tps > 50
class TestConcurrentContext:
"""Test concurrent context operations."""
def test_concurrent_context_updates(self):
"""Test concurrent context updates."""
from stack_cli.context import SessionMemory
session = SessionMemory()
errors = []
def update_session(i):
try:
session.add_message("user", f"message {i}")
session.add_tool_usage("read", {"success": True})
except Exception as e:
errors.append(e)
threads = []
for i in range(10):
t = threading.Thread(target=update_session, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Should complete without errors
assert len(errors) == 0
assert len(session.messages) == 10
class TestResourceUtilization:
"""Test resource utilization."""
def test_memory_usage_stable(self):
"""Test memory usage remains stable."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
agent = StackAgent()
# Process many queries
for i in range(100):
agent.process(f"query {i}")
# History should not grow unbounded
# (Old entries may be truncated in real implementation)
assert len(agent.conversation_history) <= 200
def test_context_growth_bounded(self):
"""Test context growth is bounded."""
with patch('stack_cli.context.create_context_manager'):
with patch('stack_cli.tools.get_tool') as mock_get_tool:
mock_tool = MagicMock(return_value={"success": True})
mock_get_tool.return_value = mock_tool
agent = StackAgent()
session = agent.context_manager.session
# Add many operations
for i in range(50):
session.add_message("user", f"msg {i}")
session.add_tool_usage("read", {"success": True})
summary = session.get_summary()
# Counts should be accurate
assert summary["messages_count"] == 50
if __name__ == "__main__":
pytest.main([__file__, "-v"])