|
|
""" |
|
|
GPU Manager for SPARKNET |
|
|
Handles GPU allocation, monitoring, and resource management |
|
|
""" |
|
|
|
|
|
import os |
|
|
import torch |
|
|
from typing import Optional, List, Dict |
|
|
from contextlib import contextmanager |
|
|
import pynvml |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
class GPUManager: |
|
|
"""Manages GPU resources for model deployment and monitoring.""" |
|
|
|
|
|
def __init__(self, primary_gpu: int = 0, fallback_gpus: Optional[List[int]] = None): |
|
|
""" |
|
|
Initialize GPU Manager. |
|
|
|
|
|
Args: |
|
|
primary_gpu: Primary GPU device ID (default: 0) |
|
|
fallback_gpus: List of fallback GPU IDs (default: [1, 2, 3]) |
|
|
""" |
|
|
self.primary_gpu = primary_gpu |
|
|
self.fallback_gpus = fallback_gpus or [1, 2, 3] |
|
|
self.initialized = False |
|
|
|
|
|
|
|
|
try: |
|
|
pynvml.nvmlInit() |
|
|
self.initialized = True |
|
|
logger.info("GPU Manager initialized with NVML") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to initialize NVML: {e}") |
|
|
|
|
|
|
|
|
self.available_gpus = self._detect_gpus() |
|
|
logger.info(f"Detected {len(self.available_gpus)} GPUs: {self.available_gpus}") |
|
|
|
|
|
def _detect_gpus(self) -> List[int]: |
|
|
"""Detect available CUDA GPUs.""" |
|
|
if not torch.cuda.is_available(): |
|
|
logger.warning("CUDA not available!") |
|
|
return [] |
|
|
|
|
|
gpu_count = torch.cuda.device_count() |
|
|
return list(range(gpu_count)) |
|
|
|
|
|
def get_gpu_info(self, gpu_id: int) -> Dict[str, any]: |
|
|
""" |
|
|
Get detailed information about a GPU. |
|
|
|
|
|
Args: |
|
|
gpu_id: GPU device ID |
|
|
|
|
|
Returns: |
|
|
Dictionary with GPU information |
|
|
""" |
|
|
if not self.initialized: |
|
|
return {"error": "NVML not initialized"} |
|
|
|
|
|
try: |
|
|
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_id) |
|
|
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
|
|
utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) |
|
|
temperature = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) |
|
|
name = pynvml.nvmlDeviceGetName(handle) |
|
|
|
|
|
return { |
|
|
"gpu_id": gpu_id, |
|
|
"name": name, |
|
|
"memory_total": mem_info.total, |
|
|
"memory_used": mem_info.used, |
|
|
"memory_free": mem_info.free, |
|
|
"memory_percent": (mem_info.used / mem_info.total) * 100, |
|
|
"gpu_utilization": utilization.gpu, |
|
|
"memory_utilization": utilization.memory, |
|
|
"temperature": temperature, |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting GPU {gpu_id} info: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def get_all_gpu_info(self) -> List[Dict[str, any]]: |
|
|
"""Get information for all available GPUs.""" |
|
|
return [self.get_gpu_info(gpu_id) for gpu_id in self.available_gpus] |
|
|
|
|
|
def get_free_memory(self, gpu_id: int) -> int: |
|
|
""" |
|
|
Get free memory on a GPU in bytes. |
|
|
|
|
|
Args: |
|
|
gpu_id: GPU device ID |
|
|
|
|
|
Returns: |
|
|
Free memory in bytes |
|
|
""" |
|
|
info = self.get_gpu_info(gpu_id) |
|
|
return info.get("memory_free", 0) |
|
|
|
|
|
def select_best_gpu(self, min_memory_gb: float = 8.0) -> Optional[int]: |
|
|
""" |
|
|
Select the best available GPU based on free memory. |
|
|
|
|
|
Args: |
|
|
min_memory_gb: Minimum required free memory in GB |
|
|
|
|
|
Returns: |
|
|
GPU ID or None if no suitable GPU found |
|
|
""" |
|
|
min_memory_bytes = min_memory_gb * 1024 ** 3 |
|
|
|
|
|
|
|
|
if self.primary_gpu in self.available_gpus: |
|
|
free_mem = self.get_free_memory(self.primary_gpu) |
|
|
if free_mem >= min_memory_bytes: |
|
|
logger.info(f"Selected primary GPU {self.primary_gpu} ({free_mem / 1024**3:.2f} GB free)") |
|
|
return self.primary_gpu |
|
|
|
|
|
|
|
|
for gpu_id in self.fallback_gpus: |
|
|
if gpu_id in self.available_gpus: |
|
|
free_mem = self.get_free_memory(gpu_id) |
|
|
if free_mem >= min_memory_bytes: |
|
|
logger.info(f"Selected fallback GPU {gpu_id} ({free_mem / 1024**3:.2f} GB free)") |
|
|
return gpu_id |
|
|
|
|
|
logger.warning(f"No GPU found with {min_memory_gb} GB free memory") |
|
|
return None |
|
|
|
|
|
def set_device(self, gpu_id: int): |
|
|
""" |
|
|
Set the CUDA device. |
|
|
|
|
|
Args: |
|
|
gpu_id: GPU device ID |
|
|
""" |
|
|
if gpu_id not in self.available_gpus: |
|
|
raise ValueError(f"GPU {gpu_id} not available") |
|
|
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
|
|
torch.cuda.set_device(gpu_id) |
|
|
logger.info(f"Set CUDA device to GPU {gpu_id}") |
|
|
|
|
|
@contextmanager |
|
|
def gpu_context(self, gpu_id: Optional[int] = None, min_memory_gb: float = 8.0): |
|
|
""" |
|
|
Context manager for GPU allocation. |
|
|
|
|
|
Args: |
|
|
gpu_id: Specific GPU ID or None for auto-selection |
|
|
min_memory_gb: Minimum required memory in GB |
|
|
|
|
|
Yields: |
|
|
GPU device ID |
|
|
""" |
|
|
|
|
|
if gpu_id is None: |
|
|
gpu_id = self.select_best_gpu(min_memory_gb) |
|
|
if gpu_id is None: |
|
|
raise RuntimeError("No suitable GPU available") |
|
|
|
|
|
|
|
|
original_device = os.environ.get("CUDA_VISIBLE_DEVICES", "") |
|
|
|
|
|
try: |
|
|
self.set_device(gpu_id) |
|
|
yield gpu_id |
|
|
finally: |
|
|
|
|
|
if original_device: |
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = original_device |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
logger.debug("Cleared CUDA cache") |
|
|
|
|
|
def clear_cache(self, gpu_id: Optional[int] = None): |
|
|
""" |
|
|
Clear CUDA cache for a specific GPU or all GPUs. |
|
|
|
|
|
Args: |
|
|
gpu_id: GPU device ID or None for all GPUs |
|
|
""" |
|
|
if gpu_id is not None: |
|
|
with torch.cuda.device(gpu_id): |
|
|
torch.cuda.empty_cache() |
|
|
logger.info(f"Cleared cache for GPU {gpu_id}") |
|
|
else: |
|
|
torch.cuda.empty_cache() |
|
|
logger.info("Cleared cache for all GPUs") |
|
|
|
|
|
def monitor(self) -> str: |
|
|
""" |
|
|
Get a formatted monitoring string for all GPUs. |
|
|
|
|
|
Returns: |
|
|
Formatted string with GPU status |
|
|
""" |
|
|
info_list = self.get_all_gpu_info() |
|
|
|
|
|
lines = ["GPU Status:"] |
|
|
for info in info_list: |
|
|
if "error" in info: |
|
|
lines.append(f" GPU {info.get('gpu_id', '?')}: Error - {info['error']}") |
|
|
else: |
|
|
lines.append( |
|
|
f" GPU {info['gpu_id']}: {info['name']} | " |
|
|
f"Memory: {info['memory_used'] / 1024**3:.2f}/{info['memory_total'] / 1024**3:.2f} GB " |
|
|
f"({info['memory_percent']:.1f}%) | " |
|
|
f"Utilization: {info['gpu_utilization']}% | " |
|
|
f"Temp: {info['temperature']}°C" |
|
|
) |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
def __del__(self): |
|
|
"""Cleanup NVML on deletion.""" |
|
|
if self.initialized: |
|
|
try: |
|
|
pynvml.nvmlShutdown() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
_gpu_manager = None |
|
|
|
|
|
|
|
|
def get_gpu_manager() -> GPUManager: |
|
|
"""Get or create the global GPU manager instance.""" |
|
|
global _gpu_manager |
|
|
if _gpu_manager is None: |
|
|
_gpu_manager = GPUManager() |
|
|
return _gpu_manager |
|
|
|