| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import queue
|
| | import collections
|
| | import threading
|
| |
|
| | __all__ = ['FutureResult', 'SlavePipe', 'SyncMaster']
|
| |
|
| |
|
| | class FutureResult(object):
|
| | """A thread-safe future implementation. Used only as one-to-one pipe."""
|
| |
|
| | def __init__(self):
|
| | self._result = None
|
| | self._lock = threading.Lock()
|
| | self._cond = threading.Condition(self._lock)
|
| |
|
| | def put(self, result):
|
| | with self._lock:
|
| | assert self._result is None, 'Previous result has\'t been fetched.'
|
| | self._result = result
|
| | self._cond.notify()
|
| |
|
| | def get(self):
|
| | with self._lock:
|
| | if self._result is None:
|
| | self._cond.wait()
|
| |
|
| | res = self._result
|
| | self._result = None
|
| | return res
|
| |
|
| |
|
| | _MasterRegistry = collections.namedtuple('MasterRegistry', ['result'])
|
| | _SlavePipeBase = collections.namedtuple('_SlavePipeBase', ['identifier', 'queue', 'result'])
|
| |
|
| |
|
| | class SlavePipe(_SlavePipeBase):
|
| | """Pipe for master-slave communication."""
|
| |
|
| | def run_slave(self, msg):
|
| | self.queue.put((self.identifier, msg))
|
| | ret = self.result.get()
|
| | self.queue.put(True)
|
| | return ret
|
| |
|
| |
|
| | class SyncMaster(object):
|
| | """An abstract `SyncMaster` object.
|
| |
|
| | - During the replication, as the data parallel will trigger an callback of each module, all slave devices should
|
| | call `register(id)` and obtain an `SlavePipe` to communicate with the master.
|
| | - During the forward pass, master device invokes `run_master`, all messages from slave devices will be collected,
|
| | and passed to a registered callback.
|
| | - After receiving the messages, the master device should gather the information and determine to message passed
|
| | back to each slave devices.
|
| | """
|
| |
|
| | def __init__(self, master_callback):
|
| | """
|
| |
|
| | Args:
|
| | master_callback: a callback to be invoked after having collected messages from slave devices.
|
| | """
|
| | self._master_callback = master_callback
|
| | self._queue = queue.Queue()
|
| | self._registry = collections.OrderedDict()
|
| | self._activated = False
|
| |
|
| | def __getstate__(self):
|
| | return {'master_callback': self._master_callback}
|
| |
|
| | def __setstate__(self, state):
|
| | self.__init__(state['master_callback'])
|
| |
|
| | def register_slave(self, identifier):
|
| | """
|
| | Register an slave device.
|
| |
|
| | Args:
|
| | identifier: an identifier, usually is the device id.
|
| |
|
| | Returns: a `SlavePipe` object which can be used to communicate with the master device.
|
| |
|
| | """
|
| | if self._activated:
|
| | assert self._queue.empty(), 'Queue is not clean before next initialization.'
|
| | self._activated = False
|
| | self._registry.clear()
|
| | future = FutureResult()
|
| | self._registry[identifier] = _MasterRegistry(future)
|
| | return SlavePipe(identifier, self._queue, future)
|
| |
|
| | def run_master(self, master_msg):
|
| | """
|
| | Main entry for the master device in each forward pass.
|
| | The messages were first collected from each devices (including the master device), and then
|
| | an callback will be invoked to compute the message to be sent back to each devices
|
| | (including the master device).
|
| |
|
| | Args:
|
| | master_msg: the message that the master want to send to itself. This will be placed as the first
|
| | message when calling `master_callback`. For detailed usage, see `_SynchronizedBatchNorm` for an example.
|
| |
|
| | Returns: the message to be sent back to the master device.
|
| |
|
| | """
|
| | self._activated = True
|
| |
|
| | intermediates = [(0, master_msg)]
|
| | for i in range(self.nr_slaves):
|
| | intermediates.append(self._queue.get())
|
| |
|
| | results = self._master_callback(intermediates)
|
| | assert results[0][0] == 0, 'The first result should belongs to the master.'
|
| |
|
| | for i, res in results:
|
| | if i == 0:
|
| | continue
|
| | self._registry[i].result.put(res)
|
| |
|
| | for i in range(self.nr_slaves):
|
| | assert self._queue.get() is True
|
| |
|
| | return results[0][1]
|
| |
|
| | @property
|
| | def nr_slaves(self):
|
| | return len(self._registry)
|
| |
|