caikit.runtime.service_generation.data_stream_source ==================================================== .. py:module:: caikit.runtime.service_generation.data_stream_source Attributes ---------- .. autoapisummary:: caikit.runtime.service_generation.data_stream_source._DATA_STREAM_SOURCE_TYPES caikit.runtime.service_generation.data_stream_source.log caikit.runtime.service_generation.data_stream_source.error caikit.runtime.service_generation.data_stream_source.PluginFactory Classes ------- .. autoapisummary:: caikit.runtime.service_generation.data_stream_source.DataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.FilePluginBase caikit.runtime.service_generation.data_stream_source.FileDataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.ListOfFilesDataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.DirectoryDataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.JsonDataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.S3FilesDataStreamSourcePlugin caikit.runtime.service_generation.data_stream_source.DataStreamPluginFactory caikit.runtime.service_generation.data_stream_source.DataStreamSourceBase Functions --------- .. autoapisummary:: caikit.runtime.service_generation.data_stream_source.make_data_stream_source caikit.runtime.service_generation.data_stream_source._make_data_stream_source_type_name Module Contents --------------- .. py:data:: _DATA_STREAM_SOURCE_TYPES .. py:data:: log .. py:data:: error .. py:class:: DataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`caikit.core.toolkit.factory.FactoryConstructible` A DataStreamSourcePlugin is a pluggable source that defines the shape of the data object needed as well as the code for accessing the data from some source type. .. py:attribute:: _config .. py:attribute:: _instance_name .. py:method:: get_stream_message_type(element_type: type) -> Type[caikit.core.data_model.base.DataBase] :abstractmethod: Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(source_message: Type[caikit.core.data_model.base.DataBase], element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream :abstractmethod: Convert an instance of the source message type into a DataStream .. py:method:: get_field_number() -> int :abstractmethod: Each plugin must define its field number which may be informed by self._config .. py:method:: get_field_name(element_type: type) -> str The name of the field that this plugin will use in the source oneof .. py:method:: _to_element_type(element_type: type, raw_element: Any) -> Any :staticmethod: .. py:method:: _to_element_partial(element_type: type) -> Callable :staticmethod: .. py:class:: FilePluginBase(config: aconfig.Config, instance_name: str) Bases: :py:obj:`DataStreamSourcePlugin` Intermediate base class for file-based plugins with helper utilities .. py:method:: _create_data_stream_from_file(fname: str, element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream :classmethod: Create a data stream object by deducing file extension and reading the file accordingly .. py:method:: _load_from_file_without_extension(fname, element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream :classmethod: Similar to _create_data_stream_from_file, but we don't have a file extension to work with. Attempt to create a data stream using one of a few well-known formats. 🌶🌶🌶️ on ordering here: File formats are loosely arranged in order of least-to-most-sketchy format validation. 1. .json/.jsonl are pretty straightforward 2. multipart files are a little iffy- the content-type header line can be omitted, in which case we check for a `--` string and roll our own boundary parser. This could cause problems in the future for multi-yaml files that begin with `---` 3. CSV support simply assumes the first line of the file has the column headers, and may confidently return a stream even if that's not the case. .. py:method:: _get_resolved_source_path(input_path: str) -> str :staticmethod: Get a fully resolved path, including any shared prefix .. py:class:: FileDataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`FilePluginBase` Plugin for a single file .. py:attribute:: name :value: 'FileData' This is the name of this constructible type that will be used by the factory to identify this class .. py:method:: get_field_name(element_type: type) -> str Half-Backwards compatibility and half keep FileReference consistent with ListofFiles/Directory .. py:method:: get_stream_message_type(*_, **__) -> Type[caikit.core.data_model.base.DataBase] Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.FileReference, element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream Convert an instance of the source message type into a DataStream .. py:method:: get_field_number() -> int Each plugin must define its field number which may be informed by self._config .. py:class:: ListOfFilesDataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`FilePluginBase` Plugin for a list of files .. py:attribute:: name :value: 'ListOfFiles' This is the name of this constructible type that will be used by the factory to identify this class .. py:method:: get_field_name(element_type: type) -> str Half-Backwards compatibility and half keep ListOfFile consistent with File/Directory .. py:method:: get_stream_message_type(*_, **__) -> Type[caikit.core.data_model.base.DataBase] Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.ListOfFileReferences, element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream Convert an instance of the source message type into a DataStream .. py:method:: get_field_number() -> int Each plugin must define its field number which may be informed by self._config .. py:class:: DirectoryDataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`FilePluginBase` Plugin for a directory holding files .. py:attribute:: name :value: 'Directory' This is the name of this constructible type that will be used by the factory to identify this class .. py:method:: get_stream_message_type(*_, **__) -> Type[caikit.core.data_model.base.DataBase] Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(source_message: caikit.interfaces.common.data_model.stream_sources.Directory, element_type: type) -> caikit.core.data_model.streams.data_stream.DataStream Convert an instance of the source message type into a DataStream .. py:method:: get_field_number() -> int Each plugin must define its field number which may be informed by self._config .. py:class:: JsonDataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`DataStreamSourcePlugin` This plugin is for inline data, elements are provided in a list. This plugin has instantiation logic: it needs the stream's element type so that it can generate a data model for List[element_type] .. py:attribute:: name :value: 'JsonData' This is the name of this constructible type that will be used by the factory to identify this class .. py:attribute:: stream_source_type_cache :type: Dict[Type[caikit.core.data_model.base.DataBase], Type[caikit.core.data_model.base.DataBase]] .. py:method:: get_stream_message_type(element_type: type) -> Type[caikit.core.data_model.base.DataBase] Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(source_message: Type[caikit.core.data_model.base.DataBase], *_, **__) -> caikit.core.data_model.streams.data_stream.DataStream source_message should be of type self.get_stream_message_type So it _should_ contain an attribute named `data`, which is a list .. py:method:: get_field_number() -> int Each plugin must define its field number which may be informed by self._config .. py:class:: S3FilesDataStreamSourcePlugin(config: aconfig.Config, instance_name: str) Bases: :py:obj:`DataStreamSourcePlugin` Unimplemented! .. py:attribute:: name :value: 'S3Files' This is the name of this constructible type that will be used by the factory to identify this class .. py:method:: get_stream_message_type(*_, **__) -> Type[caikit.core.data_model.base.DataBase] Get the type of the dataobject class that will be used as the source information .. py:method:: to_data_stream(*_, **__) -> caikit.core.data_model.streams.data_stream.DataStream Convert an instance of the source message type into a DataStream .. py:method:: get_field_number() -> int Each plugin must define its field number which may be informed by self._config .. py:class:: DataStreamPluginFactory(*args, **kwargs) Bases: :py:obj:`caikit.core.toolkit.factory.ImportableFactory` The DataStreamPluginFactory is responsible for holding a registry of plugin instances that will be used to create and manage data stream sources .. py:attribute:: _plugins :value: None .. py:method:: get_plugins(plugins_config: Optional[aconfig.Config] = None) -> List[DataStreamSourcePlugin] Builds the set of plugins to use for a data stream source of type element_type .. py:data:: PluginFactory .. py:class:: DataStreamSourceBase Bases: :py:obj:`caikit.core.data_model.streams.data_stream.DataStream` This base class acts as a sentinel so that dynamically generated data stream source classes can be identified programmatically. .. py:method:: _generator() .. py:method:: __getstate__() -> bytes A DataStreamSource is pickled by serializing its source representation. This is particularly useful when sharing data streams across subprocesses to run training in an isolated process. .. py:method:: __setstate__(pickle_bytes: bytes) Unpickling a DataStreamSource basically involves unpacking the serialized source representation. The catch is that the oneof is represented strangely in __dict__, so we need to explicitly set all oneof members. .. py:property:: name_to_plugin_map .. py:property:: _stream The internal _stream is cached here so that the result of calling to_data_stream can be re-read, rather than requiring to_data_stream to be invoked on every read through the stream .. py:method:: to_data_stream() -> caikit.core.data_model.streams.data_stream.DataStream Convert to the target data stream type based on the source type .. py:function:: make_data_stream_source(data_element_type: type, plugin_factory: DataStreamPluginFactory = PluginFactory, plugins_config: Optional[aconfig.Config] = None) -> Type[caikit.core.data_model.base.DataBase] Dynamically create a data stream source message type that supports pulling an iterable of the given type from all valid data stream sources .. py:function:: _make_data_stream_source_type_name(data_element_type: Type) -> str Make the name for data stream source class that wraps the given type