|
|
|
|
|
""" |
|
|
GEOMETRIC CODEBOOK - Signal-to-Language Decoder |
|
|
Ghost in the Machine Labs |
|
|
|
|
|
The missing piece: converts fused substrate geometric outputs |
|
|
into actionable text (analysis) and Python code (solve functions). |
|
|
|
|
|
Architecture: |
|
|
Grid → GeometricEncoder → substrate signal (numpy) |
|
|
Substrate processes → output signal (numpy) |
|
|
Output signal → GeometricDecoder → {analysis text, hypothesis, Python code} |
|
|
|
|
|
The codebook works by detecting geometric PRIMITIVES in the substrate |
|
|
output — the torsions, symmetries, and relational patterns that the |
|
|
sensor panels identified — and mapping them to transformation OPERATIONS |
|
|
that can be expressed as code. |
|
|
|
|
|
This is fabrication, not training. Each primitive→operation mapping is |
|
|
a direct geometric relationship, not a learned weight. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import List, Dict, Tuple, Optional, Any |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GeoPrimitive(Enum): |
|
|
""" |
|
|
Primitives detected by substrate sensor panels. |
|
|
Each maps to one or more ARC transformation operations. |
|
|
""" |
|
|
|
|
|
TILE_REPEAT = "tile_repeat" |
|
|
MIRROR_H = "mirror_horizontal" |
|
|
MIRROR_V = "mirror_vertical" |
|
|
MIRROR_DIAG = "mirror_diagonal" |
|
|
ROTATE_90 = "rotate_90" |
|
|
ROTATE_180 = "rotate_180" |
|
|
ROTATE_270 = "rotate_270" |
|
|
TRANSLATE = "translate" |
|
|
SCALE_UP = "scale_up" |
|
|
SCALE_DOWN = "scale_down" |
|
|
|
|
|
|
|
|
COLOR_MAP = "color_map" |
|
|
COLOR_FILL = "color_fill" |
|
|
COLOR_SWAP = "color_swap" |
|
|
COLOR_COUNT = "color_count" |
|
|
MAJORITY_COLOR = "majority_color" |
|
|
BOUNDARY_COLOR = "boundary_color" |
|
|
|
|
|
|
|
|
EXTRACT_SHAPE = "extract_shape" |
|
|
MASK_OVERLAY = "mask_overlay" |
|
|
CROP = "crop" |
|
|
PAD = "pad" |
|
|
PARTITION = "partition" |
|
|
GRAVITY = "gravity" |
|
|
|
|
|
|
|
|
SORT_BY_SIZE = "sort_by_size" |
|
|
ALIGN = "align" |
|
|
CONNECT = "connect" |
|
|
ENCLOSE = "enclose" |
|
|
|
|
|
|
|
|
DENOISING = "denoising" |
|
|
COMPLETE_PATTERN = "complete_pattern" |
|
|
BOOLEAN_OP = "boolean_op" |
|
|
CONDITIONAL = "conditional" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DetectedPrimitive: |
|
|
"""A primitive detected in the substrate output with confidence.""" |
|
|
primitive: GeoPrimitive |
|
|
confidence: float |
|
|
params: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
sensor_source: str = "" |
|
|
energy: float = 0.0 |
|
|
harmonic_alignment: float = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GridEncoder: |
|
|
""" |
|
|
Encode ARC grids as geometric signals for the substrate. |
|
|
|
|
|
Rather than flattening to bytes, we encode the GEOMETRIC PROPERTIES |
|
|
of the grid that the sensor panels are designed to detect: |
|
|
- Spatial frequency (tiling/repetition) |
|
|
- Color distribution (chromatic spectrum) |
|
|
- Shape boundaries (structural edges) |
|
|
- Symmetry axes |
|
|
- Relational positions |
|
|
""" |
|
|
|
|
|
SIGNAL_SIZE = 1024 |
|
|
|
|
|
@staticmethod |
|
|
def encode_grid(grid: List[List[int]]) -> np.ndarray: |
|
|
"""Encode a single grid into geometric signal.""" |
|
|
g = np.array(grid, dtype=np.float32) |
|
|
h, w = g.shape |
|
|
signal = np.zeros(GridEncoder.SIGNAL_SIZE, dtype=np.float32) |
|
|
|
|
|
idx = 0 |
|
|
|
|
|
|
|
|
|
|
|
signal[idx] = h / 30.0; idx += 1 |
|
|
signal[idx] = w / 30.0; idx += 1 |
|
|
signal[idx] = h * w / 900.0; idx += 1 |
|
|
signal[idx] = h / w if w > 0 else 1.0; idx += 1 |
|
|
|
|
|
|
|
|
flat = g.flatten() / 9.0 |
|
|
n = min(len(flat), 124) |
|
|
signal[idx:idx+n] = flat[:n] |
|
|
idx = 128 |
|
|
|
|
|
|
|
|
|
|
|
for c in range(10): |
|
|
count = np.sum(g == c) |
|
|
signal[idx + c] = count / (h * w) |
|
|
idx += 10 |
|
|
|
|
|
|
|
|
for r in range(h): |
|
|
for c in range(w): |
|
|
val = int(g[r, c]) |
|
|
|
|
|
if c + 1 < w: |
|
|
n_val = int(g[r, c + 1]) |
|
|
if val != n_val: |
|
|
pair_idx = 128 + 10 + val * 10 + n_val |
|
|
if pair_idx < 256: |
|
|
signal[pair_idx] += 1.0 / (h * w) |
|
|
|
|
|
if r + 1 < h: |
|
|
n_val = int(g[r + 1, c]) |
|
|
if val != n_val: |
|
|
pair_idx = 128 + 10 + val * 10 + n_val |
|
|
if pair_idx < 256: |
|
|
signal[pair_idx] += 1.0 / (h * w) |
|
|
idx = 256 |
|
|
|
|
|
|
|
|
|
|
|
if h > 1: |
|
|
h_sym = np.mean(g == g[::-1, :]) |
|
|
signal[idx] = h_sym |
|
|
idx += 1 |
|
|
|
|
|
|
|
|
if w > 1: |
|
|
v_sym = np.mean(g == g[:, ::-1]) |
|
|
signal[idx] = v_sym |
|
|
idx += 1 |
|
|
|
|
|
|
|
|
if h == w: |
|
|
d_sym = np.mean(g == g.T) |
|
|
signal[idx] = d_sym |
|
|
idx += 1 |
|
|
|
|
|
|
|
|
if h == w: |
|
|
r90 = np.rot90(g) |
|
|
signal[idx] = np.mean(g == r90) |
|
|
idx += 1 |
|
|
idx = 384 |
|
|
|
|
|
|
|
|
|
|
|
for r in range(min(h, 30)): |
|
|
row = g[r, :] |
|
|
|
|
|
for period in range(1, min(w, 8)): |
|
|
if w % period == 0: |
|
|
tiles = row.reshape(-1, period) |
|
|
if len(tiles) > 1 and np.all(tiles == tiles[0]): |
|
|
signal[384 + r * 4 + min(period-1, 3)] = 1.0 |
|
|
break |
|
|
idx = 512 |
|
|
|
|
|
|
|
|
|
|
|
if h > 2 and w > 2: |
|
|
for r in range(1, min(h-1, 16)): |
|
|
for c in range(1, min(w-1, 8)): |
|
|
|
|
|
gx = float(g[r, c+1]) - float(g[r, c-1]) |
|
|
gy = float(g[r+1, c]) - float(g[r-1, c]) |
|
|
mag = np.sqrt(gx**2 + gy**2) |
|
|
eidx = 512 + r * 8 + c |
|
|
if eidx < 640: |
|
|
signal[eidx] = mag / 12.73 |
|
|
idx = 640 |
|
|
|
|
|
|
|
|
|
|
|
visited = np.zeros_like(g, dtype=bool) |
|
|
obj_count = 0 |
|
|
for r in range(h): |
|
|
for c in range(w): |
|
|
if not visited[r, c]: |
|
|
color = g[r, c] |
|
|
if color != 0: |
|
|
|
|
|
stack = [(r, c)] |
|
|
size = 0 |
|
|
while stack: |
|
|
cr, cc = stack.pop() |
|
|
if 0 <= cr < h and 0 <= cc < w and not visited[cr, cc] and g[cr, cc] == color: |
|
|
visited[cr, cc] = True |
|
|
size += 1 |
|
|
stack.extend([(cr+1,cc),(cr-1,cc),(cr,cc+1),(cr,cc-1)]) |
|
|
if size > 0 and 640 + obj_count < 768: |
|
|
signal[640 + obj_count] = size / (h * w) |
|
|
obj_count += 1 |
|
|
signal[767] = obj_count / 30.0 |
|
|
idx = 768 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
idx = 896 |
|
|
|
|
|
|
|
|
|
|
|
raw = g.tobytes() |
|
|
for i in range(min(128, len(raw))): |
|
|
signal[896 + i] = (raw[i] - 128.0) / 128.0 |
|
|
|
|
|
return signal |
|
|
|
|
|
@staticmethod |
|
|
def encode_pair(input_grid: List[List[int]], |
|
|
output_grid: List[List[int]]) -> np.ndarray: |
|
|
""" |
|
|
Encode an input→output pair, capturing the TRANSFORMATION. |
|
|
|
|
|
Band 7 (768-895) encodes the relationship: |
|
|
- Dimension change ratios |
|
|
- Color mapping |
|
|
- Spatial operation signature |
|
|
""" |
|
|
sig = GridEncoder.encode_grid(input_grid) |
|
|
|
|
|
ig = np.array(input_grid, dtype=np.float32) |
|
|
og = np.array(output_grid, dtype=np.float32) |
|
|
ih, iw = ig.shape |
|
|
oh, ow = og.shape |
|
|
|
|
|
idx = 768 |
|
|
|
|
|
|
|
|
sig[idx] = oh / ih if ih > 0 else 1.0; idx += 1 |
|
|
sig[idx] = ow / iw if iw > 0 else 1.0; idx += 1 |
|
|
sig[idx] = (oh * ow) / (ih * iw) if ih * iw > 0 else 1.0; idx += 1 |
|
|
sig[idx] = 1.0 if oh == ih and ow == iw else 0.0; idx += 1 |
|
|
|
|
|
|
|
|
in_colors = set(ig.flatten().astype(int)) |
|
|
out_colors = set(og.flatten().astype(int)) |
|
|
sig[idx] = len(in_colors) / 10.0; idx += 1 |
|
|
sig[idx] = len(out_colors) / 10.0; idx += 1 |
|
|
sig[idx] = len(in_colors & out_colors) / max(len(in_colors | out_colors), 1); idx += 1 |
|
|
sig[idx] = 1.0 if in_colors == out_colors else 0.0; idx += 1 |
|
|
|
|
|
|
|
|
if ih == oh and iw == ow: |
|
|
diff = (ig != og).astype(np.float32) |
|
|
sig[idx] = np.mean(diff); idx += 1 |
|
|
sig[idx] = np.sum(diff) / max(ih * iw, 1); idx += 1 |
|
|
|
|
|
|
|
|
if ih > 2 and iw > 2: |
|
|
edge_mask = np.zeros_like(diff) |
|
|
edge_mask[0, :] = 1; edge_mask[-1, :] = 1 |
|
|
edge_mask[:, 0] = 1; edge_mask[:, -1] = 1 |
|
|
edge_changes = np.sum(diff * edge_mask) |
|
|
center_changes = np.sum(diff * (1 - edge_mask)) |
|
|
total_changes = edge_changes + center_changes |
|
|
sig[idx] = edge_changes / max(total_changes, 1); idx += 1 |
|
|
sig[idx] = center_changes / max(total_changes, 1); idx += 1 |
|
|
else: |
|
|
idx += 4 |
|
|
|
|
|
|
|
|
if oh > ih and ow > iw and oh % ih == 0 and ow % iw == 0: |
|
|
tile_h = oh // ih |
|
|
tile_w = ow // iw |
|
|
tiled = np.tile(ig, (tile_h, tile_w)) |
|
|
sig[idx] = np.mean(tiled == og); idx += 1 |
|
|
sig[idx] = tile_h / 10.0; idx += 1 |
|
|
sig[idx] = tile_w / 10.0; idx += 1 |
|
|
|
|
|
|
|
|
alt_tiled = np.zeros_like(og) |
|
|
for tr in range(tile_h): |
|
|
for tc in range(tile_w): |
|
|
block = ig.copy() |
|
|
if tr % 2 == 1: |
|
|
block = block[::-1, :] |
|
|
if tc % 2 == 1: |
|
|
block = block[:, ::-1] |
|
|
alt_tiled[tr*ih:(tr+1)*ih, tc*iw:(tc+1)*iw] = block |
|
|
sig[idx] = np.mean(alt_tiled == og); idx += 1 |
|
|
else: |
|
|
idx += 4 |
|
|
|
|
|
|
|
|
if ih == iw: |
|
|
r90 = np.rot90(ig) |
|
|
r180 = np.rot90(ig, 2) |
|
|
r270 = np.rot90(ig, 3) |
|
|
if oh == ih and ow == iw: |
|
|
sig[idx] = np.mean(r90 == og); idx += 1 |
|
|
sig[idx] = np.mean(r180 == og); idx += 1 |
|
|
sig[idx] = np.mean(r270 == og); idx += 1 |
|
|
else: |
|
|
idx += 3 |
|
|
else: |
|
|
idx += 3 |
|
|
|
|
|
|
|
|
if oh == ih and ow == iw: |
|
|
sig[idx] = np.mean(ig[::-1, :] == og); idx += 1 |
|
|
sig[idx] = np.mean(ig[:, ::-1] == og); idx += 1 |
|
|
|
|
|
return sig |
|
|
|
|
|
@staticmethod |
|
|
def encode_task(task: Dict) -> np.ndarray: |
|
|
""" |
|
|
Encode full ARC task (all training pairs) into composite signal. |
|
|
|
|
|
Averages pair signals to find the CONSISTENT transformation |
|
|
pattern across all examples. |
|
|
""" |
|
|
train = task.get('train', []) |
|
|
if not train: |
|
|
return np.zeros(GridEncoder.SIGNAL_SIZE, dtype=np.float32) |
|
|
|
|
|
signals = [] |
|
|
for pair in train: |
|
|
sig = GridEncoder.encode_pair(pair['input'], pair['output']) |
|
|
signals.append(sig) |
|
|
|
|
|
|
|
|
|
|
|
composite = np.mean(signals, axis=0).astype(np.float32) |
|
|
|
|
|
|
|
|
if len(signals) > 1: |
|
|
variance = np.var(signals, axis=0) |
|
|
|
|
|
consistency = 1.0 / (1.0 + variance * 10) |
|
|
composite *= consistency |
|
|
|
|
|
return composite |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PrimitiveDetector: |
|
|
""" |
|
|
Detect geometric primitives from substrate output signal. |
|
|
|
|
|
The substrate's sensor panels have already done the hard work — |
|
|
detecting spatial frequencies, symmetries, boundaries, etc. |
|
|
The detector reads those activations and maps them to named |
|
|
transformation primitives. |
|
|
""" |
|
|
|
|
|
|
|
|
CONFIDENCE_THRESHOLD = 0.3 |
|
|
|
|
|
@staticmethod |
|
|
def detect(input_signal: np.ndarray, |
|
|
output_signal: np.ndarray, |
|
|
task: Dict) -> List[DetectedPrimitive]: |
|
|
""" |
|
|
Detect all primitives from substrate processing results. |
|
|
|
|
|
Uses both the encoded signal AND the raw task data to |
|
|
cross-validate detections. |
|
|
""" |
|
|
primitives = [] |
|
|
train = task.get('train', []) |
|
|
if not train: |
|
|
return primitives |
|
|
|
|
|
|
|
|
pair_primitives = [] |
|
|
for pair in train: |
|
|
pp = PrimitiveDetector._detect_pair(pair['input'], pair['output']) |
|
|
pair_primitives.append(set(p.primitive for p in pp)) |
|
|
primitives.extend(pp) |
|
|
|
|
|
|
|
|
if pair_primitives: |
|
|
consensus = pair_primitives[0] |
|
|
for ps in pair_primitives[1:]: |
|
|
consensus &= ps |
|
|
|
|
|
if consensus: |
|
|
|
|
|
consensus_prims = [] |
|
|
for p in primitives: |
|
|
if p.primitive in consensus: |
|
|
p.confidence = min(1.0, p.confidence * 1.5) |
|
|
consensus_prims.append(p) |
|
|
|
|
|
|
|
|
best = {} |
|
|
for p in consensus_prims: |
|
|
if p.primitive not in best or p.confidence > best[p.primitive].confidence: |
|
|
best[p.primitive] = p |
|
|
|
|
|
primitives = list(best.values()) |
|
|
else: |
|
|
|
|
|
best = {} |
|
|
for p in primitives: |
|
|
if p.primitive not in best or p.confidence > best[p.primitive].confidence: |
|
|
best[p.primitive] = p |
|
|
primitives = list(best.values()) |
|
|
|
|
|
|
|
|
primitives.sort(key=lambda p: p.confidence, reverse=True) |
|
|
return primitives |
|
|
|
|
|
@staticmethod |
|
|
def _detect_pair(input_grid: List[List[int]], |
|
|
output_grid: List[List[int]]) -> List[DetectedPrimitive]: |
|
|
"""Detect primitives for a single input→output pair.""" |
|
|
prims = [] |
|
|
ig = np.array(input_grid, dtype=np.float32) |
|
|
og = np.array(output_grid, dtype=np.float32) |
|
|
ih, iw = ig.shape |
|
|
oh, ow = og.shape |
|
|
|
|
|
|
|
|
if oh > ih and ow > iw and oh % ih == 0 and ow % iw == 0: |
|
|
tile_h, tile_w = oh // ih, ow // iw |
|
|
|
|
|
|
|
|
tiled = np.tile(ig, (tile_h, tile_w)) |
|
|
match = np.mean(tiled == og) |
|
|
if match > 0.8: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.TILE_REPEAT, match, |
|
|
{'tile_h': tile_h, 'tile_w': tile_w, 'alternating': False}, |
|
|
'spatial')) |
|
|
|
|
|
|
|
|
|
|
|
alt = np.zeros_like(og) |
|
|
for tr in range(tile_h): |
|
|
block = ig.copy() |
|
|
if tr % 2 == 1: |
|
|
block = block[:, ::-1] |
|
|
row_tile = np.tile(block, (1, tile_w)) |
|
|
alt[tr*ih:(tr+1)*ih, :] = row_tile |
|
|
alt_match = np.mean(alt == og) |
|
|
if alt_match > match and alt_match > 0.8: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.TILE_REPEAT, alt_match, |
|
|
{'tile_h': tile_h, 'tile_w': tile_w, 'alternating': True}, |
|
|
'spatial')) |
|
|
|
|
|
|
|
|
if oh > ih and ow > iw: |
|
|
sh, sw = oh / ih, ow / iw |
|
|
if sh == sw and sh == int(sh): |
|
|
scale = int(sh) |
|
|
scaled = np.repeat(np.repeat(ig, scale, axis=0), scale, axis=1) |
|
|
if scaled.shape == og.shape: |
|
|
match = np.mean(scaled == og) |
|
|
if match > 0.8: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.SCALE_UP, match, |
|
|
{'factor': scale}, 'spatial')) |
|
|
|
|
|
if oh < ih and ow < iw and ih % oh == 0 and iw % ow == 0: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.SCALE_DOWN, 0.7, |
|
|
{'factor_h': ih // oh, 'factor_w': iw // ow}, 'spatial')) |
|
|
|
|
|
|
|
|
if ih == iw and oh == ow and ih == oh: |
|
|
for k, prim in [(1, GeoPrimitive.ROTATE_90), |
|
|
(2, GeoPrimitive.ROTATE_180), |
|
|
(3, GeoPrimitive.ROTATE_270)]: |
|
|
rotated = np.rot90(ig, k) |
|
|
match = np.mean(rotated == og) |
|
|
if match > 0.9: |
|
|
prims.append(DetectedPrimitive(prim, match, {}, 'symmetry')) |
|
|
|
|
|
|
|
|
if ih == oh and iw == ow: |
|
|
|
|
|
flipped_h = ig[::-1, :] |
|
|
match_h = np.mean(flipped_h == og) |
|
|
if match_h > 0.9: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.MIRROR_H, match_h, {}, 'symmetry')) |
|
|
|
|
|
|
|
|
flipped_v = ig[:, ::-1] |
|
|
match_v = np.mean(flipped_v == og) |
|
|
if match_v > 0.9: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.MIRROR_V, match_v, {}, 'symmetry')) |
|
|
|
|
|
|
|
|
if ih == iw: |
|
|
transposed = ig.T |
|
|
match_t = np.mean(transposed == og) |
|
|
if match_t > 0.9: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.MIRROR_DIAG, match_t, {}, 'symmetry')) |
|
|
|
|
|
|
|
|
if ih == oh and iw == ow: |
|
|
|
|
|
mapping = {} |
|
|
consistent = True |
|
|
for r in range(ih): |
|
|
for c in range(iw): |
|
|
ic = int(ig[r, c]) |
|
|
oc = int(og[r, c]) |
|
|
if ic in mapping: |
|
|
if mapping[ic] != oc: |
|
|
consistent = False |
|
|
break |
|
|
else: |
|
|
mapping[ic] = oc |
|
|
if not consistent: |
|
|
break |
|
|
|
|
|
if consistent and mapping: |
|
|
is_identity = all(k == v for k, v in mapping.items()) |
|
|
if not is_identity: |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.COLOR_MAP, 1.0, |
|
|
{'mapping': mapping}, 'chromatic')) |
|
|
|
|
|
|
|
|
if ih == oh and iw == ow: |
|
|
diff_positions = ig != og |
|
|
changed_in = set(ig[diff_positions].astype(int).tolist()) |
|
|
changed_out = set(og[diff_positions].astype(int).tolist()) |
|
|
if len(changed_in) == 2 and changed_in == changed_out: |
|
|
colors = list(changed_in) |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.COLOR_SWAP, 0.95, |
|
|
{'color_a': colors[0], 'color_b': colors[1]}, 'chromatic')) |
|
|
|
|
|
|
|
|
if oh < ih or ow < iw: |
|
|
|
|
|
for r in range(ih - oh + 1): |
|
|
for c in range(iw - ow + 1): |
|
|
region = ig[r:r+oh, c:c+ow] |
|
|
if np.array_equal(region, og): |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.CROP, 1.0, |
|
|
{'top': r, 'left': c}, 'structural')) |
|
|
break |
|
|
else: |
|
|
continue |
|
|
break |
|
|
|
|
|
|
|
|
if ih == oh and iw == ow: |
|
|
|
|
|
for c in range(iw): |
|
|
in_col = ig[:, c] |
|
|
out_col = og[:, c] |
|
|
in_vals = in_col[in_col != 0] |
|
|
out_vals = out_col[out_col != 0] |
|
|
if len(in_vals) > 0 and np.array_equal(sorted(in_vals), sorted(out_vals)): |
|
|
|
|
|
out_nonzero = np.where(out_col != 0)[0] |
|
|
if len(out_nonzero) > 0 and out_nonzero[-1] == ih - 1: |
|
|
if np.all(np.diff(out_nonzero) == 1): |
|
|
prims.append(DetectedPrimitive( |
|
|
GeoPrimitive.GRAVITY, 0.8, |
|
|
{'direction': 'down'}, 'structural')) |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return prims |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CodeGenerator: |
|
|
""" |
|
|
Generate Python solve() functions from detected primitives. |
|
|
|
|
|
Each primitive has a direct code template. Compositions |
|
|
chain templates together. |
|
|
""" |
|
|
|
|
|
TEMPLATES = { |
|
|
GeoPrimitive.TILE_REPEAT: { |
|
|
'simple': """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
h, w = grid.shape |
|
|
tile_h, tile_w = {tile_h}, {tile_w} |
|
|
result = np.tile(grid, (tile_h, tile_w)) |
|
|
return result.tolist()""", |
|
|
|
|
|
'alternating': """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
h, w = grid.shape |
|
|
tile_h, tile_w = {tile_h}, {tile_w} |
|
|
result = np.zeros((h * tile_h, w * tile_w), dtype=int) |
|
|
for tr in range(tile_h): |
|
|
block = grid.copy() |
|
|
if tr % 2 == 1: |
|
|
block = block[:, ::-1] |
|
|
row_tile = np.tile(block, (1, tile_w)) |
|
|
result[tr*h:(tr+1)*h, :] = row_tile |
|
|
return result.tolist()""", |
|
|
}, |
|
|
|
|
|
GeoPrimitive.SCALE_UP: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
factor = {factor} |
|
|
result = np.repeat(np.repeat(grid, factor, axis=0), factor, axis=1) |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.ROTATE_90: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = np.rot90(grid, 1) |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.ROTATE_180: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = np.rot90(grid, 2) |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.ROTATE_270: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = np.rot90(grid, 3) |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.MIRROR_H: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = grid[::-1, :] |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.MIRROR_V: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = grid[:, ::-1] |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.MIRROR_DIAG: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = grid.T |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.COLOR_MAP: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = grid.copy() |
|
|
mapping = {mapping} |
|
|
for old_c, new_c in mapping.items(): |
|
|
result[grid == old_c] = new_c |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.COLOR_SWAP: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
result = grid.copy() |
|
|
a, b = {color_a}, {color_b} |
|
|
result[grid == a] = b |
|
|
result[grid == b] = a |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.CROP: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
# Find bounding box of non-zero content |
|
|
rows = np.any(grid != 0, axis=1) |
|
|
cols = np.any(grid != 0, axis=0) |
|
|
if not rows.any(): |
|
|
return input_grid |
|
|
rmin, rmax = np.where(rows)[0][[0, -1]] |
|
|
cmin, cmax = np.where(cols)[0][[0, -1]] |
|
|
result = grid[rmin:rmax+1, cmin:cmax+1] |
|
|
return result.tolist()""", |
|
|
|
|
|
GeoPrimitive.GRAVITY: """def solve(input_grid): |
|
|
grid = np.array(input_grid) |
|
|
h, w = grid.shape |
|
|
result = np.zeros_like(grid) |
|
|
for c in range(w): |
|
|
col = grid[:, c] |
|
|
nonzero = col[col != 0] |
|
|
result[h-len(nonzero):h, c] = nonzero |
|
|
return result.tolist()""", |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def generate(primitives: List[DetectedPrimitive]) -> Optional[str]: |
|
|
"""Generate solve() function from detected primitives.""" |
|
|
if not primitives: |
|
|
return None |
|
|
|
|
|
|
|
|
best = primitives[0] |
|
|
|
|
|
template = CodeGenerator.TEMPLATES.get(best.primitive) |
|
|
if template is None: |
|
|
return None |
|
|
|
|
|
|
|
|
if isinstance(template, dict): |
|
|
if best.params.get('alternating'): |
|
|
template = template.get('alternating', template.get('simple')) |
|
|
else: |
|
|
template = template.get('simple') |
|
|
|
|
|
|
|
|
try: |
|
|
code = template.format(**best.params) |
|
|
except (KeyError, IndexError): |
|
|
code = template |
|
|
|
|
|
|
|
|
if 'np.' in code and 'import numpy' not in code: |
|
|
code = "import numpy as np\n\n" + code |
|
|
|
|
|
return code |
|
|
|
|
|
@staticmethod |
|
|
def generate_analysis(primitives: List[DetectedPrimitive], |
|
|
task: Dict) -> str: |
|
|
"""Generate text analysis from detected primitives.""" |
|
|
if not primitives: |
|
|
return "No transformation pattern detected with sufficient confidence." |
|
|
|
|
|
train = task.get('train', []) |
|
|
ig = np.array(train[0]['input']) if train else np.array([[]]) |
|
|
og = np.array(train[0]['output']) if train else np.array([[]]) |
|
|
|
|
|
lines = [] |
|
|
lines.append(f"Input: {ig.shape[0]}x{ig.shape[1]} → Output: {og.shape[0]}x{og.shape[1]}") |
|
|
|
|
|
for p in primitives: |
|
|
desc = f"Detected {p.primitive.value} (confidence: {p.confidence:.2f})" |
|
|
if p.params: |
|
|
params_str = ", ".join(f"{k}={v}" for k, v in p.params.items()) |
|
|
desc += f" [{params_str}]" |
|
|
lines.append(desc) |
|
|
|
|
|
|
|
|
best = primitives[0] |
|
|
descriptions = { |
|
|
GeoPrimitive.TILE_REPEAT: "The output tiles the input pattern across a larger grid", |
|
|
GeoPrimitive.SCALE_UP: "The output scales up each cell of the input", |
|
|
GeoPrimitive.ROTATE_90: "The output is the input rotated 90° counterclockwise", |
|
|
GeoPrimitive.ROTATE_180: "The output is the input rotated 180°", |
|
|
GeoPrimitive.ROTATE_270: "The output is the input rotated 270° counterclockwise", |
|
|
GeoPrimitive.MIRROR_H: "The output flips the input horizontally (upside down)", |
|
|
GeoPrimitive.MIRROR_V: "The output flips the input vertically (left-right)", |
|
|
GeoPrimitive.MIRROR_DIAG: "The output transposes the input (diagonal mirror)", |
|
|
GeoPrimitive.COLOR_MAP: "Each color in the input maps to a specific color in the output", |
|
|
GeoPrimitive.COLOR_SWAP: "Two specific colors are swapped", |
|
|
GeoPrimitive.CROP: "The output extracts a sub-region from the input", |
|
|
GeoPrimitive.GRAVITY: "Non-zero cells fall to the bottom of each column", |
|
|
} |
|
|
|
|
|
lines.append(f"\nRule: {descriptions.get(best.primitive, best.primitive.value)}") |
|
|
if best.params: |
|
|
lines.append(f"Parameters: {best.params}") |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
@staticmethod |
|
|
def generate_hypothesis(primitives: List[DetectedPrimitive]) -> str: |
|
|
"""Generate algorithm hypothesis from detected primitives.""" |
|
|
if not primitives: |
|
|
return "Insufficient geometric evidence to form hypothesis." |
|
|
|
|
|
best = primitives[0] |
|
|
|
|
|
hypotheses = { |
|
|
GeoPrimitive.TILE_REPEAT: "Tile the input grid {tile_h}x{tile_w} times. " |
|
|
"If alternating: flip vertically on odd rows, horizontally on odd columns.", |
|
|
GeoPrimitive.SCALE_UP: "Scale each cell to a {factor}x{factor} block.", |
|
|
GeoPrimitive.ROTATE_90: "Apply np.rot90(grid, 1).", |
|
|
GeoPrimitive.ROTATE_180: "Apply np.rot90(grid, 2).", |
|
|
GeoPrimitive.ROTATE_270: "Apply np.rot90(grid, 3).", |
|
|
GeoPrimitive.MIRROR_H: "Flip grid vertically: grid[::-1, :]", |
|
|
GeoPrimitive.MIRROR_V: "Flip grid horizontally: grid[:, ::-1]", |
|
|
GeoPrimitive.MIRROR_DIAG: "Transpose: grid.T", |
|
|
GeoPrimitive.COLOR_MAP: "Apply color mapping: {mapping}", |
|
|
GeoPrimitive.COLOR_SWAP: "Swap colors {color_a} ↔ {color_b}", |
|
|
GeoPrimitive.CROP: "Extract bounding box of non-zero content.", |
|
|
GeoPrimitive.GRAVITY: "For each column, move non-zero cells to bottom.", |
|
|
} |
|
|
|
|
|
template = hypotheses.get(best.primitive, str(best.primitive.value)) |
|
|
try: |
|
|
return template.format(**best.params) |
|
|
except (KeyError, IndexError): |
|
|
return template |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GeometricDecoder: |
|
|
""" |
|
|
Complete signal→text/code decoder. |
|
|
|
|
|
This replaces _signal_to_response in the fused service. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.encoder = GridEncoder() |
|
|
self.detector = PrimitiveDetector() |
|
|
self.generator = CodeGenerator() |
|
|
|
|
|
def decode_for_analysis(self, task: Dict, |
|
|
substrate_output: np.ndarray) -> str: |
|
|
"""Decode substrate output to text analysis.""" |
|
|
input_signal = self.encoder.encode_task(task) |
|
|
primitives = self.detector.detect(input_signal, substrate_output, task) |
|
|
return self.generator.generate_analysis(primitives, task) |
|
|
|
|
|
def decode_for_hypothesis(self, task: Dict, |
|
|
substrate_output: np.ndarray) -> str: |
|
|
"""Decode substrate output to algorithm hypothesis.""" |
|
|
input_signal = self.encoder.encode_task(task) |
|
|
primitives = self.detector.detect(input_signal, substrate_output, task) |
|
|
return self.generator.generate_hypothesis(primitives) |
|
|
|
|
|
def decode_for_code(self, task: Dict, |
|
|
substrate_output: np.ndarray) -> Optional[str]: |
|
|
"""Decode substrate output to Python solve() function.""" |
|
|
input_signal = self.encoder.encode_task(task) |
|
|
primitives = self.detector.detect(input_signal, substrate_output, task) |
|
|
return self.generator.generate(primitives) |
|
|
|
|
|
def solve_task(self, task: Dict) -> Optional[str]: |
|
|
""" |
|
|
Full pipeline: task → encode → detect → code. |
|
|
|
|
|
Bypasses substrate for direct geometric solving. |
|
|
This is the one-pass fabrication path. |
|
|
""" |
|
|
train = task.get('train', []) |
|
|
if not train: |
|
|
return None |
|
|
|
|
|
|
|
|
primitives = self.detector.detect( |
|
|
np.zeros(1024), np.zeros(1024), task) |
|
|
|
|
|
if not primitives: |
|
|
return None |
|
|
|
|
|
return self.generator.generate(primitives) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_codebook(): |
|
|
"""Test the codebook against known ARC patterns.""" |
|
|
decoder = GeometricDecoder() |
|
|
|
|
|
|
|
|
task_tile = { |
|
|
'train': [ |
|
|
{'input': [[7, 9], [4, 3]], |
|
|
'output': [[7,9,7,9,7,9],[4,3,4,3,4,3], |
|
|
[9,7,9,7,9,7],[3,4,3,4,3,4], |
|
|
[7,9,7,9,7,9],[4,3,4,3,4,3]]}, |
|
|
{'input': [[8, 6], [6, 4]], |
|
|
'output': [[8,6,8,6,8,6],[6,4,6,4,6,4], |
|
|
[6,8,6,8,6,8],[4,6,4,6,4,6], |
|
|
[8,6,8,6,8,6],[6,4,6,4,6,4]]}, |
|
|
], |
|
|
'test': [ |
|
|
{'input': [[3, 2], [7, 8]], |
|
|
'output': [[3,2,3,2,3,2],[7,8,7,8,7,8], |
|
|
[2,3,2,3,2,3],[8,7,8,7,8,7], |
|
|
[3,2,3,2,3,2],[7,8,7,8,7,8]]}, |
|
|
] |
|
|
} |
|
|
|
|
|
print("=" * 70) |
|
|
print(" GEOMETRIC CODEBOOK TEST") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
print("\n--- Task: Alternating Tile (00576224) ---") |
|
|
analysis = decoder.decode_for_analysis(task_tile, np.zeros(1024)) |
|
|
print(f"Analysis:\n{analysis}") |
|
|
|
|
|
hypothesis = decoder.decode_for_hypothesis(task_tile, np.zeros(1024)) |
|
|
print(f"\nHypothesis: {hypothesis}") |
|
|
|
|
|
code = decoder.decode_for_code(task_tile, np.zeros(1024)) |
|
|
print(f"\nGenerated code:\n{code}") |
|
|
|
|
|
|
|
|
if code: |
|
|
print("\n--- Validation ---") |
|
|
namespace = {'np': np} |
|
|
exec(code, namespace) |
|
|
solve = namespace['solve'] |
|
|
|
|
|
for i, test in enumerate(task_tile['test']): |
|
|
result = solve(test['input']) |
|
|
expected = test['output'] |
|
|
match = result == expected |
|
|
print(f" Test {i+1}: {'PASS ✓' if match else 'FAIL ✗'}") |
|
|
if not match: |
|
|
print(f" Expected: {expected[:2]}...") |
|
|
print(f" Got: {result[:2]}...") |
|
|
|
|
|
|
|
|
print("\n--- Task: 90° Rotation ---") |
|
|
task_rot = { |
|
|
'train': [ |
|
|
{'input': [[1, 2], [3, 4]], |
|
|
'output': [[2, 4], [1, 3]]}, |
|
|
{'input': [[5, 6], [7, 8]], |
|
|
'output': [[6, 8], [5, 7]]}, |
|
|
], |
|
|
'test': [ |
|
|
{'input': [[9, 1], [2, 3]], |
|
|
'output': [[1, 3], [9, 2]]}, |
|
|
] |
|
|
} |
|
|
|
|
|
code = decoder.solve_task(task_rot) |
|
|
if code: |
|
|
print(f"Code: {code.strip().split(chr(10))[-1]}") |
|
|
namespace = {'np': np} |
|
|
exec(code, namespace) |
|
|
result = namespace['solve'](task_rot['test'][0]['input']) |
|
|
match = result == task_rot['test'][0]['output'] |
|
|
print(f" Test: {'PASS ✓' if match else 'FAIL ✗'}") |
|
|
|
|
|
|
|
|
print("\n--- Task: Color Mapping ---") |
|
|
task_color = { |
|
|
'train': [ |
|
|
{'input': [[1, 2, 3], [1, 2, 3]], |
|
|
'output': [[4, 5, 6], [4, 5, 6]]}, |
|
|
{'input': [[3, 1, 2], [2, 3, 1]], |
|
|
'output': [[6, 4, 5], [5, 6, 4]]}, |
|
|
], |
|
|
'test': [ |
|
|
{'input': [[2, 1, 3], [3, 2, 1]], |
|
|
'output': [[5, 4, 6], [6, 5, 4]]}, |
|
|
] |
|
|
} |
|
|
|
|
|
code = decoder.solve_task(task_color) |
|
|
if code: |
|
|
print(f"Detected mapping") |
|
|
namespace = {'np': np} |
|
|
exec(code, namespace) |
|
|
result = namespace['solve'](task_color['test'][0]['input']) |
|
|
match = result == task_color['test'][0]['output'] |
|
|
print(f" Test: {'PASS ✓' if match else 'FAIL ✗'}") |
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_codebook() |
|
|
|