agentcache / src /workers.py
Yash030's picture
feat: migrate agentmemory to agentcache namespace, endpoints, and tools
12a6c9a
Raw
History Blame Contribute Delete
5.21 kB
"""
Background worker threads for agentcache-python.
Start all workers via start_background_workers(kv).
Each worker runs as a daemon thread so it exits automatically when the process dies.
C5.1: SIGTERM and SIGINT handlers are registered here to flush the index
persistence debounce queue and run a WAL checkpoint before exit.
"""
import os
import signal
import sys
import threading
import time
import functions
# Module-level shutdown flag — set by signal handlers
_shutting_down = threading.Event()
# Reference to the persistence object (set by start_background_workers)
_persistence_ref = None
# Reference to the kv object (set by start_background_workers)
_kv_ref = None
def _shutdown_handler(signum, frame) -> None: # noqa: ARG001
"""Handle SIGTERM/SIGINT gracefully (C5.1).
Steps:
1. Set the global _shutting_down flag to stop background loops.
2. Flush the debounce timer and save the index synchronously.
3. Run a WAL checkpoint via StateKV.teardown().
4. Exit cleanly with code 0.
"""
sig_name = signal.Signals(signum).name
print(f"\n[workers] Received {sig_name} — initiating graceful shutdown...")
_shutting_down.set()
# Flush in-flight persistence debounce timer and save immediately
global _persistence_ref
if _persistence_ref is not None:
try:
print("[workers] Flushing index persistence...")
_persistence_ref.flush()
print("[workers] Index persistence flushed.")
except Exception as e:
print(f"[workers] Error flushing persistence: {e}")
# WAL checkpoint — flush WAL to the main DB file
global _kv_ref
if _kv_ref is not None:
try:
print("[workers] Running WAL checkpoint...")
_kv_ref.teardown()
print("[workers] WAL checkpoint complete.")
except Exception as e:
print(f"[workers] Error during WAL checkpoint: {e}")
print("[workers] Shutdown complete.")
sys.exit(0)
def _register_signal_handlers() -> None:
"""Register SIGTERM and SIGINT handlers (C5.1).
Skipped if the calling thread is not the main thread, because Python
only allows signal handlers to be registered from the main thread.
"""
if threading.current_thread() is not threading.main_thread():
return
try:
signal.signal(signal.SIGTERM, _shutdown_handler)
signal.signal(signal.SIGINT, _shutdown_handler)
print("[workers] Signal handlers registered (SIGTERM, SIGINT).")
except (OSError, ValueError) as e:
# Some environments (e.g. Windows without signal support) may raise here
print(f"[workers] Could not register signal handlers: {e}")
def _auto_forget_loop(kv) -> None:
"""Periodically sweep and evict stale observations (configurable via AUTO_FORGET_ENABLED)."""
time.sleep(10)
while not _shutting_down.is_set():
try:
if os.getenv("AUTO_FORGET_ENABLED") != "false":
print("[scheduler] Running auto_forget sweep...")
res = functions.auto_forget(kv, dry_run=False)
print(f"[scheduler] auto_forget sweep completed: {res}")
except Exception as e:
print(f"[scheduler] auto_forget loop error: {e}")
# Sleep in 10-second chunks so we notice _shutting_down quickly
for _ in range(360): # 360 × 10s = 1 hour
if _shutting_down.is_set():
break
time.sleep(10)
def _rebuild_index(kv) -> None:
"""Rebuild the BM25/vector index from scratch in a background thread."""
try:
count = functions.rebuild_index(kv)
print(f"[persistence] Rebuild completed: indexed {count} items.")
except Exception as ex:
print(f"[persistence] Rebuild failed: {ex}")
def start_background_workers(kv) -> None:
"""Start all background daemon threads and register signal handlers.
Called once by create_app() after the DB and indexes are initialised.
Workers are daemon threads — they die automatically when the main process exits.
Args:
kv: Initialised StateKV instance.
"""
global _kv_ref, _persistence_ref
_kv_ref = kv
# Capture the persistence reference for the shutdown handler
_persistence_ref = functions._index_persistence
# Register graceful shutdown signal handlers (C5.1)
_register_signal_handlers()
# Rebuild search index if empty or out of sync (Step 5)
index_empty = functions._bm25_index.size == 0
index_in_sync = True
if not index_empty:
index_in_sync = functions.verify_index_sync_on_boot(kv)
if index_empty or not index_in_sync:
reason = "empty" if index_empty else "out of sync"
print(f"[persistence] Search index is {reason}. Rebuilding in background thread...")
t_rebuild = threading.Thread(
target=_rebuild_index,
args=(kv,),
daemon=True,
name="index-rebuild",
)
t_rebuild.start()
# Auto-forget sweep
t_forget = threading.Thread(
target=_auto_forget_loop,
args=(kv,),
daemon=True,
name="auto-forget",
)
t_forget.start()