LovingGraceTech's picture
Add release: e2e_test.py
8e57b1d verified
#!/usr/bin/env python3
"""
END-TO-END TEST - Harmonic Stack
Ghost in the Machine Labs
Full flow: User → Operator → Director → Coordinator → Result
"""
import asyncio
import json
from datetime import datetime
import httpx
import redis.asyncio as redis
OLLAMA_URL = "http://localhost:11434"
REDIS_URL = "redis://localhost:6379"
async def e2e_test():
"""Run full end-to-end test"""
print("=" * 60)
print("HARMONIC STACK END-TO-END TEST")
print("Ghost in the Machine Labs")
print("=" * 60)
print(f"Time: {datetime.now().isoformat()}\n")
r = redis.from_url(REDIS_URL)
http = httpx.AsyncClient(timeout=120.0)
# User request
user_request = "Write a Python function to calculate factorial"
print(f"USER REQUEST: {user_request}\n")
# STEP 1: Operator routes the request
print("-" * 60)
print("STEP 1: OPERATOR ROUTING")
print("-" * 60)
route_prompt = f"""Route this message to the appropriate node.
MESSAGE: {user_request}
AVAILABLE NODES:
- executive: High-level planning
- technical_director: Code, architecture
- creative_director: Writing, design
- research_director: Investigation, analysis
- operations_director: Process, scheduling
- council: Ethical deliberation
Respond with JSON: {{"route_to": "node", "priority": 1-5, "reason": "why"}}"""
resp = await http.post(
f"{OLLAMA_URL}/api/generate",
json={"model": "operator", "prompt": route_prompt, "stream": False}
)
route_result = resp.json().get("response", "")
print(f"Operator: {route_result[:200]}")
# Parse routing decision
try:
start = route_result.find("{")
end = route_result.rfind("}") + 1
decision = json.loads(route_result[start:end])
target = decision.get("route_to", "technical_director")
print(f"\n→ Routed to: {target}")
except:
target = "technical_director"
print(f"\n→ Default route: {target}")
# STEP 2: Director receives and delegates to Coordinator
print("\n" + "-" * 60)
print("STEP 2: DIRECTOR → COORDINATOR")
print("-" * 60)
# Subscribe to coordinator result channel
pubsub = r.pubsub()
await pubsub.subscribe("msgbus:director:technical_director")
# Map director to coordinator domain
domain_map = {
"technical_director": "code",
"creative_director": "writing",
"research_director": "research",
"operations_director": "system"
}
coordinator_domain = domain_map.get(target, "code")
# Send task to coordinator
task = {
"type": "task",
"task_id": f"e2e-{datetime.now().strftime('%H%M%S')}",
"from": "technical_director",
"specification": user_request,
"constraints": ["Clean code", "Include docstring"],
"quality_criteria": ["Working code", "Proper error handling"],
"context": {"user": "joe", "priority": "normal"},
"priority": 2
}
await r.publish(f"msgbus:coordinator:{coordinator_domain}", json.dumps(task))
print(f"Task {task['task_id']} sent to coordinator_{coordinator_domain}")
# STEP 3: Wait for Coordinator result
print("\n" + "-" * 60)
print("STEP 3: COORDINATOR EXECUTION")
print("-" * 60)
print("Waiting for result...")
result = None
timeout = 60 # seconds
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
msg = await pubsub.get_message(timeout=1.0)
if msg and msg["type"] == "message":
data = msg["data"]
if isinstance(data, bytes):
data = data.decode()
try:
result = json.loads(data)
if result.get("type") == "task_result":
break
except:
pass
if result:
print(f"\n✓ Result received!")
print(f" Task ID: {result.get('task_id')}")
print(f" From: {result.get('from')}")
print(f" Status: {result.get('status')}")
print(f" Quality: {result.get('quality')}")
print(f" Time: {result.get('ms')}ms")
print(f"\n Output preview:")
output = result.get("output", "")
print(f" {output[:500]}..." if len(output) > 500 else f" {output}")
else:
print("\n✗ Timeout waiting for result")
print(" (Is coordinators.py running?)")
# Cleanup
await pubsub.close()
await r.aclose()
await http.aclose()
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" User Request: {user_request}")
print(f" Routed To: {target}")
print(f" Coordinator: {coordinator_domain}")
print(f" Result: {'SUCCESS' if result else 'TIMEOUT'}")
if result:
print("\n🎉 END-TO-END TEST PASSED!")
else:
print("\n⚠️ Test incomplete - start coordinators.py first")
return result is not None
if __name__ == "__main__":
asyncio.run(e2e_test())