caikit.runtime.service_generation.data_stream_source

Attributes

_DATA_STREAM_SOURCE_TYPES

log

error

PluginFactory

Classes

DataStreamSourcePlugin

A DataStreamSourcePlugin is a pluggable source that defines the shape of

FilePluginBase

Intermediate base class for file-based plugins with helper utilities

FileDataStreamSourcePlugin

Plugin for a single file

ListOfFilesDataStreamSourcePlugin

Plugin for a list of files

DirectoryDataStreamSourcePlugin

Plugin for a directory holding files

JsonDataStreamSourcePlugin

This plugin is for inline data, elements are provided in a list.

S3FilesDataStreamSourcePlugin

Unimplemented!

DataStreamPluginFactory

The DataStreamPluginFactory is responsible for holding a registry of

DataStreamSourceBase

This base class acts as a sentinel so that dynamically generated data

Functions

make_data_stream_source(...)

Dynamically create a data stream source message type that supports

_make_data_stream_source_type_name(→ str)

Make the name for data stream source class that wraps the given type

Module Contents

caikit.runtime.service_generation.data_stream_source._DATA_STREAM_SOURCE_TYPES
caikit.runtime.service_generation.data_stream_source.log[source]
caikit.runtime.service_generation.data_stream_source.error
class caikit.runtime.service_generation.data_stream_source.DataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: caikit.core.toolkit.factory.FactoryConstructible

A DataStreamSourcePlugin is a pluggable source that defines the shape of the data object needed as well as the code for accessing the data from some source type.

_config
_instance_name
abstract get_stream_message_type(element_type: type) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

