| from functools import wraps |
|
|
|
|
| class Callback: |
| """ |
| Base class and interface for callback mechanism |
| |
| This class can be used directly for monitoring file transfers by |
| providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument, |
| below), or subclassed for more specialised behaviour. |
| |
| Parameters |
| ---------- |
| size: int (optional) |
| Nominal quantity for the value that corresponds to a complete |
| transfer, e.g., total number of tiles or total number of |
| bytes |
| value: int (0) |
| Starting internal counter value |
| hooks: dict or None |
| A dict of named functions to be called on each update. The signature |
| of these must be ``f(size, value, **kwargs)`` |
| """ |
|
|
| def __init__(self, size=None, value=0, hooks=None, **kwargs): |
| self.size = size |
| self.value = value |
| self.hooks = hooks or {} |
| self.kw = kwargs |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, *exc_args): |
| self.close() |
|
|
| def close(self): |
| """Close callback.""" |
|
|
| def branched(self, path_1, path_2, **kwargs): |
| """ |
| Return callback for child transfers |
| |
| If this callback is operating at a higher level, e.g., put, which may |
| trigger transfers that can also be monitored. The function returns a callback |
| that has to be passed to the child method, e.g., put_file, |
| as `callback=` argument. |
| |
| The implementation uses `callback.branch` for compatibility. |
| When implementing callbacks, it is recommended to override this function instead |
| of `branch` and avoid calling `super().branched(...)`. |
| |
| Prefer using this function over `branch`. |
| |
| Parameters |
| ---------- |
| path_1: str |
| Child's source path |
| path_2: str |
| Child's destination path |
| **kwargs: |
| Arbitrary keyword arguments |
| |
| Returns |
| ------- |
| callback: Callback |
| A callback instance to be passed to the child method |
| """ |
| self.branch(path_1, path_2, kwargs) |
| |
| return kwargs.pop("callback", DEFAULT_CALLBACK) |
|
|
| def branch_coro(self, fn): |
| """ |
| Wraps a coroutine, and pass a new child callback to it. |
| """ |
|
|
| @wraps(fn) |
| async def func(path1, path2: str, **kwargs): |
| with self.branched(path1, path2, **kwargs) as child: |
| return await fn(path1, path2, callback=child, **kwargs) |
|
|
| return func |
|
|
| def set_size(self, size): |
| """ |
| Set the internal maximum size attribute |
| |
| Usually called if not initially set at instantiation. Note that this |
| triggers a ``call()``. |
| |
| Parameters |
| ---------- |
| size: int |
| """ |
| self.size = size |
| self.call() |
|
|
| def absolute_update(self, value): |
| """ |
| Set the internal value state |
| |
| Triggers ``call()`` |
| |
| Parameters |
| ---------- |
| value: int |
| """ |
| self.value = value |
| self.call() |
|
|
| def relative_update(self, inc=1): |
| """ |
| Delta increment the internal counter |
| |
| Triggers ``call()`` |
| |
| Parameters |
| ---------- |
| inc: int |
| """ |
| self.value += inc |
| self.call() |
|
|
| def call(self, hook_name=None, **kwargs): |
| """ |
| Execute hook(s) with current state |
| |
| Each function is passed the internal size and current value |
| |
| Parameters |
| ---------- |
| hook_name: str or None |
| If given, execute on this hook |
| kwargs: passed on to (all) hook(s) |
| """ |
| if not self.hooks: |
| return |
| kw = self.kw.copy() |
| kw.update(kwargs) |
| if hook_name: |
| if hook_name not in self.hooks: |
| return |
| return self.hooks[hook_name](self.size, self.value, **kw) |
| for hook in self.hooks.values() or []: |
| hook(self.size, self.value, **kw) |
|
|
| def wrap(self, iterable): |
| """ |
| Wrap an iterable to call ``relative_update`` on each iterations |
| |
| Parameters |
| ---------- |
| iterable: Iterable |
| The iterable that is being wrapped |
| """ |
| for item in iterable: |
| self.relative_update() |
| yield item |
|
|
| def branch(self, path_1, path_2, kwargs): |
| """ |
| Set callbacks for child transfers |
| |
| If this callback is operating at a higher level, e.g., put, which may |
| trigger transfers that can also be monitored. The passed kwargs are |
| to be *mutated* to add ``callback=``, if this class supports branching |
| to children. |
| |
| Parameters |
| ---------- |
| path_1: str |
| Child's source path |
| path_2: str |
| Child's destination path |
| kwargs: dict |
| arguments passed to child method, e.g., put_file. |
| |
| Returns |
| ------- |
| |
| """ |
| return None |
|
|
| def no_op(self, *_, **__): |
| pass |
|
|
| def __getattr__(self, item): |
| """ |
| If undefined methods are called on this class, nothing happens |
| """ |
| return self.no_op |
|
|
| @classmethod |
| def as_callback(cls, maybe_callback=None): |
| """Transform callback=... into Callback instance |
| |
| For the special value of ``None``, return the global instance of |
| ``NoOpCallback``. This is an alternative to including |
| ``callback=DEFAULT_CALLBACK`` directly in a method signature. |
| """ |
| if maybe_callback is None: |
| return DEFAULT_CALLBACK |
| return maybe_callback |
|
|
|
|
| class NoOpCallback(Callback): |
| """ |
| This implementation of Callback does exactly nothing |
| """ |
|
|
| def call(self, *args, **kwargs): |
| return None |
|
|
|
|
| class DotPrinterCallback(Callback): |
| """ |
| Simple example Callback implementation |
| |
| Almost identical to Callback with a hook that prints a char; here we |
| demonstrate how the outer layer may print "#" and the inner layer "." |
| """ |
|
|
| def __init__(self, chr_to_print="#", **kwargs): |
| self.chr = chr_to_print |
| super().__init__(**kwargs) |
|
|
| def branch(self, path_1, path_2, kwargs): |
| """Mutate kwargs to add new instance with different print char""" |
| kwargs["callback"] = DotPrinterCallback(".") |
|
|
| def call(self, **kwargs): |
| """Just outputs a character""" |
| print(self.chr, end="") |
|
|
|
|
| class TqdmCallback(Callback): |
| """ |
| A callback to display a progress bar using tqdm |
| |
| Parameters |
| ---------- |
| tqdm_kwargs : dict, (optional) |
| Any argument accepted by the tqdm constructor. |
| See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_. |
| Will be forwarded to `tqdm_cls`. |
| tqdm_cls: (optional) |
| subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`. |
| |
| Examples |
| -------- |
| >>> import fsspec |
| >>> from fsspec.callbacks import TqdmCallback |
| >>> fs = fsspec.filesystem("memory") |
| >>> path2distant_data = "/your-path" |
| >>> fs.upload( |
| ".", |
| path2distant_data, |
| recursive=True, |
| callback=TqdmCallback(), |
| ) |
| |
| You can forward args to tqdm using the ``tqdm_kwargs`` parameter. |
| |
| >>> fs.upload( |
| ".", |
| path2distant_data, |
| recursive=True, |
| callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}), |
| ) |
| |
| You can also customize the progress bar by passing a subclass of `tqdm`. |
| |
| .. code-block:: python |
| |
| class TqdmFormat(tqdm): |
| '''Provides a `total_time` format parameter''' |
| @property |
| def format_dict(self): |
| d = super().format_dict |
| total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) |
| d.update(total_time=self.format_interval(total_time) + " in total") |
| return d |
| |
| >>> with TqdmCallback( |
| tqdm_kwargs={ |
| "desc": "desc", |
| "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}", |
| }, |
| tqdm_cls=TqdmFormat, |
| ) as callback: |
| fs.upload(".", path2distant_data, recursive=True, callback=callback) |
| """ |
|
|
| def __init__(self, tqdm_kwargs=None, *args, **kwargs): |
| try: |
| from tqdm import tqdm |
|
|
| except ImportError as exce: |
| raise ImportError( |
| "Using TqdmCallback requires tqdm to be installed" |
| ) from exce |
|
|
| self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm) |
| self._tqdm_kwargs = tqdm_kwargs or {} |
| self.tqdm = None |
| super().__init__(*args, **kwargs) |
|
|
| def call(self, *args, **kwargs): |
| if self.tqdm is None: |
| self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs) |
| self.tqdm.total = self.size |
| self.tqdm.update(self.value - self.tqdm.n) |
|
|
| def close(self): |
| if self.tqdm is not None: |
| self.tqdm.close() |
| self.tqdm = None |
|
|
| def __del__(self): |
| return self.close() |
|
|
|
|
| DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback() |
|
|