caikit.runtime.http_server

Submodules

Attributes

HEALTH_ENDPOINT

MODEL_MANAGEMENT_ENDPOINT

MODELS_INFO_ENDPOINT

RUNTIME_INFO_ENDPOINT

TRAINING_MANAGEMENT_ENDPOINT

Classes

RuntimeHTTPServer

An implementation of a FastAPI server that serves caikit runtimes

Functions

dataobject_to_pydantic(→ Type[pydantic.BaseModel])

Make a pydantic model based on the given proto message by using the data

pydantic_to_dataobject(...)

Convert pydantic objects to our DM objects

Package Contents

caikit.runtime.http_server.HEALTH_ENDPOINT = '/health'
caikit.runtime.http_server.MODEL_MANAGEMENT_ENDPOINT = '/management/models'
caikit.runtime.http_server.MODELS_INFO_ENDPOINT = '/info/models'
caikit.runtime.http_server.RUNTIME_INFO_ENDPOINT = '/info/version'
caikit.runtime.http_server.TRAINING_MANAGEMENT_ENDPOINT = '/management/trainings'
class caikit.runtime.http_server.RuntimeHTTPServer(tls_config_override: aconfig.Config | None = None)[source]

Bases: caikit.runtime.server_base.RuntimeServerBase

An implementation of a FastAPI server that serves caikit runtimes

app
_openapi_defs
global_predict_servicer = None
global_train_servicer = None
model_management_servicer = None
training_management_servicer = None
info_servicer
server
_uvicorn_server_thread = None
__del__()[source]
start(blocking: bool = True)[source]

Boot the http server. Can be non-blocking, or block until shutdown

Args:

blocking (boolean): Whether to block until shutdown

stop()[source]

Stop the server, with an optional grace period.

Args:
grace_period_seconds (Union[float, int]): Grace period for service shutdown.

Defaults to application config

_model_info(model_ids: py_to_proto.dataclass_to_proto.Annotated[List[str], Query(default_factory=list)]) Dict[str, Any][source]

Create wrapper for get_models_info so model_ids can be marked as a query parameter

static _health_check() str[source]
async _deploy_model(context: fastapi.Request) fastapi.Response[source]

POST handler for deploying a model

async _undeploy_model(model_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

DELETE handler for undeploying a model

_get_training_status(training_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

GET handler for fetching a training

_cancel_training(training_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

DELETE handler for undeploying a model

_get_prediction_job_result(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

GET handler for fetching a prediction job result

_get_prediction_job_status(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

GET handler for fetching a prediction job status

_cancel_prediction_job(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) fastapi.Response[source]

DELETE handler for cancelling a prediction job

_bind_routes(service: caikit.runtime.service_factory.ServicePackage)[source]

Bind all caikit rpcs as routes to the given app

_bind_model_management_routes()[source]

Bind the routes for deploy/undeploy

_bind_training_management_routes()[source]

Bind the routes for get/cancel trainings

_train_add_unary_input_unary_output_handler(rpc: caikit.runtime.service_generation.rpcs.ModuleClassTrainRPC)[source]

Add a unary:unary request handler for this RPC signature

_add_unary_input_unary_output_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC)[source]

Add a unary:unary request handler for this RPC signature

_add_prediction_job_management_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC)[source]

Bind the routes for get/cancel/result of a prediction

_add_prediction_job_unary_input_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictionJobRPC)[source]

Add a unary:unary request handler to start a prediction job for this RPC

_add_unary_input_stream_output_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC)[source]
static _handle_exception(err: Exception) dict | None[source]

Common exception handling. This function will return a dict with “id,” “code,” and “details” if the exception should be handled with a returned error body. If None is returned, the exception should be re-raised.

_run_in_thread()[source]

Run the server in an isolated thread

_get_model_id(request: Type[pydantic.BaseModel]) str[source]

Get the model id from the payload

_get_request_params(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase, request: Type[pydantic.BaseModel]) Dict[str, Any][source]

Get the request params based on the RPC’s req params, also convert to DM objects

_get_request_dataobject(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase) Type[caikit.core.data_model.DataBase][source]

Get the dataobject request for the given rpc

static _get_response_dataobject(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase) Type[caikit.core.data_model.DataBase][source]

Get the dataobject response for the given rpc

static _format_file_response(dm_class: Type[caikit.core.data_model.DataBase]) fastapi.Response[source]

Convert a dm_class into a fastapi file Response

_get_request_openapi(pydantic_model: pydantic.BaseModel | Type | Type[pydantic.BaseModel])[source]

Helper to generate the openapi schema for a given request

_get_response_openapi(dm_class: Type[caikit.core.data_model.DataBase], pydantic_model: Type | Type[pydantic.BaseModel], response_code: int = 200)[source]

Helper to generate the openapi schema for a given response

_tls_files() Iterable[_TlsFiles][source]

This contextmanager ensures that the tls config values are files on disk since SslContext requires files

Returns:

tls_files (_TlsFiles): The set of configured TLS files

_patch_openapi_spec()[source]

FastAPI does not have a way to dynamically add openapi defs for specific paths. This means we must wait till the very end to update the def values. This does allow for adding context specific fields though which is beneficial.

_patch_exit_handler()[source]

🌶️🌶️🌶️ Here there are dragons! 🌶️🌶️🌶️ uvicorn will explicitly set the interrupt handler to server.handle_exit when server.run() is called. That will override any other signal handlers that we may have tried to set.

To work around this, we: 1. Register server.handle_exit as a SIGINT/SIGTERM signal handler ourselves, so that it

is invoked on interrupt and terminate

  1. Set server.handle_exit to the existing SIGINT signal handler, so that when the uvicorn

server explicitly overrides the signal handler for SIGINT and SIGTERM to this, it has no effect.

Since uvicorn overrides SIGINT and SIGTERM with a single common handler, any special

handlers added for SIGTERM but not SIGINT will not be invoked.

caikit.runtime.http_server.dataobject_to_pydantic(dm_class: Type[caikit.core.data_model.base.DataBase]) Type[pydantic.BaseModel][source]

Make a pydantic model based on the given proto message by using the data model class annotations to mirror as a pydantic model

caikit.runtime.http_server.pydantic_to_dataobject(pydantic_model: pydantic.BaseModel) caikit.core.data_model.base.DataBase[source]

Convert pydantic objects to our DM objects