caikit.core.data_model.streams.data_stream

Data streams for lazily loading, munging and passing data through multiple modules.

Attributes

log

error

T

Classes

DataStream

A data stream is a iterable container class that is reentrant in the sense that it can be

_UtfEncodeIOWrapper

Lil' wrapper class to convert a bytes buffer to a string buffer

Module Contents

caikit.core.data_model.streams.data_stream.log[source]
caikit.core.data_model.streams.data_stream.error
caikit.core.data_model.streams.data_stream.T
class caikit.core.data_model.streams.data_stream.DataStream(generator_func, *args, **kwargs)[source]

Bases: Generic[T]

A data stream is a iterable container class that is reentrant in the sense that it can be iterated over multiple times. The items produced by a data stream may be any python object and are called data items. The data items produced by an iterator over a data stream are generated lazily (unless the .eager method is called) so that each data item in a series of data streams is produced as it is accessed. This allows processing datasets that are too large to fit into memory. A number of functional style methods are provided for manipulating and munging data streams and the .stream method on modules can also be used to process data streams.

The DataStream class is really just a generic wrapper around functions that produce python iterators or generators.

generator_func
classmethod from_iterable(data: Iterable[T]) DataStream[T][source]

Create a new data stream from a python iterable, such as a list or tuple. This data stream produces a single data item for each element of the iterable..

Args:
data (iterable): A list or tuple or other python iterable used to

construct a new data stream where each data item contains a single data item.

Returns:
DataStream: A new data stream that produces data items from the

elements of data.

Examples:
>>> list_stream = DataStream.from_iterable([1, 2, 3])
>>> for data_item in list_stream:
>>>     print(data_item)
1
2
3
classmethod _from_iterable_generator(data: Iterable[T]) Iterator[T][source]
classmethod from_jsonl(filename: str) DataStream[Dict][source]

Creates a new data stream from a path to a file with JSON lines array, where each line is a valid JSON (python dict)

Args:
filename (str): A path to a utf8 encode text file with JSON lines

array, where each line is a valid JSON (python dict)

Returns:
DataStream: A new data stream that produces python dict items each

containing a single JSON object corresponding to each line

Notes:

This class method returns a data stream over the valid JSON objects and each JSON object is on one line.

https://jsonlines.org/

Examples:
For a JSON lines file that looks like:

{“name”: “Gilbert”, “wins”: [[“straight”, “7♣”], [“one pair”, “10♥”]]} {“name”: “Alexa”, “wins”: [[“two pair”, “4♠”], [“two pair”, “9♠”]]} {“name”: “May”, “wins”: []} {“name”: “Deloise”, “wins”: [[“three of a kind”, “5♣”]]}

>>> jsonl_data_stream = DataStream.from_jsonl('sample.jsonl')
>>> for data_item in jsonl_data_stream:
>>>     print(data_item)
{'name': 'Gilbert', 'wins': [['straight', '7♣'], ['one pair', '10♥']]}
{'name': 'Alexa', 'wins': [['two pair', '4♠'], ['two pair', '9♠']]}
{'name': 'May', 'wins': []}
{'name': 'Deloise', 'wins': [['three of a kind', '5♣']]}
classmethod _from_jsonl_generator(filename)[source]
classmethod from_json_array(filename: str) DataStream[Dict][source]

Creates a new data stream from a path to a file with JSON array, where each item is a valid JSON (python dict)

Args:
filename (str): A path to a utf8 encode text file with JSON array,

where each item is a valid JSON (python dict)

Returns:
DataStream: A new data stream that produces python dict items each

containing a single JSON object specified by ‘filename’

Notes:

This class method returns a data stream over the valid JSON objects of a single JSON array text file.

Examples:
For a JSON file that looks like:

[ { a: 1, b: 2, c: False }, { a: 2, b: 3 }, { a: 3, c: True } ]

