Spaces:
Running
Running
File size: 3,869 Bytes
89c1a11 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | """
Cleanup script for ChromaDB vectors belonging to inactive users.
By default, a user is considered inactive if they have not logged in for
30 days. Users without last_login are skipped to avoid deleting vectors for
legacy accounts before activity tracking existed.
Run manually:
python backend/scripts/vector_cleanup.py
Environment:
VECTOR_CLEANUP_INACTIVE_DAYS=30
VECTOR_CLEANUP_DRY_RUN=true
"""
from __future__ import annotations
import logging
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import inspect, or_, text
# Allow running this file directly from the repository root.
BACKEND_DIR = Path(__file__).resolve().parents[1]
if str(BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(BACKEND_DIR))
from app.database import SessionLocal # noqa: E402
from app.models import User # noqa: E402
from app.rag.vectorstore import delete_user_collection # noqa: E402
logger = logging.getLogger("vector_cleanup")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def _env_bool(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def ensure_last_login_column() -> None:
"""Add users.last_login for SQLite installs that do not run migrations."""
db = SessionLocal()
try:
bind = db.get_bind()
inspector = inspect(bind)
columns = {column["name"] for column in inspector.get_columns("users")}
if "last_login" not in columns:
logger.info("Adding missing users.last_login column")
db.execute(text("ALTER TABLE users ADD COLUMN last_login DATETIME"))
db.commit()
finally:
db.close()
def cleanup_inactive_user_vectors(
inactive_days: int | None = None,
dry_run: bool | None = None,
) -> dict[str, int]:
"""Delete Chroma collections for users inactive past the threshold."""
ensure_last_login_column()
days = inactive_days or int(os.getenv("VECTOR_CLEANUP_INACTIVE_DAYS", "30"))
is_dry_run = _env_bool("VECTOR_CLEANUP_DRY_RUN", False) if dry_run is None else dry_run
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
stats = {
"scanned": 0,
"eligible": 0,
"deleted": 0,
"skipped_no_login": 0,
"failed": 0,
}
db = SessionLocal()
try:
users = db.query(User).filter(
or_(User.last_login.is_(None), User.last_login < cutoff)
).all()
for user in users:
stats["scanned"] += 1
if user.last_login is None:
stats["skipped_no_login"] += 1
logger.info(
"Skipping user %s because last_login is missing",
user.id,
)
continue
stats["eligible"] += 1
logger.info(
"User %s inactive since %s; deleting collection=%s dry_run=%s",
user.id,
user.last_login,
f"user_{user.id.replace('-', '_')}"[:63],
is_dry_run,
)
if is_dry_run:
continue
try:
delete_user_collection(user.id)
stats["deleted"] += 1
except Exception as exc: # defensive script boundary
stats["failed"] += 1
logger.warning(
"Failed deleting vector collection for user %s: %s",
user.id,
exc,
exc_info=True,
)
logger.info("Vector cleanup complete: %s", stats)
return stats
finally:
db.close()
if __name__ == "__main__":
cleanup_inactive_user_vectors()
|