abstract to_data_stream(source_message: Type[caikit.core.data_model.base.DataBase], element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Convert an instance of the source message type into a DataStream

abstract get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

get_field_name(element_type: type) str[source]

The name of the field that this plugin will use in the source oneof

static _to_element_type(element_type: type, raw_element: Any) Any[source]
static _to_element_partial(element_type: type) Callable[source]
class caikit.runtime.service_generation.data_stream_source.FilePluginBase(config: aconfig.Config, instance_name: str)[source]

Bases: DataStreamSourcePlugin

Intermediate base class for file-based plugins with helper utilities

classmethod _create_data_stream_from_file(fname: str, element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Create a data stream object by deducing file extension and reading the file accordingly

classmethod _load_from_file_without_extension(fname, element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Similar to _create_data_stream_from_file, but we don’t have a file extension to work with. Attempt to create a data stream using one of a few well-known formats. 🌶🌶🌶️ on ordering here: File formats are loosely arranged in order of least-to-most-sketchy format validation. 1. .json/.jsonl are pretty straightforward 2. multipart files are a little iffy- the content-type header line can be omitted, in

which case we check for a string and roll our own boundary parser. This could cause problems in the future for multi-yaml files that begin with

  1. CSV support simply assumes the first line of the file has the column headers, and may

    confidently return a stream even if that’s not the case.

static _get_resolved_source_path(input_path: str) str[source]

Get a fully resolved path, including any shared prefix

class caikit.runtime.service_generation.data_stream_source.FileDataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: FilePluginBase

Plugin for a single file

name = 'FileData'

This is the name of this constructible type that will be used by the factory to identify this class

get_field_name(element_type: type) str[source]

Half-Backwards compatibility and half keep FileReference consistent with ListofFiles/Directory

get_stream_message_type(*_, **__) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.FileReference, element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Convert an instance of the source message type into a DataStream

get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

class caikit.runtime.service_generation.data_stream_source.ListOfFilesDataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: FilePluginBase

Plugin for a list of files

name = 'ListOfFiles'

This is the name of this constructible type that will be used by the factory to identify this class

get_field_name(element_type: type) str[source]

Half-Backwards compatibility and half keep ListOfFile consistent with File/Directory

get_stream_message_type(*_, **__) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.ListOfFileReferences, element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Convert an instance of the source message type into a DataStream

get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

class caikit.runtime.service_generation.data_stream_source.DirectoryDataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: FilePluginBase

Plugin for a directory holding files

name = 'Directory'

This is the name of this constructible type that will be used by the factory to identify this class

get_stream_message_type(*_, **__) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.Directory, element_type: type) caikit.core.data_model.streams.data_stream.DataStream[source]

Convert an instance of the source message type into a DataStream

get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

class caikit.runtime.service_generation.data_stream_source.JsonDataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: DataStreamSourcePlugin

This plugin is for inline data, elements are provided in a list.

This plugin has instantiation logic: it needs the stream’s element type so that it can generate a data model for List[element_type]

name = 'JsonData'

This is the name of this constructible type that will be used by the factory to identify this class

stream_source_type_cache: Dict[Type[caikit.core.data_model.base.DataBase], Type[caikit.core.data_model.base.DataBase]]
get_stream_message_type(element_type: type) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

to_data_stream(source_message: Type[caikit.core.data_model.base.DataBase], *_, **__) caikit.core.data_model.streams.data_stream.DataStream[source]

source_message should be of type self.get_stream_message_type So it _should_ contain an attribute named data, which is a list

get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

class caikit.runtime.service_generation.data_stream_source.S3FilesDataStreamSourcePlugin(config: aconfig.Config, instance_name: str)[source]

Bases: DataStreamSourcePlugin

Unimplemented!

name = 'S3Files'

This is the name of this constructible type that will be used by the factory to identify this class

get_stream_message_type(*_, **__) Type[caikit.core.data_model.base.DataBase][source]

Get the type of the dataobject class that will be used as the source information

to_data_stream(*_, **__) caikit.core.data_model.streams.data_stream.DataStream[source]

Convert an instance of the source message type into a DataStream

get_field_number() int[source]

Each plugin must define its field number which may be informed by self._config

class caikit.runtime.service_generation.data_stream_source.DataStreamPluginFactory(*args, **kwargs)[source]

Bases: caikit.core.toolkit.factory.ImportableFactory

The DataStreamPluginFactory is responsible for holding a registry of plugin instances that will be used to create and manage data stream sources

_plugins = None
get_plugins(plugins_config: aconfig.Config | None = None) List[DataStreamSourcePlugin][source]

Builds the set of plugins to use for a data stream source of type element_type

caikit.runtime.service_generation.data_stream_source.PluginFactory
class caikit.runtime.service_generation.data_stream_source.DataStreamSourceBase[source]

Bases: caikit.core.data_model.streams.data_stream.DataStream

This base class acts as a sentinel so that dynamically generated data stream source classes can be identified programmatically.

_generator()[source]
__getstate__() bytes[source]

A DataStreamSource is pickled by serializing its source representation. This is particularly useful when sharing data streams across subprocesses to run training in an isolated process.

__setstate__(pickle_bytes: bytes)[source]

Unpickling a DataStreamSource basically involves unpacking the serialized source representation. The catch is that the oneof is represented strangely in __dict__, so we need to explicitly set all oneof members.

property name_to_plugin_map
property _stream

The internal _stream is cached here so that the result of calling to_data_stream can be re-read, rather than requiring to_data_stream to be invoked on every read through the stream

to_data_stream() caikit.core.data_model.streams.data_stream.DataStream[source]

Convert to the target data stream type based on the source type

caikit.runtime.service_generation.data_stream_source.make_data_stream_source(data_element_type: type, plugin_factory: DataStreamPluginFactory = PluginFactory, plugins_config: aconfig.Config | None = None) Type[caikit.core.data_model.base.DataBase][source]

Dynamically create a data stream source message type that supports pulling an iterable of the given type from all valid data stream sources

caikit.runtime.service_generation.data_stream_source._make_data_stream_source_type_name(data_element_type: Type) str[source]

Make the name for data stream source class that wraps the given type