#!/usr/bin/env python3 """ INTEGRATION TEST - Harmonic Stack Ghost in the Machine Labs Tests the full message flow: User → Executive → Directors → Coordinators → Back up """ import asyncio import json from datetime import datetime import httpx import redis.asyncio as redis OLLAMA_URL = "http://localhost:11434" REDIS_URL = "redis://localhost:6379" async def test_operator_routing(): """Test Operator routing decision""" print("\n" + "=" * 60) print("TEST 1: Operator Routing") print("=" * 60) async with httpx.AsyncClient(timeout=60.0) as http: prompt = """Route this message to the appropriate node. MESSAGE: Write a Python function to calculate fibonacci numbers CONTEXT: {"user": "joe", "priority": "normal"} AVAILABLE NODES: - executive: High-level planning, user intent - technical_director: Code, architecture, systems - creative_director: Writing, design, communication - research_director: Investigation, analysis, learning - operations_director: Process, scheduling, resources - council: Ethical deliberation (triggers Forum) Respond with JSON routing decision.""" resp = await http.post( f"{OLLAMA_URL}/api/generate", json={"model": "operator", "prompt": prompt, "stream": False} ) if resp.status_code == 200: result = resp.json().get("response", "") print(f"Operator response:\n{result}") # Try to parse JSON try: start = result.find("{") end = result.rfind("}") + 1 if start >= 0 and end > start: decision = json.loads(result[start:end]) print(f"\n✓ Routing decision: {decision.get('route_to')}") print(f" Priority: {decision.get('priority')}") print(f" Reason: {decision.get('reason')}") return True except: pass print("✗ Failed to get routing decision") return False async def test_bypass_highways(): """Test bypass highway communication""" print("\n" + "=" * 60) print("TEST 2: Bypass Highways") print("=" * 60) r = redis.from_url(REDIS_URL) pubsub = r.pubsub() # Subscribe to health highway await pubsub.subscribe("bypass:health") # Publish a health ping test_msg = json.dumps({ "msg_id": f"test-{datetime.now().strftime('%H%M%S')}", "from_node": "integration_test", "content": {"status": "testing", "load": 0.1}, "timestamp": datetime.now().isoformat() }) await r.publish("bypass:health", test_msg) print(f"✓ Published to bypass:health") # Check message received msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "subscribe": msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "message": print(f"✓ Received on bypass:health") await r.close() return True print("✗ No message received") await r.close() return False async def test_message_bus(): """Test message bus director communication""" print("\n" + "=" * 60) print("TEST 3: Message Bus (Director Channel)") print("=" * 60) r = redis.from_url(REDIS_URL) pubsub = r.pubsub() # Subscribe to technical director channel channel = "msgbus:director:technical_director" await pubsub.subscribe(channel) # Publish a task task = json.dumps({ "type": "task", "from": "integration_test", "task": { "task_id": f"test-{datetime.now().strftime('%H%M%S')}", "description": "Test task for integration" }, "timestamp": datetime.now().isoformat() }) await r.publish(channel, task) print(f"✓ Published task to {channel}") # Verify msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "subscribe": msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "message": print(f"✓ Task received on message bus") await r.close() return True print("✗ No task received") await r.close() return False async def test_forum_control(): """Test Forum Bus control channel""" print("\n" + "=" * 60) print("TEST 4: Forum Bus (Council Control)") print("=" * 60) r = redis.from_url(REDIS_URL) pubsub = r.pubsub() # Subscribe to forum control await pubsub.subscribe("forum:control") # Send convene command cmd = json.dumps({ "action": "convene", "question": "Is this integration test ethical?", "timestamp": datetime.now().isoformat() }) await r.publish("forum:control", cmd) print(f"✓ Published convene command") # Verify msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "subscribe": msg = await asyncio.wait_for(pubsub.get_message(timeout=2.0), timeout=3.0) if msg and msg["type"] == "message": print(f"✓ Forum control received command") await r.close() return True print("✗ No command received") await r.close() return False async def test_executive_model(): """Test Executive model response""" print("\n" + "=" * 60) print("TEST 5: Executive Model") print("=" * 60) async with httpx.AsyncClient(timeout=60.0) as http: resp = await http.post( f"{OLLAMA_URL}/api/generate", json={ "model": "executive", "prompt": "A user wants help writing a Python script. Create a brief plan.", "stream": False } ) if resp.status_code == 200: result = resp.json().get("response", "") print(f"Executive response:\n{result[:500]}...") print(f"\n✓ Executive model responding") return True print("✗ Executive model failed") return False async def test_council_member(): """Test a council member""" print("\n" + "=" * 60) print("TEST 6: Ethics Council Member (Voltaire)") print("=" * 60) async with httpx.AsyncClient(timeout=60.0) as http: resp = await http.post( f"{OLLAMA_URL}/api/generate", json={ "model": "voltaire", "prompt": "Should AI be used to automate jobs without worker protections?", "stream": False } ) if resp.status_code == 200: result = resp.json().get("response", "") print(f"Voltaire responds:\n{result[:500]}...") print(f"\n✓ Council member responding") return True print("✗ Council member failed") return False async def main(): """Run all integration tests""" print("=" * 60) print("HARMONIC STACK INTEGRATION TEST") print("Ghost in the Machine Labs") print("=" * 60) print(f"Time: {datetime.now().isoformat()}") results = {} # Run tests results["Operator Routing"] = await test_operator_routing() results["Bypass Highways"] = await test_bypass_highways() results["Message Bus"] = await test_message_bus() results["Forum Control"] = await test_forum_control() results["Executive Model"] = await test_executive_model() results["Council Member"] = await test_council_member() # Summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) passed = sum(1 for v in results.values() if v) total = len(results) for test, result in results.items(): status = "✓ PASS" if result else "✗ FAIL" print(f" {test}: {status}") print(f"\nTotal: {passed}/{total} passed") if passed == total: print("\n🎉 ALL TESTS PASSED - Stack is operational!") else: print(f"\n⚠️ {total - passed} test(s) failed") return passed == total if __name__ == "__main__": asyncio.run(main())