Multi-AutoML-Interface / src /mlflow_cache.py
PedroM2626's picture
feat: add support for multiple AutoML frameworks (TPOT, H2O, AutoGluon, FLAML) including data preprocessing and MLflow integration.
9c720d9
import mlflow
import pandas as pd
import time
from functools import lru_cache
import logging
logger = logging.getLogger(__name__)
class MLflowCache:
"""Cache to optimize MLflow data loading"""
def __init__(self, ttl: int = 300): # 5 minutes TTL
self._cache = {}
self._timestamps = {}
self.ttl = ttl
def _is_expired(self, key: str) -> bool:
"""Checks if cache is expired"""
if key not in self._timestamps:
return True
return time.time() - self._timestamps[key] > self.ttl
def _set_cache(self, key: str, value):
"""Sets value in cache"""
self._cache[key] = value
self._timestamps[key] = time.time()
def get_cached_all_runs(self, experiment_name: str) -> pd.DataFrame:
"""Gets all runs with cache"""
cache_key = f"all_runs_{experiment_name}"
if not self._is_expired(cache_key) and cache_key in self._cache:
logger.info(f"Using cache for experiment {experiment_name}")
return self._cache[cache_key]
try:
# Get experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
return pd.DataFrame()
# Search runs
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
# Cache the result
self._set_cache(cache_key, runs)
logger.info(f"Cache updated for experiment {experiment_name} ({len(runs)} runs)")
return runs
except Exception as e:
logger.error(f"Error fetching runs for experiment {experiment_name}: {e}")
return pd.DataFrame()
def get_cached_experiment(self, experiment_name: str):
"""Gets experiment with cache"""
cache_key = f"experiment_{experiment_name}"
if not self._is_expired(cache_key) and cache_key in self._cache:
logger.info(f"Using cache for experiment {experiment_name}")
return self._cache[cache_key]
try:
experiment = mlflow.get_experiment_by_name(experiment_name)
self._set_cache(cache_key, experiment)
return experiment
except Exception as e:
logger.error(f"Error fetching experiment {experiment_name}: {e}")
return None
def clear_cache(self):
"""Clears all cache"""
self._cache.clear()
self._timestamps.clear()
logger.info("Cache cleared")
def clear_experiment_cache(self, experiment_name: str):
"""Clears cache for a specific experiment"""
keys_to_remove = [key for key in self._cache.keys() if experiment_name in key]
for key in keys_to_remove:
self._cache.pop(key, None)
self._timestamps.pop(key, None)
logger.info(f"Cache cleared for experiment {experiment_name}")
# Global cache instance
mlflow_cache = MLflowCache()
@lru_cache(maxsize=128)
def get_cached_experiment_list():
"""Gets experiment list with cache"""
try:
experiments = mlflow.search_experiments()
return [exp.name for exp in experiments]
except Exception as e:
logger.error(f"Error fetching experiment list: {e}")
return ["AutoGluon_Experiments", "FLAML_Experiments", "H2O_Experiments"]