| | import os
|
| | import gc
|
| | import time
|
| | import json
|
| | import math
|
| | import collections
|
| | from datetime import datetime
|
| | from typing import Optional, List, Dict, Tuple, Callable, Any, Union
|
| |
|
| | import torch
|
| | import numpy as np
|
| |
|
| | from transformers import (
|
| | is_datasets_available,
|
| | is_torch_tpu_available,
|
| | is_torch_xla_available,
|
| | )
|
| |
|
| | from transformers.trainer_utils import (
|
| | PredictionOutput,
|
| | EvalPrediction,
|
| | EvalLoopOutput,
|
| | denumpify_detensorize,
|
| | speed_metrics,
|
| | )
|
| |
|
| | from transformers.utils import logging
|
| | from transformers.debug_utils import DebugOption
|
| |
|
| | if is_datasets_available():
|
| | import datasets
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | from transformers import Trainer
|
| |
|
| | logger = logging.get_logger(__name__)
|
| |
|
| | class ToMixin:
|
| | def _optimizer_to(self, devide: str = "cpu"):
|
| | """
|
| | Move the optimizer state to the specified device.
|
| |
|
| | Args:
|
| | devide (str, optional): The device to move the optimizer state to. Defaults to "cpu".
|
| | """
|
| | for param in self.optimizer.state.values():
|
| | if isinstance(param, torch.Tensor):
|
| | param.data = param.data.to(devide)
|
| | if param._grad is not None:
|
| | param._grad.data = param._grad.data.to(devide)
|
| | elif isinstance(param, dict):
|
| | for subparam in param.values():
|
| | if isinstance(subparam, torch.Tensor):
|
| | subparam.data = subparam.data.to(devide)
|
| | if subparam._grad is not None:
|
| | subparam._grad.data = subparam._grad.data.to(devide)
|
| |
|
| | def _scheduler_to(self, devide: str = "cpu") -> None:
|
| | """
|
| | Move the scheduler state to the specified device.
|
| |
|
| | Args:
|
| | devide (str, optional): The device to move the scheduler state to. Defaults to "cpu".
|
| |
|
| | Returns:
|
| | None
|
| | """
|
| | for param in self.lr_scheduler.__dict__.values():
|
| | if isinstance(param, torch.Tensor):
|
| | param.data = param.data.to(devide)
|
| | if param._grad is not None:
|
| | param._grad.data = param._grad.data.to(devide)
|
| |
|
| | class BaseReader(Trainer, ToMixin):
|
| | name: str = None
|
| |
|
| | def __init__(
|
| | self,
|
| | *args,
|
| | data_args = {},
|
| | eval_examples: datasets.Dataset = None,
|
| | **kwargs
|
| | ):
|
| | """
|
| | Initializes the BaseReader.
|
| |
|
| | Args:
|
| | *args: Positional arguments passed to Trainer.__init__.
|
| | data_args (dict): Additional arguments for data loading.
|
| | eval_examples (datasets.Dataset): Evaluation examples.
|
| | **kwargs: Keyword arguments passed to Trainer.__init__.
|
| | """
|
| |
|
| | super().__init__(*args, **kwargs)
|
| |
|
| |
|
| | self.data_args = data_args
|
| |
|
| |
|
| | self.eval_examples = eval_examples
|
| |
|
| | def free_memory(self):
|
| | """
|
| | Move the model, optimizer and scheduler state to the CPU, empty the CUDA cache and garbage collect.
|
| |
|
| | This method is useful to free up GPU memory before checkpointing the model or saving it to disk.
|
| | """
|
| | self.model.to("cpu")
|
| | self._optimizer_to("cpu")
|
| | self._scheduler_to("cpu")
|
| | torch.cuda.empty_cache()
|
| | gc.collect()
|
| |
|
| |
|
| | def postprocess(
|
| | self,
|
| | output: EvalLoopOutput,
|
| | ) -> Union[Any, PredictionOutput]:
|
| | """
|
| | Preprocess the evaluation loop output.
|
| |
|
| | This method is called after the evaluation loop has finished and before the evaluation metrics are computed.
|
| | It receives the output of the evaluation loop and can be used to modify it before it is passed to the compute_metrics function.
|
| |
|
| | Args:
|
| | output (EvalLoopOutput): The output of the evaluation loop.
|
| |
|
| | Returns:
|
| | Union[Any, PredictionOutput]: The modified output that will be passed to the compute_metrics function.
|
| | """
|
| | return output
|
| |
|
| |
|
| | def evaluate(
|
| | self,
|
| | eval_dataset: Optional[datasets.Dataset] = None,
|
| | eval_examples: Optional[datasets.Dataset] = None,
|
| | ignore_keys: Optional[List[str]] = None,
|
| | metric_key_prefix: str = "eval",
|
| | ) -> Dict[str, float]:
|
| | """
|
| | Evaluate the model on the given dataset.
|
| |
|
| | Args:
|
| | eval_dataset (Optional[datasets.Dataset], optional): The evaluation dataset. Defaults to None.
|
| | eval_examples (Optional[datasets.Dataset], optional): The evaluation examples. Defaults to None.
|
| | ignore_keys (Optional[List[str]], optional): Keys to ignore when calculating metrics. Defaults to None.
|
| | metric_key_prefix (str, optional): The prefix for metric keys. Defaults to "eval".
|
| |
|
| | Returns:
|
| | Dict[str, float]: The evaluation metrics.
|
| | """
|
| |
|
| |
|
| | self._memory_tracker.start()
|
| |
|
| |
|
| | eval_dataset = self.eval_dataset if eval_dataset is None else eval_dataset
|
| | eval_dataloader = self.get_eval_dataloader(eval_dataset)
|
| |
|
| |
|
| | eval_examples = self.eval_examples if eval_examples is None else eval_examples
|
| |
|
| |
|
| | start_time = time.time()
|
| |
|
| |
|
| | compute_metrics = self.compute_metrics
|
| | self.compute_metrics = None
|
| |
|
| |
|
| | eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
|
| | try:
|
| |
|
| | output = eval_loop(
|
| | eval_dataloader,
|
| | description="Evaluation",
|
| |
|
| | prediction_loss_only=True if compute_metrics is None else None,
|
| | ignore_keys=ignore_keys,
|
| | metric_key_prefix=metric_key_prefix,
|
| | )
|
| | finally:
|
| |
|
| | self.compute_metrics = compute_metrics
|
| |
|
| |
|
| | if isinstance(eval_dataset, datasets.Dataset):
|
| | eval_dataset.set_format(
|
| | type=eval_dataset.format["type"],
|
| | columns=list(eval_dataset.features.keys()),
|
| | )
|
| |
|
| |
|
| | eval_preds = self.postprocess(output, eval_examples, eval_dataset, mode="evaluate")
|
| |
|
| |
|
| | metrics = {}
|
| | if self.compute_metrics is not None:
|
| | metrics = self.compute_metrics(eval_preds)
|
| |
|
| |
|
| | metrics = denumpify_detensorize(metrics)
|
| |
|
| |
|
| | for key in list(metrics.keys()):
|
| | if not key.startswith(f"{metric_key_prefix}_"):
|
| | metrics[f"{metric_key_prefix}_{key}"] = metrics.pop(key)
|
| |
|
| |
|
| | total_batch_size = self.args.eval_batch_size * self.args.world_size
|
| | metrics.update(
|
| | speed_metrics(
|
| | metric_key_prefix,
|
| | start_time,
|
| | num_samples=output.num_samples,
|
| | num_steps=math.ceil(output.num_samples / total_batch_size),
|
| | )
|
| | )
|
| |
|
| |
|
| | self.log(metrics)
|
| |
|
| |
|
| | filename = "eval_results.txt"
|
| | eval_result_file = self.name + '_' + filename if self.name else filename
|
| | with open(os.path.join(self.args.output_dir, eval_result_file), "w") as writer:
|
| | logger.info(f"***** Eval results *****")
|
| | writer.write("***** Eval results *****\n")
|
| | writer.write(f"{datetime.now()}")
|
| | for key in sorted(metrics.keys()):
|
| | logger.info(f" {key} = {metrics[key]}")
|
| | writer.write(f"{key} = {metrics[key]}\n")
|
| | writer.write("\n")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | self.control = self.callback_handler.on_evaluate(
|
| | self.args, self.state, self.control, metrics
|
| | )
|
| |
|
| |
|
| | self._memory_tracker.stop_and_update_metrics(metrics)
|
| |
|
| | return metrics
|
| |
|
| | def predict(
|
| | self,
|
| | test_dataset: datasets.Dataset,
|
| | test_examples: Optional[datasets.Dataset] = None,
|
| | ignore_keys: Optional[List[str]] = None,
|
| | metric_key_prefix: str = "test",
|
| | mode: bool = "predict",
|
| | ) -> PredictionOutput:
|
| | """
|
| | Predicts on the given test dataset and returns the predictions.
|
| |
|
| | Args:
|
| | test_dataset (datasets.Dataset): The test dataset.
|
| | test_examples (Optional[datasets.Dataset], optional): The test examples. Defaults to None.
|
| | ignore_keys (Optional[List[str]], optional): Keys to ignore when calculating metrics. Defaults to None.
|
| | metric_key_prefix (str, optional): The prefix for metric keys. Defaults to "test".
|
| | mode (bool, optional): The mode of prediction. Defaults to "predict".
|
| |
|
| | Returns:
|
| | PredictionOutput: The predictions.
|
| | """
|
| |
|
| |
|
| | self._memory_tracker.start()
|
| |
|
| |
|
| | test_dataloader = self.get_test_dataloader(test_dataset)
|
| | start_time = time.time()
|
| |
|
| |
|
| | compute_metrics = self.compute_metrics
|
| | self.compute_metrics = None
|
| |
|
| |
|
| | eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
|
| | try:
|
| |
|
| | output = eval_loop(
|
| | test_dataloader,
|
| | description="Prediction",
|
| | ignore_keys=ignore_keys,
|
| | metric_key_prefix=metric_key_prefix,
|
| | )
|
| | finally:
|
| |
|
| | self.compute_metrics = compute_metrics
|
| |
|
| |
|
| | if isinstance(test_dataset, datasets.Dataset):
|
| | test_dataset.set_format(
|
| | type=test_dataset.format["type"],
|
| | columns=list(test_dataset.features.keys()),
|
| | )
|
| |
|
| |
|
| | predictions = self.postprocess(output, test_examples, test_dataset, mode=mode)
|
| |
|
| |
|
| | self._memory_tracker.stop_and_update_metrics(output.metrics)
|
| |
|
| | return predictions
|
| |
|