>>> json_data_stream = DataStream.from_json_array('sample.json')
>>> for data_item in json_data_stream:
>>>     print(data_item)
{ a: 1, b: 2, c: False }
{ a: 2, b: 3 }
{ a: 3, c: True }
classmethod _from_json_array_file_generator(filename)[source]
classmethod _from_json_array_buffer_generator(json_fh: IO, filename: str = '')[source]
classmethod from_csv(filename: str, *args, skip=0, **kwargs) DataStream[List][source]

Create a new data stream from a csv (comma separated value) file where each data item corresponds to a line of the csv file and consists of a list containing the comma separated values.

Args:
filename (str): A path to a csv file that has rows corresponding to

data items and columns corresponding to the elements of each data item.

skip (int): Number of lines to skip at the beginning of the csv

file. This is often useful for skipping a header line.

args, kwargs: Additional arguments passed to the csv.reader function.

These can be used to specify the delimiter or other csv settings.

Returns:
DataStream: A data stream that produces a data item for each line of

the csv file and where each element of the data item corresponds to a column in the csv file.Examples:

For a sample.csv that looks like:

a, b, c d, e, f

>>> csv_stream = DataStream.from_csv('sample.csv')
>>> for data_item in csv_stream:
>>>     print(data_item)
['a', 'b', 'c']
['d', 'e', 'f']
classmethod _from_csv_generator(filename, skip, *csv_args, **csv_kwargs)[source]
classmethod from_header_csv(filename: str, *args, **kwargs) DataStream[Dict][source]

Create a new data stream from a csv where the first row is a header and each subsequent row is an element. The yielded elements are tuples of dicts where each dict pairs the row values with the corresponding column headers.

Args:
filename (str): A path to a csv file that has rows corresponding to

data items and columns corresponding to the elements of each data item.

args, kwargs: Additional arguments passed to the csv.reader function.

These can be used to specify the delimiter or other csv settings.

Returns:
DataStream: A data stream that produces a data item for each line of

the csv file and where each element of the stream is a dict representation of the fieldsExamples:

For a sample.csv that looks like:

foo, bar, baz a, b, c d, e, f

>>> csv_stream = DataStream.from_csv('sample.csv')
>>> for data_item in csv_stream:
>>>     print(data_item)
{"foo": "a", "bar": "b", "baz": "c"}
{"foo": "d", "bar": "e", "baz": "f"}
classmethod _from_header_csv_generator(filename, *csv_args, **csv_kwargs)[source]
classmethod _from_header_csv_buffer_generator(fh: IO, *csv_args, **csv_kwargs)[source]
classmethod from_txt(filename: str) DataStream[str][source]

Create a new data stream from a path to a utf8 encoded text file where each data item corresponds to a single line of the file.

Args:
filename (str): A path to a utf8 encode text file with each line

corresponding to a data item.

Returns:
DataStream: A new data stream that produces string data items each

containing a single line from the file specified by filename.

Notes:

This class method returns a data stream over the lines of a single text file. In order to construct a datastream over separate files, rather than lines, consider using .from_txt_collection.

Examples:
For a text file that looks like:

first line second line third line

>>> txt_line_stream = DataStream.from_file('sample.txt')
>>> for data_item in txt_line_stream:
>>>     print(data_item)
first line
second line
third line
classmethod _from_txt_generator(filename)[source]
classmethod from_file(filename: str) DataStream[Dict | Tuple | str][source]

Loads up a DataStream from a file. Will call the correct DataStream.from_* static constructor based on the file extension

The data items returned in the data stream are: For JSON:

dictionaries

For all other files (besides CSV for now)

strings (1 per line)

Args:

filename (str): Name of file

Returns:

DataStream: Resulting datastream from file

classmethod _from_collection(dirname: str, extension: str, file_opener) DataStream[Dict | Tuple | str][source]

Create a new data stream from a path containing multiple files where each data item corresponds to the entire serialized content in a single file. The file_handler function does the serialization of individual files

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension (str): Extension of the file. Note that all files are read

