| from typing import Any, Dict, List, Tuple |
|
|
| import torch |
|
|
|
|
| class ResolutionSampler: |
| def __init__(self, batch_size: int = 1, dim_keys: Dict[str, Tuple[int, ...]] = None) -> None: |
| self.batch_size = batch_size |
| self.dim_keys = dim_keys |
| assert dim_keys is not None, "dim_keys must be provided" |
|
|
| self._chosen_leader_key = None |
| self._unsatisfied_buckets: Dict[Tuple[int, ...], List[Dict[Any, Any]]] = {} |
| self._satisfied_buckets: List[Dict[Any, Any]] = [] |
|
|
| def consume(self, *dict_items: Dict[Any, Any]) -> None: |
| if self._chosen_leader_key is None: |
| self._determine_leader_item(*dict_items) |
| self._update_buckets(*dict_items) |
|
|
| def get_batch(self) -> List[Dict[str, Any]]: |
| return list(zip(*self._satisfied_buckets.pop(-1))) |
|
|
| @property |
| def is_ready(self) -> bool: |
| return len(self._satisfied_buckets) > 0 |
|
|
| def _determine_leader_item(self, *dict_items: Dict[Any, Any]) -> None: |
| num_observed = 0 |
| for dict_item in dict_items: |
| for key in self.dim_keys.keys(): |
| if key in dict_item.keys(): |
| self._chosen_leader_key = key |
| if not torch.is_tensor(dict_item[key]): |
| raise ValueError(f"Leader key {key} must be a tensor") |
| num_observed += 1 |
| if num_observed > 1: |
| raise ValueError( |
| f"Only one leader key is allowed in provided list of data dictionaries. Found {num_observed} leader keys" |
| ) |
| if self._chosen_leader_key is None: |
| raise ValueError("No leader key found in provided list of data dictionaries") |
|
|
| def _update_buckets(self, *dict_items: Dict[Any, Any]) -> None: |
| chosen_value = [ |
| dict_item[self._chosen_leader_key] |
| for dict_item in dict_items |
| if self._chosen_leader_key in dict_item.keys() |
| ] |
| if len(chosen_value) == 0: |
| raise ValueError(f"Leader key {self._chosen_leader_key} not found in provided list of data dictionaries") |
| chosen_value = chosen_value[0] |
| dims = tuple(chosen_value.size(x) for x in self.dim_keys[self._chosen_leader_key]) |
| if dims not in self._unsatisfied_buckets: |
| self._unsatisfied_buckets[dims] = [] |
| self._unsatisfied_buckets[dims].append(dict_items) |
| if len(self._unsatisfied_buckets[dims]) == self.batch_size: |
| self._satisfied_buckets.append(self._unsatisfied_buckets.pop(dims)) |
|
|