| | |
| | |
| | |
| | """ |
| | Mathematical Foundation & Conceptual Documentation |
| | ------------------------------------------------- |
| | |
| | CORE PRINCIPLE: |
| | Combines Neural Turing Machines (external memory architectures) with evolutionary |
| | algorithms to create adaptive memory systems that evolve both their architecture |
| | and parameters through natural selection, enabling discovery of optimal memory |
| | access patterns and computational structures. |
| | |
| | MATHEMATICAL FOUNDATION: |
| | ======================= |
| | |
| | 1. NEURAL TURING MACHINE DYNAMICS: |
| | Content-based addressing: w_t^c = softmax(β_t ⊙ K[M_t, k_t]) |
| | Where: |
| | - w_t: attention weights over memory locations |
| | - β_t: key strength (focus parameter) |
| | - K[M,k]: cosine similarity between memory M and key k |
| | - M_t: memory matrix at time t |
| | - k_t: generated key vector |
| | |
| | 2. MEMORY OPERATIONS: |
| | Read: r_t = Σ_i w_t^r[i] × M_t[i] |
| | Erase: M̃_t[i] = M_{t-1}[i] ⊙ (1 - w_t^w[i] ⊙ e_t) |
| | Add: M_t[i] = M̃_t[i] + w_t^w[i] ⊙ a_t |
| | |
| | Where: |
| | - r_t: read vector |
| | - e_t: erase vector ∈ [0,1]^M |
| | - a_t: add vector ∈ ℝ^M |
| | - ⊙: element-wise product |
| | |
| | 3. EVOLUTIONARY FITNESS: |
| | F(individual) = α·task_performance + β·memory_efficiency + γ·stability |
| | |
| | Where: |
| | - task_performance: accuracy on computational tasks |
| | - memory_efficiency: 1/(parameter_count/baseline) |
| | - stability: consistency across multiple runs |
| | |
| | 4. GENETIC OPERATIONS: |
| | Architecture Crossover: A_child = random_blend(A_parent1, A_parent2) |
| | Parameter Mutation: θ'_i = θ_i + ε·N(0,σ²) with probability p_mut |
| | Selection: P(selection) ∝ exp(F(individual)/T) |
| | |
| | Where T is selection temperature. |
| | |
| | 5. POPULATION DYNAMICS: |
| | Elite Preservation: Keep top k% individuals |
| | Tournament Selection: Choose parents via tournament |
| | Replacement Strategy: (μ + λ) evolution strategy |
| | |
| | CONCEPTUAL REASONING: |
| | ==================== |
| | |
| | WHY EVOLUTIONARY + TURING MACHINES? |
| | - Fixed NTM architectures may be suboptimal for specific tasks |
| | - Manual architecture design is time-intensive and domain-specific |
| | - Evolution can discover novel memory access patterns |
| | - Natural selection optimizes both structure and parameters simultaneously |
| | |
| | KEY INNOVATIONS: |
| | 1. **Evolvable Architecture**: Memory size, heads, controller complexity all mutable |
| | 2. **Task-Adaptive Evolution**: Fitness functions guide toward task-specific solutions |
| | 3. **Multi-Objective Optimization**: Balance performance, efficiency, and stability |
| | 4. **Hierarchical Mutation**: Different rates for architecture vs parameters |
| | 5. **Memory Access Pattern Evolution**: Learn optimal attention strategies |
| | |
| | APPLICATIONS: |
| | - Algorithmic learning (sorting, copying, associative recall) |
| | - Adaptive control systems with memory requirements |
| | - Meta-learning for memory-augmented architectures |
| | - Neural architecture search for sequence modeling |
| | - Continual learning with evolving memory structures |
| | |
| | COMPLEXITY ANALYSIS: |
| | - Individual Evaluation: O(T·(D² + M·H)) where T=sequence length, D=hidden size, M=memory slots, H=heads |
| | - Population Evolution: O(P·evaluations) where P=population size |
| | - Architecture Mutation: O(1) for parameter changes, O(M) for structural changes |
| | - Memory: O(P·(D² + M²)) for population storage |
| | |
| | BIOLOGICAL INSPIRATION: |
| | - Neural plasticity and synaptic evolution |
| | - Natural selection of neural circuits |
| | - Memory consolidation and forgetting mechanisms |
| | - Adaptive brain architecture development |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | from dataclasses import dataclass |
| | from typing import Dict, List, Optional, Tuple, Union |
| | import math |
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from copy import deepcopy |
| |
|
| |
|
| | @dataclass |
| | class NTMConfig: |
| | """Configuration for Neural Turing Machine architecture. |
| | |
| | Defines the structure and hyperparameters for a single NTM individual |
| | in the evolutionary population. All parameters are evolvable. |
| | """ |
| | input_dim: int |
| | output_dim: int |
| | controller_dim: int = 128 |
| | controller_layers: int = 1 |
| | memory_slots: int = 128 |
| | memory_dim: int = 32 |
| | heads_read: int = 1 |
| | heads_write: int = 1 |
| | init_std: float = 0.1 |
| |
|
| | |
| | |
| |
|
| | class NeuralTuringMachine(nn.Module): |
| | """Neural Turing Machine with external memory and attention mechanisms. |
| | |
| | Implements the complete NTM architecture including: |
| | - LSTM controller for sequence processing |
| | - External memory matrix with read/write operations |
| | - Content-based addressing via cosine similarity |
| | - Differentiable memory operations (erase, add) |
| | |
| | Mathematical Details: |
| | - Controller processes input + read vectors: h_t = LSTM(x_t ⊕ r_{t-1}, h_{t-1}) |
| | - Interface parameters: keys, strengths, erase/add vectors |
| | - Attention: w_t = softmax(β_t ⊙ cosine_sim(M_t, k_t)) |
| | - Memory updates preserve differentiability for gradient-based learning |
| | """ |
| | def __init__(self, cfg: NTMConfig): |
| | super().__init__() |
| | self.cfg = cfg |
| | R, W, Dm = cfg.heads_read, cfg.heads_write, cfg.memory_dim |
| |
|
| | |
| | ctrl_in = cfg.input_dim + R * Dm |
| | self.controller = nn.LSTMCell(ctrl_in, cfg.controller_dim) |
| |
|
| | |
| | iface_read = R * (Dm + 1) |
| | iface_write = W * (Dm + 1 + Dm + Dm) |
| | self.interface = nn.Linear(cfg.controller_dim, iface_read + iface_write) |
| | |
| | |
| | self.output = nn.Linear(cfg.controller_dim + R * Dm, cfg.output_dim) |
| |
|
| | self.reset_parameters() |
| |
|
| | def reset_parameters(self): |
| | """Initialize parameters with appropriate distributions. |
| | |
| | Uses Xavier initialization for linear layers and orthogonal |
| | initialization for LSTM recurrent weights to ensure stable training. |
| | Forget gate bias is initialized to 1.0 for better gradient flow. |
| | """ |
| | for m in self.modules(): |
| | if isinstance(m, nn.Linear): |
| | nn.init.xavier_uniform_(m.weight) |
| | nn.init.zeros_(m.bias) |
| | if isinstance(m, nn.LSTMCell): |
| | nn.init.xavier_uniform_(m.weight_ih) |
| | nn.init.orthogonal_(m.weight_hh) |
| | nn.init.zeros_(m.bias_ih) |
| | nn.init.zeros_(m.bias_hh) |
| | |
| | hs = m.bias_ih.shape[0] // 4 |
| | m.bias_ih.data[hs:2*hs].fill_(1.0) |
| | m.bias_hh.data[hs:2*hs].fill_(1.0) |
| |
|
| | def initial_state(self, batch_size: int, device=None): |
| | """Initialize NTM state including memory, attention weights, and controller state. |
| | |
| | Args: |
| | batch_size: Number of parallel sequences |
| | device: Target device for tensors |
| | |
| | Returns: |
| | Dictionary containing: |
| | - M: Memory matrix [batch_size, memory_slots, memory_dim] |
| | - w_r: Read attention weights [batch_size, heads_read, memory_slots] |
| | - w_w: Write attention weights [batch_size, heads_write, memory_slots] |
| | - r: Read vectors [batch_size, heads_read, memory_dim] |
| | - h, c: LSTM controller states |
| | """ |
| | cfg = self.cfg |
| | device = device or next(self.parameters()).device |
| |
|
| | |
| | M = torch.zeros(batch_size, cfg.memory_slots, cfg.memory_dim, device=device) |
| | if cfg.init_std > 0: |
| | M.normal_(0.0, cfg.init_std) |
| |
|
| | |
| | w_r = torch.ones(batch_size, cfg.heads_read, cfg.memory_slots, device=device) / cfg.memory_slots |
| | w_w = torch.ones(batch_size, cfg.heads_write, cfg.memory_slots, device=device) / cfg.memory_slots |
| | |
| | |
| | r = torch.zeros(batch_size, cfg.heads_read, cfg.memory_dim, device=device) |
| | h = torch.zeros(batch_size, cfg.controller_dim, device=device) |
| | c = torch.zeros(batch_size, cfg.controller_dim, device=device) |
| |
|
| | return {'M': M, 'w_r': w_r, 'w_w': w_w, 'r': r, 'h': h, 'c': c} |
| |
|
| | def step(self, x: torch.Tensor, state: Dict[str, torch.Tensor]): |
| | """Execute one forward step of NTM computation. |
| | |
| | Complete NTM forward pass: |
| | 1. Controller processes input + previous reads |
| | 2. Interface generates memory operation parameters |
| | 3. Content-based addressing computes attention weights |
| | 4. Memory operations (read, erase, add) |
| | 5. Output generation |
| | |
| | Args: |
| | x: Input tensor [batch_size, input_dim] |
| | state: Current NTM state dictionary |
| | |
| | Returns: |
| | y: Output tensor [batch_size, output_dim] |
| | new_state: Updated state dictionary |
| | """ |
| | cfg = self.cfg |
| | B = x.shape[0] |
| |
|
| | |
| | ctrl_in = torch.cat([x, state['r'].view(B, -1)], dim=-1) |
| | h, c = self.controller(ctrl_in, (state['h'], state['c'])) |
| | |
| | |
| | iface = self.interface(h) |
| | R, W, Dm = cfg.heads_read, cfg.heads_write, cfg.memory_dim |
| |
|
| | |
| | offset = 0 |
| | |
| | k_r = iface[:, offset:offset + R * Dm].view(B, R, Dm) |
| | offset += R * Dm |
| | beta_r = F.softplus(iface[:, offset:offset + R]) |
| | offset += R |
| |
|
| | |
| | k_w = iface[:, offset:offset + W * Dm].view(B, W, Dm) |
| | offset += W * Dm |
| | beta_w = F.softplus(iface[:, offset:offset + W]) |
| | offset += W |
| | erase = torch.sigmoid(iface[:, offset:offset + W * Dm]).view(B, W, Dm) |
| | offset += W * Dm |
| | add = torch.tanh(iface[:, offset:offset + W * Dm]).view(B, W, Dm) |
| |
|
| | def address(M, k, beta, prev_weight=None): |
| | """Content-based addressing mechanism. |
| | |
| | Computes attention weights using cosine similarity between |
| | memory contents and generated keys, focused by strength parameter. |
| | |
| | Mathematical Details: |
| | - Cosine similarity: sim(M[i], k) = (M[i] · k) / (||M[i]|| ||k||) |
| | - Focused attention: w = softmax(β ⊙ sim) |
| | - Optional momentum: adds small fraction of previous weights |
| | |
| | Args: |
| | M: Memory matrix [batch_size, slots, memory_dim] |
| | k: Key vectors [batch_size, heads, memory_dim] |
| | beta: Strength parameters [batch_size, heads] |
| | prev_weight: Previous attention weights for momentum |
| | |
| | Returns: |
| | Attention weights [batch_size, heads, slots] |
| | """ |
| | |
| | M_norm = torch.norm(M, dim=-1, keepdim=True).clamp_min(1e-8) |
| | k_norm = torch.norm(k, dim=-1, keepdim=True).clamp_min(1e-8) |
| | |
| | |
| | cos_sim = torch.sum(M.unsqueeze(1) * k.unsqueeze(2), dim=-1) / ( |
| | M_norm.squeeze(-1).unsqueeze(1) * k_norm.squeeze(-1).unsqueeze(-1) |
| | ) |
| | |
| | |
| | content_logits = beta.unsqueeze(-1) * cos_sim |
| | if prev_weight is not None: |
| | content_logits = content_logits + 0.02 * prev_weight |
| | |
| | return F.softmax(content_logits, dim=-1) |
| |
|
| | |
| | w_r = address(state['M'], k_r, beta_r, prev_weight=state.get('w_r')) |
| | w_w = address(state['M'], k_w, beta_w, prev_weight=state.get('w_w')) |
| | |
| | |
| | |
| | r = torch.sum(w_r.unsqueeze(-1) * state['M'].unsqueeze(1), dim=2) |
| |
|
| | |
| | M = state['M'] |
| | if W > 0: |
| | |
| | erase_term = torch.prod(1 - w_w.unsqueeze(-1) * erase.unsqueeze(2), dim=1) |
| | M = M * erase_term |
| | |
| | |
| | add_term = torch.sum(w_w.unsqueeze(-1) * add.unsqueeze(2), dim=1) |
| | M = M + add_term |
| |
|
| | |
| | y = self.output(torch.cat([h, r.view(B, -1)], dim=-1)) |
| |
|
| | new_state = {'M': M, 'w_r': w_r, 'w_w': w_w, 'r': r, 'h': h, 'c': c} |
| | return y, new_state |
| |
|
| | def forward(self, x: torch.Tensor, state=None): |
| | """Forward pass for single step or sequence. |
| | |
| | Handles both single-step operation (for interactive use) and |
| | sequence processing (for training/evaluation). |
| | |
| | Args: |
| | x: Input tensor [batch_size, input_dim] or [batch_size, seq_len, input_dim] |
| | state: Optional initial state (created if None) |
| | |
| | Returns: |
| | For single step: (output, new_state) |
| | For sequence: (output_sequence, final_state) |
| | """ |
| | if x.dim() == 2: |
| | if state is None: |
| | state = self.initial_state(x.shape[0], x.device) |
| | return self.step(x, state) |
| |
|
| | |
| | B, T, _ = x.shape |
| | if state is None: |
| | state = self.initial_state(B, x.device) |
| |
|
| | outputs = [] |
| | for t in range(T): |
| | y, state = self.step(x[:, t], state) |
| | outputs.append(y) |
| |
|
| | return torch.stack(outputs, dim=1), state |
| |
|
| | @dataclass |
| | class EvolutionaryTuringConfig: |
| | """Configuration for evolutionary optimization of NTM population. |
| | |
| | Defines hyperparameters for the evolutionary algorithm including |
| | population size, mutation rates, selection pressure, and fitness |
| | evaluation parameters. |
| | """ |
| | population_size: int = 100 |
| | mutation_rate: float = 0.1 |
| | architecture_mutation_rate: float = 0.05 |
| | elite_ratio: float = 0.2 |
| | max_generations: int = 200 |
| | input_dim: int = 8 |
| | output_dim: int = 8 |
| | device: str = 'cpu' |
| | seed: Optional[int] = None |
| |
|
| | |
| | |
| |
|
| | class FitnessEvaluator: |
| | """Comprehensive fitness evaluation for NTM individuals. |
| | |
| | Evaluates NTM performance on multiple algorithmic tasks to assess |
| | general computational capability. Includes efficiency penalties |
| | to encourage compact, effective architectures. |
| | |
| | Tasks: |
| | 1. Copy Task: Tests basic memory read/write capabilities |
| | 2. Associative Recall: Tests content-based memory access |
| | 3. Efficiency: Penalizes excessive parameters |
| | |
| | Mathematical Details: |
| | - Copy task measures sequence reproduction accuracy |
| | - Associative recall tests key-value pair memory |
| | - Composite fitness balances multiple objectives |
| | """ |
| | def __init__(self, device: str = 'cpu'): |
| | self.device = device |
| |
|
| | def copy_task(self, ntm: NeuralTuringMachine, seq_len: int = 8, batch_size: int = 16) -> float: |
| | """Evaluate NTM on sequence copying task. |
| | |
| | The copy task is fundamental for testing memory capabilities: |
| | 1. Present input sequence |
| | 2. Present delimiter (end-of-sequence marker) |
| | 3. Evaluate output sequence reproduction accuracy |
| | |
| | Mathematical Details: |
| | - Input: x₁, x₂, ..., xₜ, delimiter |
| | - Target: reproduce x₁, x₂, ..., xₜ after delimiter |
| | - Loss: MSE between predicted and target sequences |
| | - Accuracy: 1 / (1 + loss) for bounded score ∈ [0,1] |
| | |
| | Args: |
| | ntm: NTM individual to evaluate |
| | seq_len: Length of sequences to copy |
| | batch_size: Number of parallel sequences |
| | |
| | Returns: |
| | Copy task accuracy score ∈ [0,1] |
| | """ |
| | with torch.no_grad(): |
| | |
| | x = torch.randint(0, 2, (batch_size, seq_len, ntm.cfg.input_dim), |
| | device=self.device, dtype=torch.float32) |
| |
|
| | |
| | delimiter = torch.zeros(batch_size, 1, ntm.cfg.input_dim, device=self.device) |
| | delimiter[:, :, -1] = 1 |
| |
|
| | |
| | input_seq = torch.cat([x, delimiter], dim=1) |
| | |
| | try: |
| | output, _ = ntm(input_seq) |
| | |
| | |
| | T = seq_len |
| | D = ntm.cfg.output_dim |
| | pred = output[:, -T:, :D] |
| | |
| | |
| | d = min(ntm.cfg.input_dim, D) |
| | loss = F.mse_loss(pred[..., :d], x[..., :d]) |
| | accuracy = 1.0 / (1.0 + loss.item()) |
| | return accuracy |
| | except: |
| | |
| | return 0.0 |
| |
|
| | def associative_recall(self, ntm: NeuralTuringMachine, num_pairs: int = 4) -> float: |
| | """Evaluate NTM on associative memory recall task. |
| | |
| | Tests content-based memory access by storing key-value pairs |
| | and then querying with keys to retrieve associated values. |
| | |
| | Task Structure: |
| | 1. Store phase: present key-value pairs |
| | 2. Query phase: present keys (with zero values) |
| | 3. Evaluate: check if correct values are recalled |
| | |
| | Mathematical Details: |
| | - Keys: k₁, k₂, ..., kₙ (half of input dimension) |
| | - Values: v₁, v₂, ..., vₙ (other half of input dimension) |
| | - Query: present [k₁, 0], expect output [0, v₁] |
| | - Score based on MSE between recalled and target values |
| | |
| | Args: |
| | ntm: NTM individual to evaluate |
| | num_pairs: Number of key-value pairs to store/recall |
| | |
| | Returns: |
| | Associative recall accuracy score ∈ [0,1] |
| | """ |
| | with torch.no_grad(): |
| | batch_size = 8 |
| | dim = ntm.cfg.input_dim |
| | |
| | |
| | keys = torch.randn(batch_size, num_pairs, dim // 2, device=self.device) |
| | values = torch.randn(batch_size, num_pairs, dim // 2, device=self.device) |
| | pairs = torch.cat([keys, values], dim=-1) |
| |
|
| | |
| | test_keys = torch.cat([keys, torch.zeros_like(values)], dim=-1) |
| | expected_values = torch.cat([torch.zeros_like(keys), values], dim=-1) |
| |
|
| | |
| | input_seq = torch.cat([pairs, test_keys], dim=1) |
| | target_seq = torch.cat([torch.zeros_like(pairs), expected_values], dim=1) |
| |
|
| | try: |
| | output, _ = ntm(input_seq) |
| | |
| | |
| | D = ntm.cfg.output_dim |
| | d = min(dim, D) |
| | loss = F.mse_loss(output[:, num_pairs:, :d], target_seq[:, num_pairs:, :d]) |
| | accuracy = 1.0 / (1.0 + loss.item()) |
| | return accuracy |
| | except: |
| | return 0.0 |
| |
|
| | def evaluate_fitness(self, ntm: NeuralTuringMachine) -> Dict[str, float]: |
| | """Comprehensive fitness evaluation across multiple criteria. |
| | |
| | Evaluates individual on multiple tasks and efficiency metrics |
| | to encourage both performance and architectural parsimony. |
| | |
| | Fitness Components: |
| | 1. Copy Task (50%): Basic memory functionality |
| | 2. Associative Recall (30%): Content-based memory access |
| | 3. Efficiency (20%): Parameter count penalty |
| | |
| | Mathematical Details: |
| | - Each component scored ∈ [0,1] |
| | - Efficiency = 1 / (1 + params/baseline) |
| | - Composite = weighted combination |
| | |
| | Args: |
| | ntm: NTM individual to evaluate |
| | |
| | Returns: |
| | Dictionary containing individual and composite fitness scores |
| | """ |
| | copy_score = self.copy_task(ntm) |
| | recall_score = self.associative_recall(ntm) |
| |
|
| | |
| | param_count = sum(p.numel() for p in ntm.parameters()) |
| | efficiency = 1.0 / (1.0 + param_count / 100000) |
| |
|
| | |
| | composite_score = 0.5 * copy_score + 0.3 * recall_score + 0.2 * efficiency |
| |
|
| | return { |
| | 'copy': copy_score, |
| | 'recall': recall_score, |
| | 'efficiency': efficiency, |
| | 'composite': composite_score |
| | } |
| |
|
| | |
| | |
| |
|
| | class EvolutionaryTuringMachine: |
| | """Evolutionary optimization system for Neural Turing Machine architectures. |
| | |
| | Implements a complete evolutionary algorithm for discovering optimal |
| | NTM architectures and parameters through natural selection. Uses |
| | both architectural mutations (structure) and parameter mutations. |
| | |
| | Evolutionary Operations: |
| | 1. Selection: Tournament/rank-based parent selection |
| | 2. Crossover: Architecture and parameter blending |
| | 3. Mutation: Structure modification and parameter perturbation |
| | 4. Replacement: Elite preservation with new offspring |
| | |
| | The system evolves both the neural architecture (memory size, heads, |
| | controller complexity) and the connection weights simultaneously. |
| | """ |
| | def __init__(self, cfg: EvolutionaryTuringConfig): |
| | self.cfg = cfg |
| | self.evaluator = FitnessEvaluator(cfg.device) |
| | self.generation = 0 |
| | self.best_fitness = 0.0 |
| | self.population = [] |
| |
|
| | if cfg.seed is not None: |
| | torch.manual_seed(cfg.seed) |
| |
|
| | def create_random_config(self) -> NTMConfig: |
| | """Generate random NTM architecture configuration. |
| | |
| | Creates diverse initial population by randomizing all |
| | architectural hyperparameters within reasonable bounds. |
| | |
| | Architectural Parameters: |
| | - Controller dimension: [64, 256] |
| | - Memory slots: [32, 256] |
| | - Memory dimension: [16, 64] |
| | - Read/write heads: [1, 4] and [1, 3] |
| | |
| | Returns: |
| | Random NTM configuration |
| | """ |
| | return NTMConfig( |
| | input_dim=self.cfg.input_dim, |
| | output_dim=self.cfg.output_dim, |
| | controller_dim=torch.randint(64, 256, (1,)).item(), |
| | controller_layers=torch.randint(1, 3, (1,)).item(), |
| | memory_slots=torch.randint(32, 256, (1,)).item(), |
| | memory_dim=torch.randint(16, 64, (1,)).item(), |
| | heads_read=torch.randint(1, 4, (1,)).item(), |
| | heads_write=torch.randint(1, 3, (1,)).item(), |
| | init_std=0.1 |
| | ) |
| |
|
| | def mutate_architecture(self, cfg: NTMConfig) -> NTMConfig: |
| | """Apply architectural mutations to NTM configuration. |
| | |
| | Modifies structural parameters with probability architecture_mutation_rate. |
| | Each architectural parameter can be independently mutated with |
| | small random perturbations. |
| | |
| | Mutation Operations: |
| | - Controller dimension: ±32 units |
| | - Memory slots: ±16 units |
| | - Memory dimension: ±8 units |
| | - Read/write heads: ±1 head (within bounds) |
| | |
| | Args: |
| | cfg: Original NTM configuration |
| | |
| | Returns: |
| | Mutated NTM configuration |
| | """ |
| | new_cfg = deepcopy(cfg) |
| |
|
| | if torch.rand(1) < self.cfg.architecture_mutation_rate: |
| | new_cfg.controller_dim = max(32, new_cfg.controller_dim + torch.randint(-32, 33, (1,)).item()) |
| |
|
| | if torch.rand(1) < self.cfg.architecture_mutation_rate: |
| | new_cfg.memory_slots = max(16, new_cfg.memory_slots + torch.randint(-16, 17, (1,)).item()) |
| |
|
| | if torch.rand(1) < self.cfg.architecture_mutation_rate: |
| | new_cfg.memory_dim = max(8, new_cfg.memory_dim + torch.randint(-8, 9, (1,)).item()) |
| |
|
| | if torch.rand(1) < self.cfg.architecture_mutation_rate: |
| | new_cfg.heads_read = max(1, min(4, new_cfg.heads_read + torch.randint(-1, 2, (1,)).item())) |
| |
|
| | if torch.rand(1) < self.cfg.architecture_mutation_rate: |
| | new_cfg.heads_write = max(1, min(3, new_cfg.heads_write + torch.randint(-1, 2, (1,)).item())) |
| |
|
| | return new_cfg |
| |
|
| | def mutate_parameters(self, ntm: NeuralTuringMachine) -> NeuralTuringMachine: |
| | """Apply parameter mutations to NTM weights. |
| | |
| | Performs Gaussian perturbations to network parameters with |
| | probability mutation_rate per parameter. Creates a new NTM |
| | instance to avoid modifying the original. |
| | |
| | Mathematical Details: |
| | - Each parameter p mutated with probability mutation_rate |
| | - Mutation: p' = p + ε where ε ~ N(0, 0.01²) |
| | - Preserves network architecture, only modifies weights |
| | |
| | Args: |
| | ntm: Original NTM individual |
| | |
| | Returns: |
| | New NTM with mutated parameters |
| | """ |
| | new_ntm = NeuralTuringMachine(ntm.cfg).to(self.cfg.device) |
| | new_ntm.load_state_dict(deepcopy(ntm.state_dict())) |
| | |
| | with torch.no_grad(): |
| | for p in new_ntm.parameters(): |
| | |
| | mask = (torch.rand_like(p) < self.cfg.mutation_rate) |
| | p.add_(torch.randn_like(p) * 0.01 * mask) |
| | |
| | return new_ntm |
| |
|
| | def crossover(self, parent1: NeuralTuringMachine, parent2: NeuralTuringMachine) -> NeuralTuringMachine: |
| | """Create offspring through architectural crossover. |
| | |
| | Combines architectural features from two parents by randomly |
| | selecting each architectural parameter from either parent. |
| | The resulting offspring has a new random weight initialization. |
| | |
| | Crossover Strategy: |
| | - Each architectural parameter chosen from parent1 or parent2 (50% each) |
| | - New weights initialized randomly (architectural crossover only) |
| | - Alternative: could implement parameter-level crossover |
| | |
| | Args: |
| | parent1: First parent NTM |
| | parent2: Second parent NTM |
| | |
| | Returns: |
| | Offspring NTM with hybrid architecture |
| | """ |
| | cfg1, cfg2 = parent1.cfg, parent2.cfg |
| |
|
| | |
| | new_cfg = NTMConfig( |
| | input_dim=self.cfg.input_dim, |
| | output_dim=self.cfg.output_dim, |
| | controller_dim=cfg1.controller_dim if torch.rand(1) < 0.5 else cfg2.controller_dim, |
| | memory_slots=cfg1.memory_slots if torch.rand(1) < 0.5 else cfg2.memory_slots, |
| | memory_dim=cfg1.memory_dim if torch.rand(1) < 0.5 else cfg2.memory_dim, |
| | heads_read=cfg1.heads_read if torch.rand(1) < 0.5 else cfg2.heads_read, |
| | heads_write=cfg1.heads_write if torch.rand(1) < 0.5 else cfg2.heads_write, |
| | init_std=0.1 |
| | ) |
| |
|
| | |
| | child = NeuralTuringMachine(new_cfg).to(self.cfg.device) |
| | return child |
| |
|
| | def initialize_population(self): |
| | """Create initial population with diverse random architectures. |
| | |
| | Generates population_size individuals with random architectural |
| | configurations to ensure diversity in the initial gene pool. |
| | Each individual is initialized with different structural parameters. |
| | """ |
| | self.population = [] |
| | for _ in range(self.cfg.population_size): |
| | cfg = self.create_random_config() |
| | ntm = NeuralTuringMachine(cfg).to(self.cfg.device) |
| | self.population.append(ntm) |
| |
|
| | def evolve_generation(self) -> Dict[str, float]: |
| | """Execute one generation of evolutionary optimization. |
| | |
| | Complete generational evolution cycle: |
| | 1. Evaluate all individuals in population |
| | 2. Select elite individuals for survival |
| | 3. Generate offspring through crossover and mutation |
| | 4. Replace non-elite individuals with offspring |
| | 5. Update statistics and generation counter |
| | |
| | Uses (μ + λ) evolution strategy with elite preservation |
| | to ensure best solutions are never lost. |
| | |
| | Returns: |
| | Dictionary containing generation statistics |
| | """ |
| | |
| | fitness_scores = [] |
| | for ntm in self.population: |
| | fitness = self.evaluator.evaluate_fitness(ntm) |
| | fitness_scores.append(fitness['composite']) |
| |
|
| | |
| | sorted_indices = sorted(range(len(fitness_scores)), key=lambda i: fitness_scores[i], reverse=True) |
| |
|
| | |
| | elite_count = int(self.cfg.elite_ratio * self.cfg.population_size) |
| | elites = [self.population[i] for i in sorted_indices[:elite_count]] |
| |
|
| | |
| | new_population = elites.copy() |
| |
|
| | while len(new_population) < self.cfg.population_size: |
| | if torch.rand(1) < 0.3 and len(elites) >= 2: |
| | |
| | parent1, parent2 = torch.randperm(len(elites))[:2] |
| | child = self.crossover(elites[parent1], elites[parent2]) |
| | else: |
| | |
| | parent_idx = torch.randint(0, elite_count, (1,)).item() |
| | parent = elites[parent_idx] |
| |
|
| | if torch.rand(1) < 0.5: |
| | |
| | child = self.mutate_parameters(parent) |
| | else: |
| | |
| | new_cfg = self.mutate_architecture(parent.cfg) |
| | child = NeuralTuringMachine(new_cfg).to(self.cfg.device) |
| |
|
| | new_population.append(child) |
| |
|
| | |
| | self.population = new_population[:self.cfg.population_size] |
| | self.generation += 1 |
| |
|
| | best_fitness = max(fitness_scores) |
| | avg_fitness = sum(fitness_scores) / len(fitness_scores) |
| | self.best_fitness = max(self.best_fitness, best_fitness) |
| |
|
| | return { |
| | 'generation': self.generation, |
| | 'best_fitness': best_fitness, |
| | 'avg_fitness': avg_fitness, |
| | 'best_ever': self.best_fitness |
| | } |
| |
|
| | def run_evolution(self) -> List[Dict[str, float]]: |
| | """Execute complete evolutionary optimization run. |
| | |
| | Runs evolution for max_generations, tracking progress and |
| | printing periodic updates. Returns complete optimization |
| | history for analysis and visualization. |
| | |
| | Returns: |
| | List of generation statistics dictionaries |
| | """ |
| | self.initialize_population() |
| |
|
| | history = [] |
| | for gen in range(self.cfg.max_generations): |
| | stats = self.evolve_generation() |
| | history.append(stats) |
| |
|
| | |
| | if gen % 10 == 0: |
| | print(f"Gen {gen}: Best={stats['best_fitness']:.4f}, Avg={stats['avg_fitness']:.4f}") |
| |
|
| | return history |
| |
|
| | def get_best_model(self) -> NeuralTuringMachine: |
| | """Retrieve the best individual from current population. |
| | |
| | Evaluates all current individuals and returns the one |
| | with highest composite fitness score. |
| | |
| | Returns: |
| | Best NTM individual from population |
| | """ |
| | fitness_scores = [] |
| | for ntm in self.population: |
| | fitness = self.evaluator.evaluate_fitness(ntm) |
| | fitness_scores.append(fitness['composite']) |
| |
|
| | best_idx = max(range(len(fitness_scores)), key=lambda i: fitness_scores[i]) |
| | return self.population[best_idx] |
| |
|
| | |
| | |
| |
|
| | def test_evolutionary_turing(): |
| | """Comprehensive test of evolutionary NTM optimization.""" |
| | print(" Testing Evolutionary Turing Machine - Adaptive Memory Architecture Evolution") |
| | print("=" * 90) |
| | |
| | |
| | config = EvolutionaryTuringConfig( |
| | population_size=20, |
| | max_generations=30, |
| | input_dim=8, |
| | output_dim=8, |
| | mutation_rate=0.15, |
| | architecture_mutation_rate=0.1, |
| | elite_ratio=0.3, |
| | device='cpu' |
| | ) |
| | |
| | system = EvolutionaryTuringMachine(config) |
| | |
| | print(f"Created Evolutionary Turing System:") |
| | print(f" - Population size: {config.population_size}") |
| | print(f" - Max generations: {config.max_generations}") |
| | print(f" - Architecture mutation rate: {config.architecture_mutation_rate}") |
| | print(f" - Parameter mutation rate: {config.mutation_rate}") |
| | print(f" - Elite preservation: {config.elite_ratio*100:.0f}%") |
| | |
| | |
| | print("\n Testing individual NTM...") |
| | test_config = system.create_random_config() |
| | test_ntm = NeuralTuringMachine(test_config).to(config.device) |
| | |
| | print(f"Random NTM architecture:") |
| | print(f" - Controller: {test_config.controller_dim}D") |
| | print(f" - Memory: {test_config.memory_slots} × {test_config.memory_dim}") |
| | print(f" - Heads: {test_config.heads_read}R/{test_config.heads_write}W") |
| | |
| | |
| | fitness = system.evaluator.evaluate_fitness(test_ntm) |
| | print(f"\nFitness evaluation:") |
| | for task, score in fitness.items(): |
| | print(f" - {task.capitalize()}: {score:.3f}") |
| | |
| | |
| | print("\n Testing evolutionary operations...") |
| | |
| | |
| | mutated_ntm = system.mutate_parameters(test_ntm) |
| | print("✓ Parameter mutation successful") |
| | |
| | |
| | mutated_config = system.mutate_architecture(test_config) |
| | print("✓ Architecture mutation successful") |
| | |
| | |
| | parent2_config = system.create_random_config() |
| | parent2 = NeuralTuringMachine(parent2_config).to(config.device) |
| | offspring = system.crossover(test_ntm, parent2) |
| | print("✓ Crossover operation successful") |
| | |
| | |
| | print(f"\n Running evolutionary optimization...") |
| | print("(This may take a few minutes)") |
| | |
| | history = system.run_evolution() |
| | |
| | print(f"\nEvolution completed!") |
| | print(f" - Final generation: {system.generation}") |
| | print(f" - Best fitness achieved: {system.best_fitness:.4f}") |
| | |
| | |
| | initial_fitness = history[0]['best_fitness'] |
| | final_fitness = history[-1]['best_fitness'] |
| | improvement = final_fitness - initial_fitness |
| | |
| | print(f"\nEvolution analysis:") |
| | print(f" - Initial best fitness: {initial_fitness:.4f}") |
| | print(f" - Final best fitness: {final_fitness:.4f}") |
| | print(f" - Total improvement: {improvement:.4f}") |
| | print(f" - Average generation improvement: {improvement/len(history):.4f}") |
| | |
| | |
| | best_ntm = system.get_best_model() |
| | best_fitness = system.evaluator.evaluate_fitness(best_ntm) |
| | |
| | print(f"\nBest evolved architecture:") |
| | print(f" - Controller: {best_ntm.cfg.controller_dim}D") |
| | print(f" - Memory: {best_ntm.cfg.memory_slots} × {best_ntm.cfg.memory_dim}") |
| | print(f" - Heads: {best_ntm.cfg.heads_read}R/{best_ntm.cfg.heads_write}W") |
| | print(f" - Parameters: {sum(p.numel() for p in best_ntm.parameters()):,}") |
| | |
| | print(f"\nBest individual performance:") |
| | for task, score in best_fitness.items(): |
| | print(f" - {task.capitalize()}: {score:.4f}") |
| | |
| | print("\n Evolutionary Turing Machine test completed!") |
| | print("✓ Population initialization and diversity") |
| | print("✓ Fitness evaluation on algorithmic tasks") |
| | print("✓ Architectural and parameter mutations") |
| | print("✓ Crossover and offspring generation") |
| | print("✓ Elite preservation and selection") |
| | print("✓ Multi-generational evolution and improvement") |
| | |
| | return True |
| |
|
| | def architecture_evolution_demo(): |
| | """Demonstrate architectural evolution patterns.""" |
| | print("\n" + "="*70) |
| | print(" ARCHITECTURE EVOLUTION DEMONSTRATION") |
| | print("="*70) |
| | |
| | config = EvolutionaryTuringConfig(population_size=10, max_generations=10) |
| | system = EvolutionaryTuringMachine(config) |
| | |
| | |
| | architectures = [] |
| | for _ in range(5): |
| | cfg = system.create_random_config() |
| | architectures.append(cfg) |
| | |
| | print("Initial architecture diversity:") |
| | for i, cfg in enumerate(architectures): |
| | params = (cfg.controller_dim * cfg.controller_dim + |
| | cfg.memory_slots * cfg.memory_dim) |
| | print(f" Arch {i+1}: {cfg.controller_dim}D controller, {cfg.memory_slots}×{cfg.memory_dim} memory, {params:,} params") |
| | |
| | |
| | print("\nMutation examples:") |
| | base_cfg = architectures[0] |
| | for i in range(3): |
| | mutated = system.mutate_architecture(base_cfg) |
| | print(f" Mutation {i+1}: {mutated.controller_dim}D controller, {mutated.memory_slots}×{mutated.memory_dim} memory") |
| | |
| | print("\n Evolution discovers optimal architectures through natural selection!") |
| | print(" Larger controllers and memories often emerge for complex tasks") |
| |
|
| | if __name__ == "__main__": |
| | test_evolutionary_turing() |
| | architecture_evolution_demo() |
| |
|