in the same utf8 encoding.

file_opener (function): Function to deserialize a file on disk to

memory

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod _from_collection_generator(dirname, extension, file_opener)[source]
classmethod from_txt_collection(dirname: str, extension='txt') DataStream[str][source]

Create a new data stream from a path containing multiple utf8 encoded text files where each data item corresponds to the entire text contained in a single file.

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension: str (Optional)

Optional extension of the text file. Note that all files are read in the same utf8 encoding. Defaults to ‘txt’

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single .txt (or specified extension) file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod from_json_collection(dirname: str, extension='json') DataStream[Dict | Tuple | List][source]

Create a new data stream from a path containing multiple JSON files where each data item corresponds to the entire serialized JSON contained in a single file.

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension: str (Optional)

Optional extension of the JSON file. Note that all files are read in the same utf8 encoding. Defaults to ‘json’

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single .json (or specified extension) file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod from_csv_collection(dirname: str) DataStream[Dict][source]

Create a new data stream by chaining data streams from each of the file from a path containing multiple csv files where each file can have 1 or more data item.

Args:

dirname (str): A directory path containing a number of csv files

Returns:
DataStream: A new data stream that is chained from all data streams

by reading (from_header_csv) all files in all .csv files found in dirname. All data items are dicts.

classmethod _from_csv_collection_generator(dirname)[source]
classmethod from_jsonl_collection(dirname: str) DataStream[Dict][source]

Create a new data stream by chaining data streams from each of the file from a path containing multiple jsonl files where each file can have 1 or more data item.

Args:

dirname (str): A directory path containing a number of jsonl files

Returns:
DataStream: A new data stream that is chained from all data streams

by reading (from_jsonl) all files in all .jsonl files found in dirname.

classmethod _from_jsonl_collection_generator(dirname)[source]
classmethod from_multipart_file(filename: str) DataStream[JsonDictValue][source]

Loads up a DataStream from a multipart file

The data items returned in the data stream are determined by the content type for each part in the multipart file by calling the correct DataStream.from_*

Args:

filename (str): Name of file

Returns:

DataStream: Resulting datastream from file

train_test_split(test_split=0.25, seed=None) Tuple[DataStream[T], DataStream[T]][source]

Split the current datastream into train/test substreams.

Args:
test_split (float): The fraction of examples to assign to the test

substream, in [0, 1]

seed (int | None): The seed for initializing the random assignment.

If not provided, a randomly chosen seed will be used.

Returns:
tuple(DataStream, DataStream): Two substreams: a train set

substream, and a test set substream

chain() DataStream[source]

Chain multiple data streams together sequentially. The returned data stream produces the data items from each passed data stream in turn.

Args:
args (tuple(DataStream)): A tuple containing the data streams to

chain, passed as variadic arguments.

Returns:
DataStream: A new data stream that produces the data items from the

provided data streams sequentially.

filter(func=lambda data_item: ..., *args, **kwargs) DataStream[T][source]

Skip elements in the data stream as identified by a passed function.

Args:
func (callable(data_item)): The function used to identify data items

that will be filtered. The function takes a single data item as an argument and returns True in order to keep the element and False in order to skip it. The default filter function removes falsey values.

Returns:
DataStream: A new data stream that produces the data items from the

current data stream only when func evaluates to true.

shuffle(buffer_size, seed=None) DataStream[T][source]

Randomly shuffles the elements of this dataset. If buffer_size is smaller than the full size of the full data stream, it is a partial random shuffle which is similar to Tensorflow’s dataset shuffle. For instance, if your dataset contains 10,000 elements but buffer_size is set to 1,000, then shuffle will initially select a random element from only the first 1,000 elements in the buffer. Once an element is selected, its space in the buffer is replaced by the next (i.e. 1,001-st) element, maintaining the 1,000 element buffer.

Args:
buffer_size (int): the size of the buffer space, should be greater

