File size: 3,476 Bytes
0a937d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from functools import wraps
from typing import Callable, Union, Tuple, Any

import torch
from torch import Tensor
from torch import distributed as dist

from .context_managers import RandContext


def cached(func: Callable[..., Tensor]):
    """
    A decorator that takes a pytorch call function into a cached compatible version.
    :param func: A function that calls the pytorch and return representation tensor.
    :return: A function that returns 1) representation leaf tensors for cache construction, 2) a closure function for
    the 2nd forward and the cached backward. Call 2) with 1) as argument after calling backward on the loss Tensor.
    """
    @wraps(func)
    def cache_func(*args, **kwargs):
        rnd_state = RandContext()
        with torch.no_grad():
            reps_no_grad = func(*args, **kwargs)
        if isinstance(reps_no_grad, Tensor):
            reps_no_grad = (reps_no_grad, )
        else:
            assert all(isinstance(v, Tensor) for v in reps_no_grad)
        leaf_reps = tuple(t.detach().requires_grad_() for t in reps_no_grad)

        @wraps(func)
        def forward_backward_func(cache_reps: Union[Tensor, Tuple[Tensor]]):
            with rnd_state:
                reps = func(*args, **kwargs)
            if isinstance(reps, Tensor):
                reps = (reps,)
            if isinstance(cache_reps, Tensor):
                cache_reps = (cache_reps,)
            assert len(reps) == len(cache_reps)

            surrogate = sum(map(lambda u, v: torch.dot(u.flatten(), v.grad.flatten()), reps, cache_reps), 0)
            surrogate.backward()

        return leaf_reps + (forward_backward_func,)
    return cache_func


def _cat_tensor_list(xx):
    if isinstance(xx, list) and len(xx) > 0 and all(isinstance(x, Tensor) for x in xx):
        return torch.cat(xx)
    else:
        return xx


def cat_input_tensor(func: Callable[..., Tensor]):
    """
    A decorator that concatenates positional and keyword arguments of type List[Tensor] into a single Tensor
    on the 0 dimension. This can come in handy dealing with results of representation tensors from multiple
    cached forward.
    :param func: A loss function
    :return: Decorated loss function for cached results.
    """
    @wraps(func)
    def cat_f(*args, **kwargs):
        args_cat = [_cat_tensor_list(x) for x in args]
        kwargs_cat = dict((k, _cat_tensor_list(v)) for k, v in kwargs.values())
        return func(*args_cat, **kwargs_cat)
    return cat_f


def _maybe_gather_tensor(t: Any, axis: int):
    if not isinstance(t, Tensor):
        return t
    gathered = [torch.empty_like(t) for _ in range(dist.get_world_size())]
    dist.all_gather(gathered, t)
    gathered[dist.get_rank()] = t
    return torch.cat(gathered, dim=axis)


def gather_input_tensor(func: Callable[..., Tensor], axis=0):
    """
    A decorator that all-gather positional and keyword arguments of type Tensor and concatenate them on axis.
    Intended to be used with distributed contrastive learning loss.
    :param func: A loss function
    :param axis: The axis the gathered tensors are concatenated.
    :return: Decorated loss function for distributed training.
    """
    @wraps(func)
    def f(*args, **kwargs):
        args_gathered = [_maybe_gather_tensor(x, axis=axis) for x in args]
        kwargs_gathered = dict((k, _maybe_gather_tensor(v, axis=axis)) for k, v in kwargs.values())
        return func(*args_gathered, **kwargs_gathered)
    return f