caikit.core.data_model

Common data model containing all data structures that are passed in and out of modules.

Submodules

Attributes

CAIKIT_DATA_MODEL

PredictionJobStatus

PACKAGE_COMMON

JsonDictValue

log

error

T

TrainingStatus

Classes

DataBase

Base class for all structures in the data model.

DataObjectBase

A DataObject is a data model class that is backed by a @dataclass.

JobStatus

Enum to track current status of a job

ProducerId

Information about a data structure and the module that produced it.

AugmentorBase

DataStream

A data stream is a iterable container class that is reentrant in the sense that it can be

_UtfEncodeIOWrapper

Lil' wrapper class to convert a bytes buffer to a string buffer

Functions

dataobject(→ Callable[[_DataObjectBaseT], ...)

The @dataobject decorator can be used to define a Data Model object's

render_dataobject_protos(interfaces_dir)

Write out protobufs files for all proto classes generated from dataobjects

import_enums(current_globals)

Add all enums and their reverse enum mappings a module's global symbol table. Note that

import_enum(→ Tuple[str, str])

Import a single enum into the global enum module by name

is_multipart_file(→ bool)

Returns true if the file appears to contain a multi-part form data request

stream_multipart_file(→ Iterator[Part])

Returns an iterator of Parts, where each Part comes with a content type and an io reader to

Package Contents

class caikit.core.data_model.DataBase[source]

Base class for all structures in the data model.

Notes:

All leaves in the hierarchy of derived classes should have a corresponding protobufs class defined in the interface definitions. If not, an exception will be thrown at runtime.

PROTO_CONVERSION_SPECIAL_TYPES
class OneofFieldVal[source]

Helper struct that backends can use to return information about values in oneofs along with which of the oneofs is currently valid

val: Any
which_oneof: str
__setattr__(name, val)[source]

Handle attribute setting for oneofs and named fields with delegation to backends as needed

classmethod get_proto_class() Type[google.protobuf.message.Message][source]
classmethod get_field_defaults() Type[google.protobuf.message.Message][source]

Get mapping of fields to default values. Mapping will not include fields without defaults

classmethod get_field_message_type(field_name: str) type | None[source]

Get the python type for the given field. This function relies on the metaclass to fill cls._fields_to_type. This is to avoid costly computation during runtime

Args:
field_name (str): Field name to check (AttributeError raised if name

is invalid)

Returns:
field_type: type

The data model class type for the given field

classmethod from_backend(backend)[source]
property backend: DataModelBackendBase | None
which_oneof(oneof_name: str) str | None[source]

Get the name of the oneof field set for the given oneof or None if no field is set

classmethod _infer_which_oneof(oneof_name: str, oneof_val: Any) str | None[source]

Check each candidate field within the oneof to see if it’s a type match

NOTE: In the case where fields within a oneof have the same type, the

first field whose type matches will be used!

_get_which_oneof_dict() Dict[str, str][source]
classmethod _get_type_for_field(field_name: str) type[source]

Helper class method to return the type hint for a particular field

classmethod _is_valid_type_for_field(field_name: str, val: Any) bool[source]

Check whether the given value is valid for the given field

classmethod from_binary_buffer(buf)[source]

Builds the data model object out of the binary string

Args:

buf: The binary buffer containing a serialized protobufs message

Returns:

A data model object instantiated from the protobufs message deserialized out of buf

classmethod from_proto(proto)[source]

Build a DataBase from protobufs.

Args:

proto: A protocol buffer to serialize from.

Returns:

protobufs: A DataBase object.

classmethod from_json(json_str, ignore_unknown_fields=False)[source]

Build a DataBase from a given JSON string. Use google’s protobufs.json_format for deserialization

Args:
json_str (str or dict): A stringified JSON specification/dict of the

data_model

ignore_unknown_fields (bool): If True, ignores unknown JSON fields

Returns:

caikit.core.data_model.DataBase: A DataBase object.

classmethod from_file(file_obj: io.IOBase)[source]
Abstractmethod:

Build a DataBase from a given file-like object.

Args:

file_obj IOBase: A file object that contains some representation of the dataobject

Returns:

caikit.core.data_model.DataBase: A DataBase object.

to_proto()[source]

Return a new protobufs populated with the information in this data structure.

to_binary_buffer()[source]

Returns a binary buffer with a serialized protobufs message of this data model

fill_proto(proto)[source]

Populate a protobufs with the values from this data model object.

Args:

proto: A protocol buffer to be populated.

Returns:

protobufs: The filled protobufs.

Notes:

The protobufs is filled in place, so the argument and the return value are the same at the end of this call.

to_dict() dict[source]

Convert to a dictionary representation.

to_kwargs() dict[source]

Convert to flat dictionary representation. (Like .to_dict, but not recursive) This keeps the attribute names of any fields backed by oneofs, instead of using the internal oneof field name

to_json(**kwargs) str[source]

Convert to a json representation.

abstract to_file(file_obj: io.IOBase) File | None[source]

Export a DataBaseObject into a file-like object file_obj. If the DataBase object has requirements around file name or file type it can return them via the optional “File” return object

Args:

file_obj IOBase: a file object to be filled

Returns:

file_descriptor: Optional[caikit.interfaces.common.data_mode.File]

__repr__()[source]

Human-friendly representation.

_field_to_dict_element(field)[source]

Convert field into a representation that can be placed into a dictionary. Recursively calls to_dict on other data model objects.

static get_class_for_proto(proto: google.protobuf.descriptor.Descriptor | google.protobuf.descriptor.FieldDescriptor | google.protobuf.descriptor.EnumDescriptor | google.protobuf.message.Message) Type[DataBase][source]

Look up the data model class corresponding to the given protobuf

If no data model is found, this raises an AttributeError

Args:
proto (Union[Descriptor, ProtoMessageType])

The proto name or descriptor to look up against

Returns:
dm_class (Type[DataBase]): The data model class corresponding to the

given protobuf

static get_class_for_name(class_name: str) Type[DataBase][source]

Look up the data model class corresponding to the given name

This lookup attempts to encode various naming conventions that might be used, but it can fail in multiple ways:

  1. No class with the given name is known

  2. Multiple classes with the same name, but different qualified parents are found

A ValueError will be raised if either of the above happens

Args:
class_name (str)

The name of the class either as a fully-qualified protobuf name or as the unqualified class name

Returns:
dm_class (Type[DataBase]): The data model class corresponding to the

given protobuf

caikit.core.data_model.CAIKIT_DATA_MODEL = 'caikit_data_model'
class caikit.core.data_model.DataObjectBase[source]

Bases: caikit.core.data_model.base.DataBase

A DataObject is a data model class that is backed by a @dataclass.

Data model classes that use the @dataobject decorator must derive from this base class.

caikit.core.data_model.dataobject(*args, **kwargs) Callable[[_DataObjectBaseT], _DataObjectBaseT][source]

The @dataobject decorator can be used to define a Data Model object’s schema inline with the definition of the python class rather than needing to bind to a pre-compiled protobufs class. For example:

@dataobject(“foo.bar”) class MyDataObject(DataObjectBase):

‘’’My Custom Data Object’’’ foo: str bar: int

NOTE: The wrapped class must NOT inherit directly from DataBase. That

inheritance will be added by this decorator, but if it is written directly, the metaclass that links protobufs to the class will be called before this decorator can auto-gen the protobufs class.

The dataobject decorator will not provide tools with enough information to perform type completion for constructions in an IDE, or static typechecking. In order to have that, the dataclass decorator may optionally be added, with the slight overhead of wasted effort in creating the “standard” __init__ function which then gets re-done by @dataobject. The dataclass must follow the dataobject decorator. For example:

@dataobject(“foo.bar”) @dataclass class MyDataObject(DataObjectBase):

‘’’My Custom Data Object’’’ foo: str bar: int

Kwargs:
package: str

The package name to use for the generated protobufs class

Returns:
decorator: Callable[[Type], Type[DataBase]]

The decorator function that will wrap the given class

caikit.core.data_model.render_dataobject_protos(interfaces_dir: str)[source]

Write out protobufs files for all proto classes generated from dataobjects to the target interfaces directory

Args:

interfaces_dir (str): The target directory (must already exist)

caikit.core.data_model.import_enums(current_globals)[source]

Add all enums and their reverse enum mappings a module’s global symbol table. Note that we also update __all__. In general, __all__ controls the stuff that comes with a wild (*) import.

Examples tend to make stuff like this easier to understand. Let’s say the first name we hit is the Entity Mention Type. Then, after the first cycle through the loop below, you’ll see something like:

‘__all__’: [‘import_enums’, ‘EntityMentionType’, ‘EntityMentionTypeRev’] ‘EntityMentionType’: { “MENTT_UNSET”: 0, “MENTT_NAM”: 1, … , “MENTT_NONE”: 4} ‘EntityMentionTypeRev’: { “0”: “MENTT_UNSET”, “1”: “MENTT_NAM”, … , “4”: “MENTT_NONE”}

since this is called explicitly below, you can thank this function for automagically syncing your enums (as importable from this file) with the data model.

Args:
current_globals (dict): global dictionary from your data model package

__init__ file.

caikit.core.data_model.import_enum(proto_enum: google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper, enum_class: Type[enum.Enum] | None = None) Tuple[str, str][source]

Import a single enum into the global enum module by name

Args:

proto_enum (EnumTypeWrapper): The enum to import enum_class (Optional[Type[Enum]]): A pre-existing enum class that this

proto enum binds to

Returns:
name: str

The name of the enum global

rev_name: str

The name of the reversed enum global

class caikit.core.data_model.JobStatus(*args, **kwds)[source]

Bases: enum.Enum

Enum to track current status of a job

QUEUED = 1
RUNNING = 2
COMPLETED = 3
CANCELED = 4
ERRORED = 5
property is_terminal
caikit.core.data_model.PredictionJobStatus
caikit.core.data_model.PACKAGE_COMMON = 'caikit_data_model.common'
class caikit.core.data_model.ProducerId[source]

Bases: caikit.core.data_model.dataobject.DataObjectBase

Information about a data structure and the module that produced it.

name: str
version: str
__add__(other)[source]

Add two producer ids.

classmethod from_proto(proto)[source]

Overloaded implementation for efficiency vs base introspection

fill_proto(proto)[source]

Overloaded implementation for efficiency vs base introspection

class caikit.core.data_model.AugmentorBase(random_seed, produces_none=False)[source]
produces_none = False
augment(inp_obj)[source]

Take an object in, give an object back. Calls ._augment in the subclass.

Args:
inp_obj (str | caikit.core.data_model.DataBase): Object to be

augmented.

Returns:
str | caikit.core.data_model.DataBase: Augmented object of same type

as input inp_obj.

reset()[source]

Reset random number generation for the current augmentor. Note that this currently assumes the augmentor is using the builtin random generator leveraged by Python; if you end up using something else, you may want to override this or restructure this base class to allow resetting of random states based on seed type.

caikit.core.data_model.JsonDictValue
caikit.core.data_model.is_multipart_file(file) bool[source]

Returns true if the file appears to contain a multi-part form data request

caikit.core.data_model.stream_multipart_file(file) Iterator[Part][source]

Returns an iterator of Parts, where each Part comes with a content type and an io reader to stream the data from.

NB: This only yields parts which are files, not other form fields.

caikit.core.data_model.log[source]
caikit.core.data_model.error
caikit.core.data_model.T
class caikit.core.data_model.DataStream(generator_func, *args, **kwargs)[source]

Bases: Generic[T]

A data stream is a iterable container class that is reentrant in the sense that it can be iterated over multiple times. The items produced by a data stream may be any python object and are called data items. The data items produced by an iterator over a data stream are generated lazily (unless the .eager method is called) so that each data item in a series of data streams is produced as it is accessed. This allows processing datasets that are too large to fit into memory. A number of functional style methods are provided for manipulating and munging data streams and the .stream method on modules can also be used to process data streams.

The DataStream class is really just a generic wrapper around functions that produce python iterators or generators.

generator_func
classmethod from_iterable(data: Iterable[T]) DataStream[T][source]

Create a new data stream from a python iterable, such as a list or tuple. This data stream produces a single data item for each element of the iterable..

Args:
data (iterable): A list or tuple or other python iterable used to

construct a new data stream where each data item contains a single data item.

Returns:
DataStream: A new data stream that produces data items from the

elements of data.

Examples:
>>> list_stream = DataStream.from_iterable([1, 2, 3])
>>> for data_item in list_stream:
>>>     print(data_item)
1
2
3
classmethod _from_iterable_generator(data: Iterable[T]) Iterator[T][source]
classmethod from_jsonl(filename: str) DataStream[Dict][source]

Creates a new data stream from a path to a file with JSON lines array, where each line is a valid JSON (python dict)

Args:
filename (str): A path to a utf8 encode text file with JSON lines

array, where each line is a valid JSON (python dict)

Returns:
DataStream: A new data stream that produces python dict items each

containing a single JSON object corresponding to each line

Notes:

This class method returns a data stream over the valid JSON objects and each JSON object is on one line.

https://jsonlines.org/

Examples:
For a JSON lines file that looks like:

{“name”: “Gilbert”, “wins”: [[“straight”, “7♣”], [“one pair”, “10♥”]]} {“name”: “Alexa”, “wins”: [[“two pair”, “4♠”], [“two pair”, “9♠”]]} {“name”: “May”, “wins”: []} {“name”: “Deloise”, “wins”: [[“three of a kind”, “5♣”]]}

>>> jsonl_data_stream = DataStream.from_jsonl('sample.jsonl')
>>> for data_item in jsonl_data_stream:
>>>     print(data_item)
{'name': 'Gilbert', 'wins': [['straight', '7♣'], ['one pair', '10♥']]}
{'name': 'Alexa', 'wins': [['two pair', '4♠'], ['two pair', '9♠']]}
{'name': 'May', 'wins': []}
{'name': 'Deloise', 'wins': [['three of a kind', '5♣']]}
classmethod _from_jsonl_generator(filename)[source]
classmethod from_json_array(filename: str) DataStream[Dict][source]

Creates a new data stream from a path to a file with JSON array, where each item is a valid JSON (python dict)

Args:
filename (str): A path to a utf8 encode text file with JSON array,

where each item is a valid JSON (python dict)

Returns:
DataStream: A new data stream that produces python dict items each

containing a single JSON object specified by ‘filename’

Notes:

This class method returns a data stream over the valid JSON objects of a single JSON array text file.

Examples:
For a JSON file that looks like:

[ { a: 1, b: 2, c: False }, { a: 2, b: 3 }, { a: 3, c: True } ]

>>> json_data_stream = DataStream.from_json_array('sample.json')
>>> for data_item in json_data_stream:
>>>     print(data_item)
{ a: 1, b: 2, c: False }
{ a: 2, b: 3 }
{ a: 3, c: True }
classmethod _from_json_array_file_generator(filename)[source]
classmethod _from_json_array_buffer_generator(json_fh: IO, filename: str = '')[source]
classmethod from_csv(filename: str, *args, skip=0, **kwargs) DataStream[List][source]

Create a new data stream from a csv (comma separated value) file where each data item corresponds to a line of the csv file and consists of a list containing the comma separated values.

Args:
filename (str): A path to a csv file that has rows corresponding to

data items and columns corresponding to the elements of each data item.

skip (int): Number of lines to skip at the beginning of the csv

file. This is often useful for skipping a header line.

args, kwargs: Additional arguments passed to the csv.reader function.

These can be used to specify the delimiter or other csv settings.

Returns:
DataStream: A data stream that produces a data item for each line of

the csv file and where each element of the data item corresponds to a column in the csv file.Examples:

For a sample.csv that looks like:

a, b, c d, e, f

>>> csv_stream = DataStream.from_csv('sample.csv')
>>> for data_item in csv_stream:
>>>     print(data_item)
['a', 'b', 'c']
['d', 'e', 'f']
classmethod _from_csv_generator(filename, skip, *csv_args, **csv_kwargs)[source]
classmethod from_header_csv(filename: str, *args, **kwargs) DataStream[Dict][source]

Create a new data stream from a csv where the first row is a header and each subsequent row is an element. The yielded elements are tuples of dicts where each dict pairs the row values with the corresponding column headers.

Args:
filename (str): A path to a csv file that has rows corresponding to

data items and columns corresponding to the elements of each data item.

args, kwargs: Additional arguments passed to the csv.reader function.

These can be used to specify the delimiter or other csv settings.

Returns:
DataStream: A data stream that produces a data item for each line of

the csv file and where each element of the stream is a dict representation of the fieldsExamples:

For a sample.csv that looks like:

foo, bar, baz a, b, c d, e, f

>>> csv_stream = DataStream.from_csv('sample.csv')
>>> for data_item in csv_stream:
>>>     print(data_item)
{"foo": "a", "bar": "b", "baz": "c"}
{"foo": "d", "bar": "e", "baz": "f"}
classmethod _from_header_csv_generator(filename, *csv_args, **csv_kwargs)[source]
classmethod _from_header_csv_buffer_generator(fh: IO, *csv_args, **csv_kwargs)[source]
classmethod from_txt(filename: str) DataStream[str][source]

Create a new data stream from a path to a utf8 encoded text file where each data item corresponds to a single line of the file.

Args:
filename (str): A path to a utf8 encode text file with each line

corresponding to a data item.

Returns:
DataStream: A new data stream that produces string data items each

containing a single line from the file specified by filename.

Notes:

This class method returns a data stream over the lines of a single text file. In order to construct a datastream over separate files, rather than lines, consider using .from_txt_collection.

Examples:
For a text file that looks like:

first line second line third line

>>> txt_line_stream = DataStream.from_file('sample.txt')
>>> for data_item in txt_line_stream:
>>>     print(data_item)
first line
second line
third line
classmethod _from_txt_generator(filename)[source]
classmethod from_file(filename: str) DataStream[Dict | Tuple | str][source]

Loads up a DataStream from a file. Will call the correct DataStream.from_* static constructor based on the file extension

The data items returned in the data stream are: For JSON:

dictionaries

For all other files (besides CSV for now)

strings (1 per line)

Args:

filename (str): Name of file

Returns:

DataStream: Resulting datastream from file

classmethod _from_collection(dirname: str, extension: str, file_opener) DataStream[Dict | Tuple | str][source]

Create a new data stream from a path containing multiple files where each data item corresponds to the entire serialized content in a single file. The file_handler function does the serialization of individual files

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension (str): Extension of the file. Note that all files are read

in the same utf8 encoding.

file_opener (function): Function to deserialize a file on disk to

memory

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod _from_collection_generator(dirname, extension, file_opener)[source]
classmethod from_txt_collection(dirname: str, extension='txt') DataStream[str][source]

Create a new data stream from a path containing multiple utf8 encoded text files where each data item corresponds to the entire text contained in a single file.

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension: str (Optional)

Optional extension of the text file. Note that all files are read in the same utf8 encoding. Defaults to ‘txt’

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single .txt (or specified extension) file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod from_json_collection(dirname: str, extension='json') DataStream[Dict | Tuple | List][source]

Create a new data stream from a path containing multiple JSON files where each data item corresponds to the entire serialized JSON contained in a single file.

Args:
dirname (str): A directory path containing a number of utf8 encoded

text files with the .txt filename extension.

extension: str (Optional)

Optional extension of the JSON file. Note that all files are read in the same utf8 encoding. Defaults to ‘json’

Returns:
DataStream: A new data stream that produces string data items each

containing the text contained in a single .json (or specified extension) file found in dirname.

Notes:

Each data item in this data stream represents the entire text contained in a single file and are not split by line or otherwise.

classmethod from_csv_collection(dirname: str) DataStream[Dict][source]

Create a new data stream by chaining data streams from each of the file from a path containing multiple csv files where each file can have 1 or more data item.

Args:

dirname (str): A directory path containing a number of csv files

Returns:
DataStream: A new data stream that is chained from all data streams

by reading (from_header_csv) all files in all .csv files found in dirname. All data items are dicts.

classmethod _from_csv_collection_generator(dirname)[source]
classmethod from_jsonl_collection(dirname: str) DataStream[Dict][source]

Create a new data stream by chaining data streams from each of the file from a path containing multiple jsonl files where each file can have 1 or more data item.

Args:

dirname (str): A directory path containing a number of jsonl files

Returns:
DataStream: A new data stream that is chained from all data streams

by reading (from_jsonl) all files in all .jsonl files found in dirname.

classmethod _from_jsonl_collection_generator(dirname)[source]
classmethod from_multipart_file(filename: str) DataStream[JsonDictValue][source]

Loads up a DataStream from a multipart file

The data items returned in the data stream are determined by the content type for each part in the multipart file by calling the correct DataStream.from_*

Args:

filename (str): Name of file

Returns:

DataStream: Resulting datastream from file

train_test_split(test_split=0.25, seed=None) Tuple[DataStream[T], DataStream[T]][source]

Split the current datastream into train/test substreams.

Args:
test_split (float): The fraction of examples to assign to the test

substream, in [0, 1]

seed (int | None): The seed for initializing the random assignment.

If not provided, a randomly chosen seed will be used.

Returns:
tuple(DataStream, DataStream): Two substreams: a train set

substream, and a test set substream

chain() DataStream[source]

Chain multiple data streams together sequentially. The returned data stream produces the data items from each passed data stream in turn.

Args:
args (tuple(DataStream)): A tuple containing the data streams to

chain, passed as variadic arguments.

Returns:
DataStream: A new data stream that produces the data items from the

provided data streams sequentially.

filter(func=lambda data_item: ..., *args, **kwargs) DataStream[T][source]

Skip elements in the data stream as identified by a passed function.

Args:
func (callable(data_item)): The function used to identify data items

that will be filtered. The function takes a single data item as an argument and returns True in order to keep the element and False in order to skip it. The default filter function removes falsey values.

Returns:
DataStream: A new data stream that produces the data items from the

current data stream only when func evaluates to true.

shuffle(buffer_size, seed=None) DataStream[T][source]

Randomly shuffles the elements of this dataset. If buffer_size is smaller than the full size of the full data stream, it is a partial random shuffle which is similar to Tensorflow’s dataset shuffle. For instance, if your dataset contains 10,000 elements but buffer_size is set to 1,000, then shuffle will initially select a random element from only the first 1,000 elements in the buffer. Once an element is selected, its space in the buffer is replaced by the next (i.e. 1,001-st) element, maintaining the 1,000 element buffer.

Args:
buffer_size (int): the size of the buffer space, should be greater

than 0

seed (int | None): The seed for initializing the random assignment.

If not provided, a randomly chosen seed will be used.

Returns:

DataStream: A new data stream after shuffled.

eager() DataStream[T][source]

Evaluate the data stream, place it into memory and return a new data stream over these static values. This is useful if your data stream can fit into memory, at least up to a certain point, and it will not be efficient to lazily and, potentially, re-evaluate the stream each time it is iterated over.

Returns:
DataStream: A new data stream that iterates over the evaluated, in-

memory data items in this stream.

map(func, *args, **kwargs) DataStream[source]

Apply a function to each element in the data stream.

Args:
func (callable(*args, **kwargs)): A function this is lazily applied

to each element in the data stream.

*args, **kwargs

Additional arguments to pass func.

Returns:

DataStream: A new data stream with func applied to each element.

flatten() DataStream[source]

Convert a 2-level nested stream into a flattened stream

Returns:

DataStream: A new data stream with inner stream items ‘flattened’

zip() DataStream[source]

Combine the data items of multiple data streams together in tuples.

Args:
args (tuple(DataStream)): A tuple containing the data streams to be

zip, passed as variadic arguments.

Returns:

DataStream: A data stream that produces the zipped data items.

Notes:

A ValueError is raised when the stream is iterated over if any of the zipped data streams do not have the same length. Since streams are evaluated lazily, however, this error condition will only be detected and raised when the stream is being iterated over.

peek() T[source]

Returns the first element of the stream, or raises IndexError if stream is empty

augment(augmentor, aug_cycles, *, post_augment_func=None, augment_index=None, enforce_determinism=True) DataStream[T][source]
__add__(other)[source]

The addition operator for data streams is equivalent to calling .chain and combines this data stream with another sequentially.

__getitem__(idx) T[source]

Index or slice each data item. This is valuable for creating new data streams over the elements of a stream that produces tuples, lists, arrays, et cetra.

Args:
idx (int or slice): The index or slice to be applied to each data

item.

Returns:
DataStream: A new data stream with data_item[idx] applied to each

data item.

Notes:

This operation may be somewhat counter intuitive since data_stream[0] does not return the first element of the data stream and, instead, returns a new data stream that produces data_item[0] for each data item.

This operation may fail with a TypeError if the data items in the stream are not subscriptable.

__iter__()[source]

Return an iterator or generator over all of the data items in this data stream. Data streams are reentrant in the sense that they can be iterated over multiple times.

__len__()[source]

See property method self._length

property _length

Return the number of data items contained in this data stream. This requires that the data stream be iterated over, which may be time-consuming. This value is then stored internally so that subsequent calls do not iterate over the data stream again.

This is implemented as a cached_property so that subclasses of DataStream which implement their own __getstate__ and __setstate__ do not have to account for the existence of self._length

__or__(module)[source]

Feed this data stream into the .stream method of a module. This is syntactic sugar that allows modules to be chained like data_stream | module1 | module2 rather than the equivalent module2.stream(module1.stream(data_stream)).

static _verify_dir(dirname)[source]
class caikit.core.data_model._UtfEncodeIOWrapper(bytes_stream: IO[bytes])

Bases: io.IOBase

Lil’ wrapper class to convert a bytes buffer to a string buffer

bytes_stream
read(*args, **kwargs)
readline(*args, **kwargs)

Read and return a line from the stream.

If size is specified, at most size bytes will be read.

The line terminator is always b’n’ for binary files; for text files, the newlines argument to open can be used to select the line terminator(s) recognized.

seek(*args, **kwargs)

Change the stream position to the given byte offset.

offset

The stream position, relative to ‘whence’.

whence

The relative position to seek from.

The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • os.SEEK_SET or 0 – start of stream (the default); offset should be zero or positive

  • os.SEEK_CUR or 1 – current stream position; offset may be negative

  • os.SEEK_END or 2 – end of stream; offset is usually negative

Return the new absolute position.

caikit.core.data_model.TrainingStatus