caikit.runtime.http_server ========================== .. py:module:: caikit.runtime.http_server Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/caikit/runtime/http_server/__main__/index /autoapi/caikit/runtime/http_server/http_server/index /autoapi/caikit/runtime/http_server/pydantic_wrapper/index /autoapi/caikit/runtime/http_server/request_aborter/index /autoapi/caikit/runtime/http_server/utils/index Attributes ---------- .. autoapisummary:: caikit.runtime.http_server.HEALTH_ENDPOINT caikit.runtime.http_server.MODEL_MANAGEMENT_ENDPOINT caikit.runtime.http_server.MODELS_INFO_ENDPOINT caikit.runtime.http_server.RUNTIME_INFO_ENDPOINT caikit.runtime.http_server.TRAINING_MANAGEMENT_ENDPOINT Classes ------- .. autoapisummary:: caikit.runtime.http_server.RuntimeHTTPServer Functions --------- .. autoapisummary:: caikit.runtime.http_server.dataobject_to_pydantic caikit.runtime.http_server.pydantic_to_dataobject Package Contents ---------------- .. py:data:: HEALTH_ENDPOINT :value: '/health' .. py:data:: MODEL_MANAGEMENT_ENDPOINT :value: '/management/models' .. py:data:: MODELS_INFO_ENDPOINT :value: '/info/models' .. py:data:: RUNTIME_INFO_ENDPOINT :value: '/info/version' .. py:data:: TRAINING_MANAGEMENT_ENDPOINT :value: '/management/trainings' .. py:class:: RuntimeHTTPServer(tls_config_override: Optional[aconfig.Config] = None) Bases: :py:obj:`caikit.runtime.server_base.RuntimeServerBase` An implementation of a FastAPI server that serves caikit runtimes .. py:attribute:: app .. py:attribute:: _openapi_defs .. py:attribute:: global_predict_servicer :value: None .. py:attribute:: global_train_servicer :value: None .. py:attribute:: model_management_servicer :value: None .. py:attribute:: training_management_servicer :value: None .. py:attribute:: info_servicer .. py:attribute:: server .. py:attribute:: _uvicorn_server_thread :value: None .. py:method:: __del__() .. py:method:: start(blocking: bool = True) Boot the http server. Can be non-blocking, or block until shutdown Args: blocking (boolean): Whether to block until shutdown .. py:method:: stop() Stop the server, with an optional grace period. Args: grace_period_seconds (Union[float, int]): Grace period for service shutdown. Defaults to application config .. py:method:: _model_info(model_ids: py_to_proto.dataclass_to_proto.Annotated[List[str], Query(default_factory=list)]) -> Dict[str, Any] Create wrapper for get_models_info so model_ids can be marked as a query parameter .. py:method:: _health_check() -> str :staticmethod: .. py:method:: _deploy_model(context: fastapi.Request) -> fastapi.Response :async: POST handler for deploying a model .. py:method:: _undeploy_model(model_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response :async: DELETE handler for undeploying a model .. py:method:: _get_training_status(training_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response GET handler for fetching a training .. py:method:: _cancel_training(training_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response DELETE handler for undeploying a model .. py:method:: _get_prediction_job_result(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response GET handler for fetching a prediction job result .. py:method:: _get_prediction_job_status(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response GET handler for fetching a prediction job status .. py:method:: _cancel_prediction_job(prediction_id: py_to_proto.dataclass_to_proto.Annotated[str, fastapi.Query]) -> fastapi.Response DELETE handler for cancelling a prediction job .. py:method:: _bind_routes(service: caikit.runtime.service_factory.ServicePackage) Bind all caikit rpcs as routes to the given app .. py:method:: _bind_model_management_routes() Bind the routes for deploy/undeploy .. py:method:: _bind_training_management_routes() Bind the routes for get/cancel trainings .. py:method:: _train_add_unary_input_unary_output_handler(rpc: caikit.runtime.service_generation.rpcs.ModuleClassTrainRPC) Add a unary:unary request handler for this RPC signature .. py:method:: _add_unary_input_unary_output_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC) Add a unary:unary request handler for this RPC signature .. py:method:: _add_prediction_job_management_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC) Bind the routes for get/cancel/result of a prediction .. py:method:: _add_prediction_job_unary_input_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictionJobRPC) Add a unary:unary request handler to start a prediction job for this RPC .. py:method:: _add_unary_input_stream_output_handler(rpc: caikit.runtime.service_generation.rpcs.TaskPredictRPC) .. py:method:: _handle_exception(err: Exception) -> Optional[dict] :staticmethod: Common exception handling. This function will return a dict with "id," "code," and "details" if the exception should be handled with a returned error body. If None is returned, the exception should be re-raised. .. py:method:: _run_in_thread() Run the server in an isolated thread .. py:method:: _get_model_id(request: Type[pydantic.BaseModel]) -> str Get the model id from the payload .. py:method:: _get_request_params(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase, request: Type[pydantic.BaseModel]) -> Dict[str, Any] Get the request params based on the RPC's req params, also convert to DM objects .. py:method:: _get_request_dataobject(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase) -> Type[caikit.core.data_model.DataBase] Get the dataobject request for the given rpc .. py:method:: _get_response_dataobject(rpc: caikit.runtime.service_generation.rpcs.CaikitRPCBase) -> Type[caikit.core.data_model.DataBase] :staticmethod: Get the dataobject response for the given rpc .. py:method:: _format_file_response(dm_class: Type[caikit.core.data_model.DataBase]) -> fastapi.Response :staticmethod: Convert a dm_class into a fastapi file Response .. py:method:: _get_request_openapi(pydantic_model: Union[pydantic.BaseModel, Type, Type[pydantic.BaseModel]]) Helper to generate the openapi schema for a given request .. py:method:: _get_response_openapi(dm_class: Type[caikit.core.data_model.DataBase], pydantic_model: Union[Type, Type[pydantic.BaseModel]], response_code: int = 200) Helper to generate the openapi schema for a given response .. py:method:: _tls_files() -> Iterable[_TlsFiles] This contextmanager ensures that the tls config values are files on disk since SslContext requires files Returns: tls_files (_TlsFiles): The set of configured TLS files .. py:method:: _patch_openapi_spec() FastAPI does not have a way to dynamically add openapi defs for specific paths. This means we must wait till the very end to update the def values. This does allow for adding context specific fields though which is beneficial. .. py:method:: _patch_exit_handler() 🌶️🌶️🌶️ Here there are dragons! 🌶️🌶️🌶️ uvicorn will explicitly set the interrupt handler to `server.handle_exit` when `server.run()` is called. That will override any other signal handlers that we may have tried to set. To work around this, we: 1. Register `server.handle_exit` as a SIGINT/SIGTERM signal handler ourselves, so that it is invoked on interrupt and terminate 2. Set `server.handle_exit` to the existing SIGINT signal handler, so that when the uvicorn server explicitly overrides the signal handler for SIGINT and SIGTERM to this, it has no effect. Since uvicorn overrides SIGINT and SIGTERM with a single common handler, any special handlers added for SIGTERM but not SIGINT will not be invoked. .. py:function:: dataobject_to_pydantic(dm_class: Type[caikit.core.data_model.base.DataBase]) -> Type[pydantic.BaseModel] Make a pydantic model based on the given proto message by using the data model class annotations to mirror as a pydantic model .. py:function:: pydantic_to_dataobject(pydantic_model: pydantic.BaseModel) -> caikit.core.data_model.base.DataBase Convert pydantic objects to our DM objects