harmonic-stack-v1 / parallel_core /inference_engine.py
LovingGraceTech's picture
Add parallel core: inference_engine.py
0ef300d verified
#!/usr/bin/env python3
"""
GEOMETRIC SUBSTRATE INFERENCE ENGINE
Run inference through translated models in the Harmonic Stack.
This is where the geometric architecture proves its value:
- Junctions act as neurons with geodesic connectivity
- Weights stored at vertices, connections follow sphere edges
- E8-aligned geometry enables efficient signal propagation
Author: Ghost in the Machine Labs
"""
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
import json
from pathlib import Path
import time
# =============================================================================
# ACTIVATION FUNCTIONS
# =============================================================================
def relu(x: np.ndarray) -> np.ndarray:
return np.maximum(0, x)
def gelu(x: np.ndarray) -> np.ndarray:
return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)))
def silu(x: np.ndarray) -> np.ndarray:
"""SiLU/Swish activation."""
return x * (1 / (1 + np.exp(-x)))
def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray:
exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
ACTIVATIONS = {
'relu': relu,
'gelu': gelu,
'silu': silu,
'swish': silu,
'softmax': softmax,
'none': lambda x: x,
}
# =============================================================================
# JUNCTION (Geometric Neuron)
# =============================================================================
@dataclass
class Junction:
"""
A junction in the geometric substrate.
This is the fundamental compute unit - equivalent to a neuron
but positioned on a geodesic sphere vertex.
"""
vertex_id: int
position: np.ndarray # 3D position on sphere
weights: np.ndarray # Connection weights
bias: float
activation: str
def forward(self, inputs: np.ndarray) -> float:
"""Compute junction output."""
# Weighted sum
if len(self.weights) != len(inputs):
# Pad or truncate
if len(self.weights) > len(inputs):
inputs = np.pad(inputs, (0, len(self.weights) - len(inputs)))
else:
inputs = inputs[:len(self.weights)]
z = np.dot(self.weights, inputs) + self.bias
# Activation
act_fn = ACTIVATIONS.get(self.activation, relu)
return float(act_fn(np.array([z]))[0])
# =============================================================================
# DYSON SPHERE (Layer Container)
# =============================================================================
class DysonSphere:
"""
A geodesic sphere containing junctions.
Each sphere typically holds one layer of the network.
Junctions are positioned at sphere vertices.
"""
def __init__(self, sphere_id: int, vertices: np.ndarray):
self.sphere_id = sphere_id
self.vertices = vertices
self.junctions: Dict[int, Junction] = {}
self.layer_map: Dict[str, List[int]] = {} # layer_name -> vertex_ids
def add_junction(self, vertex_id: int, weights: np.ndarray,
bias: float = 0.0, activation: str = 'relu',
layer_name: str = 'default'):
"""Add a junction at a vertex."""
self.junctions[vertex_id] = Junction(
vertex_id=vertex_id,
position=self.vertices[vertex_id],
weights=weights,
bias=bias,
activation=activation,
)
if layer_name not in self.layer_map:
self.layer_map[layer_name] = []
self.layer_map[layer_name].append(vertex_id)
def forward(self, inputs: np.ndarray, layer_name: str = None) -> np.ndarray:
"""Forward pass through sphere junctions."""
if layer_name and layer_name in self.layer_map:
vertex_ids = self.layer_map[layer_name]
else:
vertex_ids = sorted(self.junctions.keys())
outputs = []
for vid in vertex_ids:
if vid in self.junctions:
out = self.junctions[vid].forward(inputs)
outputs.append(out)
return np.array(outputs)
# =============================================================================
# SUBSTRATE ARRAY (Full Model)
# =============================================================================
class SubstrateArray:
"""
Array of Dyson Spheres forming a complete model.
Spheres are connected via the spine bus for
inter-layer communication.
"""
def __init__(self):
self.spheres: Dict[int, DysonSphere] = {}
self.spine: List[Tuple[int, int]] = [] # (from_sphere, to_sphere)
self.layer_order: List[Tuple[int, str]] = [] # (sphere_id, layer_name)
self.model_name: str = ""
def add_sphere(self, sphere: DysonSphere):
"""Add a sphere to the array."""
self.spheres[sphere.sphere_id] = sphere
def connect_spine(self, from_sphere: int, to_sphere: int):
"""Connect two spheres via spine."""
self.spine.append((from_sphere, to_sphere))
def set_layer_order(self, order: List[Tuple[int, str]]):
"""Set the order of layers for forward pass."""
self.layer_order = order
def forward(self, inputs: np.ndarray) -> np.ndarray:
"""
Forward pass through entire substrate.
Signals flow through spheres in layer_order,
with spine connections routing between spheres.
"""
x = inputs
for sphere_id, layer_name in self.layer_order:
if sphere_id in self.spheres:
sphere = self.spheres[sphere_id]
x = sphere.forward(x, layer_name)
return x
# =============================================================================
# SUBSTRATE LOADER
# =============================================================================
def load_substrate(filepath: str) -> SubstrateArray:
"""
Load a translated model substrate from JSON or binary NPZ.
"""
import numpy as np
from pathlib import Path
filepath = Path(filepath)
npz_path = filepath.with_suffix('.npz')
# Try binary first
if npz_path.exists():
data = np.load(npz_path, allow_pickle=True)
return {
'junctions': data['junctions'],
'metadata': json.loads(str(data['metadata'])),
'format': 'binary'
}
# Fall back to JSON
with open(filepath) as f:
data = json.load(f)
array = SubstrateArray()
array.model_name = data.get('model_name', 'unknown')
# Load spheres
for sphere_data in data.get('spheres', []):
sphere_id = sphere_data['sphere_id']
# Reconstruct vertices (simplified - use template)
n_junctions = sphere_data.get('num_junctions', 0)
vertices = np.random.randn(max(n_junctions + 10, 642), 3)
vertices = vertices / np.linalg.norm(vertices, axis=1, keepdims=True)
sphere = DysonSphere(sphere_id, vertices)
# Load junctions
for vid_str, junction_data in sphere_data.get('junctions', {}).items():
vid = int(vid_str)
sphere.add_junction(
vertex_id=vid,
weights=np.array(junction_data['weights'], dtype=np.float32),
bias=junction_data.get('bias', 0.0),
activation=junction_data.get('activation', 'relu'),
layer_name=junction_data.get('layer_name', 'default'),
)
array.add_sphere(sphere)
# Set up spine connections
for conn in data.get('spine_connections', []):
if isinstance(conn, list) and len(conn) == 2:
array.connect_spine(conn[0], conn[1])
# Infer layer order from spheres
layer_order = []
for sphere_id in sorted(array.spheres.keys()):
sphere = array.spheres[sphere_id]
for layer_name in sphere.layer_map.keys():
layer_order.append((sphere_id, layer_name))
array.set_layer_order(layer_order)
return array
# =============================================================================
# INFERENCE ENGINE
# =============================================================================
class InferenceEngine:
"""
High-level inference engine for Harmonic Stack.
Manages loaded substrates and routes queries
to appropriate models.
"""
def __init__(self):
self.substrates: Dict[str, SubstrateArray] = {}
self.load_times: Dict[str, float] = {}
def load_model(self, name: str, filepath: str) -> bool:
"""Load a substrate model."""
try:
start = time.time()
substrate = load_substrate(filepath)
self.substrates[name] = substrate
self.load_times[name] = time.time() - start
return True
except Exception as e:
print(f"Error loading {name}: {e}")
return False
def unload_model(self, name: str):
"""Unload a model to free memory."""
if name in self.substrates:
del self.substrates[name]
del self.load_times[name]
def infer(self, model_name: str, inputs: np.ndarray) -> np.ndarray:
"""
Run inference on a loaded model.
"""
if model_name not in self.substrates:
raise ValueError(f"Model not loaded: {model_name}")
substrate = self.substrates[model_name]
return substrate.forward(inputs)
def batch_infer(self, model_name: str, batch: List[np.ndarray]) -> List[np.ndarray]:
"""Run inference on a batch of inputs."""
return [self.infer(model_name, x) for x in batch]
def status(self) -> Dict:
"""Get engine status."""
return {
'loaded_models': list(self.substrates.keys()),
'total_spheres': sum(
len(s.spheres) for s in self.substrates.values()
),
'total_junctions': sum(
sum(len(sphere.junctions) for sphere in s.spheres.values())
for s in self.substrates.values()
),
'load_times': self.load_times,
}
# =============================================================================
# BENCHMARK
# =============================================================================
def benchmark_inference(engine: InferenceEngine, model_name: str,
input_size: int = 128, n_runs: int = 100) -> Dict:
"""Benchmark inference speed."""
# Generate random inputs
inputs = [np.random.randn(input_size).astype(np.float32) for _ in range(n_runs)]
# Warmup
for _ in range(10):
engine.infer(model_name, inputs[0])
# Timed runs
start = time.time()
for inp in inputs:
engine.infer(model_name, inp)
elapsed = time.time() - start
return {
'model': model_name,
'input_size': input_size,
'n_runs': n_runs,
'total_time': elapsed,
'avg_time_ms': (elapsed / n_runs) * 1000,
'throughput': n_runs / elapsed,
}
# =============================================================================
# MAIN
# =============================================================================
def main():
print("=" * 60)
print("GEOMETRIC SUBSTRATE INFERENCE ENGINE")
print("Ghost in the Machine Labs")
print("=" * 60)
engine = InferenceEngine()
# Try to load test substrate
test_substrate = Path('test_model_substrate.json')
if test_substrate.exists():
print(f"\nLoading test substrate...")
if engine.load_model('test', str(test_substrate)):
print(f" Loaded in {engine.load_times['test']:.3f}s")
# Show status
status = engine.status()
print(f"\nEngine status:")
print(f" Loaded models: {status['loaded_models']}")
print(f" Total spheres: {status['total_spheres']}")
print(f" Total junctions: {status['total_junctions']}")
# Test inference
print(f"\nTest inference:")
test_input = np.random.randn(64).astype(np.float32)
output = engine.infer('test', test_input)
print(f" Input shape: {test_input.shape}")
print(f" Output shape: {output.shape}")
print(f" Output range: [{output.min():.3f}, {output.max():.3f}]")
# Benchmark
print(f"\nBenchmark:")
results = benchmark_inference(engine, 'test', input_size=64, n_runs=100)
print(f" Avg time: {results['avg_time_ms']:.2f} ms")
print(f" Throughput: {results['throughput']:.0f} inferences/sec")
else:
print("\nNo test substrate found.")
print("Run harmonic_stack_pipeline.py first to create one.")
print("\n" + "=" * 60)
print("INFERENCE ENGINE READY")
print("=" * 60)
return engine
if __name__ == "__main__":
main()