| | """Console logger utilities. |
| | |
| | Copied from https://github.com/HazyResearch/transformers/blob/master/src/utils/utils.py |
| | Copied from https://docs.python.org/3/howto/logging-cookbook.html#using-a-context-manager-for-selective-logging |
| | """ |
| |
|
| | import logging |
| | import fsspec |
| | import lightning |
| | import torch |
| | from timm.scheduler import CosineLRScheduler |
| | import argparse |
| | import numpy as np |
| | import random |
| | import os |
| |
|
| | def sample_categorical_logits(logits, dtype=torch.float64): |
| | |
| | gumbel_noise = -(1e-10 - (torch.rand_like(logits, dtype=dtype) + 1e-10).log()).log() |
| | return (logits + gumbel_noise).argmax(dim=-1) |
| |
|
| | def fsspec_exists(filename): |
| | """Check if a file exists using fsspec.""" |
| | fs, _ = fsspec.core.url_to_fs(filename) |
| | return fs.exists(filename) |
| |
|
| |
|
| | def fsspec_listdir(dirname): |
| | """Listdir in manner compatible with fsspec.""" |
| | fs, _ = fsspec.core.url_to_fs(dirname) |
| | return fs.ls(dirname) |
| |
|
| |
|
| | def fsspec_mkdirs(dirname, exist_ok=True): |
| | """Mkdirs in manner compatible with fsspec.""" |
| | fs, _ = fsspec.core.url_to_fs(dirname) |
| | fs.makedirs(dirname, exist_ok=exist_ok) |
| |
|
| |
|
| | def print_nans(tensor, name): |
| | if torch.isnan(tensor).any(): |
| | print(name, tensor) |
| |
|
| |
|
| | class CosineDecayWarmupLRScheduler( |
| | CosineLRScheduler, |
| | torch.optim.lr_scheduler._LRScheduler): |
| | |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| | self._last_epoch = -1 |
| | self.step(epoch=0) |
| |
|
| | def step(self, epoch=None): |
| | if epoch is None: |
| | self._last_epoch += 1 |
| | else: |
| | self._last_epoch = epoch |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if self.t_in_epochs: |
| | super().step(epoch=self._last_epoch) |
| | else: |
| | super().step_update(num_updates=self._last_epoch) |
| |
|
| |
|
| | class LoggingContext: |
| | """Context manager for selective logging.""" |
| | def __init__(self, logger, level=None, handler=None, close=True): |
| | self.logger = logger |
| | self.level = level |
| | self.handler = handler |
| | self.close = close |
| |
|
| | def __enter__(self): |
| | if self.level is not None: |
| | self.old_level = self.logger.level |
| | self.logger.setLevel(self.level) |
| | if self.handler: |
| | self.logger.addHandler(self.handler) |
| |
|
| | def __exit__(self, et, ev, tb): |
| | if self.level is not None: |
| | self.logger.setLevel(self.old_level) |
| | if self.handler: |
| | self.logger.removeHandler(self.handler) |
| | if self.handler and self.close: |
| | self.handler.close() |
| |
|
| |
|
| | def get_logger(name=__name__, level=logging.INFO) -> logging.Logger: |
| | """Initializes multi-GPU-friendly python logger.""" |
| |
|
| | logger = logging.getLogger(name) |
| | logger.setLevel(level) |
| |
|
| | |
| | |
| | for level in ('debug', 'info', 'warning', 'error', |
| | 'exception', 'fatal', 'critical'): |
| | setattr(logger, |
| | level, |
| | lightning.pytorch.utilities.rank_zero_only( |
| | getattr(logger, level))) |
| |
|
| | return logger |
| |
|
| | |
| | def str2bool(v): |
| | if isinstance(v, bool): |
| | return v |
| | if v.lower() in ('yes', 'true', 't', 'y', '1'): |
| | return True |
| | elif v.lower() in ('no', 'false', 'f', 'n', '0'): |
| | return False |
| | else: |
| | raise argparse.ArgumentTypeError('Boolean value expected.') |
| |
|
| |
|
| | def set_seed(seed, use_cuda): |
| | os.environ['PYTHONHASHSEED'] = str(seed) |
| | np.random.seed(seed) |
| | random.seed(seed) |
| | torch.manual_seed(seed) |
| | |
| | if use_cuda: |
| | torch.cuda.manual_seed(seed) |
| | torch.cuda.manual_seed_all(seed) |
| | print(f'=> Seed of the run set to {seed}') |
| |
|
| |
|