Module infinibatch.datasets
Expand source code
from .iterators import create_source_iterator, CheckpointableIterator, SelectManyIterator, PrefetchIterator, BufferedShuffleIterator, BlockwiseShuffleIterator, MapIterator
from typing import List, Union, Iterable, Iterator, Callable, Any, Optional, Dict
import os, sys
"""
This module contains common datasets, which are implemented as convenience functions that compose underlying Infinibatch iterators.
"""
def bump_seed(seed: Optional[int], step = 1):
"""
Helper to bump a random seed if not None.
"""
return None if seed is None else seed + 1
def chunked_dataset_iterator(chunk_refs: List, read_chunk_fn: Callable[[Any], Iterator], buffer_size: int,
train: bool=True,
seed: Optional[int]=None, shuffle: bool=True, use_windowed: bool=False,
transform: Callable[[Any],Any]=None,
prefetch: bool=False,
num_instances: int=1, instance_rank: int=0) -> CheckpointableIterator:
"""
Dataset reading data from gzipped chunks.
If train=True, this chunks are strided assigned to instances in strides and the data is infinitely repeated in permutations.
Otherwise, the chunks are split among the instances in consecutive blocks and the data is not repeated.
This way, when using this dataset for inference on multiple GPUs, to order the outputs in a way that corresponds
to the original order of the data items in the dataset, one simply has to collect the lists of outputs from each GPU
and then concatenate these lists in order of increasing rank.
When using MPI, this can be achieved by a gather-operation to get a list of lists of outputs, one list per GPU,
followed by flattening the lists back into a single list.
Args:
chunk_refs: references (such as path names) to chunk files
read_chunk_fn: function(chunk_ref) -> Iterator to read a chunk's content into an iterator over its items, e.g. read a file and split into text lines
train: see above
shuffle: if true, the data is shuffled. If train is False then shuffle must be False as well.
buffer_size: size of the buffer in number of samples / data items used for shuffling (default: 2**20)
transform: transform to be applied to each data item (transform(Any) -> Any)
prefetch: if True, insert a prefetch iterator with buffer_size
seed: random seed (or None)
num_instances: number of instances of this dataset. Meant for use with multi-process data loading, e.g., in distributed training.
instance_rank: rank of this instance of the dataset. Meant for use with multi-process data loading, e.g., in distributed training.
use_windowed: temporary option to switch back to the WindowedShuffleIterator (default False). Will go away once shown that we don't need it anymore.
"""
if not train and shuffle:
raise ValueError('shuffling is not supported when train=False')
# set up the chunk reader
chunks = create_source_iterator(chunk_refs, train=train, seed=seed, shuffle=shuffle, num_instances=num_instances, instance_rank=instance_rank)
# set up the item reader
samples = SelectManyIterator(source_iterator=chunks, collection_selector=read_chunk_fn) # type: CheckpointableIterator
# wrap the I/O operation in a prefetch iterator
if prefetch:
samples = PrefetchIterator(samples, buffer_size)
# set up the item randomizer
if shuffle:
if use_windowed:
samples = BufferedShuffleIterator(samples, buffer_size, bump_seed(seed, 1))
else:
samples = BlockwiseShuffleIterator(samples, buffer_size, bump_seed(seed, 1))
# apply transform, if given
if transform is not None:
samples = MapIterator(samples, transform)
# this is what we are serving out
return samples
Functions
def bump_seed(seed: Union[int, NoneType], step=1)
-
Helper to bump a random seed if not None.
Expand source code
def bump_seed(seed: Optional[int], step = 1): """ Helper to bump a random seed if not None. """ return None if seed is None else seed + 1
def chunked_dataset_iterator(chunk_refs: List, read_chunk_fn: Callable[[Any], Iterator], buffer_size: int, train: bool = True, seed: Union[int, NoneType] = None, shuffle: bool = True, use_windowed: bool = False, transform: Callable[[Any], Any] = None, prefetch: bool = False, num_instances: int = 1, instance_rank: int = 0) ‑> CheckpointableIterator
-
Dataset reading data from gzipped chunks.
If train=True, this chunks are strided assigned to instances in strides and the data is infinitely repeated in permutations. Otherwise, the chunks are split among the instances in consecutive blocks and the data is not repeated. This way, when using this dataset for inference on multiple GPUs, to order the outputs in a way that corresponds to the original order of the data items in the dataset, one simply has to collect the lists of outputs from each GPU and then concatenate these lists in order of increasing rank. When using MPI, this can be achieved by a gather-operation to get a list of lists of outputs, one list per GPU, followed by flattening the lists back into a single list.
Args
chunk_refs
- references (such as path names) to chunk files
read_chunk_fn
- function(chunk_ref) -> Iterator to read a chunk's content into an iterator over its items, e.g. read a file and split into text lines
train
- see above
shuffle
- if true, the data is shuffled. If train is False then shuffle must be False as well.
buffer_size
- size of the buffer in number of samples / data items used for shuffling (default: 2**20)
transform
- transform to be applied to each data item (transform(Any) -> Any)
prefetch
- if True, insert a prefetch iterator with buffer_size
seed
- random seed (or None)
num_instances
- number of instances of this dataset. Meant for use with multi-process data loading, e.g., in distributed training.
instance_rank
- rank of this instance of the dataset. Meant for use with multi-process data loading, e.g., in distributed training.
use_windowed
- temporary option to switch back to the WindowedShuffleIterator (default False). Will go away once shown that we don't need it anymore.
Expand source code
def chunked_dataset_iterator(chunk_refs: List, read_chunk_fn: Callable[[Any], Iterator], buffer_size: int, train: bool=True, seed: Optional[int]=None, shuffle: bool=True, use_windowed: bool=False, transform: Callable[[Any],Any]=None, prefetch: bool=False, num_instances: int=1, instance_rank: int=0) -> CheckpointableIterator: """ Dataset reading data from gzipped chunks. If train=True, this chunks are strided assigned to instances in strides and the data is infinitely repeated in permutations. Otherwise, the chunks are split among the instances in consecutive blocks and the data is not repeated. This way, when using this dataset for inference on multiple GPUs, to order the outputs in a way that corresponds to the original order of the data items in the dataset, one simply has to collect the lists of outputs from each GPU and then concatenate these lists in order of increasing rank. When using MPI, this can be achieved by a gather-operation to get a list of lists of outputs, one list per GPU, followed by flattening the lists back into a single list. Args: chunk_refs: references (such as path names) to chunk files read_chunk_fn: function(chunk_ref) -> Iterator to read a chunk's content into an iterator over its items, e.g. read a file and split into text lines train: see above shuffle: if true, the data is shuffled. If train is False then shuffle must be False as well. buffer_size: size of the buffer in number of samples / data items used for shuffling (default: 2**20) transform: transform to be applied to each data item (transform(Any) -> Any) prefetch: if True, insert a prefetch iterator with buffer_size seed: random seed (or None) num_instances: number of instances of this dataset. Meant for use with multi-process data loading, e.g., in distributed training. instance_rank: rank of this instance of the dataset. Meant for use with multi-process data loading, e.g., in distributed training. use_windowed: temporary option to switch back to the WindowedShuffleIterator (default False). Will go away once shown that we don't need it anymore. """ if not train and shuffle: raise ValueError('shuffling is not supported when train=False') # set up the chunk reader chunks = create_source_iterator(chunk_refs, train=train, seed=seed, shuffle=shuffle, num_instances=num_instances, instance_rank=instance_rank) # set up the item reader samples = SelectManyIterator(source_iterator=chunks, collection_selector=read_chunk_fn) # type: CheckpointableIterator # wrap the I/O operation in a prefetch iterator if prefetch: samples = PrefetchIterator(samples, buffer_size) # set up the item randomizer if shuffle: if use_windowed: samples = BufferedShuffleIterator(samples, buffer_size, bump_seed(seed, 1)) else: samples = BlockwiseShuffleIterator(samples, buffer_size, bump_seed(seed, 1)) # apply transform, if given if transform is not None: samples = MapIterator(samples, transform) # this is what we are serving out return samples