| | |
| | import os.path as osp |
| | import pickle |
| | import shutil |
| | import tempfile |
| | from collections import OrderedDict |
| | from typing import Any, Dict, Generator, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| | import torch |
| | from torch import Tensor |
| | from torch import distributed as torch_dist |
| | from torch._utils import (_flatten_dense_tensors, _take_tensors, |
| | _unflatten_dense_tensors) |
| | from torch.distributed import ProcessGroup |
| | from itertools import zip_longest, chain |
| | import mmengine |
| | from .utils import (get_world_size, get_rank, get_backend, get_dist_info, |
| | get_default_group, barrier, get_data_device, |
| | get_comm_device, cast_data_device) |
| | from mmengine.utils import digit_version |
| | from mmengine.utils.dl_utils import TORCH_VERSION |
| | from mmengine.device import is_npu_available |
| |
|
| |
|
| | def _get_reduce_op(name: str) -> torch_dist.ReduceOp: |
| | op_mappings = { |
| | 'sum': torch_dist.ReduceOp.SUM, |
| | 'product': torch_dist.ReduceOp.PRODUCT, |
| | 'min': torch_dist.ReduceOp.MIN, |
| | 'max': torch_dist.ReduceOp.MAX, |
| | 'band': torch_dist.ReduceOp.BAND, |
| | 'bor': torch_dist.ReduceOp.BOR, |
| | 'bxor': torch_dist.ReduceOp.BXOR, |
| | } |
| |
|
| | if name.lower() not in op_mappings: |
| | raise ValueError( |
| | f'reduce op should be one of {op_mappings.keys()}, bug got {name}') |
| |
|
| | return op_mappings[name.lower()] |
| |
|
| |
|
| | def all_reduce(data: Tensor, |
| | op: str = 'sum', |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Reduces the tensor data across all machines in such a way that all get |
| | the final result. |
| | |
| | After the call ``data`` is going to be bitwise identical in all |
| | processes. |
| | |
| | Note: |
| | Calling ``all_reduce`` in non-distributed environment does nothing. |
| | |
| | Args: |
| | data (Tensor): Input and output of the collective. The function |
| | operates in-place. |
| | op (str): Operation to reduce data. Defaults to 'sum'. Optional values |
| | are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and |
| | 'bxor'. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = torch.arange(2, dtype=torch.int64) |
| | >>> dist.all_reduce(data) |
| | >>> data |
| | tensor([0, 1]) |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| | >>> data |
| | tensor([1, 2]) # Rank 0 |
| | tensor([3, 4]) # Rank 1 |
| | >>> dist.all_reduce(data, op=dist.ReduceOp.SUM) |
| | >>> data |
| | tensor([4, 6]) # Rank 0 |
| | tensor([4, 6]) # Rank 1 |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size > 1: |
| | if group is None: |
| | group = get_default_group() |
| |
|
| | input_device = get_data_device(data) |
| | backend_device = get_comm_device(group) |
| | data_on_device = cast_data_device(data, backend_device) |
| |
|
| | |
| | |
| | if op.lower() == 'mean': |
| | torch_dist.all_reduce(data_on_device, _get_reduce_op('sum'), group) |
| |
|
| | |
| | |
| | data_on_device = torch.true_divide(data_on_device, world_size) |
| | else: |
| | torch_dist.all_reduce(data_on_device, _get_reduce_op(op), group) |
| |
|
| | cast_data_device(data_on_device, input_device, out=data) |
| |
|
| |
|
| | def all_gather(data: Tensor, |
| | group: Optional[ProcessGroup] = None) -> List[Tensor]: |
| | """Gather data from the whole group in a list. |
| | |
| | Note: |
| | Calling ``all_gather`` in non-distributed environment does nothing |
| | and just returns a list containing :attr:`data` itself. |
| | |
| | Note: |
| | Unlike PyTorch ``torch.distributed.all_gather``, :meth:`all_gather` in |
| | MMEngine does not pass in an empty list ``gather_list`` and returns |
| | the ``gather_list`` directly, which is more convenient. The difference |
| | between their interfaces is as below: |
| | |
| | - MMEngine: all_gather(data, group) -> gather_list |
| | - PyTorch: all_gather(gather_list, data, group) -> None |
| | |
| | Args: |
| | data (Tensor): Tensor to be gathered. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | list[Tensor]: Return a list containing data from the whole group if |
| | in distributed environment, otherwise a list only containing |
| | :attr:`data` itself. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = torch.arange(2, dtype=torch.int64) |
| | >>> data |
| | tensor([0, 1]) |
| | >>> output = dist.all_gather(data) |
| | >>> output |
| | [tensor([0, 1])] |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| | >>> data |
| | tensor([1, 2]) # Rank 0 |
| | tensor([3, 4]) # Rank 1 |
| | >>> output = dist.all_gather(data) |
| | >>> output |
| | [tensor([1, 2]), tensor([3, 4])] # Rank 0 |
| | [tensor([1, 2]), tensor([3, 4])] # Rank 1 |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size == 1: |
| | return [data] |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | input_device = get_data_device(data) |
| | backend_device = get_comm_device(group) |
| | data_on_device = cast_data_device(data, backend_device) |
| |
|
| | gather_list = [ |
| | torch.empty_like(data, device=backend_device) |
| | for _ in range(world_size) |
| | ] |
| |
|
| | torch_dist.all_gather(gather_list, data_on_device, group) |
| |
|
| | return cast_data_device(gather_list, input_device) |
| |
|
| |
|
| | def gather(data: Tensor, |
| | dst: int = 0, |
| | group: Optional[ProcessGroup] = None) -> List[Optional[Tensor]]: |
| | """Gather data from the whole group to ``dst`` process. |
| | |
| | Note: |
| | Calling ``gather`` in non-distributed environment dose nothing |
| | and just returns a list containing :attr:`data` itself. |
| | |
| | Note: |
| | ``NCCL`` backend does not support ``gather``. |
| | |
| | Note: |
| | Unlike PyTorch ``torch.distributed.gather``, :meth:`gather` in |
| | MMEngine does not pass in an empty list ``gather_list`` and returns |
| | the ``gather_list`` directly, which is more convenient. The difference |
| | between their interfaces is as below: |
| | |
| | - MMEngine: gather(data, dst, group) -> gather_list |
| | - PyTorch: gather(data, gather_list, dst, group) -> None |
| | |
| | Args: |
| | data (Tensor): Tensor to be gathered. CUDA tensor is not supported. |
| | dst (int): Destination rank. Defaults to 0. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | list[Tensor]: ``dst`` process will get a list of tensor gathering from |
| | the whole group. Other process will get a empty list. If in |
| | non-distributed environment, just return a list containing |
| | :attr:`data` itself. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = torch.arange(2, dtype=torch.int64) |
| | >>> data |
| | tensor([0, 1]) |
| | >>> output = dist.gather(data) |
| | >>> output |
| | [tensor([0, 1])] |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| | >>> data |
| | tensor([1, 2]) # Rank 0 |
| | tensor([3, 4]) # Rank 1 |
| | >>> output = dist.gather(data) |
| | >>> output |
| | [tensor([1, 2]), tensor([3, 4])] # Rank 0 |
| | [] # Rank 1 |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size == 1: |
| | return [data] |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | input_device = get_data_device(data) |
| | backend_device = get_comm_device(group) |
| |
|
| | if get_rank(group) == dst: |
| | gather_list = [ |
| | torch.empty_like(data, device=backend_device) |
| | for _ in range(world_size) |
| | ] |
| | else: |
| | gather_list = [] |
| |
|
| | torch_dist.gather(data, gather_list, dst, group) |
| |
|
| | if get_rank(group) == dst: |
| | return cast_data_device(gather_list, input_device) |
| | else: |
| | return gather_list |
| |
|
| |
|
| | def broadcast(data: Tensor, |
| | src: int = 0, |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Broadcast the data from ``src`` process to the whole group. |
| | |
| | ``data`` must have the same number of elements in all processes |
| | participating in the collective. |
| | |
| | Note: |
| | Calling ``broadcast`` in non-distributed environment does nothing. |
| | |
| | Args: |
| | data (Tensor): Data to be sent if ``src`` is the rank of current |
| | process, and data to be used to save received data otherwise. |
| | src (int): Source rank. Defaults to 0. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = torch.arange(2, dtype=torch.int64) |
| | >>> data |
| | tensor([0, 1]) |
| | >>> dist.broadcast(data) |
| | >>> data |
| | tensor([0, 1]) |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> data = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| | >>> data |
| | tensor([1, 2]) # Rank 0 |
| | tensor([3, 4]) # Rank 1 |
| | >>> dist.broadcast(data) |
| | >>> data |
| | tensor([1, 2]) # Rank 0 |
| | tensor([1, 2]) # Rank 1 |
| | """ |
| | if get_world_size(group) > 1: |
| | if group is None: |
| | group = get_default_group() |
| |
|
| | input_device = get_data_device(data) |
| | backend_device = get_comm_device(group) |
| | data_on_device = cast_data_device(data, backend_device) |
| | |
| | data_on_device = data_on_device.contiguous() |
| | torch_dist.broadcast(data_on_device, src, group) |
| |
|
| | if get_rank(group) != src: |
| | cast_data_device(data_on_device, input_device, data) |
| |
|
| |
|
| | def sync_random_seed(group: Optional[ProcessGroup] = None) -> int: |
| | """Synchronize a random seed to all processes. |
| | |
| | In distributed sampling, different ranks should sample non-overlapped |
| | data in the dataset. Therefore, this function is used to make sure that |
| | each rank shuffles the data indices in the same order based |
| | on the same seed. Then different ranks could use different indices |
| | to select non-overlapped data from the same data list. |
| | |
| | Args: |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | int: Random seed. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> seed = dist.sync_random_seed() |
| | >>> seed # which a random number |
| | 587791752 |
| | |
| | >>> distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> seed = dist.sync_random_seed() |
| | >>> seed |
| | 587791752 # Rank 0 |
| | 587791752 # Rank 1 |
| | """ |
| | seed = np.random.randint(2**31) |
| | if get_world_size(group) == 1: |
| | return seed |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | backend_device = get_comm_device(group) |
| |
|
| | if get_rank(group) == 0: |
| | random_num = torch.tensor(seed, dtype=torch.int32).to(backend_device) |
| | else: |
| | random_num = torch.tensor(0, dtype=torch.int32).to(backend_device) |
| |
|
| | torch_dist.broadcast(random_num, src=0, group=group) |
| |
|
| | return random_num.item() |
| |
|
| |
|
| | def _object_to_tensor(obj: Any) -> Tuple[Tensor, Tensor]: |
| | """Serialize picklable python object to tensor.""" |
| | byte_storage = torch.ByteStorage.from_buffer(pickle.dumps(obj)) |
| | |
| | |
| | |
| | byte_tensor = torch.ByteTensor(byte_storage) |
| | local_size = torch.LongTensor([byte_tensor.numel()]) |
| | return byte_tensor, local_size |
| |
|
| |
|
| | def _tensor_to_object(tensor: Tensor, tensor_size: int) -> Any: |
| | """Deserialize tensor to picklable python object.""" |
| | buf = tensor.cpu().numpy().tobytes()[:tensor_size] |
| | return pickle.loads(buf) |
| |
|
| |
|
| | def _broadcast_object_list(object_list: List[Any], |
| | src: int = 0, |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Broadcast picklable objects in ``object_list`` to the whole group. |
| | |
| | Similar to :func:`broadcast`, but Python objects can be passed in. Note |
| | that all objects in ``object_list`` must be picklable in order to be |
| | broadcasted. |
| | """ |
| | if torch_dist.distributed_c10d._rank_not_in_group(group): |
| | return |
| |
|
| | my_rank = get_rank() |
| | |
| | if my_rank == src: |
| | tensor_list, size_list = zip( |
| | *[_object_to_tensor(obj) for obj in object_list]) |
| | object_sizes_tensor = torch.cat(size_list) |
| | else: |
| | object_sizes_tensor = torch.empty(len(object_list), dtype=torch.long) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | group_backend = get_backend(group) |
| | is_nccl_backend = group_backend == torch_dist.Backend.NCCL |
| | current_device = torch.device('cpu') |
| | is_hccl_backend = group_backend == 'hccl' |
| | is_cncl_backend = group_backend == 'cncl' |
| | is_mccl_backend = group_backend == 'mccl' |
| | if is_hccl_backend: |
| | current_device = torch.device('npu', torch.npu.current_device()) |
| | object_sizes_tensor = object_sizes_tensor.to(current_device) |
| | elif is_cncl_backend: |
| | current_device = torch.device('mlu', torch.mlu.current_device()) |
| | object_sizes_tensor = object_sizes_tensor.to(current_device) |
| | elif is_mccl_backend: |
| | current_device = torch.device('musa', torch.musa.current_device()) |
| | object_sizes_tensor = object_sizes_tensor.to(current_device) |
| | elif is_nccl_backend: |
| | |
| | |
| | |
| | current_device = torch.device('cuda', torch.cuda.current_device()) |
| | object_sizes_tensor = object_sizes_tensor.to(current_device) |
| |
|
| | |
| | torch_dist.broadcast(object_sizes_tensor, src=src, group=group) |
| |
|
| | |
| | if my_rank == src: |
| | object_tensor = torch.cat(tensor_list) |
| | else: |
| | object_tensor = torch.empty( |
| | torch.sum(object_sizes_tensor).int().item(), |
| | dtype=torch.uint8, |
| | ) |
| |
|
| | if is_nccl_backend or is_hccl_backend or is_cncl_backend: |
| | object_tensor = object_tensor.to(current_device) |
| | torch_dist.broadcast(object_tensor, src=src, group=group) |
| | |
| | offset = 0 |
| | if my_rank != src: |
| | for i, obj_size in enumerate(object_sizes_tensor): |
| | obj_view = object_tensor[offset:offset + obj_size] |
| | obj_view = obj_view.type(torch.uint8) |
| | if obj_view.device != torch.device('cpu'): |
| | obj_view = obj_view.cpu() |
| | offset += obj_size |
| | object_list[i] = _tensor_to_object(obj_view, obj_size) |
| |
|
| |
|
| | def broadcast_object_list(data: List[Any], |
| | src: int = 0, |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Broadcasts picklable objects in ``object_list`` to the whole group. |
| | Similar to :func:`broadcast`, but Python objects can be passed in. Note |
| | that all objects in ``object_list`` must be picklable in order to be |
| | broadcasted. |
| | |
| | Note: |
| | Calling ``broadcast_object_list`` in non-distributed environment does |
| | nothing. |
| | |
| | Args: |
| | data (List[Any]): List of input objects to broadcast. |
| | Each object must be picklable. Only objects on the ``src`` rank |
| | will be broadcast, but each rank must provide lists of equal sizes. |
| | src (int): Source rank from which to broadcast ``object_list``. |
| | group: (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Default is ``None``. |
| | device (``torch.device``, optional): If not None, the objects are |
| | serialized and converted to tensors which are moved to the |
| | ``device`` before broadcasting. Default is ``None``. |
| | |
| | Note: |
| | For NCCL-based process groups, internal tensor representations of |
| | objects must be moved to the GPU device before communication starts. |
| | In this case, the used device is given by |
| | ``torch.cuda.current_device()`` and it is the user's responsibility to |
| | ensure that this is correctly set so that each rank has an individual |
| | GPU, via ``torch.cuda.set_device()``. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = ['foo', 12, {1: 2}] |
| | >>> dist.broadcast_object_list(data) |
| | >>> data |
| | ['foo', 12, {1: 2}] |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> if dist.get_rank() == 0: |
| | >>> # Assumes world_size of 3. |
| | >>> data = ["foo", 12, {1: 2}] # any picklable object |
| | >>> else: |
| | >>> data = [None, None, None] |
| | >>> dist.broadcast_object_list(data) |
| | >>> data |
| | ["foo", 12, {1: 2}] # Rank 0 |
| | ["foo", 12, {1: 2}] # Rank 1 |
| | """ |
| | assert isinstance(data, list) |
| |
|
| | if get_world_size(group) > 1: |
| | if group is None: |
| | group = get_default_group() |
| |
|
| | if digit_version(TORCH_VERSION) >= digit_version( |
| | '1.8.0') and not is_npu_available(): |
| | torch_dist.broadcast_object_list(data, src, group) |
| | else: |
| | _broadcast_object_list(data, src, group) |
| |
|
| |
|
| | def all_reduce_dict(data: Dict[str, Tensor], |
| | op: str = 'sum', |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Reduces the dict across all machines in such a way that all get the |
| | final result. |
| | |
| | The code is modified from https://github.com/Megvii- |
| | BaseDetection/YOLOX/blob/main/yolox/utils/allreduce_norm.py. |
| | |
| | Args: |
| | data (dict[str, Tensor]): Data to be reduced. |
| | op (str): Operation to reduce data. Defaults to 'sum'. Optional values |
| | are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and |
| | 'bxor'. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = { |
| | 'key1': torch.arange(2, dtype=torch.int64), |
| | 'key2': torch.arange(3, dtype=torch.int64) |
| | } |
| | >>> dist.all_reduce_dict(data) |
| | >>> data |
| | {'key1': tensor([0, 1]), 'key2': tensor([0, 1, 2])} |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> data = { |
| | 'key1': torch.arange(2, dtype=torch.int64), |
| | 'key2': torch.arange(3, dtype=torch.int64) |
| | } |
| | >>> dist.all_reduce_dict(data) |
| | >>> data |
| | {'key1': tensor([0, 2]), 'key2': tensor([0, 2, 4])} # Rank 0 |
| | {'key1': tensor([0, 2]), 'key2': tensor([0, 2, 4])} # Rank 1 |
| | """ |
| | assert isinstance(data, dict) |
| |
|
| | world_size = get_world_size(group) |
| | if world_size > 1: |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | |
| | keys = sorted(data.keys()) |
| | tensor_shapes = [data[k].shape for k in keys] |
| | tensor_sizes = [data[k].numel() for k in keys] |
| |
|
| | if digit_version(TORCH_VERSION) == digit_version('1.5.0'): |
| | |
| | |
| | flatten_tensor = torch.cat( |
| | [data[k].flatten().float() for k in keys]) |
| | else: |
| | flatten_tensor = torch.cat([data[k].flatten() for k in keys]) |
| |
|
| | all_reduce(flatten_tensor, op=op, group=group) |
| |
|
| | split_tensors = [ |
| | x.reshape(shape) for x, shape in zip( |
| | torch.split(flatten_tensor, tensor_sizes), tensor_shapes) |
| | ] |
| |
|
| | for k, v in zip(keys, split_tensors): |
| | data[k] = v |
| |
|
| |
|
| | def _all_gather_object(object_list: List[Any], |
| | obj: Any, |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Gather picklable objects from the whole group into a list. |
| | |
| | Similar to :func:`all_gather`, but Python objects can be passed in. |
| | Note that the object must be picklable in order to be gathered. |
| | |
| | Args: |
| | object_list (list[Any]): Output list. It should be correctly sized as |
| | the size of the group for this collective and will contain the |
| | output. |
| | object (Any): Pickable Python object to be broadcast from current |
| | process. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | None. If the calling rank is part of this group, the output of the |
| | collective will be populated into the input ``object_list``. If the |
| | calling rank is not part of the group, the passed in ``object_list`` |
| | will be unmodified. |
| | """ |
| | if torch_dist.distributed_c10d._rank_not_in_group(group): |
| | return |
| |
|
| | input_tensor, local_size = _object_to_tensor(obj) |
| | group_backend = get_backend(group) |
| | current_device = torch.device('cpu') |
| | is_nccl_backend = group_backend == torch_dist.Backend.NCCL |
| | is_mccl_backend = group_backend == 'mccl' |
| | if is_nccl_backend: |
| | |
| | |
| | |
| | current_device = torch.device('cuda', torch.cuda.current_device()) |
| | input_tensor = input_tensor.to(current_device) |
| | local_size = local_size.to(current_device) |
| | elif is_mccl_backend: |
| | |
| | |
| | |
| | current_device = torch.device('musa', torch.musa.current_device()) |
| | input_tensor = input_tensor.to(current_device) |
| | local_size = local_size.to(current_device) |
| | |
| | |
| | group_size = get_world_size(group=group) |
| | object_sizes_tensor = torch.zeros( |
| | group_size, dtype=torch.long, device=current_device) |
| | object_size_list = [ |
| | object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) |
| | ] |
| | |
| | torch_dist.all_gather(object_size_list, local_size, group=group) |
| | max_object_size = int(max(object_size_list).item()) |
| | |
| | input_tensor.resize_(max_object_size) |
| | coalesced_output_tensor = torch.empty( |
| | max_object_size * group_size, dtype=torch.uint8, device=current_device) |
| | |
| | output_tensors = [ |
| | coalesced_output_tensor[max_object_size * i:max_object_size * (i + 1)] |
| | for i in range(group_size) |
| | ] |
| | torch_dist.all_gather(output_tensors, input_tensor, group=group) |
| | |
| | for i, tensor in enumerate(output_tensors): |
| | tensor = tensor.type(torch.uint8) |
| | if tensor.device != torch.device('cpu'): |
| | tensor = tensor.cpu() |
| | tensor_size = object_size_list[i] |
| | object_list[i] = _tensor_to_object(tensor, tensor_size) |
| |
|
| |
|
| | def all_gather_object(data: Any, |
| | group: Optional[ProcessGroup] = None) -> List[Any]: |
| | """Gather picklable objects from the whole group into a list. Similar to |
| | :func:`all_gather`, but Python objects can be passed in. Note that the |
| | object must be picklable in order to be gathered. |
| | |
| | Note: |
| | Calling ``all_gather_object`` in non-distributed environment does |
| | nothing and just returns a list containing :attr:`data` itself. |
| | |
| | Note: |
| | Unlike PyTorch ``torch.distributed.all_gather_object``, |
| | :meth:`all_gather_object` in MMEngine does not pass in an empty list |
| | ``gather_list`` and returns the ``gather_list`` directly, which is |
| | more convenient. The difference between their interfaces is as below: |
| | |
| | - MMEngine: all_gather_object(data, group) -> gather_list |
| | - PyTorch: all_gather_object(gather_list, data, group) -> None |
| | |
| | Args: |
| | data (Any): Pickable Python object to be broadcast from current |
| | process. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | list[Tensor]: Return a list containing data from the whole group if |
| | in distributed environment, otherwise a list only containing |
| | :attr:`data` itself. |
| | |
| | Note: |
| | For NCCL-based process groups, internal tensor representations |
| | of objects must be moved to the GPU device before communication starts. |
| | In this case, the used device is given by |
| | ``torch.cuda.current_device()`` and it is the user's responsibility to |
| | ensure that this is correctly set so that each rank has an individual |
| | GPU, via ``torch.cuda.set_device()``. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = ['foo', 12, {1: 2}] # any picklable object |
| | >>> gather_objects = dist.all_gather_object(data[dist.get_rank()]) |
| | >>> output |
| | ['foo'] |
| | |
| | >>> # distributed environment |
| | >>> # We have 3 process groups, 3 ranks. |
| | >>> output = dist.all_gather_object(data[dist.get_rank()]) |
| | >>> output |
| | ['foo', 12, {1: 2}] # Rank 0 |
| | ['foo', 12, {1: 2}] # Rank 1 |
| | ['foo', 12, {1: 2}] # Rank 2 |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size == 1: |
| | return [data] |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | gather_list = [None] * world_size |
| |
|
| | if digit_version(TORCH_VERSION) >= digit_version('1.8.0'): |
| | torch_dist.all_gather_object(gather_list, data, group) |
| | else: |
| | _all_gather_object(gather_list, data, group) |
| |
|
| | return gather_list |
| |
|
| |
|
| | def _validate_output_list_for_rank(my_rank: int, dst: int, |
| | gather_list: Optional[list]) -> None: |
| | """Validate whether ``gather_list`` is None in non-dst ranks.""" |
| | if dst == my_rank: |
| | if not gather_list: |
| | raise ValueError( |
| | 'Argument ``gather_list`` must be specified on destination ' |
| | 'rank.') |
| | elif gather_list: |
| | raise ValueError('Argument ``gather_list`` must NOT be specified ' |
| | 'on non-destination ranks.') |
| |
|
| |
|
| | def _gather_object(obj: Any, |
| | object_gather_list=None, |
| | dst: int = 0, |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """Gathers picklable objects from the whole group in a single process. |
| | |
| | Similar to :func:`gather`, but Python objects can be passed in. Note that |
| | the object must be picklable in order to be gathered. |
| | |
| | Args: |
| | obj (Any): Input object. Must be picklable. |
| | object_gather_list (list[Any], optional): Output list. On the ``dst`` |
| | rank, it should be correctly sized as the size of the group for |
| | this collective and will contain the output. Must be ``None`` on |
| | non-dst ranks. Defaults to None. |
| | dst (int): Destination rank. Defaults to 0. |
| | group: (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | """ |
| | if torch_dist.distributed_c10d._rank_not_in_group(group): |
| | return |
| |
|
| | |
| | my_rank = get_rank() |
| | _validate_output_list_for_rank(my_rank, dst, object_gather_list) |
| | input_tensor, local_size = _object_to_tensor(obj) |
| | group_backend = get_backend(group) |
| | current_device = torch.device('cpu') |
| | is_nccl_backend = group_backend == torch_dist.Backend.NCCL |
| | is_mccl_backend = group_backend == 'mccl' |
| | if is_nccl_backend: |
| | current_device = torch.device('cuda', torch.cuda.current_device()) |
| | input_tensor = input_tensor.to(current_device) |
| | local_size = local_size.to(current_device) |
| | elif is_mccl_backend: |
| | current_device = torch.device('musa', torch.musa.current_device()) |
| | input_tensor = input_tensor.to(current_device) |
| | local_size = local_size.to(current_device) |
| | |
| | |
| | group_size = get_world_size(group=group) |
| | object_sizes_tensor = torch.zeros( |
| | group_size, dtype=torch.long, device=current_device) |
| | object_size_list = [ |
| | object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) |
| | ] |
| | |
| | |
| | |
| | torch_dist.all_gather(object_size_list, local_size, group=group) |
| | max_object_size = int(max(object_size_list).item()) |
| | |
| | input_tensor.resize_(max_object_size) |
| | |
| | |
| | if my_rank == dst: |
| | coalesced_output_tensor = torch.empty( |
| | max_object_size * group_size, |
| | dtype=torch.uint8, |
| | device=current_device) |
| | |
| | output_tensors = [ |
| | coalesced_output_tensor[max_object_size * i:max_object_size * |
| | (i + 1)] for i in range(group_size) |
| | ] |
| | |
| | torch_dist.gather( |
| | input_tensor, |
| | gather_list=output_tensors if my_rank == dst else None, |
| | dst=dst, |
| | group=group, |
| | ) |
| | if my_rank != dst: |
| | return |
| | for i, tensor in enumerate(output_tensors): |
| | tensor = tensor.type(torch.uint8) |
| | tensor_size = object_size_list[i] |
| | object_gather_list[i] = _tensor_to_object(tensor, tensor_size) |
| |
|
| |
|
| | def gather_object(data: Any, |
| | dst: int = 0, |
| | group: Optional[ProcessGroup] = None) -> Optional[List[Any]]: |
| | """Gathers picklable objects from the whole group in a single process. |
| | Similar to :func:`gather`, but Python objects can be passed in. Note that |
| | the object must be picklable in order to be gathered. |
| | |
| | Note: |
| | ``NCCL backend`` does not support ``gather_object``. |
| | |
| | Note: |
| | Unlike PyTorch ``torch.distributed.gather_object``, |
| | :meth:`gather_object` in MMEngine does not pass in an empty list |
| | ``gather_list`` and returns the ``gather_list`` directly, which is |
| | more convenient. The difference between their interfaces is as below: |
| | |
| | - MMEngine: gather_object(data, dst, group) -> gather_list |
| | - PyTorch: gather_object(data, gather_list, data, group) -> None |
| | |
| | Args: |
| | data (Any): Input object. Must be picklable. |
| | dst (int): Destination rank. Defaults to 0. |
| | group: (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Returns: |
| | list[Any]. On the ``dst`` rank, return ``gather_list`` which contains |
| | the output of the collective. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = ['foo', 12, {1: 2}] # any picklable object |
| | >>> gather_objects = dist.gather_object(data[dist.get_rank()]) |
| | >>> output |
| | ['foo'] |
| | |
| | >>> # distributed environment |
| | >>> # We have 3 process groups, 3 ranks. |
| | >>> dist.gather_object(gather_objects[dist.get_rank()], dst=0) |
| | >>> output |
| | ['foo', 12, {1: 2}] # Rank 0 |
| | None # Rank 1 |
| | None # Rank 2 |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size == 1: |
| | return [data] |
| |
|
| | if group is None: |
| | group = get_default_group() |
| |
|
| | gather_list = [None] * world_size if get_rank(group) == dst else None |
| |
|
| | if digit_version(TORCH_VERSION) >= digit_version('1.8.0'): |
| | torch_dist.gather_object(data, gather_list, dst, group) |
| | else: |
| | _gather_object(data, gather_list, dst, group) |
| |
|
| | return gather_list |
| |
|
| |
|
| | def collect_results(results: list, |
| | size: int, |
| | device: str = 'cpu', |
| | tmpdir: Optional[str] = None) -> Optional[list]: |
| | """Collected results in distributed environments. |
| | |
| | Args: |
| | results (list[object]): Result list containing result parts to be |
| | collected. Each item of ``result_part`` should be a picklable |
| | object. |
| | size (int): Size of the results, commonly equal to length of |
| | the results. |
| | device (str): Device name. Optional values are 'cpu', 'gpu' or 'npu'. |
| | tmpdir (str | None): Temporal directory for collected results to |
| | store. If set to None, it will create a temporal directory for it. |
| | ``tmpdir`` should be None when device is 'gpu' or 'npu'. |
| | Defaults to None. |
| | |
| | Returns: |
| | list or None: The collected results. |
| | |
| | Examples: |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> import mmengine.dist as dist |
| | >>> if dist.get_rank() == 0: |
| | data = ['foo', {1: 2}] |
| | else: |
| | data = [24, {'a': 'b'}] |
| | >>> size = 4 |
| | >>> output = dist.collect_results(data, size, device='cpu') |
| | >>> output |
| | ['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 |
| | None # rank 1 |
| | """ |
| | if device not in ['gpu', 'cpu', 'npu']: |
| | raise NotImplementedError( |
| | f"device must be 'cpu' , 'gpu' or 'npu', but got {device}") |
| |
|
| | if device == 'gpu' or device == 'npu': |
| | assert tmpdir is None, f'tmpdir should be None when device is {device}' |
| | return _collect_results_device(results, size) |
| | else: |
| | return collect_results_cpu(results, size, tmpdir) |
| |
|
| |
|
| | def collect_results_cpu(result_part: list, |
| | size: int, |
| | tmpdir: Optional[str] = None) -> Optional[list]: |
| | """Collect results under cpu mode. |
| | |
| | On cpu mode, this function will save the results on different gpus to |
| | ``tmpdir`` and collect them by the rank 0 worker. |
| | |
| | Args: |
| | result_part (list): Result list containing result parts |
| | to be collected. Each item of ``result_part`` should be a picklable |
| | object. |
| | size (int): Size of the results, commonly equal to length of |
| | the results. |
| | tmpdir (str | None): Temporal directory for collected results to |
| | store. If set to None, it will create a random temporal directory |
| | for it. Defaults to None. |
| | |
| | Returns: |
| | list or None: The collected results. |
| | |
| | Examples: |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> import mmengine.dist as dist |
| | >>> if dist.get_rank() == 0: |
| | data = ['foo', {1: 2}] |
| | else: |
| | data = [24, {'a': 'b'}] |
| | >>> size = 4 |
| | >>> output = dist.collect_results_cpu(data, size) |
| | >>> output |
| | ['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 |
| | None # rank 1 |
| | """ |
| | rank, world_size = get_dist_info() |
| | if world_size == 1: |
| | return result_part[:size] |
| |
|
| | |
| | if tmpdir is None: |
| | MAX_LEN = 512 |
| | |
| | dir_tensor = torch.full((MAX_LEN, ), 32, dtype=torch.uint8) |
| | if rank == 0: |
| | mmengine.mkdir_or_exist('.dist_test') |
| | tmpdir = tempfile.mkdtemp(dir='.dist_test') |
| | tmpdir = torch.tensor( |
| | bytearray(tmpdir.encode()), dtype=torch.uint8) |
| | dir_tensor[:len(tmpdir)] = tmpdir |
| | broadcast(dir_tensor, 0) |
| | tmpdir = dir_tensor.numpy().tobytes().decode().rstrip() |
| | else: |
| | mmengine.mkdir_or_exist(tmpdir) |
| |
|
| | |
| | with open(osp.join(tmpdir, f'part_{rank}.pkl'), 'wb') as f: |
| | pickle.dump(result_part, f, protocol=2) |
| |
|
| | barrier() |
| |
|
| | |
| | if rank != 0: |
| | return None |
| | else: |
| | |
| | part_list = [] |
| | for i in range(world_size): |
| | path = osp.join(tmpdir, f'part_{i}.pkl') |
| | if not osp.exists(path): |
| | raise FileNotFoundError( |
| | f'{tmpdir} is not an shared directory for ' |
| | f'rank {i}, please make sure {tmpdir} is a shared ' |
| | 'directory for all ranks!') |
| | with open(path, 'rb') as f: |
| | part_list.append(pickle.load(f)) |
| | |
| | ordered_results = [] |
| | zipped_results = zip_longest(*part_list) |
| | ordered_results = [ |
| | i for i in chain.from_iterable(zipped_results) if i is not None |
| | ] |
| | |
| | ordered_results = ordered_results[:size] |
| | |
| | shutil.rmtree(tmpdir) |
| | return ordered_results |
| |
|
| |
|
| | def _collect_results_device(result_part: list, size: int) -> Optional[list]: |
| | """Collect results under gpu or npu mode.""" |
| | rank, world_size = get_dist_info() |
| | if world_size == 1: |
| | return result_part[:size] |
| |
|
| | |
| | |
| | part_list = all_gather_object(result_part) |
| |
|
| | if rank == 0: |
| | |
| | ordered_results = [] |
| | zipped_results = zip_longest(*part_list) |
| | ordered_results = [ |
| | i for i in chain.from_iterable(zipped_results) if i is not None |
| | ] |
| | |
| | ordered_results = ordered_results[:size] |
| | return ordered_results |
| | else: |
| | return None |
| |
|
| |
|
| | def collect_results_gpu(result_part: list, size: int) -> Optional[list]: |
| | """Collect results under gpu mode. |
| | |
| | On gpu mode, this function will encode results to gpu tensors and use gpu |
| | communication for results collection. |
| | |
| | Args: |
| | result_part (list[object]): Result list containing result parts |
| | to be collected. Each item of ``result_part`` should be a picklable |
| | object. |
| | size (int): Size of the results, commonly equal to length of |
| | the results. |
| | |
| | Returns: |
| | list or None: The collected results. |
| | |
| | Examples: |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> import mmengine.dist as dist |
| | >>> if dist.get_rank() == 0: |
| | data = ['foo', {1: 2}] |
| | else: |
| | data = [24, {'a': 'b'}] |
| | >>> size = 4 |
| | >>> output = dist.collect_results_gpu(data, size) |
| | >>> output |
| | ['foo', 24, {1: 2}, {'a': 'b'}] # rank 0 |
| | None # rank 1 |
| | """ |
| | return _collect_results_device(result_part, size) |
| |
|
| |
|
| | def _all_reduce_coalesced(tensors: List[torch.Tensor], |
| | bucket_size_mb: int = -1, |
| | op: str = 'sum', |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """All-reduce a sequence of tensors as a whole. |
| | |
| | Args: |
| | tensors (List[torch.Tensor]): A sequence of tensors to be |
| | all-reduced. |
| | bucket_size_mb (int): The limit of each chunk in megabytes |
| | for grouping tensors into chunks. Defaults to -1. |
| | op (str): Operation to reduce data. Defaults to 'sum'. Optional values |
| | are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and |
| | 'bxor'. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | """ |
| | if bucket_size_mb > 0: |
| | bucket_size_bytes = bucket_size_mb * 1024 * 1024 |
| | buckets = _take_tensors(tensors, bucket_size_bytes) |
| | else: |
| | buckets = OrderedDict() |
| | for tensor in tensors: |
| | tp = tensor.type() |
| | if tp not in buckets: |
| | buckets[tp] = [] |
| | buckets[tp].append(tensor) |
| | buckets = buckets.values() |
| |
|
| | for bucket in buckets: |
| | flat_tensors = _flatten_dense_tensors(bucket) |
| | all_reduce(flat_tensors, op=op, group=group) |
| | for tensor, synced in zip( |
| | bucket, _unflatten_dense_tensors(flat_tensors, bucket)): |
| | tensor.copy_(synced) |
| |
|
| |
|
| | def all_reduce_params(params: Union[List, Generator[torch.Tensor, None, None]], |
| | coalesce: bool = True, |
| | bucket_size_mb: int = -1, |
| | op: str = 'sum', |
| | group: Optional[ProcessGroup] = None) -> None: |
| | """All-reduce parameters. |
| | |
| | Args: |
| | params (List or Generator[torch.Tensor, None, None]): List of |
| | parameters or buffers of a model. |
| | coalesce (bool, optional): Whether to reduce parameters as a whole. |
| | Defaults to True. |
| | bucket_size_mb (int, optional): Size of bucket, the unit is MB. |
| | Defaults to -1. |
| | op (str): Operation to reduce data. Defaults to 'sum'. Optional values |
| | are 'sum', 'mean' and 'produce', 'min', 'max', 'band', 'bor' and |
| | 'bxor'. |
| | group (ProcessGroup, optional): The process group to work on. If None, |
| | the default process group will be used. Defaults to None. |
| | |
| | Examples: |
| | >>> import torch |
| | >>> import mmengine.dist as dist |
| | |
| | >>> # non-distributed environment |
| | >>> data = [torch.arange(2), torch.arange(3)] |
| | >>> dist.all_reduce_params(data) |
| | >>> data |
| | [tensor([0, 1]), tensor([0, 1, 2])] |
| | |
| | >>> # distributed environment |
| | >>> # We have 2 process groups, 2 ranks. |
| | >>> if dist.get_rank() == 0: |
| | ... data = [torch.tensor([1, 2]), torch.tensor([3, 4])] |
| | ... else: |
| | ... data = [torch.tensor([2, 3]), torch.tensor([4, 5])] |
| | |
| | >>> dist.all_reduce_params(data) |
| | >>> data |
| | [torch.tensor([3, 5]), torch.tensor([7, 9])] |
| | """ |
| | world_size = get_world_size(group) |
| | if world_size == 1: |
| | return |
| | params_data = [param.data for param in params] |
| | if coalesce: |
| | _all_reduce_coalesced(params_data, bucket_size_mb, op=op, group=group) |
| | else: |
| | for tensor in params_data: |
| | all_reduce(tensor, op=op, group=group) |
| |
|