harmonic-stack-v1 / codebook_expansion.py
LovingGraceTech's picture
v3: Dynamic learning from solved tasks
1a79866 verified
"""
Dynamic Codebook Expansion
===========================
Geometric learning from solved tasks.
When the static codebook can't recognize a pattern, the substrate still
processes the signal — it sees geometry the codebook doesn't name yet.
This module captures those geometric signatures, pairs them with working
solutions when they arrive, and recalls them for future similar tasks.
The codebook grows from evidence, not enumeration.
Three phases:
1. RECORD — On fallback, capture geometric signature as "pending"
2. LEARN — When orchestrator solves the task, pair code with signature
3. RECALL — For new tasks, match against learned signatures before fallback
Ghost in the Machine Labs — AGI for the home
"""
import json
import time
import hashlib
import os
import numpy as np
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Tuple
from pathlib import Path
# =============================================================================
# GEOMETRIC SIGNATURE — condensed fingerprint from encoder bands
# =============================================================================
@dataclass
class GeometricSignature:
"""
64-float fingerprint extracted from the encoder's 8 bands.
Not the full 1024-signal — just the semantically meaningful
features that distinguish one transformation type from another.
Each band contributes 8 floats:
Band 1 (shape): aspect ratio, area, dimension parity
Band 2 (color): histogram peaks, unique count, entropy
Band 3 (symmetry): H/V/diagonal/rotational flags
Band 4 (frequency): tiling period, repetition density
Band 5 (boundary): edge density, gradient magnitude
Band 6 (objects): count, avg size, size variance
Band 7 (transform): dimension ratio, color shift, spatial op
Band 8 (hash): low-res structural hash
"""
vector: List[float] # 64 floats
task_hash: str = "" # SHA256 of task data for exact matching
def to_numpy(self) -> np.ndarray:
return np.array(self.vector, dtype=np.float32)
def cosine_similarity(self, other: 'GeometricSignature') -> float:
"""Cosine similarity between two signatures."""
a = self.to_numpy()
b = other.to_numpy()
dot = np.dot(a, b)
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na < 1e-10 or nb < 1e-10:
return 0.0
return float(dot / (na * nb))
class SignatureExtractor:
"""
Extract GeometricSignature from an ARC task.
Uses the same geometric features the encoder captures,
but compressed to 64 dimensions for fast similarity matching.
"""
SIGNATURE_SIZE = 64 # 8 bands × 8 features
@staticmethod
def extract(task: Dict) -> GeometricSignature:
"""Extract signature from complete task (all training pairs)."""
train = task.get('train', [])
if not train:
return GeometricSignature(vector=[0.0] * 64)
# Accumulate features across all training pairs
all_features = []
for pair in train:
features = SignatureExtractor._extract_pair(
pair['input'], pair['output'])
all_features.append(features)
# Average across pairs (consensus signature)
avg = np.mean(all_features, axis=0).tolist()
# Task hash for exact matching
task_hash = hashlib.sha256(
json.dumps(task.get('train', []), sort_keys=True).encode()
).hexdigest()[:16]
return GeometricSignature(vector=avg, task_hash=task_hash)
@staticmethod
def _extract_pair(input_grid: List[List[int]],
output_grid: List[List[int]]) -> np.ndarray:
"""Extract 64 features from a single input→output pair."""
ig = np.array(input_grid, dtype=np.float32)
og = np.array(output_grid, dtype=np.float32)
ih, iw = ig.shape
oh, ow = og.shape
features = np.zeros(64, dtype=np.float32)
# === Band 1: Shape (0-7) ===
features[0] = ih / 30.0
features[1] = iw / 30.0
features[2] = oh / 30.0
features[3] = ow / 30.0
features[4] = (ih * iw) / 900.0 # input area
features[5] = (oh * ow) / 900.0 # output area
features[6] = oh / ih if ih > 0 else 0 # height ratio
features[7] = ow / iw if iw > 0 else 0 # width ratio
# === Band 2: Color (8-15) ===
i_colors = set(ig.flatten().astype(int))
o_colors = set(og.flatten().astype(int))
features[8] = len(i_colors) / 10.0 # input unique colors
features[9] = len(o_colors) / 10.0 # output unique colors
features[10] = len(i_colors & o_colors) / 10.0 # shared colors
features[11] = len(i_colors ^ o_colors) / 10.0 # changed colors
# Color entropy (information content)
for idx, g in enumerate([ig, og]):
vals, counts = np.unique(g, return_counts=True)
probs = counts / counts.sum()
entropy = -np.sum(probs * np.log2(probs + 1e-10))
features[12 + idx] = entropy / 3.32 # normalize by log2(10)
# Dominant color fraction
i_vals, i_counts = np.unique(ig, return_counts=True)
features[14] = i_counts.max() / i_counts.sum()
o_vals, o_counts = np.unique(og, return_counts=True)
features[15] = o_counts.max() / o_counts.sum()
# === Band 3: Symmetry (16-23) ===
if ih > 1:
features[16] = float(np.mean(ig == ig[::-1, :])) # input H sym
if iw > 1:
features[17] = float(np.mean(ig == ig[:, ::-1])) # input V sym
if oh > 1:
features[18] = float(np.mean(og == og[::-1, :])) # output H sym
if ow > 1:
features[19] = float(np.mean(og == og[:, ::-1])) # output V sym
if ih == iw:
features[20] = float(np.mean(ig == ig.T)) # input diag
if oh == ow:
features[21] = float(np.mean(og == og.T)) # output diag
# Input→output symmetry preservation
features[22] = abs(features[16] - features[18]) # H sym change
features[23] = abs(features[17] - features[19]) # V sym change
# === Band 4: Spatial frequency (24-31) ===
# Row repetition period in input
for period in range(1, min(iw, 8)):
if iw % period == 0 and period < iw:
tiles = ig.reshape(ih, -1, period)
if tiles.shape[1] > 1 and np.all(tiles == tiles[:, 0:1, :]):
features[24] = period / 8.0
break
# Column repetition period in input
for period in range(1, min(ih, 8)):
if ih % period == 0 and period < ih:
tiles = ig.reshape(-1, period, iw)
if tiles.shape[0] > 1 and np.all(tiles == tiles[0:1, :, :]):
features[25] = period / 8.0
break
# Output repetition patterns
for period in range(1, min(ow, 8)):
if ow % period == 0 and period < ow:
tiles = og.reshape(oh, -1, period)
if tiles.shape[1] > 1 and np.all(tiles == tiles[:, 0:1, :]):
features[26] = period / 8.0
break
for period in range(1, min(oh, 8)):
if oh % period == 0 and period < oh:
tiles = og.reshape(-1, period, ow)
if tiles.shape[0] > 1 and np.all(tiles == tiles[0:1, :, :]):
features[27] = period / 8.0
break
# Size-change type: grow, shrink, or same
features[28] = 1.0 if oh > ih else (-1.0 if oh < ih else 0.0)
features[29] = 1.0 if ow > iw else (-1.0 if ow < iw else 0.0)
# Tiling divisibility
features[30] = 1.0 if (oh % ih == 0 and ow % iw == 0 and
(oh > ih or ow > iw)) else 0.0
features[31] = 1.0 if (ih % oh == 0 and iw % ow == 0 and
(ih > oh or iw > ow)) else 0.0
# === Band 5: Boundary/edge (32-39) ===
# Edge density (fraction of cells adjacent to different color)
for idx, g in enumerate([ig, og]):
h, w = g.shape
edges = 0
total = 0
for r in range(h):
for c in range(w):
if c + 1 < w:
total += 1
if g[r, c] != g[r, c + 1]:
edges += 1
if r + 1 < h:
total += 1
if g[r, c] != g[r + 1, c]:
edges += 1
features[32 + idx] = edges / max(total, 1)
# Edge density change
features[34] = features[33] - features[32]
# Border uniformity (are edges all one color?)
for idx, g in enumerate([ig, og]):
h, w = g.shape
border = np.concatenate([g[0, :], g[-1, :], g[:, 0], g[:, -1]])
features[35 + idx] = len(np.unique(border)) / 10.0
# Non-zero fraction
features[37] = float(np.count_nonzero(ig)) / max(ig.size, 1)
features[38] = float(np.count_nonzero(og)) / max(og.size, 1)
features[39] = features[38] - features[37] # density change
# === Band 6: Objects (40-47) ===
for idx, g in enumerate([ig, og]):
h, w = g.shape
visited = np.zeros_like(g, dtype=bool)
sizes = []
for r in range(h):
for c in range(w):
if not visited[r, c] and g[r, c] != 0:
# BFS
stack = [(r, c)]
size = 0
color = g[r, c]
while stack:
cr, cc = stack.pop()
if (0 <= cr < h and 0 <= cc < w and
not visited[cr, cc] and g[cr, cc] == color):
visited[cr, cc] = True
size += 1
stack.extend([(cr+1,cc),(cr-1,cc),
(cr,cc+1),(cr,cc-1)])
if size > 0:
sizes.append(size)
base = idx * 4
features[40 + base] = len(sizes) / 30.0 # object count
if sizes:
features[41 + base] = np.mean(sizes) / (h * w) # avg size
features[42 + base] = np.std(sizes) / (h * w) # size variance
features[43 + base] = max(sizes) / (h * w) # largest object
# === Band 7: Transformation (48-55) ===
# Direct overlap (how much of input appears unchanged in output)
min_h = min(ih, oh)
min_w = min(iw, ow)
overlap = float(np.mean(ig[:min_h, :min_w] == og[:min_h, :min_w]))
features[48] = overlap
# Rotation checks
if ih == ow and iw == oh:
for k, fidx in [(1, 49), (2, 50), (3, 51)]:
rotated = np.rot90(ig, k)
if rotated.shape == og.shape:
features[fidx] = float(np.mean(rotated == og))
elif ih == oh and iw == ow:
features[50] = float(np.mean(np.rot90(ig, 2) == og))
# Mirror checks
if ih == oh and iw == ow:
features[52] = float(np.mean(ig[::-1, :] == og)) # H flip
features[53] = float(np.mean(ig[:, ::-1] == og)) # V flip
# Color mapping consistency
if ih == oh and iw == ow:
mapping = {}
consistent = True
for r in range(ih):
for c in range(iw):
ic = int(ig[r, c])
oc = int(og[r, c])
if ic in mapping:
if mapping[ic] != oc:
consistent = False
break
else:
mapping[ic] = oc
if not consistent:
break
features[54] = 1.0 if consistent and mapping else 0.0
features[55] = len(mapping) / 10.0 if consistent else 0.0
# === Band 8: Structural hash (56-63) ===
# Low-resolution grid hash for coarse matching
# Downsample both grids to 2x2 and encode
for idx, g in enumerate([ig, og]):
h, w = g.shape
# Divide into quadrants, take mode of each
mh, mw = h // 2 or 1, w // 2 or 1
for qi in range(2):
for qj in range(2):
rs = qi * mh
re = min(rs + mh, h)
cs = qj * mw
ce = min(cs + mw, w)
quad = g[rs:re, cs:ce]
vals, counts = np.unique(quad, return_counts=True)
features[56 + idx * 4 + qi * 2 + qj] = vals[counts.argmax()] / 9.0
return features
# =============================================================================
# CODEBOOK ENTRY — a learned signature→code pairing
# =============================================================================
@dataclass
class CodebookEntry:
"""A learned geometric pattern → code mapping."""
signature: GeometricSignature
code: str # Python solve() function
task_id: str = "" # ARC task ID if known
learned_at: float = 0.0 # Unix timestamp
hit_count: int = 0 # Times this entry has been recalled
last_hit: float = 0.0 # Last recall timestamp
validated: bool = False # Has this been validated against training?
description: str = "" # Human-readable description of the pattern
def to_dict(self) -> dict:
return {
'signature': self.signature.vector,
'task_hash': self.signature.task_hash,
'code': self.code,
'task_id': self.task_id,
'learned_at': self.learned_at,
'hit_count': self.hit_count,
'last_hit': self.last_hit,
'validated': self.validated,
'description': self.description,
}
@staticmethod
def from_dict(d: dict) -> 'CodebookEntry':
sig = GeometricSignature(
vector=d['signature'],
task_hash=d.get('task_hash', '')
)
return CodebookEntry(
signature=sig,
code=d['code'],
task_id=d.get('task_id', ''),
learned_at=d.get('learned_at', 0.0),
hit_count=d.get('hit_count', 0),
last_hit=d.get('last_hit', 0.0),
validated=d.get('validated', False),
description=d.get('description', ''),
)
# =============================================================================
# CODEBOOK STORE — persistent storage
# =============================================================================
class CodebookStore:
"""
JSON-backed persistent storage for learned codebook entries.
File format:
{
"version": 1,
"entries": [...],
"pending": {...},
"stats": {...}
}
"""
def __init__(self, path: str = "codebook_learned.json"):
self.path = Path(path)
self.entries: List[CodebookEntry] = []
self.pending: Dict[str, Dict] = {} # task_hash → task data
self.stats = {
'total_learned': 0,
'total_recalled': 0,
'total_pending': 0,
'total_rejected': 0,
}
self._load()
def _load(self):
"""Load from disk."""
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
self.entries = [CodebookEntry.from_dict(e)
for e in data.get('entries', [])]
self.pending = data.get('pending', {})
self.stats = data.get('stats', self.stats)
print(f"[CODEBOOK-EXPAND] Loaded {len(self.entries)} learned entries, "
f"{len(self.pending)} pending")
except (json.JSONDecodeError, KeyError) as e:
print(f"[CODEBOOK-EXPAND] Error loading {self.path}: {e}")
self.entries = []
self.pending = {}
def _save(self):
"""Persist to disk."""
data = {
'version': 1,
'entries': [e.to_dict() for e in self.entries],
'pending': self.pending,
'stats': self.stats,
}
# Atomic write
tmp = self.path.with_suffix('.tmp')
with open(tmp, 'w') as f:
json.dump(data, f, indent=2)
tmp.rename(self.path)
def add_pending(self, task_hash: str, task: Dict,
signature: GeometricSignature):
"""Record a task that the static codebook couldn't handle."""
self.pending[task_hash] = {
'task': task,
'signature': signature.vector,
'recorded_at': time.time(),
}
self.stats['total_pending'] += 1
self._save()
def add_entry(self, entry: CodebookEntry):
"""Store a validated codebook entry."""
# Check for duplicate (same task hash)
for i, existing in enumerate(self.entries):
if existing.signature.task_hash == entry.signature.task_hash:
# Update existing
self.entries[i] = entry
self._save()
return
self.entries.append(entry)
self.stats['total_learned'] += 1
# Remove from pending if present
if entry.signature.task_hash in self.pending:
del self.pending[entry.signature.task_hash]
self._save()
def find_match(self, signature: GeometricSignature,
threshold: float = 0.85) -> Optional[CodebookEntry]:
"""
Find the best matching entry by cosine similarity.
Returns None if no entry exceeds threshold.
"""
# First: exact hash match
for entry in self.entries:
if (entry.signature.task_hash and
entry.signature.task_hash == signature.task_hash):
entry.hit_count += 1
entry.last_hit = time.time()
self.stats['total_recalled'] += 1
self._save()
return entry
# Second: similarity match
best_entry = None
best_sim = threshold
for entry in self.entries:
sim = signature.cosine_similarity(entry.signature)
if sim > best_sim:
best_sim = sim
best_entry = entry
if best_entry:
best_entry.hit_count += 1
best_entry.last_hit = time.time()
self.stats['total_recalled'] += 1
self._save()
print(f"[CODEBOOK-EXPAND] Dynamic match: similarity={best_sim:.3f}, "
f"entry={best_entry.task_id}")
return best_entry
def get_stats(self) -> Dict:
"""Return codebook statistics."""
return {
**self.stats,
'stored_entries': len(self.entries),
'pending_tasks': len(self.pending),
'avg_hits': (np.mean([e.hit_count for e in self.entries])
if self.entries else 0),
}
# =============================================================================
# CODE ABSTRACTOR — extract reusable templates from specific solutions
# =============================================================================
class CodeAbstractor:
"""
Extract reusable code patterns from task-specific solutions.
A raw solve() function might have hardcoded values that are specific
to one task. The abstractor identifies what can be parameterized
to make the code work on structurally similar tasks.
Strategy:
- Detect color constants → replace with input-derived color detection
- Detect dimension constants → replace with input.shape-derived values
- Detect hardcoded grids → replace with pattern matching
- If code is already generic (operates on input_grid without constants),
leave it as-is.
"""
@staticmethod
def abstract(code: str, task: Dict) -> str:
"""
Attempt to make a solve() function more generic.
Returns the code unchanged if it's already abstract enough,
or a modified version with hardcoded values replaced.
"""
if not code or 'def solve' not in code:
return code
train = task.get('train', [])
if not train:
return code
# Extract all color values used across training pairs
all_input_colors = set()
all_output_colors = set()
for pair in train:
ig = np.array(pair['input'])
og = np.array(pair['output'])
all_input_colors.update(ig.flatten().astype(int).tolist())
all_output_colors.update(og.flatten().astype(int).tolist())
# Check if the code contains hardcoded color-specific logic
# (numbers 0-9 that match task colors)
task_specific_colors = all_input_colors | all_output_colors
# Simple heuristic: if the code works on all training pairs already,
# it's probably generic enough. Don't break what works.
try:
namespace = {'np': np}
exec(code, namespace)
solve = namespace.get('solve')
if solve:
all_pass = True
for pair in train:
result = solve(pair['input'])
expected = pair['output']
if result is None:
all_pass = False
break
if isinstance(result, np.ndarray):
result = result.tolist()
if result != expected:
all_pass = False
break
if all_pass:
return code # Already works — don't abstract
except Exception:
pass
return code # Return as-is if we can't improve it
@staticmethod
def describe(code: str, task: Dict) -> str:
"""Generate a human-readable description of what the code does."""
train = task.get('train', [])
if not train:
return "Unknown transformation"
pair = train[0]
ig = np.array(pair['input'])
og = np.array(pair['output'])
ih, iw = ig.shape
oh, ow = og.shape
parts = []
# Size change
if oh > ih or ow > iw:
parts.append(f"Expands {ih}×{iw}{oh}×{ow}")
elif oh < ih or ow < iw:
parts.append(f"Shrinks {ih}×{iw}{oh}×{ow}")
else:
parts.append(f"Same size {ih}×{iw}")
# Color change
i_colors = set(ig.flatten().astype(int))
o_colors = set(og.flatten().astype(int))
if i_colors != o_colors:
parts.append(f"Colors change: {i_colors}{o_colors}")
return "; ".join(parts) if parts else "Geometric transformation"
# =============================================================================
# SOLUTION VALIDATOR — gate for quality control
# =============================================================================
class SolutionValidator:
"""
Validate that a solve() function actually works on training data.
This is the gate. No garbage gets into the learned codebook.
"""
@staticmethod
def validate(code: str, task: Dict) -> Tuple[bool, str]:
"""
Validate code against all training pairs.
Returns (passed: bool, message: str)
"""
train = task.get('train', [])
if not train:
return False, "No training data"
if not code or ('def solve' not in code and 'def transform' not in code):
return False, "No solve() or transform() function found"
try:
namespace = {'np': np}
exec(code, namespace)
solve = namespace.get('solve') or namespace.get('transform')
if not solve:
return False, "solve() or transform() not defined after exec"
except Exception as e:
return False, f"Code compilation failed: {e}"
passed = 0
total = len(train)
for i, pair in enumerate(train):
try:
result = solve(pair['input'])
expected = pair['output']
if result is None:
return False, f"Pair {i}: solve() returned None"
# Normalize to list
if isinstance(result, np.ndarray):
result = result.tolist()
if isinstance(result, list) and len(result) > 0:
if isinstance(result[0], np.ndarray):
result = [r.tolist() for r in result]
if result != expected:
return False, (f"Pair {i}: mismatch. "
f"Got {str(result)[:100]}... "
f"Expected {str(expected)[:100]}...")
passed += 1
except Exception as e:
return False, f"Pair {i}: runtime error: {e}"
return True, f"Passed {passed}/{total} training pairs"
# =============================================================================
# DYNAMIC CODEBOOK — the integrated expansion system
# =============================================================================
class DynamicCodebook:
"""
The complete dynamic codebook expansion system.
Integrates:
- SignatureExtractor (task → geometric fingerprint)
- CodebookStore (persistence)
- SolutionValidator (quality gate)
- CodeAbstractor (generalization)
Usage:
dc = DynamicCodebook("/path/to/codebook_learned.json")
# On fallback:
dc.record_miss(task)
# When solution arrives:
dc.learn(task, code, task_id="abc123")
# Before fallback, check dynamic:
entry = dc.recall(task)
if entry:
return entry.code
"""
def __init__(self, store_path: str = "codebook_learned.json"):
self.store = CodebookStore(store_path)
self.extractor = SignatureExtractor()
self.validator = SolutionValidator()
self.abstractor = CodeAbstractor()
def record_miss(self, task: Dict) -> GeometricSignature:
"""
Record a task that the static codebook couldn't handle.
Stores the geometric signature as "pending" for later pairing
when a solution arrives.
Returns the signature for reference.
"""
sig = self.extractor.extract(task)
self.store.add_pending(sig.task_hash, task, sig)
print(f"[CODEBOOK-EXPAND] Recorded pending: hash={sig.task_hash}")
return sig
def learn(self, task: Dict, code: str,
task_id: str = "") -> Tuple[bool, str]:
"""
Learn a new codebook entry from a validated solution.
Validates the code, extracts signature, abstracts if possible,
and stores the pairing.
Returns (success: bool, message: str)
"""
# Validate
passed, msg = self.validator.validate(code, task)
if not passed:
self.store.stats['total_rejected'] += 1
print(f"[CODEBOOK-EXPAND] Rejected: {msg}")
return False, f"Validation failed: {msg}"
# Extract signature
sig = self.extractor.extract(task)
# Attempt abstraction
abstract_code = self.abstractor.abstract(code, task)
# Generate description
description = self.abstractor.describe(abstract_code, task)
# Create entry
entry = CodebookEntry(
signature=sig,
code=abstract_code,
task_id=task_id,
learned_at=time.time(),
validated=True,
description=description,
)
self.store.add_entry(entry)
print(f"[CODEBOOK-EXPAND] Learned: task={task_id}, "
f"hash={sig.task_hash}, desc={description}")
return True, f"Learned: {description}"
def recall(self, task: Dict,
threshold: float = 0.85) -> Optional[CodebookEntry]:
"""
Check if a similar task has been solved before.
Returns the best matching entry, or None.
"""
sig = self.extractor.extract(task)
return self.store.find_match(sig, threshold)
def get_code(self, task: Dict, threshold: float = 0.85) -> Optional[str]:
"""
Convenience: recall and return just the code, or None.
"""
entry = self.recall(task, threshold)
if entry:
# Re-validate against this specific task before returning
passed, _ = self.validator.validate(entry.code, task)
if passed:
return entry.code
else:
# Similar signature but code doesn't work — not a true match
print(f"[CODEBOOK-EXPAND] Signature matched but code failed "
f"validation for new task")
return None
return None
def get_stats(self) -> Dict:
"""Return expansion statistics."""
return self.store.get_stats()
def get_entries_summary(self) -> List[Dict]:
"""Return summary of all learned entries."""
return [
{
'task_id': e.task_id,
'description': e.description,
'learned_at': e.learned_at,
'hit_count': e.hit_count,
'validated': e.validated,
}
for e in self.store.entries
]
# =============================================================================
# STANDALONE TEST
# =============================================================================
def test_expansion():
"""Test the dynamic codebook expansion system."""
import tempfile
print("=" * 70)
print(" DYNAMIC CODEBOOK EXPANSION TEST")
print("=" * 70)
# Use temp file for test
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
test_path = f.name
try:
dc = DynamicCodebook(test_path)
# --- Test 1: Record a miss ---
print("\n--- Phase 1: Record miss ---")
task_unknown = {
'train': [
{'input': [[1, 0, 1], [0, 1, 0], [1, 0, 1]],
'output': [[1, 0, 1, 1, 0, 1], [0, 1, 0, 0, 1, 0],
[1, 0, 1, 1, 0, 1]]},
{'input': [[2, 0], [0, 2]],
'output': [[2, 0, 2, 0], [0, 2, 0, 2]]},
],
'test': [
{'input': [[3, 0, 3], [0, 3, 0]],
'output': [[3, 0, 3, 3, 0, 3], [0, 3, 0, 0, 3, 0]]},
]
}
sig = dc.record_miss(task_unknown)
print(f" Signature hash: {sig.task_hash}")
print(f" Pending count: {dc.store.stats['total_pending']}")
assert len(dc.store.pending) == 1, "Should have 1 pending"
print(" Record: PASS ✓")
# --- Test 2: Learn from solution ---
print("\n--- Phase 2: Learn from solution ---")
# This code doubles the grid horizontally
solution_code = """def solve(input_grid):
import numpy as np
g = np.array(input_grid)
return np.tile(g, (1, 2)).tolist()
"""
success, msg = dc.learn(task_unknown, solution_code, task_id="test_001")
print(f" Result: {msg}")
assert success, f"Should succeed: {msg}"
assert len(dc.store.entries) == 1, "Should have 1 entry"
print(f" Stored entries: {len(dc.store.entries)}")
print(" Learn: PASS ✓")
# --- Test 3: Recall for same task ---
print("\n--- Phase 3: Recall (exact match) ---")
code = dc.get_code(task_unknown)
assert code is not None, "Should find exact match"
print(f" Retrieved code: {code.strip().split(chr(10))[0]}...")
print(" Recall exact: PASS ✓")
# --- Test 4: Recall for similar task ---
print("\n--- Phase 4: Recall (similar task) ---")
task_similar = {
'train': [
{'input': [[5, 0, 5], [0, 5, 0], [5, 0, 5]],
'output': [[5, 0, 5, 5, 0, 5], [0, 5, 0, 0, 5, 0],
[5, 0, 5, 5, 0, 5]]},
{'input': [[7, 0], [0, 7]],
'output': [[7, 0, 7, 0], [0, 7, 0, 7]]},
],
'test': [
{'input': [[4, 0, 4], [0, 4, 0]],
'output': [[4, 0, 4, 4, 0, 4], [0, 4, 0, 0, 4, 0]]},
]
}
code = dc.get_code(task_similar)
if code:
# Validate on the similar task
namespace = {'np': np}
exec(code, namespace)
result = namespace['solve'](task_similar['test'][0]['input'])
expected = task_similar['test'][0]['output']
match = result == expected
print(f" Similar task match: {match}")
if match:
print(" Recall similar: PASS ✓")
else:
print(" Recall similar: FAIL ✗ (code doesn't generalize)")
else:
print(" No match found (below threshold)")
print(" Recall similar: SKIP (expected — different colors)")
# --- Test 5: Reject bad code ---
print("\n--- Phase 5: Reject bad solution ---")
bad_code = """def solve(input_grid):
return [[0]]
"""
success, msg = dc.learn(task_unknown, bad_code, task_id="bad_001")
assert not success, "Should reject"
print(f" Rejected: {msg}")
print(" Reject: PASS ✓")
# --- Test 6: Persistence ---
print("\n--- Phase 6: Persistence ---")
dc2 = DynamicCodebook(test_path)
assert len(dc2.store.entries) == 1, "Should load 1 entry from disk"
print(f" Loaded {len(dc2.store.entries)} entries from disk")
print(" Persistence: PASS ✓")
# --- Stats ---
print("\n--- Stats ---")
stats = dc.get_stats()
for k, v in stats.items():
print(f" {k}: {v}")
finally:
os.unlink(test_path)
print("\n" + "=" * 70)
print(" ALL TESTS PASSED")
print("=" * 70)
if __name__ == "__main__":
test_expansion()