| import asyncio
|
| import json
|
| import os
|
| import sys
|
| import time
|
| from unittest.mock import MagicMock, patch
|
| import pytest
|
|
|
|
|
| import types
|
| def mock_module(name):
|
| m = types.ModuleType(name)
|
| sys.modules[name] = m
|
| return m
|
|
|
|
|
| try:
|
| import mnemocore.core.engine
|
| import mnemocore.core.node
|
| import mnemocore.core.qdrant_store
|
| import mnemocore.core.async_storage
|
| import mnemocore.meta.learning_journal
|
| except ImportError:
|
| pass
|
|
|
|
|
| if "mnemocore.core.engine" not in sys.modules:
|
| mock_module("mnemocore.core")
|
| mock_module("mnemocore.core.engine")
|
| sys.modules["mnemocore.core.engine"].HAIMEngine = MagicMock()
|
| mock_module("mnemocore.core.node")
|
| sys.modules["mnemocore.core.node"].MemoryNode = MagicMock()
|
| mock_module("mnemocore.core.qdrant_store")
|
| sys.modules["mnemocore.core.qdrant_store"].QdrantStore = MagicMock()
|
|
|
| if "mnemocore.core.async_storage" not in sys.modules:
|
| mock_module("mnemocore.core.async_storage")
|
| sys.modules["mnemocore.core.async_storage"].AsyncRedisStorage = MagicMock()
|
|
|
| if "mnemocore.meta.learning_journal" not in sys.modules:
|
| mock_module("mnemocore.meta")
|
| mock_module("mnemocore.meta.learning_journal")
|
| sys.modules["mnemocore.meta.learning_journal"].LearningJournal = MagicMock()
|
|
|
| if "aiohttp" not in sys.modules:
|
| mock_module("aiohttp")
|
| sys.modules["aiohttp"].ClientSession = MagicMock()
|
|
|
|
|
| sys.path.insert(0, os.path.abspath("."))
|
| from mnemocore.subconscious.daemon import SubconsciousDaemon
|
|
|
| async def _async_test_save_evolution_state_non_blocking():
|
| """
|
| Async test logic that verifies _save_evolution_state does not block the event loop.
|
| We simulate slow I/O by patching json.dump.
|
| """
|
|
|
|
|
| daemon = SubconsciousDaemon()
|
|
|
|
|
| with patch("mnemocore.subconscious.daemon.EVOLUTION_STATE_PATH", "/tmp/test_evolution_perf.json"):
|
|
|
|
|
|
|
|
|
| original_dump = json.dump
|
|
|
| def slow_dump(*args, **kwargs):
|
| time.sleep(0.2)
|
| return original_dump(*args, **kwargs)
|
|
|
| with patch("json.dump", side_effect=slow_dump):
|
|
|
|
|
|
|
|
|
| loop_blocked_duration = 0
|
| ticker_running = True
|
|
|
| async def ticker():
|
| nonlocal loop_blocked_duration
|
| while ticker_running:
|
| start = time.time()
|
| await asyncio.sleep(0.01)
|
| diff = time.time() - start
|
|
|
| if diff > 0.05:
|
| loop_blocked_duration = max(loop_blocked_duration, diff)
|
|
|
| ticker_task = asyncio.create_task(ticker())
|
|
|
|
|
| await asyncio.sleep(0.05)
|
|
|
|
|
|
|
|
|
|
|
| start_time = time.time()
|
| if asyncio.iscoroutinefunction(daemon._save_evolution_state):
|
| await daemon._save_evolution_state()
|
| else:
|
| daemon._save_evolution_state()
|
| end_time = time.time()
|
|
|
|
|
| ticker_running = False
|
| try:
|
| await ticker_task
|
| except asyncio.CancelledError:
|
| pass
|
|
|
|
|
| print(f"Operation took: {end_time - start_time:.4f}s")
|
| print(f"Max loop block: {loop_blocked_duration:.4f}s")
|
|
|
|
|
|
|
|
|
|
|
|
|
| if loop_blocked_duration >= 0.1:
|
| raise AssertionError(f"Event loop was blocked for {loop_blocked_duration:.4f}s")
|
|
|
| def test_save_evolution_state_non_blocking():
|
| asyncio.run(_async_test_save_evolution_state_non_blocking())
|
|
|
| if __name__ == "__main__":
|
| test_save_evolution_state_non_blocking()
|
|
|