| | |
| | """ |
| | INTEGRATION TEST - Harmonic Stack |
| | Ghost in the Machine Labs |
| | |
| | Tests the full message flow: |
| | User β Executive β Directors β Coordinators β Back up |
| | """ |
| | import asyncio |
| | import json |
| | from datetime import datetime |
| |
|
| | import httpx |
| | import redis.asyncio as redis |
| |
|
| | OLLAMA_URL = "http://localhost:11434" |
| | REDIS_URL = "redis://localhost:6379" |
| |
|
| |
|
| | async def test_operator_routing(): |
| | """Test Operator routing decision""" |
| | print("\n" + "=" * 60) |
| | print("TEST 1: Operator Routing") |
| | print("=" * 60) |
| | |
| | async with httpx.AsyncClient(timeout=60.0) as http: |
| | prompt = """Route this message to the appropriate node. |
| | |
| | MESSAGE: Write a Python function to calculate fibonacci numbers |
| | |
| | CONTEXT: {"user": "joe", "priority": "normal"} |
| | |
| | AVAILABLE NODES: |
| | - executive: High-level planning, user intent |
| | - technical_director: Code, architecture, systems |
| | - creative_director: Writing, design, communication |
| | - research_director: Investigation, analysis, learning |
| | - operations_director: Process, scheduling, resources |
| | - council: Ethical deliberation (triggers Forum) |
| | |
| | Respond with JSON routing decision.""" |
| |
|
| | resp = await http.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={"model": "operator", "prompt": prompt, "stream": False} |
| | ) |
| | |
| | if resp.status_code == 200: |
| | result = resp.json().get("response", "") |
| | print(f"Operator response:\n{result}") |
| | |
| | |
| | try: |
| | start = result.find("{") |
| | end = result.rfind("}") + 1 |
| | if start >= 0 and end > start: |
| | decision = json.loads(result[start:end]) |
| | print(f"\nβ Routing decision: {decision.get('route_to')}") |
| | print(f" Priority: {decision.get('priority')}") |
| | print(f" Reason: {decision.get('reason')}") |
| | return True |
| | except: |
| | pass |
| | |
| | print("β Failed to get routing decision") |
| | return False |
| |
|
| |
|
| | async def test_bypass_highways(): |
| | """Test bypass highway communication""" |
| | print("\n" + "=" * 60) |
| | print("TEST 2: Bypass Highways") |
| | print("=" * 60) |
| | |
| | r = redis.from_url(REDIS_URL) |
| | pubsub = r.pubsub() |
| | |
| | |
| | await pubsub.subscribe("bypass:health") |
| | |
| | |
| | test_msg = json.dumps({ |
| | "msg_id": f"test-{datetime.now().strftime('%H%M%S')}", |
| | "from_node": "integration_test", |
| | "content": {"status": "testing", "load": 0.1}, |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | |
| | await r.publish("bypass:health", test_msg) |
| | print(f"β Published to bypass:health") |
| | |
| | |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | if msg and msg["type"] == "subscribe": |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | |
| | if msg and msg["type"] == "message": |
| | print(f"β Received on bypass:health") |
| | await r.close() |
| | return True |
| | |
| | print("β No message received") |
| | await r.close() |
| | return False |
| |
|
| |
|
| | async def test_message_bus(): |
| | """Test message bus director communication""" |
| | print("\n" + "=" * 60) |
| | print("TEST 3: Message Bus (Director Channel)") |
| | print("=" * 60) |
| | |
| | r = redis.from_url(REDIS_URL) |
| | pubsub = r.pubsub() |
| | |
| | |
| | channel = "msgbus:director:technical_director" |
| | await pubsub.subscribe(channel) |
| | |
| | |
| | task = json.dumps({ |
| | "type": "task", |
| | "from": "integration_test", |
| | "task": { |
| | "task_id": f"test-{datetime.now().strftime('%H%M%S')}", |
| | "description": "Test task for integration" |
| | }, |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | |
| | await r.publish(channel, task) |
| | print(f"β Published task to {channel}") |
| | |
| | |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | if msg and msg["type"] == "subscribe": |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | |
| | if msg and msg["type"] == "message": |
| | print(f"β Task received on message bus") |
| | await r.close() |
| | return True |
| | |
| | print("β No task received") |
| | await r.close() |
| | return False |
| |
|
| |
|
| | async def test_forum_control(): |
| | """Test Forum Bus control channel""" |
| | print("\n" + "=" * 60) |
| | print("TEST 4: Forum Bus (Council Control)") |
| | print("=" * 60) |
| | |
| | r = redis.from_url(REDIS_URL) |
| | pubsub = r.pubsub() |
| | |
| | |
| | await pubsub.subscribe("forum:control") |
| | |
| | |
| | cmd = json.dumps({ |
| | "action": "convene", |
| | "question": "Is this integration test ethical?", |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | |
| | await r.publish("forum:control", cmd) |
| | print(f"β Published convene command") |
| | |
| | |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | if msg and msg["type"] == "subscribe": |
| | msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) |
| | |
| | if msg and msg["type"] == "message": |
| | print(f"β Forum control received command") |
| | await r.close() |
| | return True |
| | |
| | print("β No command received") |
| | await r.close() |
| | return False |
| |
|
| |
|
| | async def test_executive_model(): |
| | """Test Executive model response""" |
| | print("\n" + "=" * 60) |
| | print("TEST 5: Executive Model") |
| | print("=" * 60) |
| | |
| | async with httpx.AsyncClient(timeout=60.0) as http: |
| | resp = await http.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={ |
| | "model": "executive", |
| | "prompt": "A user wants help writing a Python script. Create a brief plan.", |
| | "stream": False |
| | } |
| | ) |
| | |
| | if resp.status_code == 200: |
| | result = resp.json().get("response", "") |
| | print(f"Executive response:\n{result[:500]}...") |
| | print(f"\nβ Executive model responding") |
| | return True |
| | |
| | print("β Executive model failed") |
| | return False |
| |
|
| |
|
| | async def test_council_member(): |
| | """Test a council member""" |
| | print("\n" + "=" * 60) |
| | print("TEST 6: Ethics Council Member (Voltaire)") |
| | print("=" * 60) |
| | |
| | async with httpx.AsyncClient(timeout=60.0) as http: |
| | resp = await http.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={ |
| | "model": "voltaire", |
| | "prompt": "Should AI be used to automate jobs without worker protections?", |
| | "stream": False |
| | } |
| | ) |
| | |
| | if resp.status_code == 200: |
| | result = resp.json().get("response", "") |
| | print(f"Voltaire responds:\n{result[:500]}...") |
| | print(f"\nβ Council member responding") |
| | return True |
| | |
| | print("β Council member failed") |
| | return False |
| |
|
| |
|
| | async def main(): |
| | """Run all integration tests""" |
| | print("=" * 60) |
| | print("HARMONIC STACK INTEGRATION TEST") |
| | print("Ghost in the Machine Labs") |
| | print("=" * 60) |
| | print(f"Time: {datetime.now().isoformat()}") |
| | |
| | results = {} |
| | |
| | |
| | results["Operator Routing"] = await test_operator_routing() |
| | results["Bypass Highways"] = await test_bypass_highways() |
| | results["Message Bus"] = await test_message_bus() |
| | results["Forum Control"] = await test_forum_control() |
| | results["Executive Model"] = await test_executive_model() |
| | results["Council Member"] = await test_council_member() |
| | |
| | |
| | print("\n" + "=" * 60) |
| | print("SUMMARY") |
| | print("=" * 60) |
| | |
| | passed = sum(1 for v in results.values() if v) |
| | total = len(results) |
| | |
| | for test, result in results.items(): |
| | status = "β PASS" if result else "β FAIL" |
| | print(f" {test}: {status}") |
| | |
| | print(f"\nTotal: {passed}/{total} passed") |
| | |
| | if passed == total: |
| | print("\nπ ALL TESTS PASSED - Stack is operational!") |
| | else: |
| | print(f"\nβ οΈ {total - passed} test(s) failed") |
| | |
| | return passed == total |
| |
|
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |
| |
|