| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| All utilities not related to data handling. |
| """ |
|
|
| import enum |
| import json |
| import os |
| import platform |
| import subprocess |
| import tempfile |
| import warnings |
| from collections.abc import Callable |
| from dataclasses import asdict, dataclass |
| from decimal import Decimal, DivisionByZero, InvalidOperation |
| from typing import Any, Literal, Optional |
|
|
| import bitsandbytes |
| import datasets |
| import huggingface_hub |
| import numpy as np |
| import torch |
| import transformers |
| from torch import nn |
| from transformers import ( |
| AutoModelForCausalLM, |
| AutoTokenizer, |
| BitsAndBytesConfig, |
| PreTrainedModel, |
| get_cosine_schedule_with_warmup, |
| ) |
|
|
| import peft |
| from peft import PeftConfig, get_peft_model |
| from peft.optimizers import create_lorafa_optimizer, create_loraplus_optimizer |
| from peft.utils import SAFETENSORS_WEIGHTS_NAME, infer_device |
|
|
|
|
| device = infer_device() |
|
|
| if device not in ["cuda", "xpu"]: |
| raise RuntimeError("CUDA or XPU is not available, currently only CUDA or XPU is supported") |
|
|
| ACCELERATOR_MEMORY_INIT_THRESHOLD = 500 * 2**20 |
| FILE_NAME_DEFAULT_TRAIN_PARAMS = os.path.join(os.path.dirname(__file__), "default_training_params.json") |
| FILE_NAME_TRAIN_PARAMS = "training_params.json" |
| |
| RESULT_PATH = os.path.join(os.path.dirname(__file__), "results") |
| |
| RESULT_PATH_TEST = os.path.join(os.path.dirname(__file__), "temporary_results") |
| |
| RESULT_PATH_CANCELLED = os.path.join(os.path.dirname(__file__), "cancelled_results") |
| hf_api = huggingface_hub.HfApi() |
| WARMUP_STEP_RATIO = 0.1 |
|
|
|
|
| @dataclass |
| class TrainConfig: |
| """All configuration parameters associated with training the model |
| |
| Args: |
| model_id: The model identifier |
| dtype: The data type to use for the model |
| max_seq_length: The maximum sequence length |
| batch_size: The batch size for training |
| batch_size_eval: The batch size for eval/test, can be much higher than for training |
| max_steps: The maximum number of steps to train for |
| eval_steps: The number of steps between evaluations |
| compile: Whether to compile the model |
| use_gc: Whether to use gradient checkpointing. |
| query_template: The template for the query |
| seed: The random seed |
| grad_norm_clip: The gradient norm clipping value (set to 0 to skip) |
| optimizer_type: The name of a torch optimizer (e.g. AdamW) or a PEFT method ("lora+", "lora-fa") |
| optimizer_kwargs: The optimizer keyword arguments (lr etc.) |
| lr_scheduler: The learning rate scheduler (currently only None or 'cosine' are supported) |
| use_amp: Whether to use automatic mixed precision |
| autocast_adapter_dtype: Whether to cast adapter dtype to float32, same argument as in PEFT |
| generation_kwargs: Arguments passed to transformers GenerationConfig (used in evaluation) |
| attn_implementation: The attention implementation to use (if any), see transformers docs |
| """ |
|
|
| model_id: str |
| dtype: Literal["float32", "float16", "bfloat16", "int8", "int4"] |
| max_seq_length: int |
| batch_size: int |
| batch_size_eval: int |
| max_steps: int |
| eval_steps: int |
| compile: bool |
| use_gc: bool |
| query_template: str |
| seed: int |
| grad_norm_clip: float |
| optimizer_type: str |
| optimizer_kwargs: dict[str, Any] |
| lr_scheduler: Optional[Literal["cosine"]] |
| use_amp: bool |
| autocast_adapter_dtype: bool |
| generation_kwargs: dict[str, Any] |
| attn_implementation: Optional[str] |
| init_kv_cache_prefix: Optional[str] |
|
|
| def __post_init__(self) -> None: |
| if not isinstance(self.model_id, str): |
| raise TypeError(f"Invalid model_id: {self.model_id}") |
| if self.dtype not in ["float32", "float16", "bfloat16", "int8", "int4"]: |
| raise ValueError(f"Invalid dtype: {self.dtype}") |
| if self.max_seq_length < 0: |
| raise ValueError(f"Invalid max_seq_length: {self.max_seq_length}") |
| if self.batch_size <= 0: |
| raise ValueError(f"Invalid batch_size: {self.batch_size}") |
| if self.batch_size_eval <= 0: |
| raise ValueError(f"Invalid eval batch_size: {self.batch_size_eval}") |
| if self.max_steps <= 0: |
| raise ValueError(f"Invalid max_steps: {self.max_steps}") |
| if self.eval_steps <= 0: |
| raise ValueError(f"Invalid eval_steps: {self.eval_steps}") |
| if self.eval_steps > self.max_steps: |
| raise ValueError(f"Invalid eval_steps: {self.eval_steps} > max_steps: {self.max_steps}") |
| if self.grad_norm_clip < 0: |
| raise ValueError(f"Invalid grad_norm_clip: {self.grad_norm_clip}") |
| if self.optimizer_type not in ["lora+", "lora-fa"] and not hasattr(torch.optim, self.optimizer_type): |
| raise ValueError(f"Invalid optimizer_type: {self.optimizer_type}") |
| if self.lr_scheduler not in [None, "cosine"]: |
| raise ValueError(f"Invalid lr_scheduler: {self.lr_scheduler}, must be None or 'cosine'") |
| if "{query}" not in self.query_template: |
| raise ValueError("Invalid query_template, must contain '{query}'") |
|
|
|
|
| def validate_experiment_path(path: str) -> str: |
| |
| |
| |
| |
| |
| if not os.path.exists(FILE_NAME_DEFAULT_TRAIN_PARAMS): |
| raise FileNotFoundError( |
| f"Missing default training params file '{FILE_NAME_DEFAULT_TRAIN_PARAMS}' in the ./experiments directory" |
| ) |
| if not os.path.exists(path): |
| raise FileNotFoundError(f"Path {path} does not exist") |
|
|
| |
| path_parts = path.rstrip(os.path.sep).split(os.path.sep) |
| if (len(path_parts) != 3) or (path_parts[-3] != "experiments"): |
| raise ValueError( |
| f"Path {path} does not have the correct structure, should be ./experiments/<peft-method>/<experiment-name>" |
| ) |
|
|
| experiment_name = os.path.join(*path_parts[-2:]) |
| return experiment_name |
|
|
|
|
| def get_train_config(path: str) -> TrainConfig: |
| |
| with open(FILE_NAME_DEFAULT_TRAIN_PARAMS) as f: |
| default_config_kwargs = json.load(f) |
|
|
| config_kwargs = {} |
| if os.path.exists(path): |
| with open(path) as f: |
| config_kwargs = json.load(f) |
|
|
| config_kwargs = {**default_config_kwargs, **config_kwargs} |
| return TrainConfig(**config_kwargs) |
|
|
|
|
| def init_accelerator() -> int: |
| torch_accelerator_module = getattr(torch, device, torch.cuda) |
| torch.manual_seed(0) |
| torch_accelerator_module.reset_peak_memory_stats() |
| torch_accelerator_module.manual_seed_all(0) |
| |
| nn.Linear(1, 1).to(device) |
|
|
| accelerator_memory_init = torch_accelerator_module.max_memory_reserved() |
| if accelerator_memory_init > ACCELERATOR_MEMORY_INIT_THRESHOLD: |
| raise RuntimeError( |
| f"{device} memory usage at start is too high: {accelerator_memory_init // 2**20}MB, please ensure that no other " |
| f"processes are running on {device}." |
| ) |
|
|
| torch_accelerator_module.reset_peak_memory_stats() |
| accelerator_memory_init = torch_accelerator_module.max_memory_reserved() |
| return accelerator_memory_init |
|
|
|
|
| def get_tokenizer(*, model_id: str, max_seq_length: int): |
| tokenizer = AutoTokenizer.from_pretrained(model_id) |
| tokenizer.model_max_length = max_seq_length |
| if not tokenizer.pad_token: |
| tokenizer.pad_token = tokenizer.eos_token |
| return tokenizer |
|
|
|
|
| def get_base_model( |
| *, |
| model_id: str, |
| dtype: Literal["float32", "float16", "bfloat16", "int8", "int4"], |
| attn_implementation: Optional[str], |
| use_gc: bool, |
| ) -> PreTrainedModel: |
| kwargs: dict[str, Any] = { |
| "pretrained_model_name_or_path": model_id, |
| "device_map": device, |
| "attn_implementation": attn_implementation, |
| } |
| if dtype == "int4": |
| quant_config = BitsAndBytesConfig(load_in_4bit=True) |
| kwargs["quantization_config"] = quant_config |
| elif dtype == "int8": |
| quant_config = BitsAndBytesConfig(load_in_8bit=True) |
| kwargs["quantization_config"] = quant_config |
| elif dtype == "bfloat16": |
| kwargs["dtype"] = torch.bfloat16 |
| elif dtype == "float16": |
| kwargs["dtype"] = torch.float16 |
| elif dtype != "float32": |
| raise ValueError(f"Invalid dtype: {dtype}") |
|
|
| model = AutoModelForCausalLM.from_pretrained(**kwargs) |
| if use_gc: |
| model.enable_input_require_grads() |
| model.gradient_checkpointing_enable() |
|
|
| return model |
|
|
|
|
| def get_model( |
| *, |
| model_id: str, |
| dtype: Literal["float32", "float16", "bfloat16", "int8", "int4"], |
| compile: bool, |
| use_gc: bool, |
| attn_implementation: Optional[str], |
| peft_config: Optional[PeftConfig], |
| autocast_adapter_dtype: bool, |
| ) -> nn.Module: |
| base_model = get_base_model(model_id=model_id, dtype=dtype, attn_implementation=attn_implementation, use_gc=use_gc) |
| if peft_config is None: |
| model = base_model |
| else: |
| model = get_peft_model(base_model, peft_config, autocast_adapter_dtype=autocast_adapter_dtype) |
|
|
| if compile: |
| model = torch.compile(model, dynamic=True) |
|
|
| return model |
|
|
|
|
| class DummyScheduler: |
| |
| def __init__(self, lr): |
| self.lr = lr |
|
|
| def get_last_lr(self): |
| return [self.lr] |
|
|
| def step(self): |
| pass |
|
|
|
|
| def get_optimizer_and_scheduler( |
| model, *, optimizer_type: str, max_steps: int, lr_scheduler_arg: Optional[Literal["cosine"]], **optimizer_kwargs |
| ) -> tuple[torch.optim.Optimizer, Any]: |
| if optimizer_type == "lora+": |
| optimizer = create_loraplus_optimizer(model, optimizer_cls=torch.optim.AdamW, **optimizer_kwargs) |
| elif optimizer_type == "lora-fa": |
| optimizer = create_lorafa_optimizer(model, **optimizer_kwargs) |
| else: |
| cls = getattr(torch.optim, optimizer_type) |
| optimizer = cls(model.parameters(), **optimizer_kwargs) |
|
|
| if lr_scheduler_arg == "cosine": |
| warmup_steps = int(WARMUP_STEP_RATIO * max_steps) |
| lr_scheduler = get_cosine_schedule_with_warmup( |
| optimizer, num_warmup_steps=warmup_steps, num_training_steps=max_steps |
| ) |
| elif lr_scheduler_arg is None: |
| lr_scheduler = DummyScheduler(optimizer_kwargs["lr"]) |
| else: |
| raise ValueError(f"Invalid lr_scheduler argument: {lr_scheduler_arg}") |
|
|
| return optimizer, lr_scheduler |
|
|
|
|
| class BucketIterator: |
| """ |
| Iterator that yields batches of data from a torch Dataset, grouped in buckets by sequence length |
| |
| The iterator will yield batches of size `batch_size`, where the samples in each batch are sorted by sequence length. |
| This is done to minimize the amount of padding required for each batch. To avoid sorting the entire dataset and thus |
| introducing a bias, the dataset is first split into buckets of size `batch_size * bucket_factor`. |
| |
| Args: |
| ds: The torch Dataset to iterate over |
| batch_size: The batch size |
| bucket_factor: The factor by which to multiply the batch size to determine the bucket size |
| delete_cols: The columns to delete from the dataset before yielding a batch |
| """ |
|
|
| def __init__(self, ds, *, batch_size: int, bucket_factor: int, delete_cols: list[str]) -> None: |
| self.ds = ds |
| self.batch_size = batch_size |
| self.bucket_factor = bucket_factor |
| self.delete_cols = set(delete_cols) |
|
|
| assert self.bucket_factor > 0, "bucket_factor must be greater than 0" |
|
|
| def _batch_iterator(self, bucket): |
| tokens_per_sample_bucket = torch.tensor([len(i) for i in bucket["input_ids"]]) |
| |
| sorted = torch.argsort(tokens_per_sample_bucket, descending=True) |
| cls = type(bucket) |
| bucket = {k: [v[i] for i in sorted] for k, v in bucket.items() if k not in self.delete_cols} |
| num_samples = len(bucket["input_ids"]) |
| for j in range(0, num_samples, self.batch_size): |
| batch = {k: v[j : j + self.batch_size] for k, v in bucket.items()} |
| yield cls(batch) |
|
|
| def __iter__(self): |
| bucket_size = self.batch_size * self.bucket_factor |
| for i in range(0, len(self.ds), bucket_size): |
| bucket = self.ds[i : i + bucket_size] |
| yield from self._batch_iterator(bucket) |
|
|
| |
| if len(self.ds) % bucket_size != 0: |
| bucket = self.ds[-(len(self.ds) % bucket_size) :] |
| yield from self._batch_iterator(bucket) |
|
|
|
|
| def upload_checkpoint_to_bucket(model: nn.Module, experiment_name: str, bucket_name: str): |
| try: |
| with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, delete=True) as tmp_dir: |
| model.save_pretrained(tmp_dir) |
| huggingface_hub.batch_bucket_files( |
| bucket_name, |
| add=[(os.path.join(tmp_dir, fname), f"{experiment_name}/{fname}") for fname in os.listdir(tmp_dir)], |
| ) |
| except Exception as exc: |
| print(f"Failed to upload model checkpoint to hub: {exc}") |
|
|
|
|
| def get_file_size( |
| model: nn.Module, *, peft_config: Optional[PeftConfig], clean: bool, print_fn: Callable[..., None] |
| ) -> int: |
| file_size = 99999999 |
| if peft_config is not None: |
| try: |
| with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, delete=clean) as tmp_dir: |
| model.save_pretrained(tmp_dir) |
| stat = os.stat(os.path.join(tmp_dir, SAFETENSORS_WEIGHTS_NAME)) |
| file_size = stat.st_size |
| if not clean: |
| print_fn(f"Saved PEFT checkpoint to {tmp_dir}") |
| except Exception as exc: |
| print(f"Failed to save PEFT checkpoint due to the following error: {exc}") |
| else: |
| print_fn("Not saving the fully fine-tuned model because it's too big, estimating the size instead") |
| try: |
| num_params = model.num_parameters() |
| dtype_size = next(model.parameters()).element_size() |
| file_size = num_params * dtype_size |
| except Exception as exc: |
| print(f"Failed to determine file size for fully finetuned model because of: {exc}") |
| return file_size |
|
|
|
|
| |
| |
| |
|
|
|
|
| def parse_answer(text: str) -> Optional[str]: |
| """ |
| A label/prediction can look like this: |
| |
| Question: If the magnitude of vector v is equal to 4, what is the dot product of vector v with itself?. Think step |
| by step |
| Answer: The dot product of a vector with itself is equal to the square of its magnitude. So, the dot product of |
| vector v with itself is equal to $4^2 = \boxed{16}$.The answer is: 16 |
| |
| We want to extract '16' from this string. |
| |
| """ |
| |
| candidate_delimiters = [ |
| |
| "The answer is: ", |
| "The answer is ", |
| "The final answer is: ", |
| "The final answer is ", |
| |
| "#### ", |
| ] |
| text = text.strip() |
| text = text.rstrip(".!?") |
| for delimiter in candidate_delimiters: |
| if delimiter in text: |
| break |
| else: |
| return None |
|
|
| text = text.rpartition(delimiter)[-1].strip() |
| |
| text = text.split("\n", 1)[0] |
| |
| text = text.strip(" .!?$%") |
| return text |
|
|
|
|
| def convert_to_decimal(s: Optional[str]) -> Optional[Decimal]: |
| """ |
| Converts a string representing a number to a Decimal. |
| |
| The string may be: |
| - A simple number (e.g., "13", "65.33") |
| - A fraction (e.g., "20/14") |
| """ |
| if s is None: |
| return None |
|
|
| try: |
| s = s.strip() |
| |
| if "/" in s: |
| parts = s.split("/") |
| if len(parts) != 2: |
| return None |
| numerator = Decimal(parts[0].strip()) |
| denominator = Decimal(parts[1].strip()) |
| if denominator == 0: |
| return None |
| value = numerator / denominator |
| else: |
| |
| value = Decimal(s) |
| return value |
| except (DivisionByZero, InvalidOperation, ValueError): |
| return None |
|
|
|
|
| def get_accuracy(*, predictions: list[str], responses: list[str]) -> float: |
| if len(predictions) != len(responses): |
| raise ValueError(f"Prediction length mismatch: {len(predictions)} != {len(responses)}") |
|
|
| y_true: list[str | float | None] = [] |
| y_pred: list[str | float | None] = [] |
|
|
| for prediction, response in zip(predictions, responses): |
| parsed_prediction = parse_answer(prediction) |
| parsed_response = parse_answer(response) |
| if parsed_response is None: |
| raise ValueError(f"Error encountered while trying to parse response: {response}") |
|
|
| decimal_prediction = convert_to_decimal(parsed_prediction) |
| decimal_answer = convert_to_decimal(parsed_response) |
| if decimal_prediction is not None: |
| y_pred.append(float(decimal_prediction)) |
| elif parsed_prediction is not None: |
| y_pred.append(parsed_prediction) |
| else: |
| y_pred.append(None) |
|
|
| |
| |
| if decimal_answer is not None: |
| y_true.append(float(decimal_answer)) |
| elif parsed_prediction is not None: |
| y_true.append(parsed_response) |
| else: |
| y_true.append(None) |
|
|
| correct: list[bool] = [] |
| for true, pred in zip(y_true, y_pred): |
| if (true is not None) and (pred is not None): |
| correct.append(true == pred) |
| else: |
| correct.append(False) |
|
|
| accuracy = sum(correct) / len(correct) |
| return accuracy |
|
|
|
|
| |
| |
| |
|
|
|
|
| def get_base_model_info(model_id: str) -> Optional[huggingface_hub.ModelInfo]: |
| try: |
| return hf_api.model_info(model_id) |
| except Exception as exc: |
| warnings.warn(f"Could not retrieve model info, failed with error {exc}") |
| return None |
|
|
|
|
| def get_dataset_info(dataset_id: str) -> Optional[huggingface_hub.DatasetInfo]: |
| try: |
| return hf_api.dataset_info(dataset_id) |
| except Exception as exc: |
| warnings.warn(f"Could not retrieve dataset info, failed with error {exc}") |
| return None |
|
|
|
|
| def get_git_hash(module) -> Optional[str]: |
| if "site-packages" in module.__path__[0]: |
| return None |
|
|
| return subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=os.path.dirname(module.__file__)).decode().strip() |
|
|
|
|
| def get_package_info() -> dict[str, Optional[str]]: |
| """Get the package versions and commit hashes of transformers, peft, datasets, bnb, and torch""" |
| package_info = { |
| "transformers-version": transformers.__version__, |
| "transformers-commit-hash": get_git_hash(transformers), |
| "peft-version": peft.__version__, |
| "peft-commit-hash": get_git_hash(peft), |
| "datasets-version": datasets.__version__, |
| "datasets-commit-hash": get_git_hash(datasets), |
| "bitsandbytes-version": bitsandbytes.__version__, |
| "bitsandbytes-commit-hash": get_git_hash(bitsandbytes), |
| "torch-version": torch.__version__, |
| "torch-commit-hash": get_git_hash(torch), |
| } |
| return package_info |
|
|
|
|
| def get_system_info() -> dict[str, str]: |
| device = infer_device() |
| torch_accelerator_module = getattr(torch, device, torch.cuda) |
| system_info = { |
| "system": platform.system(), |
| "release": platform.release(), |
| "version": platform.version(), |
| "machine": platform.machine(), |
| "processor": platform.processor(), |
| "accelerator": torch_accelerator_module.get_device_name(0), |
| } |
| return system_info |
|
|
|
|
| @dataclass |
| class MetaInfo: |
| package_info: dict[str, Optional[str]] |
| system_info: dict[str, str] |
| pytorch_info: str |
|
|
|
|
| def get_meta_info() -> MetaInfo: |
| meta_info = MetaInfo( |
| package_info=get_package_info(), |
| system_info=get_system_info(), |
| pytorch_info=torch.__config__.show(), |
| ) |
| return meta_info |
|
|
|
|
| def get_peft_branch() -> str: |
| return ( |
| subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=os.path.dirname(peft.__file__)) |
| .decode() |
| .strip() |
| ) |
|
|
|
|
| class TrainStatus(enum.Enum): |
| FAILED = "failed" |
| SUCCESS = "success" |
| CANCELED = "canceled" |
|
|
|
|
| @dataclass |
| class TrainResult: |
| status: TrainStatus |
| train_time: float |
| accelerator_memory_reserved_log: list[int] |
| losses: list[float] |
| metrics: list[Any] |
| error_msg: str |
| num_trainable_params: int |
| num_total_params: int |
|
|
|
|
| def log_to_console(log_data: dict[str, Any], print_fn: Callable[..., None]) -> None: |
| accelerator_memory_max = log_data["train_info"]["accelerator_memory_max"] |
| accelerator_memory_avg = log_data["train_info"]["accelerator_memory_reserved_avg"] |
| accelerator_memory_reserved_99th = log_data["train_info"]["accelerator_memory_reserved_99th"] |
| time_train = log_data["train_info"]["train_time"] |
| time_total = log_data["run_info"]["total_time"] |
| file_size = log_data["train_info"]["file_size"] |
|
|
| print_fn(f"accelerator memory max: {accelerator_memory_max // 2**20}MB") |
| print_fn(f"accelerator memory reserved avg: {accelerator_memory_avg // 2**20}MB") |
| print_fn(f"accelerator memory reserved 99th percentile: {accelerator_memory_reserved_99th // 2**20}MB") |
| print_fn(f"train time: {time_train}s") |
| print_fn(f"total time: {time_total:.2f}s") |
| print_fn(f"file size of checkpoint: {file_size / 2**20:.1f}MB") |
|
|
|
|
| def log_to_file( |
| *, log_data: dict, save_dir: str, experiment_name: str, timestamp: str, print_fn: Callable[..., None] |
| ) -> None: |
| if save_dir.endswith(RESULT_PATH): |
| file_name = f"{experiment_name.replace(os.path.sep, '--')}.json" |
| else: |
| |
| |
| file_name = f"{experiment_name.replace(os.path.sep, '--')}--{timestamp.replace(':', '-')}.json" |
| file_name = os.path.join(save_dir, file_name) |
| with open(file_name, "w") as f: |
| json.dump(log_data, f, indent=2) |
| print_fn(f"Saved log to: {file_name}") |
|
|
|
|
| def log_results( |
| *, |
| experiment_name: str, |
| train_result: TrainResult, |
| accelerator_memory_init: int, |
| time_total: float, |
| file_size: int, |
| model_info: Optional[huggingface_hub.ModelInfo], |
| datasets_info: dict[str, Optional[huggingface_hub.DatasetInfo]], |
| start_date: str, |
| train_config: TrainConfig, |
| peft_config: Optional[PeftConfig], |
| print_fn: Callable[..., None], |
| ) -> None: |
| |
| device = infer_device() |
| torch_accelerator_module = getattr(torch, device, torch.cuda) |
| accelerator_memory_final = torch_accelerator_module.max_memory_reserved() |
| accelerator_memory_avg = int( |
| sum(train_result.accelerator_memory_reserved_log) / len(train_result.accelerator_memory_reserved_log) |
| ) |
| accelerator_memory_reserved_99th = int(np.percentile(train_result.accelerator_memory_reserved_log, 99)) |
|
|
| meta_info = get_meta_info() |
| if model_info is not None: |
| model_sha = model_info.sha |
| model_created_at = model_info.created_at.isoformat() |
| else: |
| model_sha = None |
| model_created_at = None |
|
|
| dataset_info_log = {} |
| for key, dataset_info in datasets_info.items(): |
| if dataset_info is not None: |
| dataset_sha = dataset_info.sha |
| dataset_created_at = dataset_info.created_at.isoformat() |
| else: |
| dataset_sha = None |
| dataset_created_at = None |
| dataset_info_log[key] = {"sha": dataset_sha, "created_at": dataset_created_at} |
|
|
| peft_branch = get_peft_branch() |
|
|
| if train_result.status == TrainStatus.CANCELED: |
| save_dir = RESULT_PATH_CANCELLED |
| print_fn("Experiment run was categorized as canceled") |
| elif peft_branch != "main": |
| save_dir = RESULT_PATH_TEST |
| print_fn(f"Experiment run was categorized as a test run on branch {peft_branch}") |
| elif train_result.status == TrainStatus.SUCCESS: |
| save_dir = RESULT_PATH |
| print_fn("Experiment run was categorized as successful run") |
| else: |
| save_dir = tempfile.mkdtemp() |
| print_fn(f"Experiment could not be categorized, writing results to {save_dir}. Please open an issue on PEFT.") |
|
|
| if peft_config is None: |
| peft_config_dict: Optional[dict[str, Any]] = None |
| else: |
| peft_config_dict = peft_config.to_dict() |
| for key, value in peft_config_dict.items(): |
| if isinstance(value, set): |
| peft_config_dict[key] = list(value) |
|
|
| log_data = { |
| "run_info": { |
| "created_at": start_date, |
| "total_time": time_total, |
| "experiment_name": experiment_name, |
| "peft_branch": peft_branch, |
| "train_config": asdict(train_config), |
| "peft_config": peft_config_dict, |
| "error_msg": train_result.error_msg, |
| }, |
| "train_info": { |
| "accelerator_memory_reserved_avg": accelerator_memory_avg, |
| "accelerator_memory_max": (accelerator_memory_final - accelerator_memory_init), |
| "accelerator_memory_reserved_99th": accelerator_memory_reserved_99th, |
| "train_time": train_result.train_time, |
| "file_size": file_size, |
| "num_trainable_params": train_result.num_trainable_params, |
| "num_total_params": train_result.num_total_params, |
| "status": train_result.status.value, |
| "metrics": train_result.metrics, |
| }, |
| "meta_info": { |
| "model_info": {"sha": model_sha, "created_at": model_created_at}, |
| "dataset_info": dataset_info_log, |
| **asdict(meta_info), |
| }, |
| } |
|
|
| log_to_console(log_data, print_fn=print) |
| log_to_file( |
| log_data=log_data, save_dir=save_dir, experiment_name=experiment_name, timestamp=start_date, print_fn=print_fn |
| ) |
|
|