LovingGraceTech's picture
Add release: integration_test.py
1b5e554 verified
#!/usr/bin/env python3
"""
INTEGRATION TEST - Harmonic Stack
Ghost in the Machine Labs
Tests the full message flow:
User β†’ Executive β†’ Directors β†’ Coordinators β†’ Back up
"""
import asyncio
import json
from datetime import datetime
import httpx
import redis.asyncio as redis
OLLAMA_URL = "http://localhost:11434"
REDIS_URL = "redis://localhost:6379"
async def test_operator_routing():
"""Test Operator routing decision"""
print("\n" + "=" * 60)
print("TEST 1: Operator Routing")
print("=" * 60)
async with httpx.AsyncClient(timeout=60.0) as http:
prompt = """Route this message to the appropriate node.
MESSAGE: Write a Python function to calculate fibonacci numbers
CONTEXT: {"user": "joe", "priority": "normal"}
AVAILABLE NODES:
- executive: High-level planning, user intent
- technical_director: Code, architecture, systems
- creative_director: Writing, design, communication
- research_director: Investigation, analysis, learning
- operations_director: Process, scheduling, resources
- council: Ethical deliberation (triggers Forum)
Respond with JSON routing decision."""
resp = await http.post(
f"{OLLAMA_URL}/api/generate",
json={"model": "operator", "prompt": prompt, "stream": False}
)
if resp.status_code == 200:
result = resp.json().get("response", "")
print(f"Operator response:\n{result}")
# Try to parse JSON
try:
start = result.find("{")
end = result.rfind("}") + 1
if start >= 0 and end > start:
decision = json.loads(result[start:end])
print(f"\nβœ“ Routing decision: {decision.get('route_to')}")
print(f" Priority: {decision.get('priority')}")
print(f" Reason: {decision.get('reason')}")
return True
except:
pass
print("βœ— Failed to get routing decision")
return False
async def test_bypass_highways():
"""Test bypass highway communication"""
print("\n" + "=" * 60)
print("TEST 2: Bypass Highways")
print("=" * 60)
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
# Subscribe to health highway
await pubsub.subscribe("bypass:health")
# Publish a health ping
test_msg = json.dumps({
"msg_id": f"test-{datetime.now().strftime('%H%M%S')}",
"from_node": "integration_test",
"content": {"status": "testing", "load": 0.1},
"timestamp": datetime.now().isoformat()
})
await r.publish("bypass:health", test_msg)
print(f"βœ“ Published to bypass:health")
# Check message received
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "subscribe":
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "message":
print(f"βœ“ Received on bypass:health")
await r.close()
return True
print("βœ— No message received")
await r.close()
return False
async def test_message_bus():
"""Test message bus director communication"""
print("\n" + "=" * 60)
print("TEST 3: Message Bus (Director Channel)")
print("=" * 60)
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
# Subscribe to technical director channel
channel = "msgbus:director:technical_director"
await pubsub.subscribe(channel)
# Publish a task
task = json.dumps({
"type": "task",
"from": "integration_test",
"task": {
"task_id": f"test-{datetime.now().strftime('%H%M%S')}",
"description": "Test task for integration"
},
"timestamp": datetime.now().isoformat()
})
await r.publish(channel, task)
print(f"βœ“ Published task to {channel}")
# Verify
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "subscribe":
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "message":
print(f"βœ“ Task received on message bus")
await r.close()
return True
print("βœ— No task received")
await r.close()
return False
async def test_forum_control():
"""Test Forum Bus control channel"""
print("\n" + "=" * 60)
print("TEST 4: Forum Bus (Council Control)")
print("=" * 60)
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
# Subscribe to forum control
await pubsub.subscribe("forum:control")
# Send convene command
cmd = json.dumps({
"action": "convene",
"question": "Is this integration test ethical?",
"timestamp": datetime.now().isoformat()
})
await r.publish("forum:control", cmd)
print(f"βœ“ Published convene command")
# Verify
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "subscribe":
msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0)
if msg and msg["type"] == "message":
print(f"βœ“ Forum control received command")
await r.close()
return True
print("βœ— No command received")
await r.close()
return False
async def test_executive_model():
"""Test Executive model response"""
print("\n" + "=" * 60)
print("TEST 5: Executive Model")
print("=" * 60)
async with httpx.AsyncClient(timeout=60.0) as http:
resp = await http.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "executive",
"prompt": "A user wants help writing a Python script. Create a brief plan.",
"stream": False
}
)
if resp.status_code == 200:
result = resp.json().get("response", "")
print(f"Executive response:\n{result[:500]}...")
print(f"\nβœ“ Executive model responding")
return True
print("βœ— Executive model failed")
return False
async def test_council_member():
"""Test a council member"""
print("\n" + "=" * 60)
print("TEST 6: Ethics Council Member (Voltaire)")
print("=" * 60)
async with httpx.AsyncClient(timeout=60.0) as http:
resp = await http.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": "voltaire",
"prompt": "Should AI be used to automate jobs without worker protections?",
"stream": False
}
)
if resp.status_code == 200:
result = resp.json().get("response", "")
print(f"Voltaire responds:\n{result[:500]}...")
print(f"\nβœ“ Council member responding")
return True
print("βœ— Council member failed")
return False
async def main():
"""Run all integration tests"""
print("=" * 60)
print("HARMONIC STACK INTEGRATION TEST")
print("Ghost in the Machine Labs")
print("=" * 60)
print(f"Time: {datetime.now().isoformat()}")
results = {}
# Run tests
results["Operator Routing"] = await test_operator_routing()
results["Bypass Highways"] = await test_bypass_highways()
results["Message Bus"] = await test_message_bus()
results["Forum Control"] = await test_forum_control()
results["Executive Model"] = await test_executive_model()
results["Council Member"] = await test_council_member()
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
passed = sum(1 for v in results.values() if v)
total = len(results)
for test, result in results.items():
status = "βœ“ PASS" if result else "βœ— FAIL"
print(f" {test}: {status}")
print(f"\nTotal: {passed}/{total} passed")
if passed == total:
print("\nπŸŽ‰ ALL TESTS PASSED - Stack is operational!")
else:
print(f"\n⚠️ {total - passed} test(s) failed")
return passed == total
if __name__ == "__main__":
asyncio.run(main())