Spaces:
Running
Running
| """ | |
| Background worker threads for agentcache-python. | |
| Start all workers via start_background_workers(kv). | |
| Each worker runs as a daemon thread so it exits automatically when the process dies. | |
| C5.1: SIGTERM and SIGINT handlers are registered here to flush the index | |
| persistence debounce queue and run a WAL checkpoint before exit. | |
| """ | |
| import os | |
| import signal | |
| import sys | |
| import threading | |
| import time | |
| import functions | |
| # Module-level shutdown flag — set by signal handlers | |
| _shutting_down = threading.Event() | |
| # Reference to the persistence object (set by start_background_workers) | |
| _persistence_ref = None | |
| # Reference to the kv object (set by start_background_workers) | |
| _kv_ref = None | |
| def _shutdown_handler(signum, frame) -> None: # noqa: ARG001 | |
| """Handle SIGTERM/SIGINT gracefully (C5.1). | |
| Steps: | |
| 1. Set the global _shutting_down flag to stop background loops. | |
| 2. Flush the debounce timer and save the index synchronously. | |
| 3. Run a WAL checkpoint via StateKV.teardown(). | |
| 4. Exit cleanly with code 0. | |
| """ | |
| sig_name = signal.Signals(signum).name | |
| print(f"\n[workers] Received {sig_name} — initiating graceful shutdown...") | |
| _shutting_down.set() | |
| # Flush in-flight persistence debounce timer and save immediately | |
| global _persistence_ref | |
| if _persistence_ref is not None: | |
| try: | |
| print("[workers] Flushing index persistence...") | |
| _persistence_ref.flush() | |
| print("[workers] Index persistence flushed.") | |
| except Exception as e: | |
| print(f"[workers] Error flushing persistence: {e}") | |
| # WAL checkpoint — flush WAL to the main DB file | |
| global _kv_ref | |
| if _kv_ref is not None: | |
| try: | |
| print("[workers] Running WAL checkpoint...") | |
| _kv_ref.teardown() | |
| print("[workers] WAL checkpoint complete.") | |
| except Exception as e: | |
| print(f"[workers] Error during WAL checkpoint: {e}") | |
| print("[workers] Shutdown complete.") | |
| sys.exit(0) | |
| def _register_signal_handlers() -> None: | |
| """Register SIGTERM and SIGINT handlers (C5.1). | |
| Skipped if the calling thread is not the main thread, because Python | |
| only allows signal handlers to be registered from the main thread. | |
| """ | |
| if threading.current_thread() is not threading.main_thread(): | |
| return | |
| try: | |
| signal.signal(signal.SIGTERM, _shutdown_handler) | |
| signal.signal(signal.SIGINT, _shutdown_handler) | |
| print("[workers] Signal handlers registered (SIGTERM, SIGINT).") | |
| except (OSError, ValueError) as e: | |
| # Some environments (e.g. Windows without signal support) may raise here | |
| print(f"[workers] Could not register signal handlers: {e}") | |
| def _auto_forget_loop(kv) -> None: | |
| """Periodically sweep and evict stale observations (configurable via AUTO_FORGET_ENABLED).""" | |
| time.sleep(10) | |
| while not _shutting_down.is_set(): | |
| try: | |
| if os.getenv("AUTO_FORGET_ENABLED") != "false": | |
| print("[scheduler] Running auto_forget sweep...") | |
| res = functions.auto_forget(kv, dry_run=False) | |
| print(f"[scheduler] auto_forget sweep completed: {res}") | |
| except Exception as e: | |
| print(f"[scheduler] auto_forget loop error: {e}") | |
| # Sleep in 10-second chunks so we notice _shutting_down quickly | |
| for _ in range(360): # 360 × 10s = 1 hour | |
| if _shutting_down.is_set(): | |
| break | |
| time.sleep(10) | |
| def _rebuild_index(kv) -> None: | |
| """Rebuild the BM25/vector index from scratch in a background thread.""" | |
| try: | |
| count = functions.rebuild_index(kv) | |
| print(f"[persistence] Rebuild completed: indexed {count} items.") | |
| except Exception as ex: | |
| print(f"[persistence] Rebuild failed: {ex}") | |
| def start_background_workers(kv) -> None: | |
| """Start all background daemon threads and register signal handlers. | |
| Called once by create_app() after the DB and indexes are initialised. | |
| Workers are daemon threads — they die automatically when the main process exits. | |
| Args: | |
| kv: Initialised StateKV instance. | |
| """ | |
| global _kv_ref, _persistence_ref | |
| _kv_ref = kv | |
| # Capture the persistence reference for the shutdown handler | |
| _persistence_ref = functions._index_persistence | |
| # Register graceful shutdown signal handlers (C5.1) | |
| _register_signal_handlers() | |
| # Rebuild search index if empty or out of sync (Step 5) | |
| index_empty = functions._bm25_index.size == 0 | |
| index_in_sync = True | |
| if not index_empty: | |
| index_in_sync = functions.verify_index_sync_on_boot(kv) | |
| if index_empty or not index_in_sync: | |
| reason = "empty" if index_empty else "out of sync" | |
| print(f"[persistence] Search index is {reason}. Rebuilding in background thread...") | |
| t_rebuild = threading.Thread( | |
| target=_rebuild_index, | |
| args=(kv,), | |
| daemon=True, | |
| name="index-rebuild", | |
| ) | |
| t_rebuild.start() | |
| # Auto-forget sweep | |
| t_forget = threading.Thread( | |
| target=_auto_forget_loop, | |
| args=(kv,), | |
| daemon=True, | |
| name="auto-forget", | |
| ) | |
| t_forget.start() | |