#!/usr/bin/env python3 """ END-TO-END TEST - Harmonic Stack Ghost in the Machine Labs Full flow: User → Operator → Director → Coordinator → Result """ import asyncio import json from datetime import datetime import httpx import redis.asyncio as redis OLLAMA_URL = "http://localhost:11434" REDIS_URL = "redis://localhost:6379" async def e2e_test(): """Run full end-to-end test""" print("=" * 60) print("HARMONIC STACK END-TO-END TEST") print("Ghost in the Machine Labs") print("=" * 60) print(f"Time: {datetime.now().isoformat()}\n") r = redis.from_url(REDIS_URL) http = httpx.AsyncClient(timeout=120.0) # User request user_request = "Write a Python function to calculate factorial" print(f"USER REQUEST: {user_request}\n") # STEP 1: Operator routes the request print("-" * 60) print("STEP 1: OPERATOR ROUTING") print("-" * 60) route_prompt = f"""Route this message to the appropriate node. MESSAGE: {user_request} AVAILABLE NODES: - executive: High-level planning - technical_director: Code, architecture - creative_director: Writing, design - research_director: Investigation, analysis - operations_director: Process, scheduling - council: Ethical deliberation Respond with JSON: {{"route_to": "node", "priority": 1-5, "reason": "why"}}""" resp = await http.post( f"{OLLAMA_URL}/api/generate", json={"model": "operator", "prompt": route_prompt, "stream": False} ) route_result = resp.json().get("response", "") print(f"Operator: {route_result[:200]}") # Parse routing decision try: start = route_result.find("{") end = route_result.rfind("}") + 1 decision = json.loads(route_result[start:end]) target = decision.get("route_to", "technical_director") print(f"\n→ Routed to: {target}") except: target = "technical_director" print(f"\n→ Default route: {target}") # STEP 2: Director receives and delegates to Coordinator print("\n" + "-" * 60) print("STEP 2: DIRECTOR → COORDINATOR") print("-" * 60) # Subscribe to coordinator result channel pubsub = r.pubsub() await pubsub.subscribe("msgbus:director:technical_director") # Map director to coordinator domain domain_map = { "technical_director": "code", "creative_director": "writing", "research_director": "research", "operations_director": "system" } coordinator_domain = domain_map.get(target, "code") # Send task to coordinator task = { "type": "task", "task_id": f"e2e-{datetime.now().strftime('%H%M%S')}", "from": "technical_director", "specification": user_request, "constraints": ["Clean code", "Include docstring"], "quality_criteria": ["Working code", "Proper error handling"], "context": {"user": "joe", "priority": "normal"}, "priority": 2 } await r.publish(f"msgbus:coordinator:{coordinator_domain}", json.dumps(task)) print(f"Task {task['task_id']} sent to coordinator_{coordinator_domain}") # STEP 3: Wait for Coordinator result print("\n" + "-" * 60) print("STEP 3: COORDINATOR EXECUTION") print("-" * 60) print("Waiting for result...") result = None timeout = 60 # seconds start_time = datetime.now() while (datetime.now() - start_time).total_seconds() < timeout: msg = await pubsub.get_message(timeout=1.0) if msg and msg["type"] == "message": data = msg["data"] if isinstance(data, bytes): data = data.decode() try: result = json.loads(data) if result.get("type") == "task_result": break except: pass if result: print(f"\n✓ Result received!") print(f" Task ID: {result.get('task_id')}") print(f" From: {result.get('from')}") print(f" Status: {result.get('status')}") print(f" Quality: {result.get('quality')}") print(f" Time: {result.get('ms')}ms") print(f"\n Output preview:") output = result.get("output", "") print(f" {output[:500]}..." if len(output) > 500 else f" {output}") else: print("\n✗ Timeout waiting for result") print(" (Is coordinators.py running?)") # Cleanup await pubsub.close() await r.aclose() await http.aclose() # Summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f" User Request: {user_request}") print(f" Routed To: {target}") print(f" Coordinator: {coordinator_domain}") print(f" Result: {'SUCCESS' if result else 'TIMEOUT'}") if result: print("\n🎉 END-TO-END TEST PASSED!") else: print("\n⚠️ Test incomplete - start coordinators.py first") return result is not None if __name__ == "__main__": asyncio.run(e2e_test())