|
|
|
|
|
""" |
|
|
HARMONIC STACK MODEL TRANSLATION PIPELINE |
|
|
|
|
|
Ghost in the Machine Labs |
|
|
"AGI for the home, first to AGI" |
|
|
|
|
|
This is the core pipeline for translating standard AI models |
|
|
into the geometric substrate format for the Harmonic Stack. |
|
|
|
|
|
Pipeline stages: |
|
|
1. MODEL DISCOVERY - Find models on disk (safetensors, GGUF, PyTorch) |
|
|
2. MODEL ANALYSIS - Analyze architecture, count params, plan allocation |
|
|
3. GEOMETRIC TRANSLATION - Convert weights to junction configurations |
|
|
4. SUBSTRATE WRITING - Write to Dyson Sphere array |
|
|
5. VERIFICATION - Round-trip integrity check |
|
|
6. REGISTRATION - Add to Harmonic Stack registry |
|
|
|
|
|
The Harmonic Stack is the unified consciousness substrate where |
|
|
all expert models coexist and can be routed to by the Router Intelligence. |
|
|
|
|
|
Author: Joe + Claude |
|
|
Date: January 25, 2026 |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Dict, List, Tuple, Optional, Any |
|
|
from dataclasses import dataclass, field |
|
|
from pathlib import Path |
|
|
import json |
|
|
import struct |
|
|
import hashlib |
|
|
import os |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEODESIC_FREQ = 8 |
|
|
VERTICES_PER_SPHERE = 10 * GEODESIC_FREQ**2 + 2 |
|
|
|
|
|
|
|
|
E8_PRECISION_BITS = 8 |
|
|
JUNCTION_OVERHEAD = 32 |
|
|
|
|
|
|
|
|
MODEL_CATEGORIES = [ |
|
|
'reasoning', |
|
|
'math', |
|
|
'code', |
|
|
'vision', |
|
|
'video', |
|
|
'audio', |
|
|
'language', |
|
|
'spatial', |
|
|
'general', |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ModelFormat: |
|
|
"""Supported model format.""" |
|
|
name: str |
|
|
extensions: List[str] |
|
|
loader: str |
|
|
|
|
|
|
|
|
SUPPORTED_FORMATS = [ |
|
|
ModelFormat('safetensors', ['.safetensors'], 'load_safetensors'), |
|
|
ModelFormat('gguf', ['.gguf'], 'load_gguf'), |
|
|
ModelFormat('pytorch', ['.pt', '.pth', '.bin'], 'load_pytorch'), |
|
|
ModelFormat('numpy', ['.npz', '.npy'], 'load_numpy'), |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_safetensors(filepath: str) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Load weights from safetensors format. |
|
|
""" |
|
|
with open(filepath, 'rb') as f: |
|
|
header_size = struct.unpack('<Q', f.read(8))[0] |
|
|
header_json = f.read(header_size).decode('utf-8') |
|
|
header = json.loads(header_json) |
|
|
|
|
|
weights = {} |
|
|
|
|
|
for name, meta in header.items(): |
|
|
if name == '__metadata__': |
|
|
continue |
|
|
|
|
|
dtype_str = meta['dtype'] |
|
|
shape = meta['shape'] |
|
|
offsets = meta['data_offsets'] |
|
|
|
|
|
dtype_map = { |
|
|
'F32': np.float32, |
|
|
'F16': np.float16, |
|
|
'BF16': np.float16, |
|
|
'I32': np.int32, |
|
|
'I64': np.int64, |
|
|
'U8': np.uint8, |
|
|
} |
|
|
|
|
|
dtype = dtype_map.get(dtype_str, np.float32) |
|
|
|
|
|
|
|
|
start, end = offsets |
|
|
f.seek(8 + header_size + start) |
|
|
data = f.read(end - start) |
|
|
|
|
|
tensor = np.frombuffer(data, dtype=dtype).reshape(shape) |
|
|
weights[name] = tensor.astype(np.float32) |
|
|
|
|
|
return weights |
|
|
|
|
|
|
|
|
def load_gguf(filepath: str) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Load weights from GGUF format. |
|
|
|
|
|
GGUF is a binary format used by llama.cpp and similar. |
|
|
""" |
|
|
|
|
|
GGUF_MAGIC = 0x46554747 |
|
|
|
|
|
weights = {} |
|
|
|
|
|
with open(filepath, 'rb') as f: |
|
|
|
|
|
magic = struct.unpack('<I', f.read(4))[0] |
|
|
if magic != GGUF_MAGIC: |
|
|
raise ValueError(f"Invalid GGUF magic: {magic}") |
|
|
|
|
|
|
|
|
version = struct.unpack('<I', f.read(4))[0] |
|
|
|
|
|
|
|
|
tensor_count = struct.unpack('<Q', f.read(8))[0] |
|
|
kv_count = struct.unpack('<Q', f.read(8))[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f" GGUF v{version}: {tensor_count} tensors, {kv_count} KV pairs") |
|
|
print(" Note: Full GGUF parsing requires quantization support") |
|
|
|
|
|
return weights |
|
|
|
|
|
|
|
|
def load_pytorch(filepath: str) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Load weights from PyTorch format. |
|
|
""" |
|
|
try: |
|
|
import torch |
|
|
state_dict = torch.load(filepath, map_location='cpu') |
|
|
|
|
|
|
|
|
if 'state_dict' in state_dict: |
|
|
state_dict = state_dict['state_dict'] |
|
|
elif 'model' in state_dict: |
|
|
state_dict = state_dict['model'] |
|
|
|
|
|
weights = {} |
|
|
for name, tensor in state_dict.items(): |
|
|
weights[name] = tensor.numpy().astype(np.float32) |
|
|
|
|
|
return weights |
|
|
except ImportError: |
|
|
raise ImportError("PyTorch loading requires torch library") |
|
|
|
|
|
|
|
|
def load_numpy(filepath: str) -> Dict[str, np.ndarray]: |
|
|
"""Load weights from numpy format.""" |
|
|
if filepath.endswith('.npy'): |
|
|
return {'weights': np.load(filepath)} |
|
|
else: |
|
|
data = np.load(filepath, allow_pickle=True) |
|
|
return {k: data[k].astype(np.float32) for k in data.files} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DiscoveredModel: |
|
|
"""A model found on disk.""" |
|
|
name: str |
|
|
path: str |
|
|
format: str |
|
|
size_bytes: int |
|
|
category: str = 'unknown' |
|
|
|
|
|
|
|
|
def discover_models(search_paths: List[str]) -> List[DiscoveredModel]: |
|
|
""" |
|
|
Discover models in given directories. |
|
|
""" |
|
|
models = [] |
|
|
|
|
|
for search_path in search_paths: |
|
|
path = Path(search_path) |
|
|
if not path.exists(): |
|
|
continue |
|
|
|
|
|
for fmt in SUPPORTED_FORMATS: |
|
|
for ext in fmt.extensions: |
|
|
for filepath in path.rglob(f'*{ext}'): |
|
|
|
|
|
category = 'general' |
|
|
path_lower = str(filepath).lower() |
|
|
for cat in MODEL_CATEGORIES: |
|
|
if cat in path_lower: |
|
|
category = cat |
|
|
break |
|
|
|
|
|
models.append(DiscoveredModel( |
|
|
name=filepath.stem, |
|
|
path=str(filepath), |
|
|
format=fmt.name, |
|
|
size_bytes=filepath.stat().st_size, |
|
|
category=category, |
|
|
)) |
|
|
|
|
|
return models |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LayerInfo: |
|
|
"""Information about a model layer.""" |
|
|
name: str |
|
|
shape: Tuple[int, ...] |
|
|
params: int |
|
|
layer_type: str |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ModelAnalysis: |
|
|
"""Analysis of a model's architecture.""" |
|
|
name: str |
|
|
total_params: int |
|
|
layers: List[LayerInfo] |
|
|
neurons: int |
|
|
spheres_needed: int |
|
|
estimated_substrate_mb: float |
|
|
|
|
|
|
|
|
def analyze_weights(weights: Dict[str, np.ndarray], model_name: str) -> ModelAnalysis: |
|
|
""" |
|
|
Analyze model weights to plan substrate allocation. |
|
|
""" |
|
|
layers = [] |
|
|
total_params = 0 |
|
|
total_neurons = 0 |
|
|
|
|
|
for name, tensor in weights.items(): |
|
|
params = tensor.size |
|
|
total_params += params |
|
|
|
|
|
|
|
|
name_lower = name.lower() |
|
|
if 'embed' in name_lower: |
|
|
layer_type = 'embed' |
|
|
elif 'norm' in name_lower or 'ln' in name_lower: |
|
|
layer_type = 'norm' |
|
|
elif 'bias' in name_lower: |
|
|
layer_type = 'bias' |
|
|
elif 'weight' in name_lower or 'kernel' in name_lower: |
|
|
layer_type = 'weight' |
|
|
else: |
|
|
layer_type = 'param' |
|
|
|
|
|
|
|
|
if len(tensor.shape) >= 2: |
|
|
neurons = tensor.shape[0] |
|
|
else: |
|
|
neurons = tensor.shape[0] |
|
|
|
|
|
total_neurons += neurons |
|
|
|
|
|
layers.append(LayerInfo( |
|
|
name=name, |
|
|
shape=tensor.shape, |
|
|
params=params, |
|
|
layer_type=layer_type, |
|
|
)) |
|
|
|
|
|
|
|
|
spheres_needed = max(1, (total_neurons + VERTICES_PER_SPHERE - 1) // VERTICES_PER_SPHERE) |
|
|
|
|
|
|
|
|
|
|
|
bytes_per_junction = 8 + 100 + JUNCTION_OVERHEAD |
|
|
estimated_bytes = total_neurons * bytes_per_junction |
|
|
|
|
|
return ModelAnalysis( |
|
|
name=model_name, |
|
|
total_params=total_params, |
|
|
layers=layers, |
|
|
neurons=total_neurons, |
|
|
spheres_needed=spheres_needed, |
|
|
estimated_substrate_mb=estimated_bytes / (1024 * 1024), |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def icosahedron_vertices() -> np.ndarray: |
|
|
"""12 vertices of regular icosahedron.""" |
|
|
phi = (1 + np.sqrt(5)) / 2 |
|
|
verts = [] |
|
|
for s1 in [-1, 1]: |
|
|
for s2 in [-1, 1]: |
|
|
verts.append([0, s1, s2 * phi]) |
|
|
verts.append([s1, s2 * phi, 0]) |
|
|
verts.append([s1 * phi, 0, s2]) |
|
|
verts = np.array(verts, dtype=np.float32) |
|
|
return verts / np.linalg.norm(verts[0]) |
|
|
|
|
|
|
|
|
def geodesic_sphere(freq: int = 8) -> np.ndarray: |
|
|
"""Generate geodesic sphere vertices.""" |
|
|
ico = icosahedron_vertices() |
|
|
|
|
|
|
|
|
faces = [ |
|
|
[0, 1, 2], [0, 2, 3], [0, 3, 4], [0, 4, 5], [0, 5, 1], |
|
|
[1, 6, 2], [2, 6, 7], [2, 7, 3], [3, 7, 8], [3, 8, 4], |
|
|
[4, 8, 9], [4, 9, 5], [5, 9, 10], [5, 10, 1], [1, 10, 6], |
|
|
[6, 11, 7], [7, 11, 8], [8, 11, 9], [9, 11, 10], [10, 11, 6] |
|
|
] |
|
|
|
|
|
all_points = [] |
|
|
for face in faces: |
|
|
v1, v2, v3 = ico[face[0]], ico[face[1]], ico[face[2]] |
|
|
|
|
|
for i in range(freq + 1): |
|
|
for j in range(freq + 1 - i): |
|
|
k = freq - i - j |
|
|
p = (i * v1 + j * v2 + k * v3) / freq |
|
|
norm = np.linalg.norm(p) |
|
|
if norm > 0: |
|
|
p = p / norm |
|
|
all_points.append(p) |
|
|
|
|
|
|
|
|
unique = [] |
|
|
for p in all_points: |
|
|
is_dup = False |
|
|
for u in unique: |
|
|
if np.linalg.norm(p - u) < 1e-6: |
|
|
is_dup = True |
|
|
break |
|
|
if not is_dup: |
|
|
unique.append(p) |
|
|
|
|
|
return np.array(unique, dtype=np.float32) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Junction: |
|
|
"""A junction in the geometric substrate.""" |
|
|
vertex_id: int |
|
|
sphere_id: int |
|
|
position: np.ndarray |
|
|
weights: np.ndarray |
|
|
bias: float |
|
|
layer_name: str |
|
|
neuron_idx: int |
|
|
junction_type: str |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DysonSphere: |
|
|
"""A single geodesic sphere in the substrate.""" |
|
|
sphere_id: int |
|
|
vertices: np.ndarray |
|
|
junctions: Dict[int, Junction] |
|
|
layer_assignments: Dict[str, List[int]] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SubstrateArray: |
|
|
"""Array of Dyson Spheres forming the substrate.""" |
|
|
spheres: List[DysonSphere] |
|
|
spine_connections: List[Tuple[int, int]] |
|
|
model_name: str |
|
|
total_junctions: int |
|
|
|
|
|
|
|
|
def translate_to_substrate(weights: Dict[str, np.ndarray], |
|
|
analysis: ModelAnalysis) -> SubstrateArray: |
|
|
""" |
|
|
Translate model weights to geometric substrate. |
|
|
""" |
|
|
|
|
|
template_vertices = geodesic_sphere(GEODESIC_FREQ) |
|
|
|
|
|
|
|
|
spheres = [] |
|
|
for i in range(analysis.spheres_needed): |
|
|
sphere = DysonSphere( |
|
|
sphere_id=i, |
|
|
vertices=template_vertices.copy(), |
|
|
junctions={}, |
|
|
layer_assignments={}, |
|
|
) |
|
|
spheres.append(sphere) |
|
|
|
|
|
|
|
|
current_sphere = 0 |
|
|
current_vertex = 0 |
|
|
total_junctions = 0 |
|
|
|
|
|
for layer in analysis.layers: |
|
|
tensor = weights[layer.name] |
|
|
|
|
|
if len(tensor.shape) >= 2: |
|
|
|
|
|
for neuron_idx in range(tensor.shape[0]): |
|
|
if current_vertex >= VERTICES_PER_SPHERE: |
|
|
current_sphere += 1 |
|
|
current_vertex = 0 |
|
|
|
|
|
if current_sphere >= len(spheres): |
|
|
break |
|
|
|
|
|
sphere = spheres[current_sphere] |
|
|
|
|
|
|
|
|
junction = Junction( |
|
|
vertex_id=current_vertex, |
|
|
sphere_id=current_sphere, |
|
|
position=sphere.vertices[current_vertex % len(sphere.vertices)], |
|
|
weights=tensor[neuron_idx].flatten().astype(np.float32), |
|
|
bias=0.0, |
|
|
layer_name=layer.name, |
|
|
neuron_idx=neuron_idx, |
|
|
junction_type='MIXED', |
|
|
) |
|
|
|
|
|
sphere.junctions[current_vertex] = junction |
|
|
|
|
|
if layer.name not in sphere.layer_assignments: |
|
|
sphere.layer_assignments[layer.name] = [] |
|
|
sphere.layer_assignments[layer.name].append(current_vertex) |
|
|
|
|
|
current_vertex += 1 |
|
|
total_junctions += 1 |
|
|
|
|
|
else: |
|
|
|
|
|
if current_vertex >= VERTICES_PER_SPHERE: |
|
|
current_sphere += 1 |
|
|
current_vertex = 0 |
|
|
|
|
|
if current_sphere < len(spheres): |
|
|
sphere = spheres[current_sphere] |
|
|
|
|
|
junction = Junction( |
|
|
vertex_id=current_vertex, |
|
|
sphere_id=current_sphere, |
|
|
position=sphere.vertices[current_vertex % len(sphere.vertices)], |
|
|
weights=tensor.flatten().astype(np.float32), |
|
|
bias=0.0, |
|
|
layer_name=layer.name, |
|
|
neuron_idx=0, |
|
|
junction_type='BIAS', |
|
|
) |
|
|
|
|
|
sphere.junctions[current_vertex] = junction |
|
|
sphere.layer_assignments[layer.name] = [current_vertex] |
|
|
|
|
|
current_vertex += 1 |
|
|
total_junctions += 1 |
|
|
|
|
|
|
|
|
spine = [(i, i+1) for i in range(len(spheres) - 1)] |
|
|
|
|
|
return SubstrateArray( |
|
|
spheres=spheres, |
|
|
spine_connections=spine, |
|
|
model_name=analysis.name, |
|
|
total_junctions=total_junctions, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def serialize_substrate(substrate: SubstrateArray) -> Dict: |
|
|
"""Serialize substrate to JSON-compatible format.""" |
|
|
data = { |
|
|
'model_name': substrate.model_name, |
|
|
'total_junctions': substrate.total_junctions, |
|
|
'num_spheres': len(substrate.spheres), |
|
|
'spine_connections': substrate.spine_connections, |
|
|
'spheres': [], |
|
|
} |
|
|
|
|
|
for sphere in substrate.spheres: |
|
|
sphere_data = { |
|
|
'sphere_id': sphere.sphere_id, |
|
|
'num_junctions': len(sphere.junctions), |
|
|
'layer_assignments': sphere.layer_assignments, |
|
|
'junctions': {}, |
|
|
} |
|
|
|
|
|
for vid, junction in sphere.junctions.items(): |
|
|
sphere_data['junctions'][str(vid)] = { |
|
|
'vertex_id': junction.vertex_id, |
|
|
'position': junction.position.tolist(), |
|
|
'weights': junction.weights.tolist(), |
|
|
'bias': junction.bias, |
|
|
'layer_name': junction.layer_name, |
|
|
'neuron_idx': junction.neuron_idx, |
|
|
'junction_type': junction.junction_type, |
|
|
} |
|
|
|
|
|
data['spheres'].append(sphere_data) |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
def write_substrate(substrate: SubstrateArray, filepath: str): |
|
|
"""Write substrate to file - binary for large, JSON for small.""" |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
total_junctions = sum(len(s.junctions) for s in substrate.spheres) |
|
|
|
|
|
if total_junctions > 50000: |
|
|
|
|
|
npz_path = filepath.replace('.json', '.npz') |
|
|
|
|
|
all_junctions = [] |
|
|
for s in substrate.spheres: |
|
|
for j in s.junctions.values(): |
|
|
pos = j.position.tolist() if hasattr(j.position, 'tolist') else list(j.position) |
|
|
w = float(np.mean(j.weights)) if hasattr(j.weights, '__len__') else float(j.weights) |
|
|
all_junctions.append([j.vertex_id, j.sphere_id, w, *pos[:3]]) |
|
|
|
|
|
metadata = { |
|
|
'model_name': getattr(substrate, 'model_name', 'unknown'), |
|
|
'sphere_count': len(substrate.spheres), |
|
|
'junction_count': total_junctions, |
|
|
} |
|
|
|
|
|
np.savez_compressed(npz_path, |
|
|
junctions=np.array(all_junctions, dtype=np.float32) if all_junctions else np.zeros((0,6)), |
|
|
metadata=json.dumps(metadata) |
|
|
) |
|
|
print(f" Written binary: {npz_path} ({os.path.getsize(npz_path)/(1024*1024):.1f} MB)") |
|
|
return |
|
|
else: |
|
|
|
|
|
data = substrate.to_dict() if hasattr(substrate, 'to_dict') else {'spheres': []} |
|
|
with open(filepath, 'w') as f: |
|
|
json.dump(data, f) |
|
|
|
|
|
|
|
|
def verify_substrate(original_weights: Dict[str, np.ndarray], |
|
|
substrate: SubstrateArray) -> Dict: |
|
|
""" |
|
|
Verify substrate integrity via weight reconstruction. |
|
|
""" |
|
|
errors = [] |
|
|
max_error = 0.0 |
|
|
verified_junctions = 0 |
|
|
|
|
|
for sphere in substrate.spheres: |
|
|
for vid, junction in sphere.junctions.items(): |
|
|
|
|
|
layer_name = junction.layer_name |
|
|
if layer_name not in original_weights: |
|
|
errors.append(f"Missing layer: {layer_name}") |
|
|
continue |
|
|
|
|
|
original = original_weights[layer_name] |
|
|
|
|
|
if len(original.shape) >= 2: |
|
|
if junction.neuron_idx < original.shape[0]: |
|
|
original_weights_row = original[junction.neuron_idx].flatten() |
|
|
reconstructed = np.array(junction.weights) |
|
|
|
|
|
if len(original_weights_row) == len(reconstructed): |
|
|
error = np.max(np.abs(original_weights_row - reconstructed)) |
|
|
max_error = max(max_error, error) |
|
|
verified_junctions += 1 |
|
|
|
|
|
return { |
|
|
'verified_junctions': verified_junctions, |
|
|
'max_error': max_error, |
|
|
'errors': errors, |
|
|
'integrity_verified': max_error < 1e-6 and len(errors) == 0, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class HarmonicStackEntry: |
|
|
"""Entry in the Harmonic Stack registry.""" |
|
|
model_name: str |
|
|
category: str |
|
|
substrate_path: str |
|
|
spheres_start: int |
|
|
spheres_count: int |
|
|
total_junctions: int |
|
|
params_original: int |
|
|
import_timestamp: str |
|
|
checksum: str |
|
|
|
|
|
|
|
|
class HarmonicStackRegistry: |
|
|
""" |
|
|
Registry of all models in the Harmonic Stack. |
|
|
""" |
|
|
|
|
|
def __init__(self, registry_path: str = 'harmonic_stack_registry.json'): |
|
|
self.registry_path = registry_path |
|
|
self.entries: Dict[str, HarmonicStackEntry] = {} |
|
|
self.next_sphere = 0 |
|
|
self.load() |
|
|
|
|
|
def load(self): |
|
|
"""Load registry from disk.""" |
|
|
if os.path.exists(self.registry_path): |
|
|
with open(self.registry_path) as f: |
|
|
data = json.load(f) |
|
|
for name, entry_data in data.get('entries', {}).items(): |
|
|
self.entries[name] = HarmonicStackEntry(**entry_data) |
|
|
self.next_sphere = data.get('next_sphere', 0) |
|
|
|
|
|
def save(self): |
|
|
"""Save registry to disk.""" |
|
|
data = { |
|
|
'entries': {name: entry.__dict__ for name, entry in self.entries.items()}, |
|
|
'next_sphere': self.next_sphere, |
|
|
'last_updated': datetime.now().isoformat(), |
|
|
} |
|
|
with open(self.registry_path, 'w') as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
def register(self, model_name: str, category: str, substrate_path: str, |
|
|
spheres_count: int, total_junctions: int, params_original: int) -> HarmonicStackEntry: |
|
|
"""Register a model in the stack.""" |
|
|
|
|
|
with open(substrate_path, 'rb') as f: |
|
|
checksum = hashlib.md5(f.read()).hexdigest() |
|
|
|
|
|
entry = HarmonicStackEntry( |
|
|
model_name=model_name, |
|
|
category=category, |
|
|
substrate_path=substrate_path, |
|
|
spheres_start=self.next_sphere, |
|
|
spheres_count=spheres_count, |
|
|
total_junctions=total_junctions, |
|
|
params_original=params_original, |
|
|
import_timestamp=datetime.now().isoformat(), |
|
|
checksum=checksum, |
|
|
) |
|
|
|
|
|
self.entries[model_name] = entry |
|
|
self.next_sphere += spheres_count |
|
|
self.save() |
|
|
|
|
|
return entry |
|
|
|
|
|
def get_by_category(self, category: str) -> List[HarmonicStackEntry]: |
|
|
"""Get all models in a category.""" |
|
|
return [e for e in self.entries.values() if e.category == category] |
|
|
|
|
|
def summary(self) -> Dict: |
|
|
"""Get stack summary.""" |
|
|
by_category = {} |
|
|
for entry in self.entries.values(): |
|
|
cat = entry.category |
|
|
if cat not in by_category: |
|
|
by_category[cat] = {'count': 0, 'junctions': 0, 'spheres': 0} |
|
|
by_category[cat]['count'] += 1 |
|
|
by_category[cat]['junctions'] += entry.total_junctions |
|
|
by_category[cat]['spheres'] += entry.spheres_count |
|
|
|
|
|
return { |
|
|
'total_models': len(self.entries), |
|
|
'total_spheres': self.next_sphere, |
|
|
'by_category': by_category, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def translate_model(model_path: str, category: str = 'general', |
|
|
output_dir: str = '.') -> Dict: |
|
|
""" |
|
|
Complete pipeline: Load → Analyze → Translate → Write → Verify → Register |
|
|
""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"TRANSLATING: {model_path}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
model_path = Path(model_path) |
|
|
suffix = model_path.suffix.lower() |
|
|
|
|
|
loader = None |
|
|
for fmt in SUPPORTED_FORMATS: |
|
|
if suffix in fmt.extensions: |
|
|
loader = globals()[f'load_{fmt.name}'] |
|
|
break |
|
|
|
|
|
if loader is None: |
|
|
raise ValueError(f"Unsupported format: {suffix}") |
|
|
|
|
|
|
|
|
print("\n[1/6] Loading weights...") |
|
|
weights = loader(str(model_path)) |
|
|
print(f" Loaded {len(weights)} tensors") |
|
|
|
|
|
|
|
|
print("\n[2/6] Analyzing model...") |
|
|
analysis = analyze_weights(weights, model_path.stem) |
|
|
print(f" Total params: {analysis.total_params:,}") |
|
|
print(f" Neurons: {analysis.neurons:,}") |
|
|
print(f" Spheres needed: {analysis.spheres_needed}") |
|
|
print(f" Estimated substrate: {analysis.estimated_substrate_mb:.1f} MB") |
|
|
|
|
|
|
|
|
print("\n[3/6] Translating to geometric substrate...") |
|
|
substrate = translate_to_substrate(weights, analysis) |
|
|
print(f" Created {len(substrate.spheres)} spheres") |
|
|
print(f" Total junctions: {substrate.total_junctions:,}") |
|
|
|
|
|
|
|
|
print("\n[4/6] Writing substrate...") |
|
|
output_path = Path(output_dir) / f"{model_path.stem}_substrate.json" |
|
|
write_substrate(substrate, str(output_path)) |
|
|
|
|
|
npz_path = output_path.with_suffix('.npz') |
|
|
if npz_path.exists(): |
|
|
file_size = npz_path.stat().st_size / (1024 * 1024) |
|
|
output_path = npz_path |
|
|
else: |
|
|
file_size = output_path.stat().st_size / (1024 * 1024) |
|
|
print(f" Written to: {output_path}") |
|
|
print(f" File size: {file_size:.1f} MB") |
|
|
|
|
|
|
|
|
print("\n[5/6] Verifying integrity...") |
|
|
verification = verify_substrate(weights, substrate) |
|
|
print(f" Verified junctions: {verification['verified_junctions']:,}") |
|
|
print(f" Max error: {verification['max_error']:.2e}") |
|
|
print(f" Integrity: {'✓ VERIFIED' if verification['integrity_verified'] else '✗ FAILED'}") |
|
|
|
|
|
|
|
|
print("\n[6/6] Registering in Harmonic Stack...") |
|
|
registry = HarmonicStackRegistry() |
|
|
entry = registry.register( |
|
|
model_name=model_path.stem, |
|
|
category=category, |
|
|
substrate_path=str(output_path), |
|
|
spheres_count=len(substrate.spheres), |
|
|
total_junctions=substrate.total_junctions, |
|
|
params_original=analysis.total_params, |
|
|
) |
|
|
print(f" Registered as: {entry.model_name}") |
|
|
print(f" Spheres: {entry.spheres_start} - {entry.spheres_start + entry.spheres_count - 1}") |
|
|
|
|
|
|
|
|
stack_summary = registry.summary() |
|
|
print(f"\n{'='*60}") |
|
|
print("HARMONIC STACK STATUS") |
|
|
print(f"{'='*60}") |
|
|
print(f" Total models: {stack_summary['total_models']}") |
|
|
print(f" Total spheres: {stack_summary['total_spheres']}") |
|
|
for cat, stats in stack_summary['by_category'].items(): |
|
|
print(f" {cat}: {stats['count']} models, {stats['junctions']:,} junctions") |
|
|
|
|
|
return { |
|
|
'model_name': model_path.stem, |
|
|
'analysis': analysis.__dict__, |
|
|
'substrate_path': str(output_path), |
|
|
'verification': verification, |
|
|
'stack_entry': entry.__dict__, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
print("=" * 70) |
|
|
print("HARMONIC STACK MODEL TRANSLATION PIPELINE") |
|
|
print("Ghost in the Machine Labs") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
test_model = Path('test_model.safetensors') |
|
|
if test_model.exists(): |
|
|
print(f"\nFound test model: {test_model}") |
|
|
result = translate_model(str(test_model), category='test') |
|
|
else: |
|
|
print("\nNo test model found. Creating synthetic test...") |
|
|
|
|
|
|
|
|
print("\nCreating synthetic model for pipeline test...") |
|
|
|
|
|
weights = { |
|
|
'encoder.weight': np.random.randn(256, 128).astype(np.float32), |
|
|
'encoder.bias': np.random.randn(256).astype(np.float32), |
|
|
'hidden.weight': np.random.randn(512, 256).astype(np.float32), |
|
|
'hidden.bias': np.random.randn(512).astype(np.float32), |
|
|
'decoder.weight': np.random.randn(128, 512).astype(np.float32), |
|
|
'decoder.bias': np.random.randn(128).astype(np.float32), |
|
|
} |
|
|
|
|
|
|
|
|
np.savez('synthetic_model.npz', **weights) |
|
|
print(" Created synthetic_model.npz") |
|
|
|
|
|
result = translate_model('synthetic_model.npz', category='test') |
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("TRANSLATION COMPLETE") |
|
|
print("=" * 70) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|