| |
| """ |
| Mnemo v4: SLM-Inspired Architecture |
| ==================================== |
| |
| Implements key SLM architecture features with parameter adjustments |
| based on Mnemo benchmark findings. |
| |
| SLM Features Implemented: |
| 1. Three-Tiered Memory (Working β Token β Semantic) |
| 2. Promotion/Demotion Algorithms |
| 3. Neural Link Types (8 types with decay) |
| 4. Self-Tuning Parameters |
| 5. Memory Utility Predictor (NEW - from benchmarks) |
| |
| Key Parameter Adjustments (from benchmarks): |
| - Semantic threshold: 0.65 β 0.50 (SLM was too high) |
| - Quality acceptance: 0.30 β 0.50 (SLM too permissive) |
| - Promotion threshold: 0.65 β 0.55 (faster promotion) |
| - Link pruning: 60 days β 30 days (faster cleanup) |
| """ |
|
|
| import hashlib |
| import time |
| import re |
| import threading |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple, Any, Set |
| from dataclasses import dataclass, field |
| from collections import defaultdict |
| from enum import Enum |
| import json |
|
|
| |
| try: |
| import faiss |
| HAS_FAISS = True |
| except ImportError: |
| HAS_FAISS = False |
|
|
| try: |
| import networkx as nx |
| HAS_NETWORKX = True |
| except ImportError: |
| HAS_NETWORKX = False |
|
|
| try: |
| from rank_bm25 import BM25Okapi |
| HAS_BM25 = True |
| except ImportError: |
| HAS_BM25 = False |
|
|
|
|
| |
| |
| |
|
|
| class MemoryTier(Enum): |
| """Three-tiered memory hierarchy from SLM""" |
| WORKING = "working" |
| TOKEN = "token" |
| SEMANTIC = "semantic" |
|
|
|
|
| class LinkType(Enum): |
| """Eight link types from SLM Neural Link system""" |
| DIRECT_REFERENCE = "direct_reference" |
| SEMANTIC_SIMILARITY = "semantic_similarity" |
| CO_OCCURRENCE = "co_occurrence" |
| HIERARCHICAL = "hierarchical" |
| TEMPORAL = "temporal" |
| CAUSAL = "causal" |
| CROSS_DOMAIN = "cross_domain" |
| ASSOCIATIVE = "associative" |
|
|
|
|
| |
| LINK_PROPERTIES = { |
| LinkType.DIRECT_REFERENCE: { |
| "creation_threshold": 0.85, |
| "initial_strength": 0.90, |
| "decay_rate": 0.005, |
| "usage_boost": 0.05 |
| }, |
| LinkType.SEMANTIC_SIMILARITY: { |
| "creation_threshold": 0.50, |
| "initial_strength": 0.75, |
| "decay_rate": 0.01, |
| "usage_boost": 0.03 |
| }, |
| LinkType.CO_OCCURRENCE: { |
| "creation_threshold": 0.60, |
| "initial_strength": 0.70, |
| "decay_rate": 0.015, |
| "usage_boost": 0.04 |
| }, |
| LinkType.HIERARCHICAL: { |
| "creation_threshold": 0.80, |
| "initial_strength": 0.85, |
| "decay_rate": 0.003, |
| "usage_boost": 0.02 |
| }, |
| LinkType.TEMPORAL: { |
| "creation_threshold": 0.55, |
| "initial_strength": 0.65, |
| "decay_rate": 0.02, |
| "usage_boost": 0.05 |
| }, |
| LinkType.CAUSAL: { |
| "creation_threshold": 0.75, |
| "initial_strength": 0.80, |
| "decay_rate": 0.005, |
| "usage_boost": 0.03 |
| }, |
| LinkType.CROSS_DOMAIN: { |
| "creation_threshold": 0.70, |
| "initial_strength": 0.65, |
| "decay_rate": 0.008, |
| "usage_boost": 0.04 |
| }, |
| LinkType.ASSOCIATIVE: { |
| "creation_threshold": 0.45, |
| "initial_strength": 0.60, |
| "decay_rate": 0.025, |
| "usage_boost": 0.06 |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Memory: |
| """Memory unit with SLM-style metadata""" |
| id: str |
| content: str |
| embedding: np.ndarray |
| tier: MemoryTier = MemoryTier.SEMANTIC |
| namespace: str = "default" |
| |
| |
| quality_score: float = 0.5 |
| relevance_score: float = 0.5 |
| confidence: float = 0.5 |
| |
| |
| access_count: int = 0 |
| last_accessed: float = field(default_factory=time.time) |
| created_at: float = field(default_factory=time.time) |
| |
| |
| priority: float = 1.0 |
| |
| metadata: Dict = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class NeuralLink: |
| """SLM Neural Link between memories""" |
| source_id: str |
| target_id: str |
| link_type: LinkType |
| strength: float |
| created_at: float = field(default_factory=time.time) |
| last_traversed: float = field(default_factory=time.time) |
| traversal_count: int = 0 |
|
|
|
|
| @dataclass |
| class SearchResult: |
| """Search result with multi-strategy scores""" |
| id: str |
| content: str |
| score: float |
| tier: MemoryTier = MemoryTier.SEMANTIC |
| link_path: List[str] = field(default_factory=list) |
| strategy_scores: Dict[str, float] = field(default_factory=dict) |
| metadata: Dict = field(default_factory=dict) |
|
|
|
|
| |
| |
| |
|
|
| class MemoryUtilityPredictor: |
| """ |
| Predicts whether memory injection will help or hurt. |
| |
| Key finding from benchmarks: |
| - Within-conversation: Memory often HURTS (-3 to -12 pts) |
| - Cross-session: Memory HELPS (+2 pts on dependent questions) |
| """ |
| |
| |
| INJECTION_SIGNALS = [ |
| "previous", "earlier", "before", "you said", "you mentioned", |
| "as you", "based on", "using your", "your analysis", "your framework", |
| "we discussed", "we analyzed", "refer to", "from your", |
| "compare", "contrast", "synthesize", "combine", "integrate", |
| "apply your", "using your", "based on your", |
| "you previously", "your earlier", "you have analyzed" |
| ] |
| |
| |
| SKIP_SIGNALS = [ |
| "this is a new", "new topic", "different subject", |
| "what is", "define", "explain what" |
| ] |
| |
| def __init__(self): |
| self.stats = { |
| "predictions": 0, |
| "inject_recommended": 0, |
| "skip_recommended": 0, |
| "skip_context_window": 0 |
| } |
| |
| def should_inject(self, |
| query: str, |
| context: str = "", |
| conversation_history: str = "", |
| model_confidence: float = 0.5) -> Tuple[bool, str, float]: |
| """ |
| Predict if memory injection will help. |
| |
| Returns: |
| (should_inject, reason, confidence) |
| """ |
| self.stats["predictions"] += 1 |
| combined = (query + " " + context).lower() |
| |
| |
| for signal in self.SKIP_SIGNALS: |
| if signal in combined: |
| self.stats["skip_recommended"] += 1 |
| return False, f"skip_signal:{signal}", 0.8 |
| |
| |
| for signal in self.INJECTION_SIGNALS: |
| if signal in combined: |
| |
| if self._context_has_info(query, conversation_history): |
| self.stats["skip_context_window"] += 1 |
| return False, "context_window_sufficient", 0.7 |
| |
| self.stats["inject_recommended"] += 1 |
| return True, f"inject_signal:{signal}", 0.85 |
| |
| |
| if self._is_simple_query(query): |
| self.stats["skip_recommended"] += 1 |
| return False, "simple_query", 0.6 |
| |
| |
| if model_confidence > 0.85: |
| self.stats["skip_recommended"] += 1 |
| return False, "model_confident", 0.7 |
| |
| |
| self.stats["skip_recommended"] += 1 |
| return False, "no_signal", 0.5 |
| |
| def _context_has_info(self, query: str, history: str) -> bool: |
| """Check if conversation history already has needed context""" |
| if not history or len(history.split()) < 200: |
| return False |
| |
| query_keywords = set(query.lower().split()) - { |
| "the", "a", "is", "are", "to", "of", "in", "for", "what", "how" |
| } |
| |
| history_lower = history.lower() |
| overlap = sum(1 for kw in query_keywords if kw in history_lower) |
| |
| return overlap >= len(query_keywords) * 0.6 |
| |
| def _is_simple_query(self, query: str) -> bool: |
| """Detect simple factual queries that don't need memory""" |
| simple_patterns = [ |
| r"^what is\b", r"^who is\b", r"^when did\b", |
| r"^where is\b", r"^how many\b", r"^define\b" |
| ] |
| query_lower = query.lower() |
| return any(re.search(p, query_lower) for p in simple_patterns) |
|
|
|
|
| |
| |
| |
|
|
| class SelfTuner: |
| """ |
| SLM Self-Tuning Parameter System |
| |
| Tracks performance and auto-adjusts parameters. |
| """ |
| |
| def __init__(self): |
| self.parameters = { |
| "similarity_threshold": 0.10, |
| "quality_threshold": 0.35, |
| "promotion_threshold": 0.55, |
| "demotion_threshold": 0.70, |
| } |
| |
| self.performance_history = defaultdict(list) |
| self.adjustment_count = 0 |
| |
| |
| self.learning_rates = { |
| "similarity_threshold": 0.01, |
| "quality_threshold": 0.02, |
| "promotion_threshold": 0.05, |
| } |
| |
| def record_outcome(self, param_name: str, value: float, success: bool): |
| """Record outcome for a parameter setting""" |
| self.performance_history[param_name].append({ |
| "value": value, |
| "success": success, |
| "timestamp": time.time() |
| }) |
| |
| |
| if len(self.performance_history[param_name]) > 100: |
| self.performance_history[param_name] = \ |
| self.performance_history[param_name][-100:] |
| |
| def should_adjust(self, param_name: str) -> bool: |
| """Check if parameter should be adjusted (every 10 samples)""" |
| history = self.performance_history.get(param_name, []) |
| return len(history) >= 10 and len(history) % 10 == 0 |
| |
| def get_adjustment(self, param_name: str) -> float: |
| """Calculate parameter adjustment based on recent performance""" |
| history = self.performance_history.get(param_name, []) |
| if len(history) < 10: |
| return 0.0 |
| |
| recent = history[-10:] |
| success_rate = sum(1 for h in recent if h["success"]) / len(recent) |
| |
| lr = self.learning_rates.get(param_name, 0.01) |
| |
| if success_rate < 0.5: |
| |
| return -lr |
| elif success_rate > 0.8: |
| |
| return lr * 0.5 |
| |
| return 0.0 |
| |
| def auto_tune(self): |
| """Run auto-tuning cycle""" |
| adjusted = [] |
| |
| for param_name in self.parameters: |
| if self.should_adjust(param_name): |
| adjustment = self.get_adjustment(param_name) |
| if adjustment != 0: |
| old_val = self.parameters[param_name] |
| new_val = max(0.1, min(0.9, old_val + adjustment)) |
| self.parameters[param_name] = new_val |
| adjusted.append((param_name, old_val, new_val)) |
| self.adjustment_count += 1 |
| |
| return adjusted |
|
|
|
|
| |
| |
| |
|
|
| class TieredMemoryManager: |
| """ |
| SLM Three-Tiered Memory Hierarchy |
| |
| Working Memory (32MB, <1ms): |
| - Currently active info |
| - Priority decay: 0.95/minute |
| - Eviction threshold: 0.2 |
| |
| Token Memory (100-250 items, 1-10ms): |
| - Compressed representations |
| - Loop-based organization |
| - Merging at 0.8 similarity |
| |
| Semantic Memory (persistent, 10-100ms): |
| - Full knowledge representations |
| - Partition-based organization |
| """ |
| |
| |
| WORKING_MEMORY_SIZE = 50 |
| TOKEN_LOOP_CAPACITY = 100 |
| TOKEN_LOOP_MAX = 250 |
| |
| PRIORITY_DECAY = 0.95 |
| EVICTION_THRESHOLD = 0.2 |
| LOOP_MERGE_THRESHOLD = 0.8 |
| |
| |
| MEMORY_DECAY_RATE = 0.01 |
| MEMORY_PRUNE_THRESHOLD = 0.15 |
| MEMORY_STALE_DAYS = 30 |
| |
| def __init__(self, tuner: SelfTuner): |
| self.tuner = tuner |
| |
| |
| self.working_memory: Dict[str, Memory] = {} |
| self.token_loops: Dict[str, List[str]] = defaultdict(list) |
| self.semantic_memory: Dict[str, Memory] = {} |
| |
| self.stats = { |
| "promotions": 0, |
| "demotions": 0, |
| "evictions": 0, |
| "memories_decayed": 0, |
| "memories_pruned": 0 |
| } |
| |
| def add_to_tier(self, memory: Memory, tier: MemoryTier): |
| """Add memory to specific tier""" |
| memory.tier = tier |
| |
| if tier == MemoryTier.WORKING: |
| self._add_to_working(memory) |
| elif tier == MemoryTier.TOKEN: |
| self._add_to_token(memory) |
| else: |
| self.semantic_memory[memory.id] = memory |
| |
| def _add_to_working(self, memory: Memory): |
| """Add to working memory with eviction if needed""" |
| if len(self.working_memory) >= self.WORKING_MEMORY_SIZE: |
| self._evict_from_working() |
| |
| memory.priority = 1.0 |
| self.working_memory[memory.id] = memory |
| |
| def _add_to_token(self, memory: Memory): |
| """Add to token memory loop""" |
| loop = self.token_loops[memory.namespace] |
| |
| if len(loop) >= self.TOKEN_LOOP_CAPACITY: |
| |
| oldest_id = loop.pop(0) |
| if oldest_id in self.semantic_memory: |
| self.semantic_memory[oldest_id].tier = MemoryTier.SEMANTIC |
| |
| loop.append(memory.id) |
| self.semantic_memory[memory.id] = memory |
| memory.tier = MemoryTier.TOKEN |
| |
| def _evict_from_working(self): |
| """Evict lowest priority items from working memory""" |
| if not self.working_memory: |
| return |
| |
| |
| min_id = min(self.working_memory, key=lambda k: self.working_memory[k].priority) |
| evicted = self.working_memory.pop(min_id) |
| |
| |
| self._add_to_token(evicted) |
| self.stats["evictions"] += 1 |
| |
| def decay_priorities(self): |
| """Apply SLM priority decay (0.95 per cycle)""" |
| for memory in self.working_memory.values(): |
| memory.priority *= self.PRIORITY_DECAY |
| |
| |
| if memory.priority < self.EVICTION_THRESHOLD: |
| self._evict_from_working() |
| |
| def calculate_promotion_score(self, memory: Memory, query_relevance: float) -> float: |
| """ |
| SLM Promotion Score: |
| PromotionScore = (QueryRelevance * 0.6) + (AccessFrequency * 0.3) + (RecencyScore * 0.1) |
| """ |
| |
| access_freq = min(memory.access_count / 10, 1.0) |
| |
| |
| age_hours = (time.time() - memory.last_accessed) / 3600 |
| recency = max(0, 1 - (age_hours / 24)) |
| |
| return (query_relevance * 0.6) + (access_freq * 0.3) + (recency * 0.1) |
| |
| def calculate_demotion_score(self, memory: Memory, query_relevance: float) -> float: |
| """ |
| SLM Demotion Score: |
| DemotionScore = (1-QueryRelevance)*0.5 + (1-AccessFrequency)*0.3 + (Age/MAX_AGE)*0.2 |
| """ |
| access_freq = min(memory.access_count / 10, 1.0) |
| |
| age_hours = (time.time() - memory.created_at) / 3600 |
| age_score = min(age_hours / 168, 1.0) |
| |
| return ((1 - query_relevance) * 0.5) + ((1 - access_freq) * 0.3) + (age_score * 0.2) |
| |
| def try_promote(self, memory_id: str, query_relevance: float) -> bool: |
| """Try to promote memory to higher tier""" |
| if memory_id not in self.semantic_memory: |
| return False |
| |
| memory = self.semantic_memory[memory_id] |
| score = self.calculate_promotion_score(memory, query_relevance) |
| threshold = self.tuner.parameters["promotion_threshold"] |
| |
| if score > threshold: |
| if memory.tier == MemoryTier.SEMANTIC: |
| self._add_to_token(memory) |
| self.stats["promotions"] += 1 |
| return True |
| elif memory.tier == MemoryTier.TOKEN: |
| self._add_to_working(memory) |
| self.stats["promotions"] += 1 |
| return True |
| |
| return False |
| |
| def try_demote(self, memory_id: str, query_relevance: float) -> bool: |
| """Try to demote memory to lower tier""" |
| if memory_id in self.working_memory: |
| memory = self.working_memory[memory_id] |
| score = self.calculate_demotion_score(memory, query_relevance) |
| threshold = self.tuner.parameters["demotion_threshold"] |
| |
| |
| capacity_pressure = len(self.working_memory) / self.WORKING_MEMORY_SIZE |
| |
| if score > threshold and capacity_pressure > 0.8: |
| self.working_memory.pop(memory_id) |
| self._add_to_token(memory) |
| self.stats["demotions"] += 1 |
| return True |
| |
| return False |
| |
| def get_all_memories(self) -> Dict[str, Memory]: |
| """Get all memories across tiers""" |
| return {**self.semantic_memory, **self.working_memory} |
| |
| def decay_memories(self) -> int: |
| """ |
| Apply gentle quality decay to unused semantic memories. |
| Memories that are accessed stay fresh; unused ones gradually decay. |
| Returns number of memories affected. |
| """ |
| now = time.time() |
| affected = 0 |
| |
| for memory in self.semantic_memory.values(): |
| |
| days_unused = (now - memory.last_accessed) / 86400 |
| |
| if days_unused > 1: |
| |
| |
| decay_factor = min(days_unused * self.MEMORY_DECAY_RATE, 0.1) |
| memory.quality_score *= (1 - decay_factor) |
| affected += 1 |
| |
| return affected |
| |
| def prune_stale_memories(self) -> Tuple[int, List[str]]: |
| """ |
| Remove memories that have decayed below threshold. |
| Returns (count_pruned, list_of_pruned_ids). |
| """ |
| now = time.time() |
| to_prune = [] |
| |
| for mem_id, memory in self.semantic_memory.items(): |
| days_unused = (now - memory.last_accessed) / 86400 |
| |
| |
| if (memory.quality_score < self.MEMORY_PRUNE_THRESHOLD and |
| days_unused > self.MEMORY_STALE_DAYS): |
| to_prune.append(mem_id) |
| |
| |
| pruned_ids = [] |
| for mem_id in to_prune: |
| del self.semantic_memory[mem_id] |
| pruned_ids.append(mem_id) |
| |
| return len(pruned_ids), pruned_ids |
| |
| def refresh_memory(self, memory_id: str): |
| """Mark a memory as freshly accessed (resets decay)""" |
| if memory_id in self.semantic_memory: |
| self.semantic_memory[memory_id].last_accessed = time.time() |
| elif memory_id in self.working_memory: |
| self.working_memory[memory_id].last_accessed = time.time() |
| |
| def get_tier_stats(self) -> Dict: |
| """Get tier statistics""" |
| return { |
| "working_memory_count": len(self.working_memory), |
| "working_memory_capacity": self.WORKING_MEMORY_SIZE, |
| "token_loops": {ns: len(ids) for ns, ids in self.token_loops.items()}, |
| "semantic_memory_count": len(self.semantic_memory), |
| "promotions": self.stats["promotions"], |
| "demotions": self.stats["demotions"], |
| "evictions": self.stats["evictions"] |
| } |
|
|
|
|
| |
| |
| |
|
|
| class NeuralLinkManager: |
| """ |
| SLM Neural Link Pathway System |
| |
| Creates and manages typed connections between memories. |
| """ |
| |
| |
| MAX_PATH_DEPTH = 4 |
| MIN_PATH_STRENGTH = 0.40 |
| PATH_STRENGTH_DECAY = 0.9 |
| MAX_BRANCHING = 12 |
| |
| |
| PRUNE_STRENGTH_THRESHOLD = 0.25 |
| PRUNE_AGE_DAYS = 30 |
| |
| def __init__(self): |
| self.links: Dict[str, NeuralLink] = {} |
| self.outgoing: Dict[str, Set[str]] = defaultdict(set) |
| self.incoming: Dict[str, Set[str]] = defaultdict(set) |
| |
| self.stats = { |
| "links_created": 0, |
| "links_pruned": 0, |
| "traversals": 0 |
| } |
| |
| def _link_id(self, source: str, target: str, link_type: LinkType) -> str: |
| """Generate link ID""" |
| return f"{source}:{target}:{link_type.value}" |
| |
| def create_link(self, source_id: str, target_id: str, |
| link_type: LinkType, similarity: float) -> Optional[str]: |
| """ |
| Create link if similarity exceeds type-specific threshold. |
| |
| SLM LinkScore = (VectorSimilarity * 0.6) + (CoOccurrence * 0.25) + (DomainRelatedness * 0.15) |
| Simplified here to just similarity. |
| """ |
| props = LINK_PROPERTIES[link_type] |
| |
| if similarity < props["creation_threshold"]: |
| return None |
| |
| link_id = self._link_id(source_id, target_id, link_type) |
| |
| if link_id in self.links: |
| |
| self.links[link_id].strength = min( |
| 1.0, |
| self.links[link_id].strength + props["usage_boost"] |
| ) |
| return link_id |
| |
| |
| link = NeuralLink( |
| source_id=source_id, |
| target_id=target_id, |
| link_type=link_type, |
| strength=props["initial_strength"] |
| ) |
| |
| self.links[link_id] = link |
| self.outgoing[source_id].add(link_id) |
| self.incoming[target_id].add(link_id) |
| self.stats["links_created"] += 1 |
| |
| return link_id |
| |
| def traverse_link(self, link_id: str) -> Optional[NeuralLink]: |
| """Traverse a link, strengthening it""" |
| if link_id not in self.links: |
| return None |
| |
| link = self.links[link_id] |
| link.traversal_count += 1 |
| link.last_traversed = time.time() |
| |
| |
| props = LINK_PROPERTIES[link.link_type] |
| link.strength = min(1.0, link.strength + props["usage_boost"]) |
| |
| self.stats["traversals"] += 1 |
| return link |
| |
| def find_paths(self, source_id: str, target_id: str, |
| max_depth: int = None) -> List[List[str]]: |
| """Find paths between memories (SLM path finding)""" |
| max_depth = max_depth or self.MAX_PATH_DEPTH |
| paths = [] |
| |
| def dfs(current: str, target: str, path: List[str], |
| strength: float, depth: int): |
| if depth > max_depth or strength < self.MIN_PATH_STRENGTH: |
| return |
| |
| if current == target: |
| paths.append(path.copy()) |
| return |
| |
| |
| link_ids = list(self.outgoing.get(current, set()))[:self.MAX_BRANCHING] |
| |
| for link_id in link_ids: |
| link = self.links.get(link_id) |
| if link and link.target_id not in path: |
| new_strength = strength * link.strength * self.PATH_STRENGTH_DECAY |
| path.append(link.target_id) |
| dfs(link.target_id, target, path, new_strength, depth + 1) |
| path.pop() |
| |
| dfs(source_id, target_id, [source_id], 1.0, 0) |
| return paths |
| |
| def get_connected(self, memory_id: str, link_types: List[LinkType] = None) -> List[str]: |
| """Get memories connected to this one""" |
| connected = [] |
| |
| for link_id in self.outgoing.get(memory_id, set()): |
| link = self.links.get(link_id) |
| if link: |
| if link_types is None or link.link_type in link_types: |
| connected.append(link.target_id) |
| |
| return connected |
| |
| def decay_links(self): |
| """Apply daily decay to all links""" |
| for link in self.links.values(): |
| props = LINK_PROPERTIES[link.link_type] |
| link.strength *= (1 - props["decay_rate"]) |
| |
| def prune_weak_links(self) -> int: |
| """Prune links below strength threshold and unused for too long""" |
| to_prune = [] |
| now = time.time() |
| age_threshold = self.PRUNE_AGE_DAYS * 24 * 3600 |
| |
| for link_id, link in self.links.items(): |
| age = now - link.last_traversed |
| if link.strength < self.PRUNE_STRENGTH_THRESHOLD and age > age_threshold: |
| to_prune.append(link_id) |
| |
| for link_id in to_prune: |
| link = self.links.pop(link_id) |
| self.outgoing[link.source_id].discard(link_id) |
| self.incoming[link.target_id].discard(link_id) |
| self.stats["links_pruned"] += 1 |
| |
| return len(to_prune) |
| |
| def remove_links_for_memory(self, memory_id: str) -> int: |
| """Remove all links connected to a memory (when memory is pruned)""" |
| to_remove = [] |
| |
| |
| for link_id, link in self.links.items(): |
| if link.source_id == memory_id or link.target_id == memory_id: |
| to_remove.append(link_id) |
| |
| |
| for link_id in to_remove: |
| link = self.links.pop(link_id) |
| self.outgoing[link.source_id].discard(link_id) |
| self.incoming[link.target_id].discard(link_id) |
| self.stats["links_pruned"] += 1 |
| |
| |
| if memory_id in self.outgoing: |
| del self.outgoing[memory_id] |
| if memory_id in self.incoming: |
| del self.incoming[memory_id] |
| |
| return len(to_remove) |
| |
| def get_stats(self) -> Dict: |
| return { |
| "total_links": len(self.links), |
| "links_by_type": { |
| lt.value: sum(1 for l in self.links.values() if l.link_type == lt) |
| for lt in LinkType |
| }, |
| **self.stats |
| } |
|
|
|
|
| |
| |
| |
|
|
| class Mnemo: |
| """ |
| Mnemo v4: SLM-Inspired Memory System |
| |
| Implements: |
| - Three-tiered memory hierarchy |
| - Neural link pathways (8 types) |
| - Self-tuning parameters |
| - Memory utility prediction |
| |
| With parameter adjustments based on Mnemo benchmarks. |
| """ |
| |
| STOP_WORDS = {"a", "an", "the", "is", "are", "was", "were", "be", "been", |
| "to", "of", "in", "for", "on", "with", "at", "by", "from", |
| "and", "but", "or", "not", "this", "that", "i", "me", "my"} |
| |
| def __init__(self, embedding_dim: int = 384): |
| self.embedding_dim = embedding_dim |
| |
| |
| self.tuner = SelfTuner() |
| self.memory_manager = TieredMemoryManager(self.tuner) |
| self.link_manager = NeuralLinkManager() |
| self.utility_predictor = MemoryUtilityPredictor() |
| |
| |
| self._embeddings: List[np.ndarray] = [] |
| self._ids: List[str] = [] |
| |
| if HAS_FAISS: |
| self.index = faiss.IndexFlatIP(embedding_dim) |
| else: |
| self.index = None |
| |
| |
| self.bm25 = None |
| self._tokenized_docs: List[List[str]] = [] |
| |
| |
| if HAS_NETWORKX: |
| self.graph = nx.DiGraph() |
| else: |
| self.graph = None |
| |
| |
| self._cache: Dict[str, Any] = {} |
| self._cache_lock = threading.Lock() |
| |
| |
| self.stats = { |
| "adds": 0, |
| "adds_rejected": 0, |
| "searches": 0, |
| "cache_hits": 0, |
| "cache_misses": 0 |
| } |
| |
| def _get_embedding(self, text: str) -> np.ndarray: |
| """Generate embedding (hash-based for POC)""" |
| cache_key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" |
| |
| with self._cache_lock: |
| if cache_key in self._cache: |
| self.stats["cache_hits"] += 1 |
| return self._cache[cache_key] |
| self.stats["cache_misses"] += 1 |
| |
| |
| embedding = np.zeros(self.embedding_dim, dtype=np.float32) |
| words = text.lower().split() |
| for i, word in enumerate(words): |
| idx = hash(word) % self.embedding_dim |
| embedding[idx] += 1.0 / (i + 1) |
| |
| norm = np.linalg.norm(embedding) |
| if norm > 0: |
| embedding = embedding / norm |
| |
| with self._cache_lock: |
| self._cache[cache_key] = embedding |
| |
| return embedding |
| |
| def _estimate_quality(self, content: str) -> float: |
| """Estimate content quality (SLM quality gates)""" |
| score = 0.5 |
| words = len(content.split()) |
| |
| if words < 5: |
| score -= 0.3 |
| elif words > 20: |
| score += 0.1 |
| |
| if any(r in content.lower() for r in ["because", "therefore", "shows"]): |
| score += 0.2 |
| |
| if re.search(r'\d+', content): |
| score += 0.1 |
| |
| if any(v in content.lower() for v in ["something", "stuff", "maybe"]): |
| score -= 0.2 |
| |
| return max(0.0, min(1.0, score)) |
| |
| def should_inject(self, query: str, context: str = "", |
| conversation_history: str = "", |
| model_confidence: float = 0.5) -> bool: |
| """ |
| Memory Utility Predictor - should we inject memory? |
| |
| Based on benchmark findings that memory often hurts performance. |
| """ |
| should, reason, confidence = self.utility_predictor.should_inject( |
| query, context, conversation_history, model_confidence |
| ) |
| return should |
| |
| def add(self, content: str, namespace: str = "default", |
| metadata: Dict = None, skip_quality_check: bool = False) -> Optional[str]: |
| """Add memory with SLM quality gates""" |
| quality = self._estimate_quality(content) |
| threshold = self.tuner.parameters["quality_threshold"] |
| |
| if not skip_quality_check and quality < threshold: |
| self.stats["adds_rejected"] += 1 |
| self.tuner.record_outcome("quality_threshold", threshold, False) |
| return None |
| |
| memory_id = f"mem_{hashlib.md5(content.encode()).hexdigest()[:8]}" |
| embedding = self._get_embedding(content) |
| |
| memory = Memory( |
| id=memory_id, |
| content=content, |
| embedding=embedding, |
| namespace=namespace, |
| quality_score=quality, |
| metadata=metadata or {} |
| ) |
| |
| |
| self.memory_manager.add_to_tier(memory, MemoryTier.SEMANTIC) |
| |
| |
| self._embeddings.append(embedding) |
| self._ids.append(memory_id) |
| |
| if HAS_FAISS and self.index is not None: |
| self.index.add(embedding.reshape(1, -1)) |
| |
| tokens = content.lower().split() |
| self._tokenized_docs.append(tokens) |
| if HAS_BM25: |
| self.bm25 = BM25Okapi(self._tokenized_docs) |
| |
| |
| self._create_links_for_new_memory(memory_id, embedding) |
| |
| self.stats["adds"] += 1 |
| self.tuner.record_outcome("quality_threshold", threshold, True) |
| |
| return memory_id |
| |
| def _create_links_for_new_memory(self, memory_id: str, embedding: np.ndarray): |
| """Create neural links to similar memories""" |
| if len(self._ids) < 2: |
| return |
| |
| |
| similarities = [] |
| for other_id, other_emb in zip(self._ids, self._embeddings): |
| if other_id != memory_id: |
| sim = float(np.dot(embedding, other_emb)) |
| similarities.append((other_id, sim)) |
| |
| |
| similarities.sort(key=lambda x: x[1], reverse=True) |
| |
| |
| for other_id, sim in similarities[:5]: |
| |
| self.link_manager.create_link( |
| memory_id, other_id, LinkType.SEMANTIC_SIMILARITY, sim |
| ) |
| self.link_manager.create_link( |
| other_id, memory_id, LinkType.SEMANTIC_SIMILARITY, sim |
| ) |
| |
| def search(self, query: str, top_k: int = 5, |
| namespace: Optional[str] = None, |
| use_links: bool = True) -> List[SearchResult]: |
| """ |
| Search with multi-strategy retrieval + neural links |
| """ |
| if not self.memory_manager.semantic_memory: |
| return [] |
| |
| self.stats["searches"] += 1 |
| query_embedding = self._get_embedding(query) |
| threshold = self.tuner.parameters["similarity_threshold"] |
| |
| |
| semantic_scores = {} |
| if HAS_FAISS and self.index is not None and self.index.ntotal > 0: |
| k = min(top_k * 3, self.index.ntotal) |
| scores, indices = self.index.search(query_embedding.reshape(1, -1), k) |
| for score, idx in zip(scores[0], indices[0]): |
| if 0 <= idx < len(self._ids): |
| semantic_scores[self._ids[idx]] = float(score) |
| else: |
| for mem_id, emb in zip(self._ids, self._embeddings): |
| semantic_scores[mem_id] = float(np.dot(query_embedding, emb)) |
| |
| |
| bm25_scores = {} |
| if HAS_BM25 and self.bm25 is not None: |
| tokens = query.lower().split() |
| scores = self.bm25.get_scores(tokens) |
| max_score = max(scores) if len(scores) > 0 and max(scores) > 0 else 1 |
| for idx, score in enumerate(scores): |
| if score > 0.1 * max_score: |
| bm25_scores[self._ids[idx]] = float(score / max_score) |
| |
| |
| link_scores = {} |
| if use_links: |
| |
| top_semantic = sorted(semantic_scores.items(), key=lambda x: x[1], reverse=True)[:3] |
| for mem_id, _ in top_semantic: |
| connected = self.link_manager.get_connected(mem_id) |
| for conn_id in connected[:5]: |
| link_scores[conn_id] = link_scores.get(conn_id, 0) + 0.3 |
| |
| |
| all_ids = set(semantic_scores.keys()) | set(bm25_scores.keys()) | set(link_scores.keys()) |
| |
| if namespace: |
| |
| all_ids = {mid for mid in all_ids |
| if mid in self.memory_manager.semantic_memory |
| and self.memory_manager.semantic_memory[mid].namespace == namespace} |
| |
| results = [] |
| for mem_id in all_ids: |
| strat = { |
| "semantic": semantic_scores.get(mem_id, 0), |
| "bm25": bm25_scores.get(mem_id, 0), |
| "links": link_scores.get(mem_id, 0) |
| } |
| |
| combined = ( |
| strat["semantic"] * 0.5 + |
| strat["bm25"] * 0.3 + |
| strat["links"] * 0.2 |
| ) |
| |
| memory = self.memory_manager.semantic_memory.get(mem_id) |
| if memory and combined >= threshold: |
| |
| memory.access_count += 1 |
| memory.last_accessed = time.time() |
| |
| |
| self.memory_manager.try_promote(mem_id, combined) |
| |
| results.append(SearchResult( |
| id=mem_id, |
| content=memory.content, |
| score=combined, |
| tier=memory.tier, |
| strategy_scores=strat, |
| metadata=memory.metadata |
| )) |
| |
| self.tuner.record_outcome("similarity_threshold", threshold, True) |
| else: |
| self.tuner.record_outcome("similarity_threshold", threshold, False) |
| |
| results.sort(key=lambda x: x.score, reverse=True) |
| return results[:top_k] |
| |
| def get_context(self, query: str, top_k: int = 3, |
| namespace: Optional[str] = None) -> str: |
| """Get formatted context for prompt injection""" |
| results = self.search(query, top_k=top_k, namespace=namespace) |
| |
| if not results: |
| return "" |
| |
| parts = ["[RELEVANT CONTEXT FROM MEMORY]"] |
| for r in results: |
| tier_marker = f"[{r.tier.value.upper()}]" if r.tier != MemoryTier.SEMANTIC else "" |
| parts.append(f"β’ {tier_marker} {r.content}") |
| parts.append("[END CONTEXT]\n") |
| |
| return "\n".join(parts) |
| |
| def feedback(self, query: str, memory_id: str, relevance: float): |
| """Record feedback for learning""" |
| relevance = max(-1, min(1, relevance)) |
| |
| if memory_id in self.memory_manager.semantic_memory: |
| memory = self.memory_manager.semantic_memory[memory_id] |
| |
| |
| memory.relevance_score = 0.7 * memory.relevance_score + 0.3 * ((relevance + 1) / 2) |
| |
| |
| for link_id in self.link_manager.outgoing.get(memory_id, set()): |
| link = self.link_manager.links.get(link_id) |
| if link: |
| link.strength = max(0, min(1, link.strength + relevance * 0.05)) |
| |
| def maintenance_cycle(self): |
| """Run SLM maintenance operations""" |
| |
| self.memory_manager.decay_priorities() |
| |
| |
| self.link_manager.decay_links() |
| |
| |
| links_pruned = self.link_manager.prune_weak_links() |
| |
| |
| memories_decayed = self.memory_manager.decay_memories() |
| self.memory_manager.stats["memories_decayed"] += memories_decayed |
| |
| |
| memories_pruned, pruned_ids = self.memory_manager.prune_stale_memories() |
| self.memory_manager.stats["memories_pruned"] += memories_pruned |
| |
| |
| for mem_id in pruned_ids: |
| self.link_manager.remove_links_for_memory(mem_id) |
| |
| |
| adjustments = self.tuner.auto_tune() |
| |
| return { |
| "links_pruned": links_pruned, |
| "memories_decayed": memories_decayed, |
| "memories_pruned": memories_pruned, |
| "parameter_adjustments": adjustments |
| } |
| |
| def get_stats(self) -> Dict: |
| """Get comprehensive statistics""" |
| return { |
| "memories": { |
| "total": len(self.memory_manager.semantic_memory), |
| **self.memory_manager.get_tier_stats() |
| }, |
| "links": self.link_manager.get_stats(), |
| "utility_predictor": self.utility_predictor.stats, |
| "tuner": { |
| "parameters": self.tuner.parameters, |
| "adjustments": self.tuner.adjustment_count |
| }, |
| "operations": self.stats |
| } |
| |
| def clear(self): |
| """Clear all memory""" |
| self.memory_manager = TieredMemoryManager(self.tuner) |
| self.link_manager = NeuralLinkManager() |
| self._embeddings.clear() |
| self._ids.clear() |
| self._tokenized_docs.clear() |
| self.bm25 = None |
| self._cache.clear() |
| |
| if HAS_FAISS: |
| self.index = faiss.IndexFlatIP(self.embedding_dim) |
| |
| def __len__(self): |
| return len(self.memory_manager.semantic_memory) |
| |
| def __repr__(self): |
| return f"Mnemo(memories={len(self)}, links={len(self.link_manager.links)})" |
|
|
|
|
| |
| |
| |
|
|
| def demo(): |
| print("="*70) |
| print("MNEMO v4: SLM-INSPIRED ARCHITECTURE") |
| print("="*70) |
| |
| m = Mnemo() |
| print(f"\nβ Initialized: {m}") |
| |
| |
| print("\nπ Tuned Parameters (adjusted from SLM):") |
| for param, value in m.tuner.parameters.items(): |
| print(f" {param}: {value}") |
| |
| |
| print("\nπ Adding memories...") |
| memories = [ |
| "User prefers Python because it has clean syntax and good libraries", |
| "Previous analysis showed gender bias in Victorian psychiatry diagnoses", |
| "Framework has 5 checkpoints for detecting historical medical bias", |
| "The project deadline is March 15th for the API redesign", |
| "User's coffee preference is cappuccino with oat milk" |
| ] |
| |
| for mem in memories: |
| result = m.add(mem) |
| status = "β" if result else "β" |
| print(f" {status} {mem[:50]}...") |
| |
| |
| print("\nπ§ Memory Utility Predictions:") |
| tests = [ |
| ("What is Python?", False), |
| ("Based on your previous analysis...", True), |
| ("Compare to your earlier findings", True), |
| ("This is a NEW topic", False), |
| ] |
| |
| for query, expected in tests: |
| result = m.should_inject(query) |
| status = "β" if result == expected else "β" |
| action = "INJECT" if result else "SKIP" |
| print(f" {status} {action}: {query}") |
| |
| |
| print("\nπ Search Results:") |
| results = m.search("previous analysis framework", top_k=3) |
| for r in results: |
| print(f" [{r.tier.value}] score={r.score:.3f}: {r.content[:50]}...") |
| |
| |
| print("\nπ Neural Links:") |
| link_stats = m.link_manager.get_stats() |
| print(f" Total links: {link_stats['total_links']}") |
| for lt, count in link_stats['links_by_type'].items(): |
| if count > 0: |
| print(f" {lt}: {count}") |
| |
| |
| print("\nπ Full Statistics:") |
| stats = m.get_stats() |
| print(f" Memories: {stats['memories']['total']}") |
| print(f" Working memory: {stats['memories']['working_memory_count']}") |
| print(f" Links: {stats['links']['total_links']}") |
| print(f" Utility predictions: {stats['utility_predictor']['predictions']}") |
| |
| print("\n" + "="*70) |
| print("β
Demo complete!") |
| print("="*70) |
|
|
|
|
| if __name__ == "__main__": |
| demo() |
|
|