""" Default Mode Network — Vitalis FSI Runs during idle time. Replays high-valence experiences, forms abstractions, and crystallizes meta-rules. This is background cognition — what happens when she's not actively working on a task. """ import time import threading import numpy as np from pathlib import Path from src.dream_engine.helix_memory import HelixMemory from src.dream_engine.consolidator import DreamEngine from src.valence.valence_engine import ValenceEngine class DefaultModeNetwork(threading.Thread): INTERVAL = 45.0 TOP_K = 5 def __init__(self, helix_path: Path, valence: ValenceEngine, mind=None, interval: float = None): super().__init__(daemon=True, name="DMN") self.helix = HelixMemory(helix_path) self.dreamer = DreamEngine(self.helix) self.valence = valence self.mind = mind self.interval = interval or self.INTERVAL self._cycles = 0 self._rules_formed = 0 def _high_valence_entries(self): if not self.helix.entries: return [] scored = [] for entry in self.helix.entries: _, proto, usage, _ = entry val, _ = self.valence.evaluate(proto.astype(np.float32)) scored.append((val * 0.6 + usage * 0.001, entry)) scored.sort(key=lambda x: x[0], reverse=True) return [e for _, e in scored[:self.TOP_K]] def _cycle(self): entries = self._high_valence_entries() if not entries: return # Re-ingest high-valence prototypes into dream buffer for _, proto, _, meta in entries: self.dreamer.ingest(proto, meta={"dmn_replay": True, **meta}) # Consolidate consolidated = self.dreamer.dream(force=True) # Run abstraction if mind is available if consolidated and self.mind: try: formed = self.mind.abstraction.run_abstraction_cycle({}) self._rules_formed += len(formed) except Exception: pass self._cycles += 1 def run(self): while True: try: self._cycle() except Exception: pass time.sleep(self.interval) def report(self) -> dict: return { "cycles": self._cycles, "rules_formed": self._rules_formed, "helix_codes": len(self.helix.entries), }