than 0

seed (int | None): The seed for initializing the random assignment.

If not provided, a randomly chosen seed will be used.

Returns:

DataStream: A new data stream after shuffled.

eager() DataStream[T][source]

Evaluate the data stream, place it into memory and return a new data stream over these static values. This is useful if your data stream can fit into memory, at least up to a certain point, and it will not be efficient to lazily and, potentially, re-evaluate the stream each time it is iterated over.

Returns:
DataStream: A new data stream that iterates over the evaluated, in-

memory data items in this stream.

map(func, *args, **kwargs) DataStream[source]

Apply a function to each element in the data stream.

Args:
func (callable(*args, **kwargs)): A function this is lazily applied

to each element in the data stream.

*args, **kwargs

Additional arguments to pass func.

Returns:

DataStream: A new data stream with func applied to each element.

flatten() DataStream[source]

Convert a 2-level nested stream into a flattened stream

Returns:

DataStream: A new data stream with inner stream items ‘flattened’

zip() DataStream[source]

Combine the data items of multiple data streams together in tuples.

Args:
args (tuple(DataStream)): A tuple containing the data streams to be

zip, passed as variadic arguments.

Returns:

DataStream: A data stream that produces the zipped data items.

Notes:

A ValueError is raised when the stream is iterated over if any of the zipped data streams do not have the same length. Since streams are evaluated lazily, however, this error condition will only be detected and raised when the stream is being iterated over.

peek() T[source]

Returns the first element of the stream, or raises IndexError if stream is empty

augment(augmentor, aug_cycles, *, post_augment_func=None, augment_index=None, enforce_determinism=True) DataStream[T][source]
__add__(other)[source]

The addition operator for data streams is equivalent to calling .chain and combines this data stream with another sequentially.

__getitem__(idx) T[source]

Index or slice each data item. This is valuable for creating new data streams over the elements of a stream that produces tuples, lists, arrays, et cetra.

Args:
idx (int or slice): The index or slice to be applied to each data

item.

Returns:
DataStream: A new data stream with data_item[idx] applied to each

data item.

Notes:

This operation may be somewhat counter intuitive since data_stream[0] does not return the first element of the data stream and, instead, returns a new data stream that produces data_item[0] for each data item.

This operation may fail with a TypeError if the data items in the stream are not subscriptable.

__iter__()[source]

Return an iterator or generator over all of the data items in this data stream. Data streams are reentrant in the sense that they can be iterated over multiple times.

__len__()[source]

See property method self._length

property _length

Return the number of data items contained in this data stream. This requires that the data stream be iterated over, which may be time-consuming. This value is then stored internally so that subsequent calls do not iterate over the data stream again.

This is implemented as a cached_property so that subclasses of DataStream which implement their own __getstate__ and __setstate__ do not have to account for the existence of self._length

__or__(module)[source]

Feed this data stream into the .stream method of a module. This is syntactic sugar that allows modules to be chained like data_stream | module1 | module2 rather than the equivalent module2.stream(module1.stream(data_stream)).

static _verify_dir(dirname)[source]
class caikit.core.data_model.streams.data_stream._UtfEncodeIOWrapper(bytes_stream: IO[bytes])[source]

Bases: io.IOBase

Lil’ wrapper class to convert a bytes buffer to a string buffer

bytes_stream
read(*args, **kwargs)[source]
readline(*args, **kwargs)[source]

Read and return a line from the stream.

If size is specified, at most size bytes will be read.

The line terminator is always b’n’ for binary files; for text files, the newlines argument to open can be used to select the line terminator(s) recognized.

seek(*args, **kwargs)[source]

Change the stream position to the given byte offset.

offset

The stream position, relative to ‘whence’.

whence

The relative position to seek from.

The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive

  • os.SEEK_CUR or 1 – current stream position; offset may be negative

  • os.SEEK_END or 2 – end of stream; offset is usually negative

Return the new absolute position.