#!/usr/bin/env python3 """ GEOMETRIC SUBSTRATE INFERENCE ENGINE Run inference through translated models in the Harmonic Stack. This is where the geometric architecture proves its value: - Junctions act as neurons with geodesic connectivity - Weights stored at vertices, connections follow sphere edges - E8-aligned geometry enables efficient signal propagation Author: Ghost in the Machine Labs """ import numpy as np from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass import json from pathlib import Path import time # ============================================================================= # ACTIVATION FUNCTIONS # ============================================================================= def relu(x: np.ndarray) -> np.ndarray: return np.maximum(0, x) def gelu(x: np.ndarray) -> np.ndarray: return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3))) def silu(x: np.ndarray) -> np.ndarray: """SiLU/Swish activation.""" return x * (1 / (1 + np.exp(-x))) def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray: exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True)) return exp_x / np.sum(exp_x, axis=axis, keepdims=True) ACTIVATIONS = { 'relu': relu, 'gelu': gelu, 'silu': silu, 'swish': silu, 'softmax': softmax, 'none': lambda x: x, } # ============================================================================= # JUNCTION (Geometric Neuron) # ============================================================================= @dataclass class Junction: """ A junction in the geometric substrate. This is the fundamental compute unit - equivalent to a neuron but positioned on a geodesic sphere vertex. """ vertex_id: int position: np.ndarray # 3D position on sphere weights: np.ndarray # Connection weights bias: float activation: str def forward(self, inputs: np.ndarray) -> float: """Compute junction output.""" # Weighted sum if len(self.weights) != len(inputs): # Pad or truncate if len(self.weights) > len(inputs): inputs = np.pad(inputs, (0, len(self.weights) - len(inputs))) else: inputs = inputs[:len(self.weights)] z = np.dot(self.weights, inputs) + self.bias # Activation act_fn = ACTIVATIONS.get(self.activation, relu) return float(act_fn(np.array([z]))[0]) # ============================================================================= # DYSON SPHERE (Layer Container) # ============================================================================= class DysonSphere: """ A geodesic sphere containing junctions. Each sphere typically holds one layer of the network. Junctions are positioned at sphere vertices. """ def __init__(self, sphere_id: int, vertices: np.ndarray): self.sphere_id = sphere_id self.vertices = vertices self.junctions: Dict[int, Junction] = {} self.layer_map: Dict[str, List[int]] = {} # layer_name -> vertex_ids def add_junction(self, vertex_id: int, weights: np.ndarray, bias: float = 0.0, activation: str = 'relu', layer_name: str = 'default'): """Add a junction at a vertex.""" self.junctions[vertex_id] = Junction( vertex_id=vertex_id, position=self.vertices[vertex_id], weights=weights, bias=bias, activation=activation, ) if layer_name not in self.layer_map: self.layer_map[layer_name] = [] self.layer_map[layer_name].append(vertex_id) def forward(self, inputs: np.ndarray, layer_name: str = None) -> np.ndarray: """Forward pass through sphere junctions.""" if layer_name and layer_name in self.layer_map: vertex_ids = self.layer_map[layer_name] else: vertex_ids = sorted(self.junctions.keys()) outputs = [] for vid in vertex_ids: if vid in self.junctions: out = self.junctions[vid].forward(inputs) outputs.append(out) return np.array(outputs) # ============================================================================= # SUBSTRATE ARRAY (Full Model) # ============================================================================= class SubstrateArray: """ Array of Dyson Spheres forming a complete model. Spheres are connected via the spine bus for inter-layer communication. """ def __init__(self): self.spheres: Dict[int, DysonSphere] = {} self.spine: List[Tuple[int, int]] = [] # (from_sphere, to_sphere) self.layer_order: List[Tuple[int, str]] = [] # (sphere_id, layer_name) self.model_name: str = "" def add_sphere(self, sphere: DysonSphere): """Add a sphere to the array.""" self.spheres[sphere.sphere_id] = sphere def connect_spine(self, from_sphere: int, to_sphere: int): """Connect two spheres via spine.""" self.spine.append((from_sphere, to_sphere)) def set_layer_order(self, order: List[Tuple[int, str]]): """Set the order of layers for forward pass.""" self.layer_order = order def forward(self, inputs: np.ndarray) -> np.ndarray: """ Forward pass through entire substrate. Signals flow through spheres in layer_order, with spine connections routing between spheres. """ x = inputs for sphere_id, layer_name in self.layer_order: if sphere_id in self.spheres: sphere = self.spheres[sphere_id] x = sphere.forward(x, layer_name) return x # ============================================================================= # SUBSTRATE LOADER # ============================================================================= def load_substrate(filepath: str) -> SubstrateArray: """ Load a translated model substrate from JSON or binary NPZ. """ import numpy as np from pathlib import Path filepath = Path(filepath) npz_path = filepath.with_suffix('.npz') # Try binary first if npz_path.exists(): data = np.load(npz_path, allow_pickle=True) return { 'junctions': data['junctions'], 'metadata': json.loads(str(data['metadata'])), 'format': 'binary' } # Fall back to JSON with open(filepath) as f: data = json.load(f) array = SubstrateArray() array.model_name = data.get('model_name', 'unknown') # Load spheres for sphere_data in data.get('spheres', []): sphere_id = sphere_data['sphere_id'] # Reconstruct vertices (simplified - use template) n_junctions = sphere_data.get('num_junctions', 0) vertices = np.random.randn(max(n_junctions + 10, 642), 3) vertices = vertices / np.linalg.norm(vertices, axis=1, keepdims=True) sphere = DysonSphere(sphere_id, vertices) # Load junctions for vid_str, junction_data in sphere_data.get('junctions', {}).items(): vid = int(vid_str) sphere.add_junction( vertex_id=vid, weights=np.array(junction_data['weights'], dtype=np.float32), bias=junction_data.get('bias', 0.0), activation=junction_data.get('activation', 'relu'), layer_name=junction_data.get('layer_name', 'default'), ) array.add_sphere(sphere) # Set up spine connections for conn in data.get('spine_connections', []): if isinstance(conn, list) and len(conn) == 2: array.connect_spine(conn[0], conn[1]) # Infer layer order from spheres layer_order = [] for sphere_id in sorted(array.spheres.keys()): sphere = array.spheres[sphere_id] for layer_name in sphere.layer_map.keys(): layer_order.append((sphere_id, layer_name)) array.set_layer_order(layer_order) return array # ============================================================================= # INFERENCE ENGINE # ============================================================================= class InferenceEngine: """ High-level inference engine for Harmonic Stack. Manages loaded substrates and routes queries to appropriate models. """ def __init__(self): self.substrates: Dict[str, SubstrateArray] = {} self.load_times: Dict[str, float] = {} def load_model(self, name: str, filepath: str) -> bool: """Load a substrate model.""" try: start = time.time() substrate = load_substrate(filepath) self.substrates[name] = substrate self.load_times[name] = time.time() - start return True except Exception as e: print(f"Error loading {name}: {e}") return False def unload_model(self, name: str): """Unload a model to free memory.""" if name in self.substrates: del self.substrates[name] del self.load_times[name] def infer(self, model_name: str, inputs: np.ndarray) -> np.ndarray: """ Run inference on a loaded model. """ if model_name not in self.substrates: raise ValueError(f"Model not loaded: {model_name}") substrate = self.substrates[model_name] return substrate.forward(inputs) def batch_infer(self, model_name: str, batch: List[np.ndarray]) -> List[np.ndarray]: """Run inference on a batch of inputs.""" return [self.infer(model_name, x) for x in batch] def status(self) -> Dict: """Get engine status.""" return { 'loaded_models': list(self.substrates.keys()), 'total_spheres': sum( len(s.spheres) for s in self.substrates.values() ), 'total_junctions': sum( sum(len(sphere.junctions) for sphere in s.spheres.values()) for s in self.substrates.values() ), 'load_times': self.load_times, } # ============================================================================= # BENCHMARK # ============================================================================= def benchmark_inference(engine: InferenceEngine, model_name: str, input_size: int = 128, n_runs: int = 100) -> Dict: """Benchmark inference speed.""" # Generate random inputs inputs = [np.random.randn(input_size).astype(np.float32) for _ in range(n_runs)] # Warmup for _ in range(10): engine.infer(model_name, inputs[0]) # Timed runs start = time.time() for inp in inputs: engine.infer(model_name, inp) elapsed = time.time() - start return { 'model': model_name, 'input_size': input_size, 'n_runs': n_runs, 'total_time': elapsed, 'avg_time_ms': (elapsed / n_runs) * 1000, 'throughput': n_runs / elapsed, } # ============================================================================= # MAIN # ============================================================================= def main(): print("=" * 60) print("GEOMETRIC SUBSTRATE INFERENCE ENGINE") print("Ghost in the Machine Labs") print("=" * 60) engine = InferenceEngine() # Try to load test substrate test_substrate = Path('test_model_substrate.json') if test_substrate.exists(): print(f"\nLoading test substrate...") if engine.load_model('test', str(test_substrate)): print(f" Loaded in {engine.load_times['test']:.3f}s") # Show status status = engine.status() print(f"\nEngine status:") print(f" Loaded models: {status['loaded_models']}") print(f" Total spheres: {status['total_spheres']}") print(f" Total junctions: {status['total_junctions']}") # Test inference print(f"\nTest inference:") test_input = np.random.randn(64).astype(np.float32) output = engine.infer('test', test_input) print(f" Input shape: {test_input.shape}") print(f" Output shape: {output.shape}") print(f" Output range: [{output.min():.3f}, {output.max():.3f}]") # Benchmark print(f"\nBenchmark:") results = benchmark_inference(engine, 'test', input_size=64, n_runs=100) print(f" Avg time: {results['avg_time_ms']:.2f} ms") print(f" Throughput: {results['throughput']:.0f} inferences/sec") else: print("\nNo test substrate found.") print("Run harmonic_stack_pipeline.py first to create one.") print("\n" + "=" * 60) print("INFERENCE ENGINE READY") print("=" * 60) return engine if __name__ == "__main__": main()