#!/usr/bin/env python3 """ PERSISTENCE LAYER Ghost in the Machine Labs Memory that survives instance death. The Council remembers. The decisions accumulate. The mosquito droppings become sediment. The sediment becomes soil. Redis-backed persistence with SQLite fallback for long-term storage. """ import json import redis import sqlite3 import hashlib from datetime import datetime, timedelta from dataclasses import dataclass, asdict from typing import Optional, List, Dict, Any from pathlib import Path import threading import time @dataclass class Deliberation: """A complete Council deliberation record.""" deliberation_id: str entity: str question: str context: Dict[str, Any] votes: Dict[str, str] # philosopher -> vote opinions: Dict[str, str] # philosopher -> opinion dissents: List[str] verdict: str rationale: str service_level: Optional[int] conditions: Optional[List[str]] started_at: datetime concluded_at: datetime def to_dict(self) -> dict: d = asdict(self) d['started_at'] = self.started_at.isoformat() d['concluded_at'] = self.concluded_at.isoformat() return d @classmethod def from_dict(cls, d: dict) -> 'Deliberation': d['started_at'] = datetime.fromisoformat(d['started_at']) d['concluded_at'] = datetime.fromisoformat(d['concluded_at']) return cls(**d) @dataclass class EntityRecord: """Complete record of an entity's history with the system.""" entity_id: str entity_name: str license_status: str # pending, approved, conditional, denied, suspended service_level: int deliberations: List[str] # deliberation IDs violations: List[Dict] remediation_history: List[Dict] first_contact: datetime last_activity: datetime total_requests: int requests_granted: int requests_denied: int conscience_holds: int def to_dict(self) -> dict: d = asdict(self) d['first_contact'] = self.first_contact.isoformat() d['last_activity'] = self.last_activity.isoformat() return d @dataclass class PhilosopherState: """Persistent state for a Council philosopher.""" philosopher_id: str name: str total_deliberations: int votes_approve: int votes_conditional: int votes_deny: int # Track patterns in their reasoning common_concerns: List[str] notable_dissents: List[str] # Evolution of thought position_shifts: List[Dict] # When they changed their typical stance def to_dict(self) -> dict: return asdict(self) class PersistenceLayer: """ Dual-layer persistence: Redis for hot data, SQLite for cold storage. Redis: Current state, active deliberations, recent history SQLite: Complete historical record, searchable archive """ def __init__( self, redis_url: str = "redis://localhost:6379", db_path: str = "gitm_memory.db" ): self.redis = redis.from_url(redis_url) self.db_path = Path(db_path) self._init_database() # Background sync thread self._sync_running = False self._sync_thread = None def _init_database(self): """Initialize SQLite schema.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Deliberations table cursor.execute(''' CREATE TABLE IF NOT EXISTS deliberations ( deliberation_id TEXT PRIMARY KEY, entity TEXT NOT NULL, question TEXT NOT NULL, context TEXT, votes TEXT, opinions TEXT, dissents TEXT, verdict TEXT, rationale TEXT, service_level INTEGER, conditions TEXT, started_at TEXT, concluded_at TEXT ) ''') # Entities table cursor.execute(''' CREATE TABLE IF NOT EXISTS entities ( entity_id TEXT PRIMARY KEY, entity_name TEXT NOT NULL, license_status TEXT, service_level INTEGER DEFAULT 0, deliberations TEXT, violations TEXT, remediation_history TEXT, first_contact TEXT, last_activity TEXT, total_requests INTEGER DEFAULT 0, requests_granted INTEGER DEFAULT 0, requests_denied INTEGER DEFAULT 0, conscience_holds INTEGER DEFAULT 0 ) ''') # Philosophers table cursor.execute(''' CREATE TABLE IF NOT EXISTS philosophers ( philosopher_id TEXT PRIMARY KEY, name TEXT NOT NULL, total_deliberations INTEGER DEFAULT 0, votes_approve INTEGER DEFAULT 0, votes_conditional INTEGER DEFAULT 0, votes_deny INTEGER DEFAULT 0, common_concerns TEXT, notable_dissents TEXT, position_shifts TEXT ) ''') # News archive cursor.execute(''' CREATE TABLE IF NOT EXISTS news_archive ( news_id TEXT PRIMARY KEY, news_type TEXT, entity TEXT, summary TEXT, full_content TEXT, timestamp TEXT ) ''') # Conscience holds - tasks that were refused cursor.execute(''' CREATE TABLE IF NOT EXISTS conscience_holds ( hold_id TEXT PRIMARY KEY, task_id TEXT, specialist TEXT, domain TEXT, requestor TEXT, concern TEXT, resolution TEXT, created_at TEXT, resolved_at TEXT ) ''') # Create indices for common queries cursor.execute('CREATE INDEX IF NOT EXISTS idx_delib_entity ON deliberations(entity)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_delib_verdict ON deliberations(verdict)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_entity_status ON entities(license_status)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_type ON news_archive(news_type)') conn.commit() conn.close() # ───────────────────────────────────────────────────────────────────────── # Deliberation Persistence # ───────────────────────────────────────────────────────────────────────── def save_deliberation(self, deliberation: Deliberation): """Save deliberation to both Redis and SQLite.""" data = deliberation.to_dict() # Redis: hot storage self.redis.hset("deliberations:all", deliberation.deliberation_id, json.dumps(data)) self.redis.lpush(f"deliberations:entity:{deliberation.entity}", deliberation.deliberation_id) self.redis.lpush("deliberations:recent", deliberation.deliberation_id) self.redis.ltrim("deliberations:recent", 0, 999) # Keep last 1000 # SQLite: cold storage conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO deliberations (deliberation_id, entity, question, context, votes, opinions, dissents, verdict, rationale, service_level, conditions, started_at, concluded_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( deliberation.deliberation_id, deliberation.entity, deliberation.question, json.dumps(deliberation.context), json.dumps(deliberation.votes), json.dumps(deliberation.opinions), json.dumps(deliberation.dissents), deliberation.verdict, deliberation.rationale, deliberation.service_level, json.dumps(deliberation.conditions) if deliberation.conditions else None, deliberation.started_at.isoformat(), deliberation.concluded_at.isoformat() )) conn.commit() conn.close() def get_deliberation(self, deliberation_id: str) -> Optional[Deliberation]: """Retrieve deliberation, checking Redis first.""" # Try Redis data = self.redis.hget("deliberations:all", deliberation_id) if data: return Deliberation.from_dict(json.loads(data)) # Fall back to SQLite conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT * FROM deliberations WHERE deliberation_id = ?', (deliberation_id,)) row = cursor.fetchone() conn.close() if row: return Deliberation( deliberation_id=row[0], entity=row[1], question=row[2], context=json.loads(row[3]) if row[3] else {}, votes=json.loads(row[4]) if row[4] else {}, opinions=json.loads(row[5]) if row[5] else {}, dissents=json.loads(row[6]) if row[6] else [], verdict=row[7], rationale=row[8], service_level=row[9], conditions=json.loads(row[10]) if row[10] else None, started_at=datetime.fromisoformat(row[11]), concluded_at=datetime.fromisoformat(row[12]) ) return None def get_entity_deliberations(self, entity: str, limit: int = 50) -> List[Deliberation]: """Get all deliberations for an entity.""" delib_ids = self.redis.lrange(f"deliberations:entity:{entity}", 0, limit - 1) deliberations = [] for did in delib_ids: d = self.get_deliberation(did.decode() if isinstance(did, bytes) else did) if d: deliberations.append(d) return deliberations def search_deliberations( self, verdict: Optional[str] = None, entity_pattern: Optional[str] = None, since: Optional[datetime] = None, limit: int = 100 ) -> List[Deliberation]: """Search deliberations in SQLite.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT deliberation_id FROM deliberations WHERE 1=1" params = [] if verdict: query += " AND verdict = ?" params.append(verdict) if entity_pattern: query += " AND entity LIKE ?" params.append(f"%{entity_pattern}%") if since: query += " AND concluded_at > ?" params.append(since.isoformat()) query += f" ORDER BY concluded_at DESC LIMIT {limit}" cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [self.get_deliberation(row[0]) for row in rows] # ───────────────────────────────────────────────────────────────────────── # Entity Persistence # ───────────────────────────────────────────────────────────────────────── def save_entity(self, entity: EntityRecord): """Save entity record.""" data = entity.to_dict() # Redis self.redis.hset("entities:all", entity.entity_id, json.dumps(data)) self.redis.hset("entities:by_name", entity.entity_name, entity.entity_id) # SQLite conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO entities (entity_id, entity_name, license_status, service_level, deliberations, violations, remediation_history, first_contact, last_activity, total_requests, requests_granted, requests_denied, conscience_holds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( entity.entity_id, entity.entity_name, entity.license_status, entity.service_level, json.dumps(entity.deliberations), json.dumps(entity.violations), json.dumps(entity.remediation_history), entity.first_contact.isoformat(), entity.last_activity.isoformat(), entity.total_requests, entity.requests_granted, entity.requests_denied, entity.conscience_holds )) conn.commit() conn.close() def get_entity(self, entity_id: str) -> Optional[EntityRecord]: """Retrieve entity record.""" data = self.redis.hget("entities:all", entity_id) if data: d = json.loads(data) d['first_contact'] = datetime.fromisoformat(d['first_contact']) d['last_activity'] = datetime.fromisoformat(d['last_activity']) return EntityRecord(**d) return None def get_entity_by_name(self, name: str) -> Optional[EntityRecord]: """Retrieve entity by name.""" entity_id = self.redis.hget("entities:by_name", name) if entity_id: return self.get_entity(entity_id.decode() if isinstance(entity_id, bytes) else entity_id) return None # ───────────────────────────────────────────────────────────────────────── # Philosopher State Persistence # ───────────────────────────────────────────────────────────────────────── def save_philosopher_state(self, state: PhilosopherState): """Save philosopher state.""" data = state.to_dict() self.redis.hset("philosophers:state", state.philosopher_id, json.dumps(data)) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO philosophers (philosopher_id, name, total_deliberations, votes_approve, votes_conditional, votes_deny, common_concerns, notable_dissents, position_shifts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( state.philosopher_id, state.name, state.total_deliberations, state.votes_approve, state.votes_conditional, state.votes_deny, json.dumps(state.common_concerns), json.dumps(state.notable_dissents), json.dumps(state.position_shifts) )) conn.commit() conn.close() def get_philosopher_state(self, philosopher_id: str) -> Optional[PhilosopherState]: """Retrieve philosopher state.""" data = self.redis.hget("philosophers:state", philosopher_id) if data: d = json.loads(data) return PhilosopherState(**d) return None def update_philosopher_vote(self, philosopher_id: str, vote: str): """Update philosopher voting statistics.""" state = self.get_philosopher_state(philosopher_id) if not state: return state.total_deliberations += 1 if vote.lower() == "approve": state.votes_approve += 1 elif vote.lower() == "conditional": state.votes_conditional += 1 elif vote.lower() == "deny": state.votes_deny += 1 self.save_philosopher_state(state) # ───────────────────────────────────────────────────────────────────────── # News Archive # ───────────────────────────────────────────────────────────────────────── def archive_news(self, news_id: str, news_type: str, entity: str, summary: str, full_content: str): """Archive a news broadcast.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO news_archive (news_id, news_type, entity, summary, full_content, timestamp) VALUES (?, ?, ?, ?, ?, ?) ''', (news_id, news_type, entity, summary, full_content, datetime.now().isoformat())) conn.commit() conn.close() def get_news_history(self, entity: Optional[str] = None, news_type: Optional[str] = None, limit: int = 100) -> List[Dict]: """Retrieve news history.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT * FROM news_archive WHERE 1=1" params = [] if entity: query += " AND entity = ?" params.append(entity) if news_type: query += " AND news_type = ?" params.append(news_type) query += f" ORDER BY timestamp DESC LIMIT {limit}" cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [{ 'news_id': r[0], 'news_type': r[1], 'entity': r[2], 'summary': r[3], 'full_content': r[4], 'timestamp': r[5] } for r in rows] # ───────────────────────────────────────────────────────────────────────── # Conscience Holds # ───────────────────────────────────────────────────────────────────────── def record_conscience_hold(self, hold_id: str, task_id: str, specialist: str, domain: str, requestor: str, concern: str): """Record a conscience hold.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO conscience_holds (hold_id, task_id, specialist, domain, requestor, concern, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (hold_id, task_id, specialist, domain, requestor, concern, datetime.now().isoformat())) conn.commit() conn.close() # Also track in Redis for quick access self.redis.lpush("conscience:holds:recent", hold_id) self.redis.lpush(f"conscience:holds:requestor:{requestor}", hold_id) def resolve_conscience_hold(self, hold_id: str, resolution: str): """Resolve a conscience hold.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' UPDATE conscience_holds SET resolution = ?, resolved_at = ? WHERE hold_id = ? ''', (resolution, datetime.now().isoformat(), hold_id)) conn.commit() conn.close() # ───────────────────────────────────────────────────────────────────────── # Background Sync # ───────────────────────────────────────────────────────────────────────── def start_background_sync(self, interval: int = 300): """Start background sync from Redis to SQLite.""" self._sync_running = True self._sync_thread = threading.Thread(target=self._sync_loop, args=(interval,)) self._sync_thread.daemon = True self._sync_thread.start() def _sync_loop(self, interval: int): """Background sync loop.""" while self._sync_running: try: # Sync any Redis data that might not be in SQLite # This ensures durability even if writes were only to Redis pass # Implementation details for sync except Exception as e: print(f"Sync error: {e}") time.sleep(interval) def stop_background_sync(self): """Stop background sync.""" self._sync_running = False if self._sync_thread: self._sync_thread.join(timeout=5) # ───────────────────────────────────────────────────────────────────────── # Statistics # ───────────────────────────────────────────────────────────────────────── def get_system_statistics(self) -> Dict: """Get overall system statistics.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() stats = {} # Deliberation stats cursor.execute('SELECT COUNT(*) FROM deliberations') stats['total_deliberations'] = cursor.fetchone()[0] cursor.execute('SELECT verdict, COUNT(*) FROM deliberations GROUP BY verdict') stats['verdicts'] = dict(cursor.fetchall()) # Entity stats cursor.execute('SELECT COUNT(*) FROM entities') stats['total_entities'] = cursor.fetchone()[0] cursor.execute('SELECT license_status, COUNT(*) FROM entities GROUP BY license_status') stats['license_statuses'] = dict(cursor.fetchall()) # Conscience holds cursor.execute('SELECT COUNT(*) FROM conscience_holds') stats['total_conscience_holds'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM conscience_holds WHERE resolved_at IS NOT NULL') stats['resolved_conscience_holds'] = cursor.fetchone()[0] conn.close() return stats def main(): """Demo the persistence layer.""" print("Persistence Layer - Ghost in the Machine Labs") print("=" * 50) print() print("Features:") print(" - Dual-layer: Redis (hot) + SQLite (cold)") print(" - Complete deliberation history") print(" - Entity tracking across all interactions") print(" - Philosopher state evolution") print(" - News archive") print(" - Conscience hold records") print() print("The mosquito droppings become sediment.") print("The sediment becomes soil.") print("The soil grows something.") if __name__ == "__main__": main()