File size: 3,442 Bytes
1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 9c720d9 1330e26 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | import mlflow
import pandas as pd
import time
from functools import lru_cache
import logging
logger = logging.getLogger(__name__)
class MLflowCache:
"""Cache to optimize MLflow data loading"""
def __init__(self, ttl: int = 300): # 5 minutes TTL
self._cache = {}
self._timestamps = {}
self.ttl = ttl
def _is_expired(self, key: str) -> bool:
"""Checks if cache is expired"""
if key not in self._timestamps:
return True
return time.time() - self._timestamps[key] > self.ttl
def _set_cache(self, key: str, value):
"""Sets value in cache"""
self._cache[key] = value
self._timestamps[key] = time.time()
def get_cached_all_runs(self, experiment_name: str) -> pd.DataFrame:
"""Gets all runs with cache"""
cache_key = f"all_runs_{experiment_name}"
if not self._is_expired(cache_key) and cache_key in self._cache:
logger.info(f"Using cache for experiment {experiment_name}")
return self._cache[cache_key]
try:
# Get experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
return pd.DataFrame()
# Search runs
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
# Cache the result
self._set_cache(cache_key, runs)
logger.info(f"Cache updated for experiment {experiment_name} ({len(runs)} runs)")
return runs
except Exception as e:
logger.error(f"Error fetching runs for experiment {experiment_name}: {e}")
return pd.DataFrame()
def get_cached_experiment(self, experiment_name: str):
"""Gets experiment with cache"""
cache_key = f"experiment_{experiment_name}"
if not self._is_expired(cache_key) and cache_key in self._cache:
logger.info(f"Using cache for experiment {experiment_name}")
return self._cache[cache_key]
try:
experiment = mlflow.get_experiment_by_name(experiment_name)
self._set_cache(cache_key, experiment)
return experiment
except Exception as e:
logger.error(f"Error fetching experiment {experiment_name}: {e}")
return None
def clear_cache(self):
"""Clears all cache"""
self._cache.clear()
self._timestamps.clear()
logger.info("Cache cleared")
def clear_experiment_cache(self, experiment_name: str):
"""Clears cache for a specific experiment"""
keys_to_remove = [key for key in self._cache.keys() if experiment_name in key]
for key in keys_to_remove:
self._cache.pop(key, None)
self._timestamps.pop(key, None)
logger.info(f"Cache cleared for experiment {experiment_name}")
# Global cache instance
mlflow_cache = MLflowCache()
@lru_cache(maxsize=128)
def get_cached_experiment_list():
"""Gets experiment list with cache"""
try:
experiments = mlflow.search_experiments()
return [exp.name for exp in experiments]
except Exception as e:
logger.error(f"Error fetching experiment list: {e}")
return ["AutoGluon_Experiments", "FLAML_Experiments", "H2O_Experiments"]
|