| """ |
| MemoryShell - Temporal Memory Architecture for Recursive Agents |
| |
| This module implements the memory shell architecture that enables agents to |
| maintain persistent memory with configurable decay properties. The memory |
| shell acts as a cognitive substrate that provides: |
| |
| - Short-term working memory |
| - Medium-term episodic memory with decay |
| - Long-term semantic memory with compression |
| - Temporal relationship tracking |
| - Experience-based learning |
| |
| Internal Note: The memory shell simulates the MEMTRACE and ECHO-LOOP interpretability |
| shells for modeling memory decay and feedback loops in agent cognition. |
| """ |
|
|
| import datetime |
| import math |
| import uuid |
| import heapq |
| from typing import Dict, List, Any, Optional, Tuple, Set |
| import numpy as np |
| from collections import defaultdict, deque |
|
|
| from pydantic import BaseModel, Field |
|
|
|
|
| class Memory(BaseModel): |
| """Base memory unit with attribution and decay properties.""" |
| |
| id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| content: Dict[str, Any] = Field(...) |
| memory_type: str = Field(...) |
| creation_time: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| last_access_time: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| access_count: int = Field(default=1) |
| salience: float = Field(default=1.0) |
| decay_rate: float = Field(default=0.1) |
| associations: Dict[str, float] = Field(default_factory=dict) |
| source: Optional[str] = Field(default=None) |
| tags: List[str] = Field(default_factory=list) |
| |
| def update_access(self) -> None: |
| """Update access time and count.""" |
| self.last_access_time = datetime.datetime.now() |
| self.access_count += 1 |
| |
| def calculate_current_salience(self) -> float: |
| """Calculate current salience based on decay model.""" |
| |
| hours_since_creation = (datetime.datetime.now() - self.creation_time).total_seconds() / 3600 |
| |
| |
| base_decay = math.exp(-self.decay_rate * hours_since_creation) |
| access_factor = math.log1p(self.access_count) / 10 |
| |
| |
| current_salience = min(1.0, self.salience * base_decay * (1 + access_factor)) |
| |
| return current_salience |
| |
| def add_association(self, memory_id: str, strength: float = 0.5) -> None: |
| """ |
| Add association to another memory. |
| |
| Args: |
| memory_id: ID of memory to associate with |
| strength: Association strength (0-1) |
| """ |
| self.associations[memory_id] = strength |
| |
| def add_tag(self, tag: str) -> None: |
| """ |
| Add semantic tag to memory. |
| |
| Args: |
| tag: Tag to add |
| """ |
| if tag not in self.tags: |
| self.tags.append(tag) |
| |
| def as_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary for export.""" |
| return { |
| "id": self.id, |
| "content": self.content, |
| "memory_type": self.memory_type, |
| "creation_time": self.creation_time.isoformat(), |
| "last_access_time": self.last_access_time.isoformat(), |
| "access_count": self.access_count, |
| "salience": self.salience, |
| "current_salience": self.calculate_current_salience(), |
| "decay_rate": self.decay_rate, |
| "associations": self.associations, |
| "source": self.source, |
| "tags": self.tags, |
| } |
|
|
|
|
| class EpisodicMemory(Memory): |
| """Episodic memory representing specific experiences.""" |
| |
| sequence_position: Optional[int] = Field(default=None) |
| emotional_valence: float = Field(default=0.0) |
| outcome: Optional[str] = Field(default=None) |
| |
| def __init__(self, **data): |
| data["memory_type"] = "episodic" |
| super().__init__(**data) |
|
|
|
|
| class SemanticMemory(Memory): |
| """Semantic memory representing conceptual knowledge.""" |
| |
| certainty: float = Field(default=0.7) |
| contradiction_ids: List[str] = Field(default_factory=list) |
| supporting_evidence: List[str] = Field(default_factory=list) |
| |
| def __init__(self, **data): |
| data["memory_type"] = "semantic" |
| |
| data.setdefault("decay_rate", 0.05) |
| super().__init__(**data) |
| |
| def add_evidence(self, memory_id: str, is_supporting: bool = True) -> None: |
| """ |
| Add supporting or contradicting evidence. |
| |
| Args: |
| memory_id: Memory ID for evidence |
| is_supporting: Whether evidence is supporting (True) or contradicting (False) |
| """ |
| if is_supporting: |
| if memory_id not in self.supporting_evidence: |
| self.supporting_evidence.append(memory_id) |
| else: |
| if memory_id not in self.contradiction_ids: |
| self.contradiction_ids.append(memory_id) |
| |
| def update_certainty(self, evidence_ratio: float) -> None: |
| """ |
| Update certainty based on supporting/contradicting evidence ratio. |
| |
| Args: |
| evidence_ratio: Ratio of supporting to total evidence (0-1) |
| """ |
| |
| self.certainty = 0.7 * self.certainty + 0.3 * evidence_ratio |
|
|
|
|
| class WorkingMemory(Memory): |
| """Working memory representing active thinking and temporary storage.""" |
| |
| expiration_time: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now() + datetime.timedelta(hours=1)) |
| priority: int = Field(default=1) |
| |
| def __init__(self, **data): |
| data["memory_type"] = "working" |
| |
| data.setdefault("decay_rate", 0.5) |
| super().__init__(**data) |
| |
| def set_expiration(self, hours: float) -> None: |
| """ |
| Set expiration time for working memory. |
| |
| Args: |
| hours: Hours until expiration |
| """ |
| self.expiration_time = datetime.datetime.now() + datetime.timedelta(hours=hours) |
| |
| def is_expired(self) -> bool: |
| """Check if working memory has expired.""" |
| return datetime.datetime.now() > self.expiration_time |
|
|
|
|
| class MemoryShell: |
| """ |
| Memory shell architecture for agent cognitive persistence. |
| |
| The MemoryShell provides: |
| - Multi-tiered memory system (working, episodic, semantic) |
| - Configurable decay rates for different memory types |
| - Time-based and access-based memory reinforcement |
| - Associative memory network with activation spread |
| - Query capabilities with relevance ranking |
| """ |
| |
| def __init__(self, decay_rate: float = 0.2): |
| """ |
| Initialize memory shell. |
| |
| Args: |
| decay_rate: Base decay rate for memories |
| """ |
| self.memories: Dict[str, Memory] = {} |
| self.decay_rate = decay_rate |
| self.working_memory_capacity = 7 |
| self.episodic_index: Dict[str, Set[str]] = defaultdict(set) |
| self.semantic_index: Dict[str, Set[str]] = defaultdict(set) |
| self.temporal_sequence: List[str] = [] |
| self.activation_threshold = 0.1 |
| |
| |
| self.stats = { |
| "total_memories_created": 0, |
| "total_memories_decayed": 0, |
| "working_memory_count": 0, |
| "episodic_memory_count": 0, |
| "semantic_memory_count": 0, |
| "average_salience": 0.0, |
| "association_count": 0, |
| } |
| |
| def add_working_memory(self, content: Dict[str, Any], priority: int = 1, |
| expiration_hours: float = 1.0, tags: List[str] = None) -> str: |
| """ |
| Add item to working memory. |
| |
| Args: |
| content: Memory content |
| priority: Priority level (higher = more important) |
| expiration_hours: Hours until expiration |
| tags: Semantic tags |
| |
| Returns: |
| Memory ID |
| """ |
| |
| memory = WorkingMemory( |
| content=content, |
| priority=priority, |
| decay_rate=self.decay_rate * 2, |
| tags=tags or [], |
| source="working", |
| ) |
| |
| |
| memory.set_expiration(expiration_hours) |
| |
| |
| self.memories[memory.id] = memory |
| |
| |
| for tag in memory.tags: |
| self.episodic_index[tag].add(memory.id) |
| |
| |
| self._enforce_working_memory_capacity() |
| |
| |
| self.stats["total_memories_created"] += 1 |
| self.stats["working_memory_count"] += 1 |
| |
| return memory.id |
| |
| def add_episodic_memory(self, content: Dict[str, Any], emotional_valence: float = 0.0, |
| outcome: Optional[str] = None, tags: List[str] = None) -> str: |
| """ |
| Add episodic memory. |
| |
| Args: |
| content: Memory content |
| emotional_valence: Emotional charge (-1 to 1) |
| outcome: Outcome of the experience |
| tags: Semantic tags |
| |
| Returns: |
| Memory ID |
| """ |
| |
| memory = EpisodicMemory( |
| content=content, |
| emotional_valence=emotional_valence, |
| outcome=outcome, |
| decay_rate=self.decay_rate, |
| tags=tags or [], |
| source="episode", |
| ) |
| |
| |
| memory.sequence_position = len(self.temporal_sequence) |
| |
| |
| self.memories[memory.id] = memory |
| |
| |
| for tag in memory.tags: |
| self.episodic_index[tag].add(memory.id) |
| |
| |
| self.temporal_sequence.append(memory.id) |
| |
| |
| self.stats["total_memories_created"] += 1 |
| self.stats["episodic_memory_count"] += 1 |
| |
| return memory.id |
| |
| def add_semantic_memory(self, content: Dict[str, Any], certainty: float = 0.7, |
| tags: List[str] = None) -> str: |
| """ |
| Add semantic memory. |
| |
| Args: |
| content: Memory content |
| certainty: Certainty level (0-1) |
| tags: Semantic tags |
| |
| Returns: |
| Memory ID |
| """ |
| |
| memory = SemanticMemory( |
| content=content, |
| certainty=certainty, |
| decay_rate=self.decay_rate * 0.5, |
| tags=tags or [], |
| source="semantic", |
| ) |
| |
| |
| self.memories[memory.id] = memory |
| |
| |
| for tag in memory.tags: |
| self.semantic_index[tag].add(memory.id) |
| |
| |
| self.stats["total_memories_created"] += 1 |
| self.stats["semantic_memory_count"] += 1 |
| |
| return memory.id |
| |
| def add_experience(self, experience: Dict[str, Any]) -> Tuple[str, List[str]]: |
| """ |
| Add new experience as episodic memory and extract semantic memories. |
| |
| Args: |
| experience: Experience data |
| |
| Returns: |
| Tuple of (episodic_id, list of semantic_ids) |
| """ |
| |
| tags = experience.get("tags", []) |
| if not tags and "type" in experience: |
| tags = [experience["type"]] |
| |
| |
| episodic_id = self.add_episodic_memory( |
| content=experience, |
| emotional_valence=experience.get("emotional_valence", 0.0), |
| outcome=experience.get("outcome"), |
| tags=tags, |
| ) |
| |
| |
| semantic_ids = [] |
| if "insights" in experience and isinstance(experience["insights"], list): |
| for insight in experience["insights"]: |
| if isinstance(insight, dict): |
| semantic_id = self.add_semantic_memory( |
| content=insight, |
| certainty=insight.get("confidence", 0.7), |
| tags=insight.get("tags", tags), |
| ) |
| semantic_ids.append(semantic_id) |
| |
| |
| self.add_association(episodic_id, semantic_id, 0.8) |
| |
| return episodic_id, semantic_ids |
| |
| def add_association(self, memory_id1: str, memory_id2: str, strength: float = 0.5) -> bool: |
| """ |
| Add bidirectional association between memories. |
| |
| Args: |
| memory_id1: First memory ID |
| memory_id2: Second memory ID |
| strength: Association strength (0-1) |
| |
| Returns: |
| Success status |
| """ |
| |
| if memory_id1 not in self.memories or memory_id2 not in self.memories: |
| return False |
| |
| |
| self.memories[memory_id1].add_association(memory_id2, strength) |
| self.memories[memory_id2].add_association(memory_id1, strength) |
| |
| |
| self.stats["association_count"] += 2 |
| |
| return True |
| |
| def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Retrieve memory by ID. |
| |
| Args: |
| memory_id: Memory ID |
| |
| Returns: |
| Memory data or None if not found |
| """ |
| if memory_id not in self.memories: |
| return None |
| |
| |
| memory = self.memories[memory_id] |
| |
| |
| memory.update_access() |
| |
| |
| memory_dict = memory.as_dict() |
| |
| return memory_dict |
| |
| def query_memories(self, query: Dict[str, Any], memory_type: Optional[str] = None, |
| tags: Optional[List[str]] = None, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Query memories based on content, type, and tags. |
| |
| Args: |
| query: Query terms |
| memory_type: Optional filter by memory type |
| tags: Optional filter by tags |
| limit: Maximum number of results |
| |
| Returns: |
| List of matching memories |
| """ |
| |
| candidate_ids = set() |
| |
| if memory_type: |
| |
| for memory_id, memory in self.memories.items(): |
| if memory.memory_type == memory_type: |
| candidate_ids.add(memory_id) |
| else: |
| |
| candidate_ids = set(self.memories.keys()) |
| |
| |
| if tags: |
| tag_memories = set() |
| for tag in tags: |
| |
| tag_memories.update(self.episodic_index.get(tag, set())) |
| tag_memories.update(self.semantic_index.get(tag, set())) |
| |
| |
| if tag_memories: |
| candidate_ids = candidate_ids.intersection(tag_memories) |
| |
| |
| scored_candidates = [] |
| |
| for memory_id in candidate_ids: |
| memory = self.memories[memory_id] |
| |
| |
| current_salience = memory.calculate_current_salience() |
| if current_salience < self.activation_threshold: |
| continue |
| |
| |
| relevance = self._calculate_relevance(memory, query) |
| |
| |
| score = 0.7 * relevance + 0.3 * current_salience |
| |
| |
| scored_candidates.append((memory_id, score)) |
| |
| |
| top_candidates = heapq.nlargest(limit, scored_candidates, key=lambda x: x[1]) |
| |
| |
| result_memories = [] |
| for memory_id, score in top_candidates: |
| memory = self.memories[memory_id] |
| memory.update_access() |
| |
| |
| memory_dict = memory.as_dict() |
| memory_dict["relevance_score"] = score |
| |
| result_memories.append(memory_dict) |
| |
| return result_memories |
| |
| def get_recent_memories(self, memory_type: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]: |
| """ |
| Get most recent memories by creation time. |
| |
| Args: |
| memory_type: Optional filter by memory type |
| limit: Maximum number of results |
| |
| Returns: |
| List of recent memories |
| """ |
| |
| recent_memories = [] |
| |
| for memory_id, memory in self.memories.items(): |
| |
| if memory_type and memory.memory_type != memory_type: |
| continue |
| |
| |
| recent_memories.append((memory_id, memory.creation_time)) |
| |
| |
| recent_memories.sort(key=lambda x: x[1], reverse=True) |
| |
| |
| result_memories = [] |
| for memory_id, _ in recent_memories[:limit]: |
| memory = self.memories[memory_id] |
| memory.update_access() |
| result_memories.append(memory.as_dict()) |
| |
| return result_memories |
| |
| def get_temporal_sequence(self, start_index: int = 0, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get temporal sequence of episodic memories. |
| |
| Args: |
| start_index: Starting index in sequence |
| limit: Maximum number of results |
| |
| Returns: |
| List of episodic memories in temporal order |
| """ |
| |
| sequence_slice = self.temporal_sequence[start_index:start_index+limit] |
| |
| |
| result_memories = [] |
| for memory_id in sequence_slice: |
| if memory_id in self.memories: |
| memory = self.memories[memory_id] |
| memory.update_access() |
| result_memories.append(memory.as_dict()) |
| |
| return result_memories |
| |
| def get_relevant_experiences(self, query: Optional[Dict[str, Any]] = None, |
| tags: Optional[List[str]] = None, limit: int = 5) -> List[Dict[str, Any]]: |
| """ |
| Get relevant episodic experiences. |
| |
| Args: |
| query: Optional query terms |
| tags: Optional filter by tags |
| limit: Maximum number of results |
| |
| Returns: |
| List of relevant experiences |
| """ |
| |
| if query: |
| return self.query_memories(query, memory_type="episodic", tags=tags, limit=limit) |
| |
| |
| |
| candidate_ids = set() |
| if tags: |
| for tag in tags: |
| candidate_ids.update(self.episodic_index.get(tag, set())) |
| else: |
| |
| candidate_ids = {memory_id for memory_id, memory in self.memories.items() |
| if memory.memory_type == "episodic"} |
| |
| |
| scored_candidates = [] |
| for memory_id in candidate_ids: |
| if memory_id in self.memories: |
| memory = self.memories[memory_id] |
| current_salience = memory.calculate_current_salience() |
| |
| |
| if current_salience < self.activation_threshold: |
| continue |
| |
| scored_candidates.append((memory_id, current_salience)) |
| |
| |
| top_candidates = heapq.nlargest(limit, scored_candidates, key=lambda x: x[1]) |
| |
| |
| result_memories = [] |
| for memory_id, _ in top_candidates: |
| memory = self.memories[memory_id] |
| memory.update_access() |
| result_memories.append(memory.as_dict()) |
| |
| return result_memories |
| |
| def get_beliefs(self, tags: Optional[List[str]] = None, certainty_threshold: float = 0.5) -> List[Dict[str, Any]]: |
| """ |
| Get semantic beliefs with high certainty. |
| |
| Args: |
| tags: Optional filter by tags |
| certainty_threshold: Minimum certainty threshold |
| |
| Returns: |
| List of semantic beliefs |
| """ |
| |
| candidate_ids = set() |
| if tags: |
| for tag in tags: |
| candidate_ids.update(self.semantic_index.get(tag, set())) |
| else: |
| |
| candidate_ids = {memory_id for memory_id, memory in self.memories.items() |
| if memory.memory_type == "semantic"} |
| |
| |
| scored_candidates = [] |
| for memory_id in candidate_ids: |
| if memory_id in self.memories: |
| memory = self.memories[memory_id] |
| |
| |
| if memory.memory_type != "semantic" or not hasattr(memory, "certainty"): |
| continue |
| |
| if memory.certainty < certainty_threshold: |
| continue |
| |
| |
| current_salience = memory.calculate_current_salience() |
| |
| |
| if current_salience < self.activation_threshold: |
| continue |
| |
| |
| score = 0.6 * memory.certainty + 0.4 * current_salience |
| |
| scored_candidates.append((memory_id, score)) |
| |
| |
| scored_candidates.sort(key=lambda x: x[1], reverse=True) |
| |
| |
| result_memories = [] |
| for memory_id, score in scored_candidates: |
| memory = self.memories[memory_id] |
| memory.update_access() |
| |
| |
| memory_dict = memory.as_dict() |
| memory_dict["belief_score"] = score |
| |
| result_memories.append(memory_dict) |
| |
| return result_memories |
| |
| def apply_decay(self) -> int: |
| """ |
| Apply memory decay to all memories and clean up decayed memories. |
| |
| Returns: |
| Number of memories removed due to decay |
| """ |
| |
| to_remove = [] |
| |
| |
| for memory_id, memory in self.memories.items(): |
| |
| current_salience = memory.calculate_current_salience() |
| |
| |
| if current_salience < self.activation_threshold: |
| to_remove.append(memory_id) |
| |
| |
| for memory_id in to_remove: |
| self._remove_memory(memory_id) |
| |
| |
| self.stats["total_memories_decayed"] += len(to_remove) |
| self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| |
| |
| if self.memories: |
| self.stats["average_salience"] = sum(m.calculate_current_salience() for m in self.memories.values()) / len(self.memories) |
| |
| return len(to_remove) |
| |
| def consolidate_memories(self) -> Dict[str, Any]: |
| """ |
| Consolidate episodic memories into semantic memories. |
| |
| Returns: |
| Consolidation results |
| """ |
| |
| |
| recent_episodic = self.get_recent_memories(memory_type="episodic", limit=10) |
| |
| |
| tag_counts = defaultdict(int) |
| for memory in recent_episodic: |
| for tag in memory.get("tags", []): |
| tag_counts[tag] += 1 |
| |
| |
| common_tags = {tag for tag, count in tag_counts.items() if count >= 3} |
| |
| |
| consolidated_ids = [] |
| if common_tags: |
| |
| semantic_id = self.add_semantic_memory( |
| content={ |
| "consolidated_from": [m.get("id") for m in recent_episodic], |
| "common_tags": list(common_tags), |
| "summary": f"Consolidated memory with common tags: {', '.join(common_tags)}" |
| }, |
| certainty=0.6, |
| tags=list(common_tags), |
| ) |
| |
| consolidated_ids.append(semantic_id) |
| |
| return { |
| "consolidated_count": len(consolidated_ids), |
| "consolidated_ids": consolidated_ids, |
| "common_tags": list(common_tags) if common_tags else [] |
| } |
| |
| def _calculate_relevance(self, memory: Memory, query: Dict[str, Any]) -> float: |
| """ |
| Calculate relevance score of memory to query. |
| |
| Args: |
| memory: Memory to score |
| query: Query terms |
| |
| Returns: |
| Relevance score (0-1) |
| """ |
| |
| relevance = 0.0 |
| |
| |
| content = memory.content |
| |
| |
| matching_keys = set(query.keys()).intersection(set(content.keys())) |
| if matching_keys: |
| relevance += 0.3 * (len(matching_keys) / len(query)) |
| |
| |
| for key, value in query.items(): |
| if key in content and isinstance(value, str) and isinstance(content[key], str): |
| if value.lower() in content[key].lower(): |
| relevance += 0.2 |
| elif key in content and value == content[key]: |
| relevance += 0.3 |
| |
| |
| query_tags = query.get("tags", []) |
| if isinstance(query_tags, list) and memory.tags: |
| matching_tags = set(query_tags).intersection(set(memory.tags)) |
| if matching_tags: |
| relevance += 0.3 * (len(matching_tags) / len(query_tags)) |
| |
| |
| return min(1.0, relevance) |
| |
| def _enforce_working_memory_capacity(self) -> None: |
| """Enforce working memory capacity limit by removing low priority items.""" |
| |
| working_memories = [(memory_id, memory) for memory_id, memory in self.memories.items() |
| if memory.memory_type == "working"] |
| |
| |
| if len(working_memories) <= self.working_memory_capacity: |
| return |
| |
| |
| working_memories.sort(key=lambda x: (x[1].priority, x[1].calculate_current_salience())) |
| |
| |
| for memory_id, _ in working_memories[:len(working_memories) - self.working_memory_capacity]: |
| self._remove_memory(memory_id) |
| |
| def _remove_memory(self, memory_id: str) -> None: |
| """ |
| Remove memory by ID. |
| |
| Args: |
| memory_id: Memory ID to remove |
| """ |
| if memory_id not in self.memories: |
| return |
| |
| |
| memory = self.memories[memory_id] |
| |
| |
| del self.memories[memory_id] |
| |
| |
| for tag in memory.tags: |
| if memory.memory_type == "episodic" and tag in self.episodic_index: |
| self.episodic_index[tag].discard(memory_id) |
| elif memory.memory_type == "semantic" and tag in self.semantic_index: |
| self.semantic_index[tag].discard(memory_id) |
| |
| |
| if memory.memory_type == "episodic": |
| if memory_id in self.temporal_sequence: |
| self.temporal_sequence.remove(memory_id) |
| |
| |
| for other_id, other_memory in self.memories.items(): |
| if memory_id in other_memory.associations: |
| del other_memory.associations[memory_id] |
| |
| def export_state(self) -> Dict[str, Any]: |
| """ |
| Export memory shell state. |
| |
| Returns: |
| Serializable memory shell state |
| """ |
| |
| memory_dicts = {memory_id: memory.as_dict() for memory_id, memory in self.memories.items()} |
| |
| |
| episodic_index = {tag: list(memories) for tag, memories in self.episodic_index.items()} |
| semantic_index = {tag: list(memories) for tag, memories in self.semantic_index.items()} |
| |
| |
| state = { |
| "memories": memory_dicts, |
| "episodic_index": episodic_index, |
| "semantic_index": semantic_index, |
| "temporal_sequence": self.temporal_sequence, |
| "decay_rate": self.decay_rate, |
| "activation_threshold": self.activation_threshold, |
| "working_memory_capacity": self.working_memory_capacity, |
| "stats": self.stats, |
| } |
| |
| return state |
| |
| def import_state(self, state: Dict[str, Any]) -> None: |
| """ |
| Import memory shell state. |
| |
| Args: |
| state: Memory shell state |
| """ |
| |
| self.memories = {} |
| self.episodic_index = defaultdict(set) |
| self.semantic_index = defaultdict(set) |
| self.temporal_sequence = [] |
| |
| |
| self.decay_rate = state.get("decay_rate", self.decay_rate) |
| self.activation_threshold = state.get("activation_threshold", self.activation_threshold) |
| self.working_memory_capacity = state.get("working_memory_capacity", self.working_memory_capacity) |
| |
| |
| for memory_id, memory_dict in state.get("memories", {}).items(): |
| memory_type = memory_dict.get("memory_type") |
| |
| if memory_type == "working": |
| |
| memory = WorkingMemory( |
| id=memory_id, |
| content=memory_dict.get("content", {}), |
| priority=memory_dict.get("priority", 1), |
| decay_rate=memory_dict.get("decay_rate", self.decay_rate * 2), |
| tags=memory_dict.get("tags", []), |
| source=memory_dict.get("source", "working"), |
| salience=memory_dict.get("salience", 1.0), |
| creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| access_count=memory_dict.get("access_count", 1), |
| associations=memory_dict.get("associations", {}), |
| ) |
| |
| |
| if "expiration_time" in memory_dict: |
| memory.expiration_time = datetime.datetime.fromisoformat(memory_dict["expiration_time"]) |
| else: |
| memory.set_expiration(1.0) |
| |
| |
| self.memories[memory_id] = memory |
| |
| elif memory_type == "episodic": |
| |
| memory = EpisodicMemory( |
| id=memory_id, |
| content=memory_dict.get("content", {}), |
| emotional_valence=memory_dict.get("emotional_valence", 0.0), |
| outcome=memory_dict.get("outcome"), |
| decay_rate=memory_dict.get("decay_rate", self.decay_rate), |
| tags=memory_dict.get("tags", []), |
| source=memory_dict.get("source", "episode"), |
| salience=memory_dict.get("salience", 1.0), |
| creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| access_count=memory_dict.get("access_count", 1), |
| associations=memory_dict.get("associations", {}), |
| sequence_position=memory_dict.get("sequence_position"), |
| ) |
| |
| |
| self.memories[memory_id] = memory |
| |
| elif memory_type == "semantic": |
| |
| memory = SemanticMemory( |
| id=memory_id, |
| content=memory_dict.get("content", {}), |
| certainty=memory_dict.get("certainty", 0.7), |
| decay_rate=memory_dict.get("decay_rate", self.decay_rate * 0.5), |
| tags=memory_dict.get("tags", []), |
| source=memory_dict.get("source", "semantic"), |
| salience=memory_dict.get("salience", 1.0), |
| creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| access_count=memory_dict.get("access_count", 1), |
| associations=memory_dict.get("associations", {}), |
| contradiction_ids=memory_dict.get("contradiction_ids", []), |
| supporting_evidence=memory_dict.get("supporting_evidence", []), |
| ) |
| |
| |
| self.memories[memory_id] = memory |
| |
| |
| for tag, memory_ids in state.get("episodic_index", {}).items(): |
| self.episodic_index[tag] = set(memory_ids) |
| |
| for tag, memory_ids in state.get("semantic_index", {}).items(): |
| self.semantic_index[tag] = set(memory_ids) |
| |
| |
| self.temporal_sequence = state.get("temporal_sequence", []) |
| |
| |
| self.stats = state.get("stats", self.stats.copy()) |
| |
| |
| self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """ |
| Get memory shell statistics. |
| |
| Returns: |
| Memory statistics |
| """ |
| |
| self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| |
| |
| if self.memories: |
| self.stats["average_salience"] = sum(m.calculate_current_salience() for m in self.memories.values()) / len(self.memories) |
| |
| |
| active_memories = sum(1 for m in self.memories.values() |
| if m.calculate_current_salience() >= self.activation_threshold) |
| |
| tag_stats = { |
| "episodic_tags": len(self.episodic_index), |
| "semantic_tags": len(self.semantic_index), |
| } |
| |
| decay_stats = { |
| "activation_threshold": self.activation_threshold, |
| "active_memory_ratio": active_memories / len(self.memories) if self.memories else 0, |
| "decay_rate": self.decay_rate, |
| } |
| |
| return { |
| **self.stats, |
| **tag_stats, |
| **decay_stats, |
| "total_memories": len(self.memories), |
| "active_memories": active_memories, |
| } |
|
|