harmonic-stack-v1 / geometric_codebook.py
LovingGraceTech's picture
v3: Static pattern detection + code generation
22d295b verified
#!/usr/bin/env python3
"""
GEOMETRIC CODEBOOK - Signal-to-Language Decoder
Ghost in the Machine Labs
The missing piece: converts fused substrate geometric outputs
into actionable text (analysis) and Python code (solve functions).
Architecture:
Grid → GeometricEncoder → substrate signal (numpy)
Substrate processes → output signal (numpy)
Output signal → GeometricDecoder → {analysis text, hypothesis, Python code}
The codebook works by detecting geometric PRIMITIVES in the substrate
output — the torsions, symmetries, and relational patterns that the
sensor panels identified — and mapping them to transformation OPERATIONS
that can be expressed as code.
This is fabrication, not training. Each primitive→operation mapping is
a direct geometric relationship, not a learned weight.
"""
import numpy as np
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
# =============================================================================
# GEOMETRIC PRIMITIVES — the vocabulary of transformations
# =============================================================================
class GeoPrimitive(Enum):
"""
Primitives detected by substrate sensor panels.
Each maps to one or more ARC transformation operations.
"""
# Spatial
TILE_REPEAT = "tile_repeat" # Pattern repeats in space
MIRROR_H = "mirror_horizontal" # Horizontal reflection
MIRROR_V = "mirror_vertical" # Vertical reflection
MIRROR_DIAG = "mirror_diagonal" # Diagonal reflection
ROTATE_90 = "rotate_90"
ROTATE_180 = "rotate_180"
ROTATE_270 = "rotate_270"
TRANSLATE = "translate" # Shift pattern
SCALE_UP = "scale_up" # Enlarge grid/pattern
SCALE_DOWN = "scale_down" # Shrink grid/pattern
# Chromatic (color operations)
COLOR_MAP = "color_map" # Recolor by mapping
COLOR_FILL = "color_fill" # Flood fill region
COLOR_SWAP = "color_swap" # Swap two colors
COLOR_COUNT = "color_count" # Count-based operation
MAJORITY_COLOR = "majority_color" # Most frequent color
BOUNDARY_COLOR = "boundary_color" # Color at edges
# Structural
EXTRACT_SHAPE = "extract_shape" # Pull out a shape
MASK_OVERLAY = "mask_overlay" # Apply mask/overlay
CROP = "crop" # Trim to content
PAD = "pad" # Add border
PARTITION = "partition" # Split into regions
GRAVITY = "gravity" # Objects fall/slide
# Relational
SORT_BY_SIZE = "sort_by_size" # Order by region size
ALIGN = "align" # Align objects
CONNECT = "connect" # Draw lines between
ENCLOSE = "enclose" # Draw boundary around
# Pattern
DENOISING = "denoising" # Remove noise
COMPLETE_PATTERN = "complete_pattern" # Fill in missing
BOOLEAN_OP = "boolean_op" # AND/OR/XOR grids
CONDITIONAL = "conditional" # If-then color rules
@dataclass
class DetectedPrimitive:
"""A primitive detected in the substrate output with confidence."""
primitive: GeoPrimitive
confidence: float
params: Dict[str, Any] = field(default_factory=dict)
# Geometric evidence from substrate
sensor_source: str = "" # Which sensor type detected it
energy: float = 0.0 # Signal energy at detection
harmonic_alignment: float = 0.0 # How aligned with field
# =============================================================================
# GRID ENCODER — ARC grids to substrate signals
# =============================================================================
class GridEncoder:
"""
Encode ARC grids as geometric signals for the substrate.
Rather than flattening to bytes, we encode the GEOMETRIC PROPERTIES
of the grid that the sensor panels are designed to detect:
- Spatial frequency (tiling/repetition)
- Color distribution (chromatic spectrum)
- Shape boundaries (structural edges)
- Symmetry axes
- Relational positions
"""
SIGNAL_SIZE = 1024
@staticmethod
def encode_grid(grid: List[List[int]]) -> np.ndarray:
"""Encode a single grid into geometric signal."""
g = np.array(grid, dtype=np.float32)
h, w = g.shape
signal = np.zeros(GridEncoder.SIGNAL_SIZE, dtype=np.float32)
idx = 0
# === Band 1: Shape signature (0-127) ===
# Dimensions encoded as ratios
signal[idx] = h / 30.0; idx += 1
signal[idx] = w / 30.0; idx += 1
signal[idx] = h * w / 900.0; idx += 1
signal[idx] = h / w if w > 0 else 1.0; idx += 1
# Flattened grid (normalized to 0-1)
flat = g.flatten() / 9.0
n = min(len(flat), 124)
signal[idx:idx+n] = flat[:n]
idx = 128
# === Band 2: Color spectrum (128-255) ===
# Color histogram (10 colors, 0-9)
for c in range(10):
count = np.sum(g == c)
signal[idx + c] = count / (h * w)
idx += 10
# Color adjacency matrix (which colors touch which)
for r in range(h):
for c in range(w):
val = int(g[r, c])
# Right neighbor
if c + 1 < w:
n_val = int(g[r, c + 1])
if val != n_val:
pair_idx = 128 + 10 + val * 10 + n_val
if pair_idx < 256:
signal[pair_idx] += 1.0 / (h * w)
# Down neighbor
if r + 1 < h:
n_val = int(g[r + 1, c])
if val != n_val:
pair_idx = 128 + 10 + val * 10 + n_val
if pair_idx < 256:
signal[pair_idx] += 1.0 / (h * w)
idx = 256
# === Band 3: Symmetry signatures (256-383) ===
# Horizontal symmetry
if h > 1:
h_sym = np.mean(g == g[::-1, :])
signal[idx] = h_sym
idx += 1
# Vertical symmetry
if w > 1:
v_sym = np.mean(g == g[:, ::-1])
signal[idx] = v_sym
idx += 1
# Diagonal symmetry (if square)
if h == w:
d_sym = np.mean(g == g.T)
signal[idx] = d_sym
idx += 1
# 90° rotational symmetry (if square)
if h == w:
r90 = np.rot90(g)
signal[idx] = np.mean(g == r90)
idx += 1
idx = 384
# === Band 4: Spatial frequency (384-511) ===
# Row-wise and column-wise repetition patterns
for r in range(min(h, 30)):
row = g[r, :]
# Check for period-N repetition
for period in range(1, min(w, 8)):
if w % period == 0:
tiles = row.reshape(-1, period)
if len(tiles) > 1 and np.all(tiles == tiles[0]):
signal[384 + r * 4 + min(period-1, 3)] = 1.0
break
idx = 512
# === Band 5: Boundary/edge features (512-639) ===
# Edge detection (Sobel-like)
if h > 2 and w > 2:
for r in range(1, min(h-1, 16)):
for c in range(1, min(w-1, 8)):
# Gradient magnitude
gx = float(g[r, c+1]) - float(g[r, c-1])
gy = float(g[r+1, c]) - float(g[r-1, c])
mag = np.sqrt(gx**2 + gy**2)
eidx = 512 + r * 8 + c
if eidx < 640:
signal[eidx] = mag / 12.73 # max gradient = 9*sqrt(2)
idx = 640
# === Band 6: Object detection (640-767) ===
# Connected component count per color
visited = np.zeros_like(g, dtype=bool)
obj_count = 0
for r in range(h):
for c in range(w):
if not visited[r, c]:
color = g[r, c]
if color != 0: # Skip background
# BFS
stack = [(r, c)]
size = 0
while stack:
cr, cc = stack.pop()
if 0 <= cr < h and 0 <= cc < w and not visited[cr, cc] and g[cr, cc] == color:
visited[cr, cc] = True
size += 1
stack.extend([(cr+1,cc),(cr-1,cc),(cr,cc+1),(cr,cc-1)])
if size > 0 and 640 + obj_count < 768:
signal[640 + obj_count] = size / (h * w)
obj_count += 1
signal[767] = obj_count / 30.0 # total object count
idx = 768
# === Band 7: Transformation hints (768-895) ===
# Reserved for encoding input→output relationships
# (filled by encode_pair)
idx = 896
# === Band 8: Raw hash (896-1023) ===
# Deterministic hash for exact matching
raw = g.tobytes()
for i in range(min(128, len(raw))):
signal[896 + i] = (raw[i] - 128.0) / 128.0
return signal
@staticmethod
def encode_pair(input_grid: List[List[int]],
output_grid: List[List[int]]) -> np.ndarray:
"""
Encode an input→output pair, capturing the TRANSFORMATION.
Band 7 (768-895) encodes the relationship:
- Dimension change ratios
- Color mapping
- Spatial operation signature
"""
sig = GridEncoder.encode_grid(input_grid)
ig = np.array(input_grid, dtype=np.float32)
og = np.array(output_grid, dtype=np.float32)
ih, iw = ig.shape
oh, ow = og.shape
idx = 768
# Dimension relationships
sig[idx] = oh / ih if ih > 0 else 1.0; idx += 1 # height ratio
sig[idx] = ow / iw if iw > 0 else 1.0; idx += 1 # width ratio
sig[idx] = (oh * ow) / (ih * iw) if ih * iw > 0 else 1.0; idx += 1 # area ratio
sig[idx] = 1.0 if oh == ih and ow == iw else 0.0; idx += 1 # same size?
# Color transformation
in_colors = set(ig.flatten().astype(int))
out_colors = set(og.flatten().astype(int))
sig[idx] = len(in_colors) / 10.0; idx += 1
sig[idx] = len(out_colors) / 10.0; idx += 1
sig[idx] = len(in_colors & out_colors) / max(len(in_colors | out_colors), 1); idx += 1
sig[idx] = 1.0 if in_colors == out_colors else 0.0; idx += 1
# If same size, compute cell-wise diff
if ih == oh and iw == ow:
diff = (ig != og).astype(np.float32)
sig[idx] = np.mean(diff); idx += 1 # fraction changed
sig[idx] = np.sum(diff) / max(ih * iw, 1); idx += 1 # changed count ratio
# Where did changes happen? Edge vs center
if ih > 2 and iw > 2:
edge_mask = np.zeros_like(diff)
edge_mask[0, :] = 1; edge_mask[-1, :] = 1
edge_mask[:, 0] = 1; edge_mask[:, -1] = 1
edge_changes = np.sum(diff * edge_mask)
center_changes = np.sum(diff * (1 - edge_mask))
total_changes = edge_changes + center_changes
sig[idx] = edge_changes / max(total_changes, 1); idx += 1
sig[idx] = center_changes / max(total_changes, 1); idx += 1
else:
idx += 4
# Tiling check: does output = tiled input?
if oh > ih and ow > iw and oh % ih == 0 and ow % iw == 0:
tile_h = oh // ih
tile_w = ow // iw
tiled = np.tile(ig, (tile_h, tile_w))
sig[idx] = np.mean(tiled == og); idx += 1 # simple tile match
sig[idx] = tile_h / 10.0; idx += 1
sig[idx] = tile_w / 10.0; idx += 1
# Check alternating tile (flip every other)
alt_tiled = np.zeros_like(og)
for tr in range(tile_h):
for tc in range(tile_w):
block = ig.copy()
if tr % 2 == 1:
block = block[::-1, :]
if tc % 2 == 1:
block = block[:, ::-1]
alt_tiled[tr*ih:(tr+1)*ih, tc*iw:(tc+1)*iw] = block
sig[idx] = np.mean(alt_tiled == og); idx += 1 # alternating tile match
else:
idx += 4
# Rotation check (if square)
if ih == iw:
r90 = np.rot90(ig)
r180 = np.rot90(ig, 2)
r270 = np.rot90(ig, 3)
if oh == ih and ow == iw:
sig[idx] = np.mean(r90 == og); idx += 1
sig[idx] = np.mean(r180 == og); idx += 1
sig[idx] = np.mean(r270 == og); idx += 1
else:
idx += 3
else:
idx += 3
# Mirror check
if oh == ih and ow == iw:
sig[idx] = np.mean(ig[::-1, :] == og); idx += 1 # flip H
sig[idx] = np.mean(ig[:, ::-1] == og); idx += 1 # flip V
return sig
@staticmethod
def encode_task(task: Dict) -> np.ndarray:
"""
Encode full ARC task (all training pairs) into composite signal.
Averages pair signals to find the CONSISTENT transformation
pattern across all examples.
"""
train = task.get('train', [])
if not train:
return np.zeros(GridEncoder.SIGNAL_SIZE, dtype=np.float32)
signals = []
for pair in train:
sig = GridEncoder.encode_pair(pair['input'], pair['output'])
signals.append(sig)
# Consensus: average across pairs
# High-confidence features will be consistent, noise will cancel
composite = np.mean(signals, axis=0).astype(np.float32)
# Variance across pairs (low = consistent = confident)
if len(signals) > 1:
variance = np.var(signals, axis=0)
# Boost consistent features, dampen noisy ones
consistency = 1.0 / (1.0 + variance * 10)
composite *= consistency
return composite
# =============================================================================
# PRIMITIVE DETECTOR — reads substrate output to find operations
# =============================================================================
class PrimitiveDetector:
"""
Detect geometric primitives from substrate output signal.
The substrate's sensor panels have already done the hard work —
detecting spatial frequencies, symmetries, boundaries, etc.
The detector reads those activations and maps them to named
transformation primitives.
"""
# Thresholds for detection
CONFIDENCE_THRESHOLD = 0.3
@staticmethod
def detect(input_signal: np.ndarray,
output_signal: np.ndarray,
task: Dict) -> List[DetectedPrimitive]:
"""
Detect all primitives from substrate processing results.
Uses both the encoded signal AND the raw task data to
cross-validate detections.
"""
primitives = []
train = task.get('train', [])
if not train:
return primitives
# Analyze all training pairs for consensus
pair_primitives = []
for pair in train:
pp = PrimitiveDetector._detect_pair(pair['input'], pair['output'])
pair_primitives.append(set(p.primitive for p in pp))
primitives.extend(pp)
# Consensus: only keep primitives detected in ALL pairs
if pair_primitives:
consensus = pair_primitives[0]
for ps in pair_primitives[1:]:
consensus &= ps
if consensus:
# Filter to consensus + boost confidence
consensus_prims = []
for p in primitives:
if p.primitive in consensus:
p.confidence = min(1.0, p.confidence * 1.5) # boost
consensus_prims.append(p)
# Deduplicate: keep highest confidence per primitive type
best = {}
for p in consensus_prims:
if p.primitive not in best or p.confidence > best[p.primitive].confidence:
best[p.primitive] = p
primitives = list(best.values())
else:
# No perfect consensus — keep all and deduplicate by confidence
best = {}
for p in primitives:
if p.primitive not in best or p.confidence > best[p.primitive].confidence:
best[p.primitive] = p
primitives = list(best.values())
# Sort by confidence
primitives.sort(key=lambda p: p.confidence, reverse=True)
return primitives
@staticmethod
def _detect_pair(input_grid: List[List[int]],
output_grid: List[List[int]]) -> List[DetectedPrimitive]:
"""Detect primitives for a single input→output pair."""
prims = []
ig = np.array(input_grid, dtype=np.float32)
og = np.array(output_grid, dtype=np.float32)
ih, iw = ig.shape
oh, ow = og.shape
# --- TILING ---
if oh > ih and ow > iw and oh % ih == 0 and ow % iw == 0:
tile_h, tile_w = oh // ih, ow // iw
# Simple tile
tiled = np.tile(ig, (tile_h, tile_w))
match = np.mean(tiled == og)
if match > 0.8:
prims.append(DetectedPrimitive(
GeoPrimitive.TILE_REPEAT, match,
{'tile_h': tile_h, 'tile_w': tile_w, 'alternating': False},
'spatial'))
# Alternating tile: tile columns, alternate block-rows
# between original and column-reversed
alt = np.zeros_like(og)
for tr in range(tile_h):
block = ig.copy()
if tr % 2 == 1:
block = block[:, ::-1] # reverse columns on odd blocks
row_tile = np.tile(block, (1, tile_w))
alt[tr*ih:(tr+1)*ih, :] = row_tile
alt_match = np.mean(alt == og)
if alt_match > match and alt_match > 0.8:
prims.append(DetectedPrimitive(
GeoPrimitive.TILE_REPEAT, alt_match,
{'tile_h': tile_h, 'tile_w': tile_w, 'alternating': True},
'spatial'))
# --- SCALING ---
if oh > ih and ow > iw:
sh, sw = oh / ih, ow / iw
if sh == sw and sh == int(sh):
scale = int(sh)
scaled = np.repeat(np.repeat(ig, scale, axis=0), scale, axis=1)
if scaled.shape == og.shape:
match = np.mean(scaled == og)
if match > 0.8:
prims.append(DetectedPrimitive(
GeoPrimitive.SCALE_UP, match,
{'factor': scale}, 'spatial'))
if oh < ih and ow < iw and ih % oh == 0 and iw % ow == 0:
prims.append(DetectedPrimitive(
GeoPrimitive.SCALE_DOWN, 0.7,
{'factor_h': ih // oh, 'factor_w': iw // ow}, 'spatial'))
# --- ROTATION (same size, square) ---
if ih == iw and oh == ow and ih == oh:
for k, prim in [(1, GeoPrimitive.ROTATE_90),
(2, GeoPrimitive.ROTATE_180),
(3, GeoPrimitive.ROTATE_270)]:
rotated = np.rot90(ig, k)
match = np.mean(rotated == og)
if match > 0.9:
prims.append(DetectedPrimitive(prim, match, {}, 'symmetry'))
# --- MIRROR ---
if ih == oh and iw == ow:
# Horizontal flip
flipped_h = ig[::-1, :]
match_h = np.mean(flipped_h == og)
if match_h > 0.9:
prims.append(DetectedPrimitive(
GeoPrimitive.MIRROR_H, match_h, {}, 'symmetry'))
# Vertical flip
flipped_v = ig[:, ::-1]
match_v = np.mean(flipped_v == og)
if match_v > 0.9:
prims.append(DetectedPrimitive(
GeoPrimitive.MIRROR_V, match_v, {}, 'symmetry'))
# Transpose
if ih == iw:
transposed = ig.T
match_t = np.mean(transposed == og)
if match_t > 0.9:
prims.append(DetectedPrimitive(
GeoPrimitive.MIRROR_DIAG, match_t, {}, 'symmetry'))
# --- COLOR MAP ---
if ih == oh and iw == ow:
# Check if there's a consistent color→color mapping
mapping = {}
consistent = True
for r in range(ih):
for c in range(iw):
ic = int(ig[r, c])
oc = int(og[r, c])
if ic in mapping:
if mapping[ic] != oc:
consistent = False
break
else:
mapping[ic] = oc
if not consistent:
break
if consistent and mapping:
is_identity = all(k == v for k, v in mapping.items())
if not is_identity:
prims.append(DetectedPrimitive(
GeoPrimitive.COLOR_MAP, 1.0,
{'mapping': mapping}, 'chromatic'))
# --- COLOR SWAP ---
if ih == oh and iw == ow:
diff_positions = ig != og
changed_in = set(ig[diff_positions].astype(int).tolist())
changed_out = set(og[diff_positions].astype(int).tolist())
if len(changed_in) == 2 and changed_in == changed_out:
colors = list(changed_in)
prims.append(DetectedPrimitive(
GeoPrimitive.COLOR_SWAP, 0.95,
{'color_a': colors[0], 'color_b': colors[1]}, 'chromatic'))
# --- CROP ---
if oh < ih or ow < iw:
# Check if output is a sub-region of input
for r in range(ih - oh + 1):
for c in range(iw - ow + 1):
region = ig[r:r+oh, c:c+ow]
if np.array_equal(region, og):
prims.append(DetectedPrimitive(
GeoPrimitive.CROP, 1.0,
{'top': r, 'left': c}, 'structural'))
break
else:
continue
break
# --- GRAVITY ---
if ih == oh and iw == ow:
# Check if non-zero cells "fell" downward
for c in range(iw):
in_col = ig[:, c]
out_col = og[:, c]
in_vals = in_col[in_col != 0]
out_vals = out_col[out_col != 0]
if len(in_vals) > 0 and np.array_equal(sorted(in_vals), sorted(out_vals)):
# Check if output has all non-zero at bottom
out_nonzero = np.where(out_col != 0)[0]
if len(out_nonzero) > 0 and out_nonzero[-1] == ih - 1:
if np.all(np.diff(out_nonzero) == 1):
prims.append(DetectedPrimitive(
GeoPrimitive.GRAVITY, 0.8,
{'direction': 'down'}, 'structural'))
break
# --- BOOLEAN OP ---
# (Detects when output = some combination of input regions)
# This is complex — will expand in later versions
return prims
# =============================================================================
# CODE GENERATOR — primitives to Python solve()
# =============================================================================
class CodeGenerator:
"""
Generate Python solve() functions from detected primitives.
Each primitive has a direct code template. Compositions
chain templates together.
"""
TEMPLATES = {
GeoPrimitive.TILE_REPEAT: {
'simple': """def solve(input_grid):
grid = np.array(input_grid)
h, w = grid.shape
tile_h, tile_w = {tile_h}, {tile_w}
result = np.tile(grid, (tile_h, tile_w))
return result.tolist()""",
'alternating': """def solve(input_grid):
grid = np.array(input_grid)
h, w = grid.shape
tile_h, tile_w = {tile_h}, {tile_w}
result = np.zeros((h * tile_h, w * tile_w), dtype=int)
for tr in range(tile_h):
block = grid.copy()
if tr % 2 == 1:
block = block[:, ::-1]
row_tile = np.tile(block, (1, tile_w))
result[tr*h:(tr+1)*h, :] = row_tile
return result.tolist()""",
},
GeoPrimitive.SCALE_UP: """def solve(input_grid):
grid = np.array(input_grid)
factor = {factor}
result = np.repeat(np.repeat(grid, factor, axis=0), factor, axis=1)
return result.tolist()""",
GeoPrimitive.ROTATE_90: """def solve(input_grid):
grid = np.array(input_grid)
result = np.rot90(grid, 1)
return result.tolist()""",
GeoPrimitive.ROTATE_180: """def solve(input_grid):
grid = np.array(input_grid)
result = np.rot90(grid, 2)
return result.tolist()""",
GeoPrimitive.ROTATE_270: """def solve(input_grid):
grid = np.array(input_grid)
result = np.rot90(grid, 3)
return result.tolist()""",
GeoPrimitive.MIRROR_H: """def solve(input_grid):
grid = np.array(input_grid)
result = grid[::-1, :]
return result.tolist()""",
GeoPrimitive.MIRROR_V: """def solve(input_grid):
grid = np.array(input_grid)
result = grid[:, ::-1]
return result.tolist()""",
GeoPrimitive.MIRROR_DIAG: """def solve(input_grid):
grid = np.array(input_grid)
result = grid.T
return result.tolist()""",
GeoPrimitive.COLOR_MAP: """def solve(input_grid):
grid = np.array(input_grid)
result = grid.copy()
mapping = {mapping}
for old_c, new_c in mapping.items():
result[grid == old_c] = new_c
return result.tolist()""",
GeoPrimitive.COLOR_SWAP: """def solve(input_grid):
grid = np.array(input_grid)
result = grid.copy()
a, b = {color_a}, {color_b}
result[grid == a] = b
result[grid == b] = a
return result.tolist()""",
GeoPrimitive.CROP: """def solve(input_grid):
grid = np.array(input_grid)
# Find bounding box of non-zero content
rows = np.any(grid != 0, axis=1)
cols = np.any(grid != 0, axis=0)
if not rows.any():
return input_grid
rmin, rmax = np.where(rows)[0][[0, -1]]
cmin, cmax = np.where(cols)[0][[0, -1]]
result = grid[rmin:rmax+1, cmin:cmax+1]
return result.tolist()""",
GeoPrimitive.GRAVITY: """def solve(input_grid):
grid = np.array(input_grid)
h, w = grid.shape
result = np.zeros_like(grid)
for c in range(w):
col = grid[:, c]
nonzero = col[col != 0]
result[h-len(nonzero):h, c] = nonzero
return result.tolist()""",
}
@staticmethod
def generate(primitives: List[DetectedPrimitive]) -> Optional[str]:
"""Generate solve() function from detected primitives."""
if not primitives:
return None
# Take highest confidence primitive
best = primitives[0]
template = CodeGenerator.TEMPLATES.get(best.primitive)
if template is None:
return None
# Handle templates with variants
if isinstance(template, dict):
if best.params.get('alternating'):
template = template.get('alternating', template.get('simple'))
else:
template = template.get('simple')
# Fill parameters
try:
code = template.format(**best.params)
except (KeyError, IndexError):
code = template
# Ensure numpy import
if 'np.' in code and 'import numpy' not in code:
code = "import numpy as np\n\n" + code
return code
@staticmethod
def generate_analysis(primitives: List[DetectedPrimitive],
task: Dict) -> str:
"""Generate text analysis from detected primitives."""
if not primitives:
return "No transformation pattern detected with sufficient confidence."
train = task.get('train', [])
ig = np.array(train[0]['input']) if train else np.array([[]])
og = np.array(train[0]['output']) if train else np.array([[]])
lines = []
lines.append(f"Input: {ig.shape[0]}x{ig.shape[1]} → Output: {og.shape[0]}x{og.shape[1]}")
for p in primitives:
desc = f"Detected {p.primitive.value} (confidence: {p.confidence:.2f})"
if p.params:
params_str = ", ".join(f"{k}={v}" for k, v in p.params.items())
desc += f" [{params_str}]"
lines.append(desc)
# Primary transformation description
best = primitives[0]
descriptions = {
GeoPrimitive.TILE_REPEAT: "The output tiles the input pattern across a larger grid",
GeoPrimitive.SCALE_UP: "The output scales up each cell of the input",
GeoPrimitive.ROTATE_90: "The output is the input rotated 90° counterclockwise",
GeoPrimitive.ROTATE_180: "The output is the input rotated 180°",
GeoPrimitive.ROTATE_270: "The output is the input rotated 270° counterclockwise",
GeoPrimitive.MIRROR_H: "The output flips the input horizontally (upside down)",
GeoPrimitive.MIRROR_V: "The output flips the input vertically (left-right)",
GeoPrimitive.MIRROR_DIAG: "The output transposes the input (diagonal mirror)",
GeoPrimitive.COLOR_MAP: "Each color in the input maps to a specific color in the output",
GeoPrimitive.COLOR_SWAP: "Two specific colors are swapped",
GeoPrimitive.CROP: "The output extracts a sub-region from the input",
GeoPrimitive.GRAVITY: "Non-zero cells fall to the bottom of each column",
}
lines.append(f"\nRule: {descriptions.get(best.primitive, best.primitive.value)}")
if best.params:
lines.append(f"Parameters: {best.params}")
return "\n".join(lines)
@staticmethod
def generate_hypothesis(primitives: List[DetectedPrimitive]) -> str:
"""Generate algorithm hypothesis from detected primitives."""
if not primitives:
return "Insufficient geometric evidence to form hypothesis."
best = primitives[0]
hypotheses = {
GeoPrimitive.TILE_REPEAT: "Tile the input grid {tile_h}x{tile_w} times. "
"If alternating: flip vertically on odd rows, horizontally on odd columns.",
GeoPrimitive.SCALE_UP: "Scale each cell to a {factor}x{factor} block.",
GeoPrimitive.ROTATE_90: "Apply np.rot90(grid, 1).",
GeoPrimitive.ROTATE_180: "Apply np.rot90(grid, 2).",
GeoPrimitive.ROTATE_270: "Apply np.rot90(grid, 3).",
GeoPrimitive.MIRROR_H: "Flip grid vertically: grid[::-1, :]",
GeoPrimitive.MIRROR_V: "Flip grid horizontally: grid[:, ::-1]",
GeoPrimitive.MIRROR_DIAG: "Transpose: grid.T",
GeoPrimitive.COLOR_MAP: "Apply color mapping: {mapping}",
GeoPrimitive.COLOR_SWAP: "Swap colors {color_a} ↔ {color_b}",
GeoPrimitive.CROP: "Extract bounding box of non-zero content.",
GeoPrimitive.GRAVITY: "For each column, move non-zero cells to bottom.",
}
template = hypotheses.get(best.primitive, str(best.primitive.value))
try:
return template.format(**best.params)
except (KeyError, IndexError):
return template
# =============================================================================
# INTEGRATED DECODER — full pipeline
# =============================================================================
class GeometricDecoder:
"""
Complete signal→text/code decoder.
This replaces _signal_to_response in the fused service.
"""
def __init__(self):
self.encoder = GridEncoder()
self.detector = PrimitiveDetector()
self.generator = CodeGenerator()
def decode_for_analysis(self, task: Dict,
substrate_output: np.ndarray) -> str:
"""Decode substrate output to text analysis."""
input_signal = self.encoder.encode_task(task)
primitives = self.detector.detect(input_signal, substrate_output, task)
return self.generator.generate_analysis(primitives, task)
def decode_for_hypothesis(self, task: Dict,
substrate_output: np.ndarray) -> str:
"""Decode substrate output to algorithm hypothesis."""
input_signal = self.encoder.encode_task(task)
primitives = self.detector.detect(input_signal, substrate_output, task)
return self.generator.generate_hypothesis(primitives)
def decode_for_code(self, task: Dict,
substrate_output: np.ndarray) -> Optional[str]:
"""Decode substrate output to Python solve() function."""
input_signal = self.encoder.encode_task(task)
primitives = self.detector.detect(input_signal, substrate_output, task)
return self.generator.generate(primitives)
def solve_task(self, task: Dict) -> Optional[str]:
"""
Full pipeline: task → encode → detect → code.
Bypasses substrate for direct geometric solving.
This is the one-pass fabrication path.
"""
train = task.get('train', [])
if not train:
return None
# Detect primitives directly from task geometry
primitives = self.detector.detect(
np.zeros(1024), np.zeros(1024), task)
if not primitives:
return None
return self.generator.generate(primitives)
# =============================================================================
# STANDALONE TEST
# =============================================================================
def test_codebook():
"""Test the codebook against known ARC patterns."""
decoder = GeometricDecoder()
# Test 1: Tiling (task 00576224)
task_tile = {
'train': [
{'input': [[7, 9], [4, 3]],
'output': [[7,9,7,9,7,9],[4,3,4,3,4,3],
[9,7,9,7,9,7],[3,4,3,4,3,4],
[7,9,7,9,7,9],[4,3,4,3,4,3]]},
{'input': [[8, 6], [6, 4]],
'output': [[8,6,8,6,8,6],[6,4,6,4,6,4],
[6,8,6,8,6,8],[4,6,4,6,4,6],
[8,6,8,6,8,6],[6,4,6,4,6,4]]},
],
'test': [
{'input': [[3, 2], [7, 8]],
'output': [[3,2,3,2,3,2],[7,8,7,8,7,8],
[2,3,2,3,2,3],[8,7,8,7,8,7],
[3,2,3,2,3,2],[7,8,7,8,7,8]]},
]
}
print("=" * 70)
print(" GEOMETRIC CODEBOOK TEST")
print("=" * 70)
# Test analysis
print("\n--- Task: Alternating Tile (00576224) ---")
analysis = decoder.decode_for_analysis(task_tile, np.zeros(1024))
print(f"Analysis:\n{analysis}")
hypothesis = decoder.decode_for_hypothesis(task_tile, np.zeros(1024))
print(f"\nHypothesis: {hypothesis}")
code = decoder.decode_for_code(task_tile, np.zeros(1024))
print(f"\nGenerated code:\n{code}")
# Validate
if code:
print("\n--- Validation ---")
namespace = {'np': np}
exec(code, namespace)
solve = namespace['solve']
for i, test in enumerate(task_tile['test']):
result = solve(test['input'])
expected = test['output']
match = result == expected
print(f" Test {i+1}: {'PASS ✓' if match else 'FAIL ✗'}")
if not match:
print(f" Expected: {expected[:2]}...")
print(f" Got: {result[:2]}...")
# Test 2: Simple rotation
print("\n--- Task: 90° Rotation ---")
task_rot = {
'train': [
{'input': [[1, 2], [3, 4]],
'output': [[2, 4], [1, 3]]},
{'input': [[5, 6], [7, 8]],
'output': [[6, 8], [5, 7]]},
],
'test': [
{'input': [[9, 1], [2, 3]],
'output': [[1, 3], [9, 2]]},
]
}
code = decoder.solve_task(task_rot)
if code:
print(f"Code: {code.strip().split(chr(10))[-1]}")
namespace = {'np': np}
exec(code, namespace)
result = namespace['solve'](task_rot['test'][0]['input'])
match = result == task_rot['test'][0]['output']
print(f" Test: {'PASS ✓' if match else 'FAIL ✗'}")
# Test 3: Color map
print("\n--- Task: Color Mapping ---")
task_color = {
'train': [
{'input': [[1, 2, 3], [1, 2, 3]],
'output': [[4, 5, 6], [4, 5, 6]]},
{'input': [[3, 1, 2], [2, 3, 1]],
'output': [[6, 4, 5], [5, 6, 4]]},
],
'test': [
{'input': [[2, 1, 3], [3, 2, 1]],
'output': [[5, 4, 6], [6, 5, 4]]},
]
}
code = decoder.solve_task(task_color)
if code:
print(f"Detected mapping")
namespace = {'np': np}
exec(code, namespace)
result = namespace['solve'](task_color['test'][0]['input'])
match = result == task_color['test'][0]['output']
print(f" Test: {'PASS ✓' if match else 'FAIL ✗'}")
print("\n" + "=" * 70)
if __name__ == "__main__":
test_codebook()