caikit.core.data_model.streams.data_stream
Data streams for lazily loading, munging and passing data through multiple modules.
Attributes
Classes
A data stream is a iterable container class that is reentrant in the sense that it can be |
|
Lil' wrapper class to convert a bytes buffer to a string buffer |
Module Contents
- caikit.core.data_model.streams.data_stream.error
- caikit.core.data_model.streams.data_stream.T
- class caikit.core.data_model.streams.data_stream.DataStream(generator_func, *args, **kwargs)[source]
Bases:
Generic[T]A data stream is a iterable container class that is reentrant in the sense that it can be iterated over multiple times. The items produced by a data stream may be any python object and are called data items. The data items produced by an iterator over a data stream are generated lazily (unless the .eager method is called) so that each data item in a series of data streams is produced as it is accessed. This allows processing datasets that are too large to fit into memory. A number of functional style methods are provided for manipulating and munging data streams and the .stream method on modules can also be used to process data streams.
The DataStream class is really just a generic wrapper around functions that produce python iterators or generators.
- generator_func
- classmethod from_iterable(data: Iterable[T]) DataStream[T][source]
Create a new data stream from a python iterable, such as a list or tuple. This data stream produces a single data item for each element of the iterable..
- Args:
- data (iterable): A list or tuple or other python iterable used to
construct a new data stream where each data item contains a single data item.
- Returns:
- DataStream: A new data stream that produces data items from the
elements of data.
- Examples:
>>> list_stream = DataStream.from_iterable([1, 2, 3]) >>> for data_item in list_stream: >>> print(data_item) 1 2 3
- classmethod from_jsonl(filename: str) DataStream[Dict][source]
Creates a new data stream from a path to a file with JSON lines array, where each line is a valid JSON (python dict)
- Args:
- filename (str): A path to a utf8 encode text file with JSON lines
array, where each line is a valid JSON (python dict)
- Returns:
- DataStream: A new data stream that produces python dict items each
containing a single JSON object corresponding to each line
- Notes:
This class method returns a data stream over the valid JSON objects and each JSON object is on one line.
- Examples:
- For a JSON lines file that looks like:
{“name”: “Gilbert”, “wins”: [[“straight”, “7♣”], [“one pair”, “10♥”]]} {“name”: “Alexa”, “wins”: [[“two pair”, “4♠”], [“two pair”, “9♠”]]} {“name”: “May”, “wins”: []} {“name”: “Deloise”, “wins”: [[“three of a kind”, “5♣”]]}
>>> jsonl_data_stream = DataStream.from_jsonl('sample.jsonl') >>> for data_item in jsonl_data_stream: >>> print(data_item) {'name': 'Gilbert', 'wins': [['straight', '7♣'], ['one pair', '10♥']]} {'name': 'Alexa', 'wins': [['two pair', '4♠'], ['two pair', '9♠']]} {'name': 'May', 'wins': []} {'name': 'Deloise', 'wins': [['three of a kind', '5♣']]}
- classmethod from_json_array(filename: str) DataStream[Dict][source]
Creates a new data stream from a path to a file with JSON array, where each item is a valid JSON (python dict)
- Args:
- filename (str): A path to a utf8 encode text file with JSON array,
where each item is a valid JSON (python dict)
- Returns:
- DataStream: A new data stream that produces python dict items each
containing a single JSON object specified by ‘filename’
- Notes:
This class method returns a data stream over the valid JSON objects of a single JSON array text file.
- Examples:
- For a JSON file that looks like:
[ { a: 1, b: 2, c: False }, { a: 2, b: 3 }, { a: 3, c: True } ]
>>> json_data_stream = DataStream.from_json_array('sample.json') >>> for data_item in json_data_stream: >>> print(data_item) { a: 1, b: 2, c: False } { a: 2, b: 3 } { a: 3, c: True }
- classmethod from_csv(filename: str, *args, skip=0, **kwargs) DataStream[List][source]
Create a new data stream from a csv (comma separated value) file where each data item corresponds to a line of the csv file and consists of a list containing the comma separated values.
- Args:
- filename (str): A path to a csv file that has rows corresponding to
data items and columns corresponding to the elements of each data item.
- skip (int): Number of lines to skip at the beginning of the csv
file. This is often useful for skipping a header line.
- args, kwargs: Additional arguments passed to the csv.reader function.
These can be used to specify the delimiter or other csv settings.
- Returns:
- DataStream: A data stream that produces a data item for each line of
the csv file and where each element of the data item corresponds to a column in the csv file.Examples:
- For a sample.csv that looks like:
a, b, c d, e, f
>>> csv_stream = DataStream.from_csv('sample.csv') >>> for data_item in csv_stream: >>> print(data_item) ['a', 'b', 'c'] ['d', 'e', 'f']
- classmethod from_header_csv(filename: str, *args, **kwargs) DataStream[Dict][source]
Create a new data stream from a csv where the first row is a header and each subsequent row is an element. The yielded elements are tuples of dicts where each dict pairs the row values with the corresponding column headers.
- Args:
- filename (str): A path to a csv file that has rows corresponding to
data items and columns corresponding to the elements of each data item.
- args, kwargs: Additional arguments passed to the csv.reader function.
These can be used to specify the delimiter or other csv settings.
- Returns:
- DataStream: A data stream that produces a data item for each line of
the csv file and where each element of the stream is a dict representation of the fieldsExamples:
- For a sample.csv that looks like:
foo, bar, baz a, b, c d, e, f
>>> csv_stream = DataStream.from_csv('sample.csv') >>> for data_item in csv_stream: >>> print(data_item) {"foo": "a", "bar": "b", "baz": "c"} {"foo": "d", "bar": "e", "baz": "f"}
- classmethod from_txt(filename: str) DataStream[str][source]
Create a new data stream from a path to a utf8 encoded text file where each data item corresponds to a single line of the file.
- Args:
- filename (str): A path to a utf8 encode text file with each line
corresponding to a data item.
- Returns:
- DataStream: A new data stream that produces string data items each
containing a single line from the file specified by filename.
- Notes:
This class method returns a data stream over the lines of a single text file. In order to construct a datastream over separate files, rather than lines, consider using .from_txt_collection.
- Examples:
- For a text file that looks like:
first line second line third line
>>> txt_line_stream = DataStream.from_file('sample.txt') >>> for data_item in txt_line_stream: >>> print(data_item) first line second line third line
- classmethod from_file(filename: str) DataStream[Dict | Tuple | str][source]
Loads up a DataStream from a file. Will call the correct DataStream.from_* static constructor based on the file extension
The data items returned in the data stream are: For JSON:
dictionaries
- For all other files (besides CSV for now)
strings (1 per line)
- Args:
filename (str): Name of file
- Returns:
DataStream: Resulting datastream from file
- classmethod _from_collection(dirname: str, extension: str, file_opener) DataStream[Dict | Tuple | str][source]
Create a new data stream from a path containing multiple files where each data item corresponds to the entire serialized content in a single file. The file_handler function does the serialization of individual files
- Args:
- dirname (str): A directory path containing a number of utf8 encoded
text files with the .txt filename extension.
- extension (str): Extension of the file. Note that all files are read
in the same utf8 encoding.
- file_opener (function): Function to deserialize a file on disk to
memory
- Returns:
- DataStream: A new data stream that produces string data items each
containing the text contained in a single file found in dirname.
- Notes:
Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.
- classmethod from_txt_collection(dirname: str, extension='txt') DataStream[str][source]
Create a new data stream from a path containing multiple utf8 encoded text files where each data item corresponds to the entire text contained in a single file.
- Args:
- dirname (str): A directory path containing a number of utf8 encoded
text files with the .txt filename extension.
- extension: str (Optional)
Optional extension of the text file. Note that all files are read in the same utf8 encoding. Defaults to ‘txt’
- Returns:
- DataStream: A new data stream that produces string data items each
containing the text contained in a single .txt (or specified extension) file found in dirname.
- Notes:
Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.
- classmethod from_json_collection(dirname: str, extension='json') DataStream[Dict | Tuple | List][source]
Create a new data stream from a path containing multiple JSON files where each data item corresponds to the entire serialized JSON contained in a single file.
- Args:
- dirname (str): A directory path containing a number of utf8 encoded
text files with the .txt filename extension.
- extension: str (Optional)
Optional extension of the JSON file. Note that all files are read in the same utf8 encoding. Defaults to ‘json’
- Returns:
- DataStream: A new data stream that produces string data items each
containing the text contained in a single .json (or specified extension) file found in dirname.
- Notes:
Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.
- classmethod from_csv_collection(dirname: str) DataStream[Dict][source]
Create a new data stream by chaining data streams from each of the file from a path containing multiple csv files where each file can have 1 or more data item.
- Args:
dirname (str): A directory path containing a number of csv files
- Returns:
- DataStream: A new data stream that is chained from all data streams
by reading (from_header_csv) all files in all .csv files found in dirname. All data items are dicts.
- classmethod from_jsonl_collection(dirname: str) DataStream[Dict][source]
Create a new data stream by chaining data streams from each of the file from a path containing multiple jsonl files where each file can have 1 or more data item.
- Args:
dirname (str): A directory path containing a number of jsonl files
- Returns:
- DataStream: A new data stream that is chained from all data streams
by reading (from_jsonl) all files in all .jsonl files found in dirname.
- classmethod from_multipart_file(filename: str) DataStream[JsonDictValue][source]
Loads up a DataStream from a multipart file
The data items returned in the data stream are determined by the content type for each part in the multipart file by calling the correct DataStream.from_*
- Args:
filename (str): Name of file
- Returns:
DataStream: Resulting datastream from file
- train_test_split(test_split=0.25, seed=None) Tuple[DataStream[T], DataStream[T]][source]
Split the current datastream into train/test substreams.
- Args:
- test_split (float): The fraction of examples to assign to the test
substream, in [0, 1]
- seed (int | None): The seed for initializing the random assignment.
If not provided, a randomly chosen seed will be used.
- Returns:
- tuple(DataStream, DataStream): Two substreams: a train set
substream, and a test set substream
- chain() DataStream[source]
Chain multiple data streams together sequentially. The returned data stream produces the data items from each passed data stream in turn.
- Args:
- args (tuple(DataStream)): A tuple containing the data streams to
chain, passed as variadic arguments.
- Returns:
- DataStream: A new data stream that produces the data items from the
provided data streams sequentially.
- filter(func=lambda data_item: ..., *args, **kwargs) DataStream[T][source]
Skip elements in the data stream as identified by a passed function.
- Args:
- func (callable(data_item)): The function used to identify data items
that will be filtered. The function takes a single data item as an argument and returns True in order to keep the element and False in order to skip it. The default filter function removes falsey values.
- Returns:
- DataStream: A new data stream that produces the data items from the
current data stream only when func evaluates to true.
- shuffle(buffer_size, seed=None) DataStream[T][source]
Randomly shuffles the elements of this dataset. If buffer_size is smaller than the full size of the full data stream, it is a partial random shuffle which is similar to Tensorflow’s dataset shuffle. For instance, if your dataset contains 10,000 elements but buffer_size is set to 1,000, then shuffle will initially select a random element from only the first 1,000 elements in the buffer. Once an element is selected, its space in the buffer is replaced by the next (i.e. 1,001-st) element, maintaining the 1,000 element buffer.
- Args:
- buffer_size (int): the size of the buffer space, should be greater
than 0
- seed (int | None): The seed for initializing the random assignment.
If not provided, a randomly chosen seed will be used.
- Returns:
DataStream: A new data stream after shuffled.
- eager() DataStream[T][source]
Evaluate the data stream, place it into memory and return a new data stream over these static values. This is useful if your data stream can fit into memory, at least up to a certain point, and it will not be efficient to lazily and, potentially, re-evaluate the stream each time it is iterated over.
- Returns:
- DataStream: A new data stream that iterates over the evaluated, in-
memory data items in this stream.
- map(func, *args, **kwargs) DataStream[source]
Apply a function to each element in the data stream.
- flatten() DataStream[source]
Convert a 2-level nested stream into a flattened stream
- Returns:
DataStream: A new data stream with inner stream items ‘flattened’
- zip() DataStream[source]
Combine the data items of multiple data streams together in tuples.
- Args:
- args (tuple(DataStream)): A tuple containing the data streams to be
zip, passed as variadic arguments.
- Returns:
DataStream: A data stream that produces the zipped data items.
- Notes:
A ValueError is raised when the stream is iterated over if any of the zipped data streams do not have the same length. Since streams are evaluated lazily, however, this error condition will only be detected and raised when the stream is being iterated over.
- augment(augmentor, aug_cycles, *, post_augment_func=None, augment_index=None, enforce_determinism=True) DataStream[T][source]
- __add__(other)[source]
The addition operator for data streams is equivalent to calling .chain and combines this data stream with another sequentially.
- __getitem__(idx) T[source]
Index or slice each data item. This is valuable for creating new data streams over the elements of a stream that produces tuples, lists, arrays, et cetra.
- Args:
- idx (int or slice): The index or slice to be applied to each data
item.
- Returns:
- DataStream: A new data stream with data_item[idx] applied to each
data item.
- Notes:
This operation may be somewhat counter intuitive since data_stream[0] does not return the first element of the data stream and, instead, returns a new data stream that produces data_item[0] for each data item.
This operation may fail with a TypeError if the data items in the stream are not subscriptable.
- __iter__()[source]
Return an iterator or generator over all of the data items in this data stream. Data streams are reentrant in the sense that they can be iterated over multiple times.
- property _length
Return the number of data items contained in this data stream. This requires that the data stream be iterated over, which may be time-consuming. This value is then stored internally so that subsequent calls do not iterate over the data stream again.
This is implemented as a cached_property so that subclasses of DataStream which implement their own __getstate__ and __setstate__ do not have to account for the existence of self._length
- class caikit.core.data_model.streams.data_stream._UtfEncodeIOWrapper(bytes_stream: IO[bytes])[source]
Bases:
io.IOBaseLil’ wrapper class to convert a bytes buffer to a string buffer
- bytes_stream
- readline(*args, **kwargs)[source]
Read and return a line from the stream.
If size is specified, at most size bytes will be read.
The line terminator is always b’n’ for binary files; for text files, the newlines argument to open can be used to select the line terminator(s) recognized.
- seek(*args, **kwargs)[source]
Change the stream position to the given byte offset.
- offset
The stream position, relative to ‘whence’.
- whence
The relative position to seek from.
The offset is interpreted relative to the position indicated by whence. Values for whence are:
os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive
os.SEEK_CUR or 1 – current stream position; offset may be negative
os.SEEK_END or 2 – end of stream; offset is usually negative
Return the new absolute position.