Skip to content

API Reference

pytriton.triton.TritonConfig dataclass

Triton Inference Server configuration class for customization of server execution.

The arguments are optional. If value is not provided the defaults for Triton Inference Server are used. Please, refer to https://github.com/triton-inference-server/server/ for more details.

Parameters:

Name Type Description Default
id Optional[str]

Identifier for this server.

None
log_verbose Optional[int]

Set verbose logging level. Zero (0) disables verbose logging and values >= 1 enable verbose logging.

None
log_file Optional[pathlib.Path]

Set the name of the log output file.

None
exit_timeout_secs Optional[int]

Timeout (in seconds) when exiting to wait for in-flight inferences to finish.

None
exit_on_error Optional[bool]

Exit the inference server if an error occurs during initialization.

None
strict_readiness Optional[bool]

If true /v2/health/ready endpoint indicates ready if the server is responsive and all models are available.

None
allow_http Optional[bool]

Allow the server to listen for HTTP requests.

None
http_address Optional[str]

The address for the http server to bind to. Default is 0.0.0.0.

None
http_port Optional[int]

The port for the server to listen on for HTTP requests. Default is 8000.

None
http_header_forward_pattern Optional[str]

The regular expression pattern that will be used for forwarding HTTP headers as inference request parameters.

None
http_thread_count Optional[int]

Number of threads handling HTTP requests.

None
allow_grpc Optional[bool]

Allow the server to listen for GRPC requests.

None
grpc_address Optional[str]

The address for the grpc server to binds to. Default is 0.0.0.0.

None
grpc_port Optional[int]

The port for the server to listen on for GRPC requests. Default is 8001.

None
grpc_header_forward_pattern Optional[str]

The regular expression pattern that will be used for forwarding GRPC headers as inference request parameters.

None
grpc_infer_allocation_pool_size Optional[int]

The maximum number of inference request/response objects that remain allocated for reuse. As long as the number of in-flight requests doesn't exceed this value there will be no allocation/deallocation of request/response objects.

None
grpc_use_ssl Optional[bool]

Use SSL authentication for GRPC requests. Default is false.

None
grpc_use_ssl_mutual Optional[bool]

Use mututal SSL authentication for GRPC requests. This option will preempt grpc_use_ssl if it is also specified. Default is false.

None
grpc_server_cert Optional[pathlib.Path]

File holding PEM-encoded server certificate. Ignored unless grpc_use_ssl is true.

None
grpc_server_key Optional[pathlib.Path]

Path to file holding PEM-encoded server key. Ignored unless grpc_use_ssl is true.

None
grpc_root_cert Optional[pathlib.Path]

Path to file holding PEM-encoded root certificate. Ignored unless grpc_use_ssl is true.

None
grpc_infer_response_compression_level Optional[str]

The compression level to be used while returning the inference response to the peer. Allowed values are none, low, medium and high. Default is none.

None
grpc_keepalive_time Optional[int]

The period (in milliseconds) after which a keepalive ping is sent on the transport.

None
grpc_keepalive_timeout Optional[int]

The period (in milliseconds) the sender of the keepalive ping waits for an acknowledgement.

None
grpc_keepalive_permit_without_calls Optional[bool]

Allows keepalive pings to be sent even if there are no calls in flight

None
grpc_http2_max_pings_without_data Optional[int]

The maximum number of pings that can be sent when there is no data/header frame to be sent.

None
grpc_http2_min_recv_ping_interval_without_data Optional[int]

If there are no data/header frames being sent on the transport, this channel argument on the server side controls the minimum time (in milliseconds) that gRPC Core would expect between receiving successive pings.

None
grpc_http2_max_ping_strikes Optional[int]

Maximum number of bad pings that the server will tolerate before sending an HTTP2 GOAWAY frame and closing the transport.

None
grpc_restricted_protocol

Specify restricted GRPC protocol setting. The format of this flag is ,=. Where is a comma-separated list of protocols to be restricted. will be additional header key to be checked when a GRPC request is received, and is the value expected to be matched.

required
allow_metrics Optional[bool]

Allow the server to provide prometheus metrics.

None
allow_gpu_metrics Optional[bool]

Allow the server to provide GPU metrics.

None
allow_cpu_metrics Optional[bool]

Allow the server to provide CPU metrics.

None
metrics_interval_ms Optional[int]

Metrics will be collected once every milliseconds.

None
metrics_port Optional[int]

The port reporting prometheus metrics.

None
metrics_address Optional[str]

The address for the metrics server to bind to. Default is the same as http_address.

None
allow_sagemaker Optional[bool]

Allow the server to listen for Sagemaker requests.

None
sagemaker_port Optional[int]

The port for the server to listen on for Sagemaker requests.

None
sagemaker_safe_port_range Optional[str]

Set the allowed port range for endpoints other than the SageMaker endpoints.

None
sagemaker_thread_count Optional[int]

Number of threads handling Sagemaker requests.

None
allow_vertex_ai Optional[bool]

Allow the server to listen for Vertex AI requests.

None
vertex_ai_port Optional[int]

The port for the server to listen on for Vertex AI requests.

None
vertex_ai_thread_count Optional[int]

Number of threads handling Vertex AI requests.

None
vertex_ai_default_model Optional[str]

The name of the model to use for single-model inference requests.

None
metrics_config Optional[List[str]]

Specify a metrics-specific configuration setting. The format of this flag is =. It can be specified multiple times

None
trace_config Optional[List[str]]

Specify global or trace mode specific configuration setting. The format of this flag is ,=. Where is either 'triton' or 'opentelemetry'. The default is 'triton'. To specify global trace settings (level, rate, count, or mode), the format would be =. For 'triton' mode, the server will use Triton's Trace APIs. For 'opentelemetry' mode, the server will use OpenTelemetry's APIs to generate, collect and export traces for individual inference requests.

None
cache_config Optional[List[str]]

Specify a cache-specific configuration setting. The format of this flag is ,=. Where is the name of the cache, such as 'local' or 'redis'. Example: local,size=1048576 will configure a 'local' cache implementation with a fixed buffer pool of size 1048576 bytes.

None
cache_directory Optional[str]

The global directory searched for cache shared libraries. Default is '/opt/tritonserver/caches'. This directory is expected to contain a cache implementation as a shared library with the name 'libtritoncache.so'.

None
buffer_manager_thread_count Optional[int]

The number of threads used to accelerate copies and other operations required to manage input and output tensor contents.

None

__post_init__()

Validate configuration for early error handling.

Source code in pytriton/triton.py
def __post_init__(self):
    """Validate configuration for early error handling."""
    if self.allow_http not in [True, None] and self.allow_grpc not in [True, None]:
        raise PyTritonValidationError("The `http` or `grpc` endpoint has to be allowed.")

from_env() classmethod

Creates TritonConfig from environment variables.

Environment variables should start with PYTRITON_TRITON_CONFIG_ prefix. For example:

PYTRITON_TRITON_CONFIG_GRPC_PORT=45436
PYTRITON_TRITON_CONFIG_LOG_VERBOSE=4
Typical use

triton_config = TritonConfig.from_env()

Returns:

Type Description
TritonConfig

TritonConfig class instantiated from environment variables.

Source code in pytriton/triton.py
@classmethod
def from_env(cls) -> "TritonConfig":
    """Creates TritonConfig from environment variables.

    Environment variables should start with `PYTRITON_TRITON_CONFIG_` prefix. For example:

        PYTRITON_TRITON_CONFIG_GRPC_PORT=45436
        PYTRITON_TRITON_CONFIG_LOG_VERBOSE=4

    Typical use:

        triton_config = TritonConfig.from_env()

    Returns:
        TritonConfig class instantiated from environment variables.
    """
    prefix = "PYTRITON_TRITON_CONFIG_"
    config = {name[len(prefix) :].lower(): value for name, value in os.environ.items() if name.startswith(prefix)}
    fields: Dict[str, dataclasses.Field] = {field.name: field for field in dataclasses.fields(cls)}
    unknown_config_parameters = {name: value for name, value in config.items() if name not in fields}
    for name, value in unknown_config_parameters.items():
        LOGGER.warning(
            f"Ignoring {name}={value} as could not find matching config field. "
            f"Available fields: {', '.join(map(str, fields))}"
        )

    def _cast_value(_field, _value):
        field_type = _field.type
        is_optional = typing_inspect.is_optional_type(field_type)
        if is_optional:
            field_type = field_type.__args__[0]
        return field_type(_value)

    config_with_casted_values = {
        name: _cast_value(fields[name], value) for name, value in config.items() if name in fields
    }
    return cls(**config_with_casted_values)

to_dict()

Map config object to dictionary.

Source code in pytriton/triton.py
def to_dict(self):
    """Map config object to dictionary."""
    return dataclasses.asdict(self)

pytriton.decorators

Inference callable decorators.

ConstantPadder(pad_value=0)

Padder that pads the given batches with a constant value.

Initialize the padder.

Parameters:

Name Type Description Default
pad_value int

Padding value. Defaults to 0.

0
Source code in pytriton/decorators.py
def __init__(self, pad_value=0):
    """Initialize the padder.

    Args:
        pad_value (int, optional): Padding value. Defaults to 0.
    """
    self.pad_value = pad_value

__call__(batches_list)

Pad the given batches with the specified value to pad size enabling further batching to single arrays.

Parameters:

Name Type Description Default
batches_list List[Dict[str, np.ndarray]]

List of batches to pad.

required

Returns:

Type Description
InferenceResults

List[Dict[str, np.ndarray]]: List of padded batches.

Raises:

Type Description
PyTritonRuntimeError

If the input arrays for a given input name have different dtypes.

Source code in pytriton/decorators.py
def __call__(self, batches_list: InferenceResults) -> InferenceResults:
    """Pad the given batches with the specified value to pad size enabling further batching to single arrays.

    Args:
        batches_list (List[Dict[str, np.ndarray]]): List of batches to pad.

    Returns:
        List[Dict[str, np.ndarray]]: List of padded batches.

    Raises:
        PyTritonRuntimeError: If the input arrays for a given input name have different dtypes.
    """

    def _get_padded_shape(_batches: List[np.ndarray]) -> Tuple[int, ...]:
        """Get the shape of the padded array without batch axis."""
        return tuple(np.max([batch.shape[1:] for batch in _batches if batch is not None], axis=0))

    def _get_padded_dtype(_batches: List[np.ndarray]) -> np.dtype:
        dtypes = [batch.dtype for batch in _batches if batch is not None]
        result_dtype = dtypes[0]

        if not all(dtype.kind == result_dtype.kind for dtype in dtypes):
            raise PyTritonRuntimeError("All input arrays for given input name must have the same dtype.")

        # for bytes (encoded string) or unicode string need to obtain the max length
        if result_dtype.kind in "SU":
            order_and_kind = result_dtype.str[:2]
            max_len = max([int(dtype.str[2:]) for dtype in dtypes])
            result_dtype = f"{order_and_kind}{max_len}"
        else:
            if not all(dtype == result_dtype for dtype in dtypes):
                raise PyTritonRuntimeError("All input arrays for given input name must have the same dtype.")

        return np.dtype(result_dtype)

    input_names = list(
        collections.OrderedDict.fromkeys(input_name for batch in batches_list for input_name in batch.keys())
    )
    batches_by_name = {input_name: [batch.get(input_name) for batch in batches_list] for input_name in input_names}
    for input_batches in batches_by_name.values():
        result_shape, result_dtype = _get_padded_shape(input_batches), _get_padded_dtype(input_batches)
        for batch_idx, batch in enumerate(input_batches):
            if batch is not None:
                input_batches[batch_idx] = np.pad(
                    batch,
                    [(0, 0)] + [(0, b - a) for a, b in zip(batch.shape[1:], result_shape)],
                    mode="constant",
                    constant_values=self.pad_value if result_dtype.kind not in ["S", "U", "O"] else b"",
                ).astype(result_dtype)

    return [
        {name: batches[batch_idx] for name, batches in batches_by_name.items() if batches[batch_idx] is not None}
        for batch_idx in range(len(batches_list))
    ]

ModelConfigDict()

Bases: MutableMapping

Dictionary for storing model configs for inference callable.

Create ModelConfigDict object.

Source code in pytriton/decorators.py
def __init__(self):
    """Create ModelConfigDict object."""
    self._data: Dict[str, TritonModelConfig] = {}
    self._keys: List[Callable] = []

__delitem__(infer_callable)

Delete model config for inference callable.

Source code in pytriton/decorators.py
def __delitem__(self, infer_callable: Callable):
    """Delete model config for inference callable."""
    key = self._get_model_config_key(infer_callable)
    del self._data[key]

__getitem__(infer_callable)

Get model config for inference callable.

Source code in pytriton/decorators.py
def __getitem__(self, infer_callable: Callable) -> TritonModelConfig:
    """Get model config for inference callable."""
    key = self._get_model_config_key(infer_callable)
    return self._data[key]

__iter__()

Iterate over inference callable keys.

Source code in pytriton/decorators.py
def __iter__(self):
    """Iterate over inference callable keys."""
    return iter(self._keys)

__len__()

Get number of inference callable keys.

Source code in pytriton/decorators.py
def __len__(self):
    """Get number of inference callable keys."""
    return len(self._data)

__setitem__(infer_callable, item)

Set model config for inference callable.

Source code in pytriton/decorators.py
def __setitem__(self, infer_callable: Callable, item: TritonModelConfig):
    """Set model config for inference callable."""
    self._keys.append(infer_callable)
    key = self._get_model_config_key(infer_callable)
    self._data[key] = item

TritonContext dataclass

Triton context definition class.

batch(wrapped, instance, args, kwargs)

Decorator for converting list of request dicts to dict of input batches.

Converts list of request dicts to dict of input batches. It passes **kwargs to inference callable where each named input contains numpy array with batch of requests received by Triton server. We assume that each request has the same set of keys (you can use group_by_keys decorator before using @batch decorator if your requests may have different set of keys).

Source code in pytriton/decorators.py
@wrapt.decorator
def batch(wrapped, instance, args, kwargs):
    """Decorator for converting list of request dicts to dict of input batches.

    Converts list of request dicts to dict of input batches.
    It passes **kwargs to inference callable where each named input contains numpy array with batch of requests
    received by Triton server.
    We assume that each request has the same set of keys (you can use group_by_keys decorator before
    using @batch decorator if your requests may have different set of keys).
    """
    req_list = args[0]
    input_names = req_list[0].keys()

    for req_dict2 in req_list[1:]:
        if input_names != req_dict2.keys():
            raise PyTritonValidationError("Cannot batch requests with different set of inputs keys")

    inputs = {}
    for model_input in input_names:
        concatenated_input_data = np.concatenate([req[model_input] for req in req_list])
        inputs[model_input] = concatenated_input_data

    args = args[1:]
    new_kwargs = dict(kwargs)
    new_kwargs.update(inputs)
    outputs = wrapped(*args, **new_kwargs)

    outputs = convert_output(outputs, wrapped, instance)
    output_names = outputs.keys()

    out_list = []
    start_idx = 0
    for request in req_list:
        # get batch_size of first input for each request - assume that all inputs have same batch_size
        first_input = next(iter(request.values()))
        request_batch_size = first_input.shape[0]
        req_output_dict = {}
        for _output_ind, output_name in enumerate(output_names):
            req_output = outputs[output_name][start_idx : start_idx + request_batch_size, ...]
            req_output_dict[output_name] = req_output
        out_list.append(req_output_dict)
        start_idx += request_batch_size
    return out_list

convert_output(outputs, wrapped=None, instance=None, model_config=None)

Converts output from tuple ot list to dictionary.

It is utility function useful for mapping output list into dictionary of outputs. Currently, it is used in @sample and @batch decorators (we assume that user can return list or tuple of outputs instead of dictionary if this list matches output list in model config (size and order).

Source code in pytriton/decorators.py
def convert_output(
    outputs: Union[Dict, List, Tuple], wrapped=None, instance=None, model_config: Optional[TritonModelConfig] = None
):
    """Converts output from tuple ot list to dictionary.

    It is utility function useful for mapping output list into dictionary of outputs.
    Currently, it is used in @sample and @batch decorators (we assume that user can return list or tuple of outputs
    instead of dictionary if this list matches output list in model config (size and order).
    """
    if isinstance(outputs, dict):
        return outputs
    elif isinstance(outputs, (list, tuple)):
        if model_config is None:
            model_config = get_model_config(wrapped, instance)
        if len(outputs) != len(model_config.outputs):
            raise PyTritonValidationError("Outputs length different than config outputs length")
        outputs = {config_output.name: output for config_output, output in zip(model_config.outputs, outputs)}
        return outputs
    else:
        raise PyTritonValidationError(f"Unsupported output type {type(outputs)}.")

fill_optionals(**defaults)

This decorator ensures that any missing inputs in requests are filled with default values specified by the user.

Default values should be NumPy arrays without batch axis.

If you plan to group requests ex. with @group_by_keys or @group_by_vales decorators provide default values for optional parameters at the beginning of decorators stack. The other decorators can then group requests into bigger batches resulting in a better model performance.

Typical use

@fill_optionals() @group_by_keys() @batch def infer_fun(**inputs): ... return outputs

Parameters:

Name Type Description Default
defaults

keyword arguments containing default values for missing inputs

{}

If you have default values for some optional parameter it is good idea to provide them at the very beginning, so the other decorators (e.g. @group_by_keys) can make bigger consistent groups.

Source code in pytriton/decorators.py
def fill_optionals(**defaults):
    """This decorator ensures that any missing inputs in requests are filled with default values specified by the user.

    Default values should be NumPy arrays without batch axis.

    If you plan to group requests ex. with
    [@group_by_keys][pytriton.decorators.group_by_keys] or
    [@group_by_vales][pytriton.decorators.group_by_values] decorators
    provide default values for optional parameters at the beginning of decorators stack.
    The other decorators can then group requests into bigger batches resulting in a better model performance.

    Typical use:

        @fill_optionals()
        @group_by_keys()
        @batch
        def infer_fun(**inputs):
            ...
            return outputs

    Args:
        defaults: keyword arguments containing default values for missing inputs


    If you have default values for some optional parameter it is good idea to provide them at the very beginning,
    so the other decorators (e.g. @group_by_keys) can make bigger consistent groups.
    """

    def _verify_defaults(model_config: TritonModelConfig):
        inputs = {spec.name: spec for spec in model_config.inputs}
        not_matching_default_names = sorted(set(defaults) - set(inputs))
        if not_matching_default_names:
            raise PyTritonBadParameterError(f"Could not found {', '.join(not_matching_default_names)} inputs")

        non_numpy_items = {k: v for k, v in defaults.items() if not isinstance(v, np.ndarray)}
        if non_numpy_items:
            raise PyTritonBadParameterError(
                f"Could not use {', '.join([f'{k}={v}' for k, v in non_numpy_items.items()])} defaults "
                "as they are not NumPy arrays"
            )

        not_matching_dtypes = {k: (v.dtype, inputs[k].dtype) for k, v in defaults.items() if v.dtype != inputs[k].dtype}
        if not_matching_dtypes:
            non_matching_dtypes_str_list = [
                f"{name}: dtype={have_dtype} expected_dtype={expected_dtype}"
                for name, (have_dtype, expected_dtype) in not_matching_dtypes.items()
            ]
            raise PyTritonBadParameterError(
                f"Could not use {', '.join(non_matching_dtypes_str_list)} "
                f"defaults as they have different than input signature dtypes"
            )

        def _shape_match(_have_shape, _expected_shape):
            return len(_have_shape) == len(_expected_shape) and all(
                e == -1 or h == e for h, e in zip(_have_shape, _expected_shape)
            )

        not_matching_shapes = {
            k: (v.shape, inputs[k].shape) for k, v in defaults.items() if not _shape_match(v.shape, inputs[k].shape)
        }
        if not_matching_shapes:
            non_matching_shapes_str_list = [
                f"{name}: shape={have_shape} expected_shape={expected_shape}"
                for name, (have_shape, expected_shape) in not_matching_shapes.items()
            ]
            raise PyTritonBadParameterError(
                f"Could not use {', '.join(non_matching_shapes_str_list)} "
                f"defaults as they have different than input signature shapes"
            )

    @wrapt.decorator
    def _wrapper(wrapped, instance, args, kwargs):
        model_config = get_model_config(wrapped, instance)
        _verify_defaults(model_config)
        # verification if not after group wrappers is in group wrappers

        (requests,) = args

        model_supports_batching = model_config.batching
        for request in requests:
            batch_size = get_inference_request_batch_size(request) if model_supports_batching else None
            for default_key, default_value in defaults.items():
                if default_key in request:
                    continue

                if model_supports_batching:
                    ones_reps = (1,) * default_value.ndim  # repeat once default_value on each axis
                    axis_reps = (batch_size,) + ones_reps  # ... except on batch axis. we repeat it batch_size times
                    default_value = np.tile(default_value, axis_reps)

                request[default_key] = default_value
        return wrapped(*args, **kwargs)

    return _wrapper

first_value(*keys, squeeze_single_values=True, strict=True)

This decorator overwrites selected inputs with first element of the given input.

It can be used in two ways:

  1. Wrapping a single request inference callable by chaining with @batch decorator: @batch @first_value("temperature") def infer_fn(**inputs): ... return result

  2. Wrapping a multiple requests inference callable: @first_value("temperature") def infer_fn(requests): ... return results

By default, the decorator squeezes single value arrays to scalars. This behavior can be disabled by setting the squeeze_single_values flag to False.

By default, the decorator checks the equality of the values on selected values. This behavior can be disabled by setting the strict flag to False.

Wrapper can only be used with models that support batching.

Parameters:

Name Type Description Default
keys str

The input keys selected for conversion.

()
squeeze_single_values

squeeze single value ND array to scalar values. Defaults to True.

True
strict bool

enable checking if all values on single selected input of request are equal. Defaults to True.

True

Raises:

Type Description
PyTritonRuntimeError

if not all values on a single selected input of the request are equal

PyTritonBadParameterError

if any of the keys passed to the decorator are not allowed.

Source code in pytriton/decorators.py
def first_value(*keys: str, squeeze_single_values=True, strict: bool = True):
    """This decorator overwrites selected inputs with first element of the given input.

    It can be used in two ways:

    1. Wrapping a single request inference callable by chaining with @batch decorator:
        @batch
        @first_value("temperature")
        def infer_fn(**inputs):
            ...
            return result

    2. Wrapping a multiple requests inference callable:
        @first_value("temperature")
        def infer_fn(requests):
            ...
            return results

    By default, the decorator squeezes single value arrays to scalars.
    This behavior can be disabled by setting the `squeeze_single_values` flag to False.

    By default, the decorator checks the equality of the values on selected values.
    This behavior can be disabled by setting the `strict` flag to False.

    Wrapper can only be used with models that support batching.

    Args:
        keys: The input keys selected for conversion.
        squeeze_single_values: squeeze single value ND array to scalar values. Defaults to True.
        strict: enable checking if all values on single selected input of request are equal. Defaults to True.

    Raises:
        PyTritonRuntimeError: if not all values on a single selected input of the request are equal
        and the strict flag is set to True. Additionally, if the decorator is used with a model that doesn't support batching,
        PyTritonBadParameterError: if any of the keys passed to the decorator are not allowed.
    """
    if any(k in _SPECIAL_KEYS for k in keys):
        not_allowed_keys = [key for key in keys if key in _SPECIAL_KEYS]
        raise PyTritonBadParameterError(
            f"The keys {', '.join(not_allowed_keys)} are not allowed as keys for @first_value wrapper. "
            f"The set of not allowed keys are {', '.join(_SPECIAL_KEYS)}"
        )

    @wrapt.decorator
    def wrapper(wrapped, instance, args, kwargs):

        model_config = get_model_config(wrapped, instance)
        if not model_config.batching:
            raise PyTritonRuntimeError("The @first_value decorator can only be used with models that support batching.")

        def _replace_inputs_with_first_value(_request):
            for input_name in keys:
                if input_name not in _request:
                    continue

                values = _request[input_name]
                if strict:
                    # do not set axis for arrays with strings (object) or models not supporting batching
                    axis_of_uniqueness = None if values.dtype == object else 0
                    unique_values = np.unique(values, axis=axis_of_uniqueness)
                    if len(unique_values) > 1:
                        raise PyTritonRuntimeError(
                            f"The values on the {input_name!r} input are not equal. "
                            "To proceed, either disable strict mode in @first_value wrapper "
                            "or ensure that the values always are consistent. "
                            f"The current values of {input_name!r} are {_request[input_name]!r}."
                        )

                _first_value = values[0]
                if (
                    squeeze_single_values
                    and not np.isscalar(_first_value)
                    and all(dim == 1 for dim in _first_value.shape)
                ):
                    _dim_0_array = np.squeeze(_first_value)
                    _first_value = _dim_0_array[()]  # obtain scalar from 0-dim array with numpy type

                _request[input_name] = _first_value
            return _request

        inputs_names = set(kwargs) - set(_SPECIAL_KEYS)
        if inputs_names:
            kwargs = _replace_inputs_with_first_value(kwargs)
            return wrapped(*args, **kwargs)
        else:
            requests, *other_args = args
            requests = [_replace_inputs_with_first_value(request) for request in requests]
            return wrapped(requests, *other_args, **kwargs)

    return wrapper

get_inference_request_batch_size(inference_request)

Get batch size from triton request.

Parameters:

Name Type Description Default
inference_request InferenceRequest

Triton request.

required

Returns:

Name Type Description
int int

Batch size.

Source code in pytriton/decorators.py
def get_inference_request_batch_size(inference_request: InferenceRequest) -> int:
    """Get batch size from triton request.

    Args:
        inference_request (InferenceRequest): Triton request.

    Returns:
        int: Batch size.
    """
    first_input_value = next(iter(inference_request.values()))
    batch_size, *dims = first_input_value.shape
    return batch_size

get_model_config(wrapped, instance)

Retrieves instance of TritonModelConfig from callable.

It is internally used in convert_output function to get output list from model. You can use this in custom decorators if you need access to model_config information. If you use @triton_context decorator you do not need this function (you can get model_config directly from triton_context passing function/callable to dictionary getter).

Source code in pytriton/decorators.py
def get_model_config(wrapped, instance) -> TritonModelConfig:
    """Retrieves instance of TritonModelConfig from callable.

    It is internally used in convert_output function to get output list from model.
    You can use this in custom decorators if you need access to model_config information.
    If you use @triton_context decorator you do not need this function (you can get model_config directly
    from triton_context passing function/callable to dictionary getter).
    """
    return get_triton_context(wrapped, instance).model_configs[wrapped]

get_triton_context(wrapped, instance)

Retrieves triton context from callable.

It is used in @triton_context to get triton context registered by triton binding in inference callable. If you use @triton_context decorator you do not need this function.

Source code in pytriton/decorators.py
def get_triton_context(wrapped, instance) -> TritonContext:
    """Retrieves triton context from callable.

    It is used in @triton_context to get triton context registered by triton binding in inference callable.
    If you use @triton_context decorator you do not need this function.
    """
    caller = instance or wrapped
    if not hasattr(caller, "__triton_context__"):
        raise PyTritonValidationError("Wrapped function or object must bound with triton to get  __triton_context__")
    return caller.__triton_context__

group_by_keys(wrapped, instance, args, kwargs)

Group by keys.

Decorator prepares groups of requests with the same set of keys and calls wrapped function for each group separately (it is convenient to use this decorator before batching, because the batching decorator requires consistent set of inputs as it stacks them into batches).

Source code in pytriton/decorators.py
@wrapt.decorator
def group_by_keys(wrapped, instance, args, kwargs):
    """Group by keys.

    Decorator prepares groups of requests with the same set of keys and calls wrapped function
    for each group separately (it is convenient to use this decorator before batching, because the batching decorator
    requires consistent set of inputs as it stacks them into batches).
    """
    inputs = args[0]
    idx_inputs = [(idx, tuple(sorted(input.keys())), input) for idx, input in enumerate(inputs)]
    idx_inputs.sort(key=operator.itemgetter(1))
    idx_groups_res = []
    for _, group in itertools.groupby(idx_inputs, key=operator.itemgetter(1)):
        idx, _key, sample_list = zip(*group)
        args = (list(sample_list),) + args[1:]
        out = wrapped(*args, **kwargs)
        idx_groups_res.extend(zip(idx, out))

    idx_groups_res.sort(key=operator.itemgetter(0))
    res_flat = [r[1] for r in idx_groups_res]
    return res_flat

group_by_values(*keys, pad_fn=None)

Decorator for grouping requests by values of selected keys.

This function splits a batch into multiple sub-batches based on the specified keys values and calls the decorated function with each sub-batch. This is particularly useful when working with models that require dynamic parameters sent by the user.

For example, given an input of the form:

{"sentences": [b"Sentence1", b"Sentence2", b"Sentence3"], "param1": [1, 1, 2], "param2": [1, 1, 1]}

Using @group_by_values("param1", "param2") will split the batch into two sub-batches:

[
    {"sentences": [b"Sentence1", b"Sentence2"], "param1": [1, 1], "param2": [1, 1]},
    {"sentences": [b"Sentence3"], "param1": [2], "param2": [1]}
]

This decorator should be used after the @batch decorator.

Example usage

@batch @group_by_values("param1", "param2") def infer_fun(**inputs): ... return outputs

Parameters:

Name Type Description Default
*keys

List of keys to group by.

()
pad_fn typing.Optional[typing.Callable[[InferenceRequests], InferenceRequests]]

Optional function to pad the batch to the same size before merging again to a single batch.

None

Returns:

Type Description

The decorator function.

Source code in pytriton/decorators.py
def group_by_values(*keys, pad_fn: typing.Optional[typing.Callable[[InferenceRequests], InferenceRequests]] = None):
    """Decorator for grouping requests by values of selected keys.

    This function splits a batch into multiple sub-batches based on the specified keys values and
    calls the decorated function with each sub-batch. This is particularly useful when working with models
    that require dynamic parameters sent by the user.

    For example, given an input of the form:

        {"sentences": [b"Sentence1", b"Sentence2", b"Sentence3"], "param1": [1, 1, 2], "param2": [1, 1, 1]}

    Using @group_by_values("param1", "param2") will split the batch into two sub-batches:

        [
            {"sentences": [b"Sentence1", b"Sentence2"], "param1": [1, 1], "param2": [1, 1]},
            {"sentences": [b"Sentence3"], "param1": [2], "param2": [1]}
        ]

    This decorator should be used after the @batch decorator.

    Example usage:

        @batch
        @group_by_values("param1", "param2")
        def infer_fun(**inputs):
            ...
            return outputs

    Args:
        *keys: List of keys to group by.
        pad_fn: Optional function to pad the batch to the same size before merging again to a single batch.

    Returns:
        The decorator function.
    """

    def value_to_key(value):
        if isinstance(value, np.ndarray):
            if value.dtype == np.object_ or value.dtype.type == np.bytes_:
                return _serialize_byte_tensor(value)
            else:
                return value.tobytes()
        return value

    def _get_sort_key_for_sample(_request, _sample_idx: int):
        return tuple(value_to_key(_request[_key][_sample_idx]) for _key in keys)

    def _group_request(_request: InferenceRequest, _batch_size: int):
        idx_inputs = [(sample_idx, _get_sort_key_for_sample(_request, sample_idx)) for sample_idx in range(_batch_size)]
        idx_inputs.sort(key=operator.itemgetter(1))
        for _, group in itertools.groupby(idx_inputs, key=operator.itemgetter(1)):
            _samples_idxes, _ = zip(*group)
            grouped_request = {input_name: value[_samples_idxes, ...] for input_name, value in _request.items()}
            yield _samples_idxes, grouped_request

    @wrapt.decorator
    def _wrapper(wrapped, instance, args, kwargs):

        wrappers_stack = [
            callable_with_wrapper.wrapper
            for callable_with_wrapper in _get_wrapt_stack(wrapped)
            if callable_with_wrapper.wrapper is not None
        ]
        if batch in wrappers_stack:
            raise PyTritonRuntimeError("The @group_by_values decorator must be used after the @batch decorator.")

        request = {k: v for k, v in kwargs.items() if k not in _SPECIAL_KEYS}
        other_kwargs = {k: v for k, v in kwargs.items() if k in _SPECIAL_KEYS}

        batch_size = get_inference_request_batch_size(request)
        sample_indices_with_interim_result = []
        for sample_indices, _grouped_sub_request in _group_request(request, batch_size):
            interim_result = wrapped(*args, **_grouped_sub_request, **other_kwargs)
            sample_indices_with_interim_result.append((sample_indices, interim_result))

        if pad_fn is not None:
            indices, results = tuple(map(tuple, zip(*sample_indices_with_interim_result)))
            results = pad_fn(results)
            sample_indices_with_interim_result = tuple(zip(indices, results))

        _, first_result_data = sample_indices_with_interim_result[0]
        result = {
            output_name: np.zeros((batch_size,) + data.shape[1:], dtype=data.dtype)
            for output_name, data in first_result_data.items()
        }
        for indices, results in sample_indices_with_interim_result:
            for output_name, data in results.items():
                result[output_name][indices, ...] = data

        return result

    return _wrapper

pad_batch(wrapped, instance, args, kwargs)

Add padding to the inputs batches.

Decorator appends last rows to the inputs multiple times to get desired batch size (preferred batch size or max batch size from model config whatever is closer to current input size).

Source code in pytriton/decorators.py
@wrapt.decorator
def pad_batch(wrapped, instance, args, kwargs):
    """Add padding to the inputs batches.

    Decorator appends last rows to the inputs multiple times to get desired batch size (preferred batch size or
    max batch size from model config whatever is closer to current input size).
    """
    inputs = {k: v for k, v in kwargs.items() if k != "__triton_context__"}
    first_input = next(iter(inputs.values()))
    config = get_model_config(wrapped, instance)
    batch_sizes = (
        []
        if (config.batcher is None or config.batcher.preferred_batch_size is None)
        else sorted(config.batcher.preferred_batch_size)
    )
    batch_sizes.append(config.max_batch_size)
    batch_size = batch_sizes[bisect_left(batch_sizes, first_input.shape[0])]

    new_inputs = {
        input_name: np.repeat(
            input_array,
            np.concatenate(
                [np.ones(input_array.shape[0] - 1), np.array([batch_size - input_array.shape[0] + 1])]
            ).astype(np.int64),
            axis=0,
        )
        for input_name, input_array in inputs.items()
    }

    kwargs.update(new_inputs)
    return wrapped(*args, **kwargs)

sample(wrapped, instance, args, kwargs)

Decorator is used for non-batched inputs to convert from one element list of requests to request kwargs.

Decorator takes first request and convert it into named inputs. Useful with non-batching models - instead of one element list of request, we will get named inputs - kwargs.

Source code in pytriton/decorators.py
@wrapt.decorator
def sample(wrapped, instance, args, kwargs):
    """Decorator is used for non-batched inputs to convert from one element list of requests to request kwargs.

    Decorator takes first request and convert it into named inputs.
    Useful with non-batching models - instead of one element list of request, we will get named inputs - `kwargs`.
    """
    kwargs.update(args[0][0])
    outputs = wrapped(*args[1:], **kwargs)
    outputs = convert_output(outputs, wrapped, instance)
    return [outputs]

triton_context(wrapped, instance, args, kwargs)

Adds triton context.

It gives you additional argument passed to the function in **kwargs called 'triton_context'. You can read model config from it and in the future possibly have some interaction with triton.

Source code in pytriton/decorators.py
@wrapt.decorator
def triton_context(wrapped, instance, args, kwargs):
    """Adds triton context.

    It gives you additional argument passed to the function in **kwargs called 'triton_context'.
    You can read model config from it and in the future possibly have some interaction with triton.
    """
    kwargs[TRITON_CONTEXT_FIELD_NAME] = get_triton_context(wrapped, instance)
    return wrapped(*args, **kwargs)

pytriton.triton.Triton(*, config=None, workspace=None)

Triton Inference Server for Python models.

Initialize Triton Inference Server context for starting server and loading models.

Parameters:

Name Type Description Default
config Optional[TritonConfig]

TritonConfig object with optional customizations for Triton Inference Server. Configuration can be passed also through environment variables. See TritonConfig.from_env() class method for details.

Order of precedence:

  • config defined through config parameter of init method.
  • config defined in environment variables
  • default TritonConfig values
None
workspace Union[Workspace, str, pathlib.Path, None]

workspace or path where the Triton Model Store and files used by pytriton will be created. If workspace is None random workspace will be created. Workspace will be deleted in Triton.stop().

None
Source code in pytriton/triton.py
def __init__(
    self, *, config: Optional[TritonConfig] = None, workspace: Union[Workspace, str, pathlib.Path, None] = None
):
    """Initialize Triton Inference Server context for starting server and loading models.

    Args:
        config: TritonConfig object with optional customizations for Triton Inference Server.
            Configuration can be passed also through environment variables.
            See [TritonConfig.from_env()][pytriton.triton.TritonConfig.from_env] class method for details.

            Order of precedence:

              - config defined through `config` parameter of init method.
              - config defined in environment variables
              - default TritonConfig values
        workspace: workspace or path where the Triton Model Store and files used by pytriton will be created.
            If workspace is `None` random workspace will be created.
            Workspace will be deleted in [Triton.stop()][pytriton.triton.Triton.stop].
    """

    def _without_none_values(_d):
        return {name: value for name, value in _d.items() if value is not None}

    default_config_dict = _without_none_values(TritonConfig().to_dict())
    env_config_dict = _without_none_values(TritonConfig.from_env().to_dict())
    explicit_config_dict = _without_none_values(config.to_dict() if config else {})
    config_dict = {**default_config_dict, **env_config_dict, **explicit_config_dict}
    self._config = TritonConfig(**config_dict)
    self._workspace = workspace if isinstance(workspace, Workspace) else Workspace(workspace)

    model_repository = TritonModelRepository(path=self._config.model_repository, workspace=self._workspace)
    self._model_manager = ModelManager(model_repository)

    self._triton_server_config = TritonServerConfig()
    config_data = self._config.to_dict()

    self._python_backend_config = PythonBackendConfig()
    backend_config_data = {
        "shm_default_byte_size": INITIAL_BACKEND_SHM_SIZE,
        "shm-growth-byte-size": GROWTH_BACKEND_SHM_SIZE,
    }
    for name, value in backend_config_data.items():
        if name not in PythonBackendConfig.allowed_keys() or value is None:
            continue

        self._python_backend_config[name] = value

    for name, value in config_data.items():
        if name not in TritonServerConfig.allowed_keys() or value is None:
            continue

        self._triton_server_config[name] = value

    self._triton_server_config["backend_config"] = self._python_backend_config.to_cli_string()

    self._triton_server_config["model_repository"] = model_repository.path.as_posix()
    self._triton_server_config["backend_directory"] = (TRITONSERVER_DIST_DIR / "backends").as_posix()
    if "cache_directory" not in self._triton_server_config:
        self._triton_server_config["cache_directory"] = (get_root_module_path() / "tritonserver/caches").as_posix()

    self._triton_server = TritonServer(
        path=(TRITONSERVER_DIST_DIR / "bin/tritonserver").as_posix(),
        libs_path=get_libs_path(),
        config=self._triton_server_config,
    )

    self._cv = th.Condition()
    with self._cv:
        self._stopped = True

    self.triton_context = TritonContext()
    url = (
        self._triton_server.get_endpoint("http")
        if (self._config.allow_http is None or self._config.allow_http)
        else self._triton_server.get_endpoint("grpc")
    )
    self._log_level_checker = _LogLevelChecker(url)

__enter__()

Enter the context.

Returns:

Type Description
Triton

A Triton object

Source code in pytriton/triton.py
def __enter__(self) -> "Triton":
    """Enter the context.

    Returns:
        A Triton object
    """
    return self

__exit__(*_)

Exit the context stopping the process and cleaning the workspace.

Parameters:

Name Type Description Default
*_

unused arguments

()
Source code in pytriton/triton.py
def __exit__(self, *_) -> None:
    """Exit the context stopping the process and cleaning the workspace.

    Args:
        *_: unused arguments
    """
    self.stop()

bind(model_name, infer_func, inputs, outputs, model_version=1, config=None, strict=False)

Create a model with given name and inference callable binding into Triton Inference Server.

More information about model configuration: https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md

Parameters:

Name Type Description Default
infer_func Union[Callable, Sequence[Callable]]

Inference callable to handle request/response from Triton Inference Server

required
inputs Sequence[Tensor]

Definition of model inputs

required
outputs Sequence[Tensor]

Definition of model outputs

required
model_name str

Name under which model is available in Triton Inference Server. It can only contain

required
model_version int

Version of model

1
config Optional[ModelConfig]

Model configuration for Triton Inference Server deployment

None
strict bool

Enable strict validation between model config outputs and inference function result

False
Source code in pytriton/triton.py
def bind(
    self,
    model_name: str,
    infer_func: Union[Callable, Sequence[Callable]],
    inputs: Sequence[Tensor],
    outputs: Sequence[Tensor],
    model_version: int = 1,
    config: Optional[ModelConfig] = None,
    strict: bool = False,
) -> None:
    """Create a model with given name and inference callable binding into Triton Inference Server.

    More information about model configuration:
    https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md

    Args:
        infer_func: Inference callable to handle request/response from Triton Inference Server
        (or list of inference callable for multi instance model)
        inputs: Definition of model inputs
        outputs: Definition of model outputs
        model_name: Name under which model is available in Triton Inference Server. It can only contain
        alphanumeric characters, dots, underscores and dashes.
        model_version: Version of model
        config: Model configuration for Triton Inference Server deployment
        strict: Enable strict validation between model config outputs and inference function result
    """
    self._validate_model_name(model_name)
    model = Model(
        model_name=model_name,
        model_version=model_version,
        inference_fn=infer_func,
        inputs=inputs,
        outputs=outputs,
        config=config if config else ModelConfig(),
        workspace=self._workspace,
        triton_context=self.triton_context,
        strict=strict,
    )
    model.on_model_event(self._on_model_event)

    self._model_manager.add_model(model)

is_alive()

Verify is deployed models and server are alive.

Returns:

Type Description
bool

True if server and loaded models are alive, False otherwise.

Source code in pytriton/triton.py
def is_alive(self) -> bool:
    """Verify is deployed models and server are alive.

    Returns:
        True if server and loaded models are alive, False otherwise.
    """
    if not self._triton_server.is_alive():
        return False

    for model in self._model_manager.models:
        if not model.is_alive():
            return False
    return True

run()

Run Triton Inference Server.

Source code in pytriton/triton.py
def run(self) -> None:
    """Run Triton Inference Server."""
    if not self._triton_server.is_alive():
        self._model_manager.create_models()
        with self._cv:
            self._stopped = False
        LOGGER.debug("Starting Triton Inference")
        self._triton_server.register_on_exit(self._on_tritonserver_exit)
        atexit.register(self.stop)
        self._triton_server.start()
    self._wait_for_models()

serve(monitoring_period_sec=MONITORING_PERIOD_SEC)

Run Triton Inference Server and lock thread for serving requests/response.

Parameters:

Name Type Description Default
monitoring_period_sec float

the timeout of monitoring if Triton and models are available. Every monitoring_period_sec seconds main thread wakes up and check if triton server and proxy backend are still alive and sleep again. If triton or proxy is not alive - method returns.

MONITORING_PERIOD_SEC
Source code in pytriton/triton.py
def serve(self, monitoring_period_sec: float = MONITORING_PERIOD_SEC) -> None:
    """Run Triton Inference Server and lock thread for serving requests/response.

    Args:
        monitoring_period_sec: the timeout of monitoring if Triton and models are available.
            Every monitoring_period_sec seconds main thread wakes up and check if triton server and proxy backend
            are still alive and sleep again. If triton or proxy is not alive - method returns.
    """
    self.run()
    with self._cv:
        while self.is_alive():
            self._cv.wait(timeout=monitoring_period_sec)
    self.stop()

stop()

Stop Triton Inference Server.

Source code in pytriton/triton.py
def stop(self) -> None:
    """Stop Triton Inference Server."""
    LOGGER.debug("Stopping Triton Inference server and proxy backends")
    with self._cv:
        if self._stopped:
            LOGGER.debug("Triton Inference already stopped.")
            return
        self._stopped = True
    self._triton_server.unregister_on_exit(self._on_tritonserver_exit)
    atexit.unregister(self.stop)
    self._triton_server.stop()
    self._model_manager.clean()
    self._workspace.clean()
    with self._cv:
        self._cv.notify_all()
    LOGGER.debug("Stopped Triton Inference server and proxy backends")
    self._log_level_checker.check(skip_update=True)

pytriton.model_config.tensor.Tensor dataclass

Model input and output definition for Triton deployment.

Parameters:

Name Type Description Default
shape tuple

Shape of the input/output tensor.

required
dtype Union[np.dtype, Type[np.dtype], Type[object]]

Data type of the input/output tensor.

required
name Optional[str]

Name of the input/output of model.

None
optional Optional[bool]

Flag to mark if input is optional.

False

__post_init__()

Override object values on post init or field override.

Source code in pytriton/model_config/tensor.py
def __post_init__(self):
    """Override object values on post init or field override."""
    if isinstance(self.dtype, np.dtype):
        object.__setattr__(self, "dtype", self.dtype.type)  # pytype: disable=attribute-error

pytriton.model_config.common

Common structures for internal and external ModelConfig.

DeviceKind

Bases: enum.Enum

Device kind for model deployment.

Parameters:

Name Type Description Default
KIND_AUTO

Automatically select the device for model deployment.

required
KIND_CPU

Model is deployed on CPU.

required
KIND_GPU

Model is deployed on GPU.

required

DynamicBatcher dataclass

Dynamic batcher configuration.

More in Triton Inference Server documentation

Parameters:

Name Type Description Default
max_queue_delay_microseconds int

The maximum time, in microseconds, a request will be delayed in the scheduling queue to wait for additional requests for batching.

0
preferred_batch_size Optional[list]

Preferred batch sizes for dynamic batching.

None
preserve_ordering

Should the dynamic batcher preserve the ordering of responses to match the order of requests received by the scheduler.

False
priority_levels int

The number of priority levels to be enabled for the model.

0
default_priority_level int

The priority level used for requests that don't specify their priority.

0
default_queue_policy Optional[QueuePolicy]

The default queue policy used for requests.

None
priority_queue_policy Optional[Dict[int, QueuePolicy]]

Specify the queue policy for the priority level.

None

QueuePolicy dataclass

Model queue policy configuration.

More in Triton Inference Server documentation

Parameters:

Name Type Description Default
timeout_action TimeoutAction

The action applied to timed-out request.

TimeoutAction.REJECT
default_timeout_microseconds int

The default timeout for every request, in microseconds.

0
allow_timeout_override bool

Whether individual request can override the default timeout value.

False
max_queue_size int

The maximum queue size for holding requests.

0

TimeoutAction

Bases: enum.Enum

Timeout action definition for timeout_action QueuePolicy field.

Parameters:

Name Type Description Default
REJECT

Reject the request and return error message accordingly.

required
DELAY

Delay the request until all other requests at the same (or higher) priority levels that have not reached their timeouts are processed.

required

pytriton.model_config.model_config.ModelConfig dataclass

Additional model configuration for running model through Triton Inference Server.

Parameters:

Name Type Description Default
batching bool

Flag to enable/disable batching for model.

True
max_batch_size int

The maximal batch size that would be handled by model.

4
batcher DynamicBatcher

Configuration of Dynamic Batching for the model.

dataclasses.field(default_factory=DynamicBatcher)
response_cache bool

Flag to enable/disable response cache for the model

False

pytriton.client.client

Clients for easy interaction with models deployed on the Triton Inference Server.

Typical usage example

with ModelClient("localhost", "MyModel") as client: result_dict = client.infer_sample(input_a=a, input_b=b)

Inference inputs can be provided either as positional or keyword arguments

result_dict = client.infer_sample(input1, input2) result_dict = client.infer_sample(a=input1, b=input2)

Mixing of argument passing conventions is not supported and will raise PyTritonClientValueError.

AsyncioModelClient(url, model_name, model_version=None, *, lazy_init=True, init_timeout_s=None, inference_timeout_s=None)

Bases: BaseModelClient

Asyncio client for model deployed on the Triton Inference Server.

This client is based on Triton Inference Server Python clients and GRPC library: * tritonclient.http.aio.InferenceServerClient * tritonclient.grpc.aio.InferenceServerClient

It can wait for server to be ready with model loaded and then perform inference on it. AsyncioModelClient supports asyncio context manager protocol.

Typical usage:

from pytriton.client import AsyncioModelClient
import numpy as np

input1_sample = np.random.rand(1, 3, 224, 224).astype(np.float32)
input2_sample = np.random.rand(1, 3, 224, 224).astype(np.float32)

async with AsyncioModelClient("localhost", "MyModel") as client:
    result_dict = await client.infer_sample(input1_sample, input2_sample)
    print(result_dict["output_name"])

Inits ModelClient for given model deployed on the Triton Inference Server.

If lazy_init argument is False, model configuration will be read from inference server during initialization.

Parameters:

Name Type Description Default
url str

The Triton Inference Server url, e.g. 'grpc://localhost:8001'. In case no scheme is provided http scheme will be used as default. In case no port is provided default port for given scheme will be used - 8001 for grpc scheme, 8000 for http scheme.

required
model_name str

name of the model to interact with.

required
model_version Optional[str]

version of the model to interact with. If model_version is None inference on latest model will be performed. The latest versions of the model are numerically the greatest version numbers.

None
lazy_init bool

if initialization should be performed just before sending first request to inference server.

True
init_timeout_s Optional[float]

timeout for server and model being ready.

None

Raises:

Type Description
PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientTimeoutError

if lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s.

PyTritonClientUrlParseError

In case of problems with parsing url.

Source code in pytriton/client/client.py
def __init__(
    self,
    url: str,
    model_name: str,
    model_version: Optional[str] = None,
    *,
    lazy_init: bool = True,
    init_timeout_s: Optional[float] = None,
    inference_timeout_s: Optional[float] = None,
):
    """Inits ModelClient for given model deployed on the Triton Inference Server.

    If `lazy_init` argument is False, model configuration will be read
    from inference server during initialization.

    Args:
        url: The Triton Inference Server url, e.g. 'grpc://localhost:8001'.
            In case no scheme is provided http scheme will be used as default.
            In case no port is provided default port for given scheme will be used -
            8001 for grpc scheme, 8000 for http scheme.
        model_name: name of the model to interact with.
        model_version: version of the model to interact with.
            If model_version is None inference on latest model will be performed.
            The latest versions of the model are numerically the greatest version numbers.
        lazy_init: if initialization should be performed just before sending first request to inference server.
        init_timeout_s: timeout for server and model being ready.

    Raises:
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientTimeoutError: if `lazy_init` argument is False and wait time for server and model being ready exceeds `init_timeout_s`.
        PyTritonClientUrlParseError: In case of problems with parsing url.
    """
    super().__init__(
        url=url,
        model_name=model_name,
        model_version=model_version,
        lazy_init=lazy_init,
        init_timeout_s=init_timeout_s,
        inference_timeout_s=inference_timeout_s,
    )

model_config async property

Obtain configuration of model deployed on the Triton Inference Server.

Also waits for server to get into readiness state.

__aenter__() async

Create context for use AsyncioModelClient as a context manager.

Source code in pytriton/client/client.py
async def __aenter__(self):
    """Create context for use AsyncioModelClient as a context manager."""
    _LOGGER.debug("Entering AsyncioModelClient context")
    try:
        if not self._lazy_init:
            _LOGGER.debug("Waiting in AsyncioModelClient context for model to be ready")
            await self._wait_and_init_model_config(self._init_timeout_s)
            _LOGGER.debug("Model is ready in AsyncioModelClient context")
        return self
    except Exception as e:
        _LOGGER.error("Error occurred during AsyncioModelClient context initialization")
        await self.close()
        raise e

__aexit__(*_) async

Close resources used by AsyncioModelClient when exiting from context.

Source code in pytriton/client/client.py
async def __aexit__(self, *_):
    """Close resources used by AsyncioModelClient when exiting from context."""
    await self.close()
    _LOGGER.debug("Exiting AsyncioModelClient context")

close() async

Close resources used by _ModelClientBase.

Source code in pytriton/client/client.py
async def close(self):
    """Close resources used by _ModelClientBase."""
    _LOGGER.debug("Closing InferenceServerClient")
    await self._general_client.close()
    await self._infer_client.close()
    _LOGGER.debug("InferenceServerClient closed")

get_lib()

Get Triton Inference Server Python client library.

Source code in pytriton/client/client.py
def get_lib(self):
    """Get Triton Inference Server Python client library."""
    return {"grpc": tritonclient.grpc.aio, "http": tritonclient.http.aio}[self._scheme.lower()]

infer_batch(*inputs, parameters=None, headers=None, **named_inputs) async

Run asynchronous inference on batched data.

Typical usage:

async with AsyncioModelClient("localhost", "MyModel") as client:
    result_dict = await client.infer_batch(input1, input2)

Inference inputs can be provided either as positional or keyword arguments:

result_dict = await client.infer_batch(input1, input2)
result_dict = await client.infer_batch(a=input1, b=input2)

Mixing of argument passing conventions is not supported and will raise PyTritonClientValueError.

Parameters:

Name Type Description Default
*inputs

inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

custom inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

custom inference headers.

None
**named_inputs

inference inputs provided as named arguments.

{}

Returns:

Type Description
Dict[str, np.ndarray]

dictionary with inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientValueError

if mixing of positional and named arguments passing detected.

PyTritonClientTimeoutError

in case of first method call, lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s or inference time exceeds timeout_s.

PyTritonClientModelDoesntSupportBatchingError

if model doesn't support batching.

PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientInferenceServerError

If error occurred on inference callable or Triton Inference Server side.

Source code in pytriton/client/client.py
async def infer_batch(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Dict[str, np.ndarray]:
    """Run asynchronous inference on batched data.

    Typical usage:

    ```python
    async with AsyncioModelClient("localhost", "MyModel") as client:
        result_dict = await client.infer_batch(input1, input2)
    ```

    Inference inputs can be provided either as positional or keyword arguments:

    ```python
    result_dict = await client.infer_batch(input1, input2)
    result_dict = await client.infer_batch(a=input1, b=input2)
    ```

    Mixing of argument passing conventions is not supported and will raise PyTritonClientValueError.

    Args:
        *inputs: inference inputs provided as positional arguments.
        parameters: custom inference parameters.
        headers: custom inference headers.
        **named_inputs: inference inputs provided as named arguments.

    Returns:
        dictionary with inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientValueError: if mixing of positional and named arguments passing detected.
        PyTritonClientTimeoutError:
            in case of first method call, `lazy_init` argument is False
            and wait time for server and model being ready exceeds `init_timeout_s`
            or inference time exceeds `timeout_s`.
        PyTritonClientModelDoesntSupportBatchingError: if model doesn't support batching.
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientInferenceServerError: If error occurred on inference callable or Triton Inference Server side.
    """
    _verify_inputs_args(inputs, named_inputs)
    _verify_parameters(parameters)
    _verify_parameters(headers)

    _LOGGER.debug(f"Running inference for {self._model_name}")
    model_config = await self.model_config
    _LOGGER.debug(f"Model config for {self._model_name} obtained")

    model_supports_batching = model_config.max_batch_size > 0
    if not model_supports_batching:
        _LOGGER.error(f"Model {model_config.model_name} doesn't support batching")
        raise PyTritonClientModelDoesntSupportBatchingError(
            f"Model {model_config.model_name} doesn't support batching - use infer_sample method instead"
        )

    _LOGGER.debug(f"Running _infer for {self._model_name}")
    result = await self._infer(inputs or named_inputs, parameters, headers)
    _LOGGER.debug(f"_infer for {self._model_name} finished")
    return result

infer_sample(*inputs, parameters=None, headers=None, **named_inputs) async

Run asynchronous inference on single data sample.

Typical usage:

async with AsyncioModelClient("localhost", "MyModel") as client:
    result_dict = await client.infer_sample(input1, input2)

Inference inputs can be provided either as positional or keyword arguments:

result_dict = await client.infer_sample(input1, input2)
result_dict = await client.infer_sample(a=input1, b=input2)

Mixing of argument passing conventions is not supported and will raise PyTritonClientRuntimeError.

Parameters:

Name Type Description Default
*inputs

inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

custom inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

custom inference headers.

None
**named_inputs

inference inputs provided as named arguments.

{}

Returns:

Type Description
Dict[str, np.ndarray]

dictionary with inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientValueError

if mixing of positional and named arguments passing detected.

PyTritonClientTimeoutError

in case of first method call, lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s or inference time exceeds timeout_s.

PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientInferenceServerError

If error occurred on inference callable or Triton Inference Server side.

Source code in pytriton/client/client.py
async def infer_sample(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Dict[str, np.ndarray]:
    """Run asynchronous inference on single data sample.

    Typical usage:

    ```python
    async with AsyncioModelClient("localhost", "MyModel") as client:
        result_dict = await client.infer_sample(input1, input2)
    ```

    Inference inputs can be provided either as positional or keyword arguments:

    ```python
    result_dict = await client.infer_sample(input1, input2)
    result_dict = await client.infer_sample(a=input1, b=input2)
    ```

    Mixing of argument passing conventions is not supported and will raise PyTritonClientRuntimeError.

    Args:
        *inputs: inference inputs provided as positional arguments.
        parameters: custom inference parameters.
        headers: custom inference headers.
        **named_inputs: inference inputs provided as named arguments.

    Returns:
        dictionary with inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientValueError: if mixing of positional and named arguments passing detected.
        PyTritonClientTimeoutError:
            in case of first method call, `lazy_init` argument is False
            and wait time for server and model being ready exceeds `init_timeout_s`
            or inference time exceeds `timeout_s`.
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientInferenceServerError: If error occurred on inference callable or Triton Inference Server side.
    """
    _verify_inputs_args(inputs, named_inputs)
    _verify_parameters(parameters)
    _verify_parameters(headers)

    _LOGGER.debug(f"Running inference for {self._model_name}")
    model_config = await self.model_config
    _LOGGER.debug(f"Model config for {self._model_name} obtained")

    model_supports_batching = model_config.max_batch_size > 0
    if model_supports_batching:
        if inputs:
            inputs = tuple(data[np.newaxis, ...] for data in inputs)
        elif named_inputs:
            named_inputs = {name: data[np.newaxis, ...] for name, data in named_inputs.items()}

    _LOGGER.debug(f"Running _infer for {self._model_name}")
    result = await self._infer(inputs or named_inputs, parameters, headers)
    _LOGGER.debug(f"_infer for {self._model_name} finished")
    if model_supports_batching:
        result = {name: data[0] for name, data in result.items()}

    return result

wait_for_model(timeout_s) async

Asynchronous wait for Triton Inference Server and deployed on it model readiness.

Parameters:

Name Type Description Default
timeout_s float

timeout to server and model get into readiness state.

required

Raises:

Type Description
PyTritonClientTimeoutError

If server and model are not in readiness state before given timeout.

PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

KeyboardInterrupt

If hosting process receives SIGINT

Source code in pytriton/client/client.py
async def wait_for_model(self, timeout_s: float):
    """Asynchronous wait for Triton Inference Server and deployed on it model readiness.

    Args:
        timeout_s: timeout to server and model get into readiness state.

    Raises:
        PyTritonClientTimeoutError: If server and model are not in readiness state before given timeout.
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        KeyboardInterrupt: If hosting process receives SIGINT
    """
    _LOGGER.debug(f"Waiting for model {self._model_name} to be ready")
    async with async_timeout.timeout(self._init_timeout_s):
        await asyncio_wait_for_model_ready(
            self._general_client, self._model_name, self._model_version, timeout_s=timeout_s
        )
    _LOGGER.debug(f"Model {self._model_name} is ready")

BaseModelClient(url, model_name, model_version=None, *, lazy_init=True, init_timeout_s=None, inference_timeout_s=None)

Base client for model deployed on the Triton Inference Server.

Inits BaseModelClient for given model deployed on the Triton Inference Server.

Common usage
with ModelClient("localhost", "BERT") as client
    result_dict = client.infer_sample(input1_sample, input2_sample)

Parameters:

Name Type Description Default
url str

The Triton Inference Server url, e.g. grpc://localhost:8001. In case no scheme is provided http scheme will be used as default. In case no port is provided default port for given scheme will be used - 8001 for grpc scheme, 8000 for http scheme.

required
model_name str

name of the model to interact with.

required
model_version Optional[str]

version of the model to interact with. If model_version is None inference on latest model will be performed. The latest versions of the model are numerically the greatest version numbers.

None
lazy_init bool

if initialization should be performed just before sending first request to inference server.

True
init_timeout_s Optional[float]

timeout in seconds for the server and model to be ready. If not passed, the default timeout of 300 seconds will be used.

None
inference_timeout_s Optional[float]

timeout in seconds for a single model inference request. If not passed, the default timeout of 60 seconds will be used.

None

Raises:

Type Description
PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientTimeoutError

if lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s.

PyTritonClientInvalidUrlError

If provided Triton Inference Server url is invalid.

Source code in pytriton/client/client.py
def __init__(
    self,
    url: str,
    model_name: str,
    model_version: Optional[str] = None,
    *,
    lazy_init: bool = True,
    init_timeout_s: Optional[float] = None,
    inference_timeout_s: Optional[float] = None,
):
    """Inits BaseModelClient for given model deployed on the Triton Inference Server.

    Common usage:

        ```
        with ModelClient("localhost", "BERT") as client
            result_dict = client.infer_sample(input1_sample, input2_sample)
        ```

    Args:
        url: The Triton Inference Server url, e.g. `grpc://localhost:8001`.
            In case no scheme is provided http scheme will be used as default.
            In case no port is provided default port for given scheme will be used -
            8001 for grpc scheme, 8000 for http scheme.
        model_name: name of the model to interact with.
        model_version: version of the model to interact with.
            If model_version is None inference on latest model will be performed.
            The latest versions of the model are numerically the greatest version numbers.
        lazy_init: if initialization should be performed just before sending first request to inference server.
        init_timeout_s: timeout in seconds for the server and model to be ready. If not passed, the default timeout of 300 seconds will be used.
        inference_timeout_s: timeout in seconds for a single model inference request. If not passed, the default timeout of 60 seconds will be used.

    Raises:
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientTimeoutError:
            if `lazy_init` argument is False and wait time for server and model being ready exceeds `init_timeout_s`.
        PyTritonClientInvalidUrlError: If provided Triton Inference Server url is invalid.
    """
    self._init_timeout_s = _DEFAULT_SYNC_INIT_TIMEOUT_S if init_timeout_s is None else init_timeout_s
    self._inference_timeout_s = DEFAULT_INFERENCE_TIMEOUT_S if inference_timeout_s is None else inference_timeout_s
    self._network_timeout_s = min(_DEFAULT_NETWORK_TIMEOUT_S, self._init_timeout_s)

    self._general_client = self.create_client_from_url(url, network_timeout_s=self._network_timeout_s)
    self._infer_client = self.create_client_from_url(url, network_timeout_s=self._inference_timeout_s)

    self._model_name = model_name
    self._model_version = model_version

    self._request_id_generator = itertools.count(0)

    # Monkey patch __del__ method from client to catch error in client when instance is garbage collected.
    # This is needed because we are closing client in __exit__ method or in close method.
    # (InferenceClient uses gevent library which does not support closing twice from different threads)
    self._monkey_patch_client()

    self._model_config = None
    self._model_ready = None
    self._lazy_init: bool = lazy_init

    self._handle_lazy_init()

create_client_from_url(url, network_timeout_s=None)

Create Triton Inference Server client.

Parameters:

Name Type Description Default
url str

url of the server to connect to. If url doesn't contain scheme (e.g. "localhost:8001") http scheme is added. If url doesn't contain port (e.g. "localhost") default port for given scheme is added.

required
network_timeout_s Optional[float]

timeout for client commands. Default value is 60.0 s.

None

Returns:

Type Description

Triton Inference Server client.

Raises:

Type Description
PyTritonClientInvalidUrlError

If provided Triton Inference Server url is invalid.

Source code in pytriton/client/client.py
def create_client_from_url(self, url: str, network_timeout_s: Optional[float] = None):
    """Create Triton Inference Server client.

    Args:
        url: url of the server to connect to.
            If url doesn't contain scheme (e.g. "localhost:8001") http scheme is added.
            If url doesn't contain port (e.g. "localhost") default port for given scheme is added.
        network_timeout_s: timeout for client commands. Default value is 60.0 s.

    Returns:
        Triton Inference Server client.

    Raises:
        PyTritonClientInvalidUrlError: If provided Triton Inference Server url is invalid.
    """
    if not isinstance(url, str):
        raise PyTritonClientInvalidUrlError(f"Invalid url {url}. Url must be a string.")

    try:
        parsed_url = urllib.parse.urlparse(url)
        # change in py3.9+
        # https://github.com/python/cpython/commit/5a88d50ff013a64fbdb25b877c87644a9034c969
        if sys.version_info < (3, 9) and not parsed_url.scheme and "://" in parsed_url.path:
            raise ValueError(f"Invalid url {url}. Only grpc and http are supported.")
        if (sys.version_info < (3, 9) and not parsed_url.scheme and "://" not in parsed_url.path) or (
            sys.version_info >= (3, 9) and parsed_url.scheme and not parsed_url.netloc
        ):
            _LOGGER.debug(f"Adding http scheme to {url}")
            parsed_url = urllib.parse.urlparse(f"http://{url}")

        self._scheme = parsed_url.scheme.lower()
        if self._scheme not in ["grpc", "http"]:
            raise ValueError(f"Invalid scheme {self._scheme}. Only grpc and http are supported.")
        port = parsed_url.port or {"grpc": DEFAULT_GRPC_PORT, "http": DEFAULT_HTTP_PORT}[self._scheme]
    except ValueError as e:
        raise PyTritonClientInvalidUrlError(f"Invalid url {url}") from e

    self._triton_client_lib = self.get_lib()

    self._url = f"{parsed_url.hostname}:{port}"
    self._monkey_patch_client()

    if self._scheme == "grpc":
        # by default grpc client has very large number of timeout, thus we want to make it equal to http client timeout
        network_timeout_s = _DEFAULT_NETWORK_TIMEOUT_S if network_timeout_s is None else network_timeout_s
        _LOGGER.warning(
            f"tritonclient.grpc doesn't support timeout for other commands than infer. Ignoring network_timeout: {network_timeout_s}."
        )

    triton_client_init_kwargs = self._get_init_extra_args()

    _LOGGER.debug(
        f"Creating InferenceServerClient for {parsed_url.scheme}://{self._url} with {triton_client_init_kwargs}"
    )
    return self._triton_client_lib.InferenceServerClient(self._url, **triton_client_init_kwargs)

get_lib()

Returns tritonclient library for given scheme.

Source code in pytriton/client/client.py
def get_lib(self):
    """Returns tritonclient library for given scheme."""
    raise NotImplementedError

FuturesModelClient(url, model_name, model_version=None, *, max_workers=None, init_timeout_s=None, inference_timeout_s=None)

A client for interacting with a model deployed on the Triton Inference Server using concurrent.futures.

This client allows asynchronous inference requests using a thread pool executor. It can be used to perform inference on a model by providing input data and receiving the corresponding output data. The client can be used in a with statement to ensure proper resource management.

Example usage
with FuturesModelClient("localhost", "MyModel") as client:
    result_future = client.infer_sample(input1=input1_data, input2=input2_data)
    # do something else
    print(result_future.result())

Initializes the FuturesModelClient for a given model.

Parameters:

Name Type Description Default
url str

The Triton Inference Server url, e.g. grpc://localhost:8001.

required
model_name str

The name of the model to interact with.

required
model_version Optional[str]

The version of the model to interact with. If None, the latest version will be used.

None
max_workers Optional[int]

The maximum number of threads that can be used to execute the given calls. If None, the min(32, os.cpu_count() + 4) number of threads will be used.

None
init_timeout_s Optional[float]

Timeout in seconds for server and model being ready. If non passed default 60 seconds timeout will be used.

None
inference_timeout_s Optional[float]

Timeout in seconds for the single model inference request. If non passed default 60 seconds timeout will be used.

None
Source code in pytriton/client/client.py
def __init__(
    self,
    url: str,
    model_name: str,
    model_version: Optional[str] = None,
    *,
    max_workers: Optional[int] = None,
    init_timeout_s: Optional[float] = None,
    inference_timeout_s: Optional[float] = None,
):
    """Initializes the FuturesModelClient for a given model.

    Args:
        url: The Triton Inference Server url, e.g. `grpc://localhost:8001`.
        model_name: The name of the model to interact with.
        model_version: The version of the model to interact with. If None, the latest version will be used.
        max_workers: The maximum number of threads that can be used to execute the given calls. If None, the `min(32, os.cpu_count() + 4)` number of threads will be used.
        init_timeout_s: Timeout in seconds for server and model being ready. If non passed default 60 seconds timeout will be used.
        inference_timeout_s: Timeout in seconds for the single model inference request. If non passed default 60 seconds timeout will be used.
    """
    self._url = url
    self._model_name = model_name
    self._model_version = model_version
    self._threads = []
    self._max_workers = max_workers
    if self._max_workers is not None and self._max_workers <= 0:
        raise ValueError("max_workers must be greater than 0")
    kwargs = {}
    if self._max_workers is not None:
        kwargs["maxsize"] = self._max_workers
    self._queue = Queue(**kwargs)
    self._queue.put((_INIT, None, None))
    self._init_timeout_s = _DEFAULT_FUTURES_INIT_TIMEOUT_S if init_timeout_s is None else init_timeout_s
    self._inference_timeout_s = inference_timeout_s
    self._closed = False

__enter__()

Create context for using FuturesModelClient as a context manager.

Source code in pytriton/client/client.py
def __enter__(self):
    """Create context for using FuturesModelClient as a context manager."""
    return self

__exit__(exc_type, exc_value, traceback)

Close resources used by FuturesModelClient instance when exiting from the context.

Source code in pytriton/client/client.py
def __exit__(self, exc_type, exc_value, traceback):
    """Close resources used by FuturesModelClient instance when exiting from the context."""
    self.close()

close(wait=True)

Close resources used by FuturesModelClient.

This method closes the resources used by the FuturesModelClient instance, including the Triton Inference Server connections. Once this method is called, the FuturesModelClient instance should not be used again.

Parameters:

Name Type Description Default
wait

If True, then shutdown will not return until all running futures have finished executing.

True
Source code in pytriton/client/client.py
def close(self, wait=True):
    """Close resources used by FuturesModelClient.

    This method closes the resources used by the FuturesModelClient instance, including the Triton Inference Server connections.
    Once this method is called, the FuturesModelClient instance should not be used again.

    Args:
        wait: If True, then shutdown will not return until all running futures have finished executing.
    """
    if self._closed:
        _LOGGER.warning("FuturesModelClient is already closed")
        return
    _LOGGER.debug("Closing FuturesModelClient.")

    self._closed = True
    for _ in range(len(self._threads)):
        self._queue.put((_CLOSE, None, None))

    if wait:
        _LOGGER.debug("Waiting for futures to finish.")
        for thread in self._threads:
            thread.join()

infer_batch(*inputs, parameters=None, headers=None, **named_inputs)

Run asynchronous inference on batched data and return a Future object.

This method allows the user to perform inference on batched data by providing input data and receiving the corresponding output data. The method returns a Future object that wraps a dictionary of inference results, where dictionary keys are output names.

Example usage
with FuturesModelClient("localhost", "BERT") as client:
    future = client.infer_batch(input1_sample, input2_sample)
    # do something else
    print(future.result())
Inference inputs can be provided either as positional or keyword arguments
future = client.infer_batch(input1, input2)
future = client.infer_batch(a=input1, b=input2)

Mixing of argument passing conventions is not supported and will raise PyTritonClientValueError.

Parameters:

Name Type Description Default
*inputs

Inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

Optional dictionary of inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

Optional dictionary of HTTP headers for the inference request.

None
**named_inputs

Inference inputs provided as named arguments.

{}

Returns:

Type Description
Future

A Future object wrapping a dictionary of inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientClosedError

If the FuturesModelClient is closed.

Source code in pytriton/client/client.py
def infer_batch(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Future:
    """Run asynchronous inference on batched data and return a Future object.

    This method allows the user to perform inference on batched data by providing input data and receiving the corresponding output data.
    The method returns a Future object that wraps a dictionary of inference results, where dictionary keys are output names.

    Example usage:

        ```python
        with FuturesModelClient("localhost", "BERT") as client:
            future = client.infer_batch(input1_sample, input2_sample)
            # do something else
            print(future.result())
        ```

    Inference inputs can be provided either as positional or keyword arguments:

        ```python
        future = client.infer_batch(input1, input2)
        future = client.infer_batch(a=input1, b=input2)
        ```

    Mixing of argument passing conventions is not supported and will raise PyTritonClientValueError.

    Args:
        *inputs: Inference inputs provided as positional arguments.
        parameters: Optional dictionary of inference parameters.
        headers: Optional dictionary of HTTP headers for the inference request.
        **named_inputs: Inference inputs provided as named arguments.

    Returns:
        A Future object wrapping a dictionary of inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientClosedError: If the FuturesModelClient is closed.
    """
    return self._execute(name=_INFER_BATCH, request=(inputs, parameters, headers, named_inputs))

infer_sample(*inputs, parameters=None, headers=None, **named_inputs)

Run asynchronous inference on a single data sample and return a Future object.

This method allows the user to perform inference on a single data sample by providing input data and receiving the corresponding output data. The method returns a Future object that wraps a dictionary of inference results, where dictionary keys are output names.

Example usage
with FuturesModelClient("localhost", "BERT") as client:
    result_future = client.infer_sample(input1=input1_data, input2=input2_data)
    # do something else
    print(result_future.result())
Inference inputs can be provided either as positional or keyword arguments
future = client.infer_sample(input1, input2)
future = client.infer_sample(a=input1, b=input2)

Parameters:

Name Type Description Default
*inputs

Inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

Optional dictionary of inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

Optional dictionary of HTTP headers for the inference request.

None
**named_inputs

Inference inputs provided as named arguments.

{}

Returns:

Type Description
Future

A Future object wrapping a dictionary of inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientClosedError

If the FuturesModelClient is closed.

Source code in pytriton/client/client.py
def infer_sample(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Future:
    """Run asynchronous inference on a single data sample and return a Future object.

    This method allows the user to perform inference on a single data sample by providing input data and receiving the
    corresponding output data. The method returns a Future object that wraps a dictionary of inference results, where dictionary keys are output names.

    Example usage:

        ```python
        with FuturesModelClient("localhost", "BERT") as client:
            result_future = client.infer_sample(input1=input1_data, input2=input2_data)
            # do something else
            print(result_future.result())
        ```

    Inference inputs can be provided either as positional or keyword arguments:

        ```python
        future = client.infer_sample(input1, input2)
        future = client.infer_sample(a=input1, b=input2)
        ```

    Args:
        *inputs: Inference inputs provided as positional arguments.
        parameters: Optional dictionary of inference parameters.
        headers: Optional dictionary of HTTP headers for the inference request.
        **named_inputs: Inference inputs provided as named arguments.

    Returns:
        A Future object wrapping a dictionary of inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientClosedError: If the FuturesModelClient is closed.
    """
    return self._execute(
        name=_INFER_SAMPLE,
        request=(inputs, parameters, headers, named_inputs),
    )

model_config()

Obtain the configuration of the model deployed on the Triton Inference Server.

This method returns a Future object that will contain the TritonModelConfig object when it is ready. Client will wait init_timeout_s for the server to get into readiness state before obtaining the model configuration.

Returns:

Type Description
Future

A Future object that will contain the TritonModelConfig object when it is ready.

Raises:

Type Description
PyTritonClientClosedError

If the FuturesModelClient is closed.

Source code in pytriton/client/client.py
def model_config(self) -> Future:
    """Obtain the configuration of the model deployed on the Triton Inference Server.

    This method returns a Future object that will contain the TritonModelConfig object when it is ready.
    Client will wait init_timeout_s for the server to get into readiness state before obtaining the model configuration.

    Returns:
        A Future object that will contain the TritonModelConfig object when it is ready.

    Raises:
        PyTritonClientClosedError: If the FuturesModelClient is closed.
    """
    return self._execute(name=_MODEL_CONFIG)

wait_for_model(timeout_s)

Returns a Future object which result will be None when the model is ready.

Typical usage
with FuturesModelClient("localhost", "BERT") as client
    future = client.wait_for_model(300.)
    # do something else
    future.result()   # wait rest of timeout_s time
                      # till return None if model is ready
                      # or raise PyTritonClientTimeutError

Parameters:

Name Type Description Default
timeout_s float

The maximum amount of time to wait for the model to be ready, in seconds.

required

Returns:

Type Description
Future

A Future object which result is None when the model is ready.

Source code in pytriton/client/client.py
def wait_for_model(self, timeout_s: float) -> Future:
    """Returns a Future object which result will be None when the model is ready.

    Typical usage:

        ```python
        with FuturesModelClient("localhost", "BERT") as client
            future = client.wait_for_model(300.)
            # do something else
            future.result()   # wait rest of timeout_s time
                              # till return None if model is ready
                              # or raise PyTritonClientTimeutError
        ```

    Args:
        timeout_s: The maximum amount of time to wait for the model to be ready, in seconds.

    Returns:
        A Future object which result is None when the model is ready.
    """
    return self._execute(
        name=_WAIT_FOR_MODEL,
        request=timeout_s,
    )

ModelClient(url, model_name, model_version=None, *, lazy_init=True, init_timeout_s=None, inference_timeout_s=None)

Bases: BaseModelClient

Synchronous client for model deployed on the Triton Inference Server.

Inits ModelClient for given model deployed on the Triton Inference Server.

If lazy_init argument is False, model configuration will be read from inference server during initialization.

Common usage:

with ModelClient("localhost", "BERT") as client
    result_dict = client.infer_sample(input1_sample, input2_sample)

Parameters:

Name Type Description Default
url str

The Triton Inference Server url, e.g. 'grpc://localhost:8001'. In case no scheme is provided http scheme will be used as default. In case no port is provided default port for given scheme will be used - 8001 for grpc scheme, 8000 for http scheme.

required
model_name str

name of the model to interact with.

required
model_version Optional[str]

version of the model to interact with. If model_version is None inference on latest model will be performed. The latest versions of the model are numerically the greatest version numbers.

None
lazy_init bool

if initialization should be performed just before sending first request to inference server.

True
init_timeout_s Optional[float]

timeout for maximum waiting time in loop, which sends retry requests ask if model is ready. It is applied at initialization time only when lazy_init argument is False. Default is to do retry loop at first inference.

None
inference_timeout_s Optional[float]

timeout in seconds for the model inference process. If non passed default 60 seconds timeout will be used. For HTTP client it is not only inference timeout but any client request timeout - get model config, is model loaded. For GRPC client it is only inference timeout.

None

Raises:

Type Description
PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientTimeoutError

if lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s.

PyTritonClientUrlParseError

In case of problems with parsing url.

Source code in pytriton/client/client.py
def __init__(
    self,
    url: str,
    model_name: str,
    model_version: Optional[str] = None,
    *,
    lazy_init: bool = True,
    init_timeout_s: Optional[float] = None,
    inference_timeout_s: Optional[float] = None,
):
    """Inits ModelClient for given model deployed on the Triton Inference Server.

    If `lazy_init` argument is False, model configuration will be read
    from inference server during initialization.

    Common usage:
    ```
    with ModelClient("localhost", "BERT") as client
        result_dict = client.infer_sample(input1_sample, input2_sample)
    ```

    Args:
        url: The Triton Inference Server url, e.g. 'grpc://localhost:8001'.
            In case no scheme is provided http scheme will be used as default.
            In case no port is provided default port for given scheme will be used -
            8001 for grpc scheme, 8000 for http scheme.
        model_name: name of the model to interact with.
        model_version: version of the model to interact with.
            If model_version is None inference on latest model will be performed.
            The latest versions of the model are numerically the greatest version numbers.
        lazy_init: if initialization should be performed just before sending first request to inference server.
        init_timeout_s: timeout for maximum waiting time in loop, which sends retry requests ask if model is ready. It is applied at initialization time only when `lazy_init` argument is False. Default is to do retry loop at first inference.
        inference_timeout_s: timeout in seconds for the model inference process.
            If non passed default 60 seconds timeout will be used.
            For HTTP client it is not only inference timeout but any client request timeout
            - get model config, is model loaded. For GRPC client it is only inference timeout.

    Raises:
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientTimeoutError:
            if `lazy_init` argument is False and wait time for server and model being ready exceeds `init_timeout_s`.
        PyTritonClientUrlParseError: In case of problems with parsing url.
    """
    super().__init__(
        url=url,
        model_name=model_name,
        model_version=model_version,
        lazy_init=lazy_init,
        init_timeout_s=init_timeout_s,
        inference_timeout_s=inference_timeout_s,
    )

is_batching_supported property

Checks if model supports batching.

Also waits for server to get into readiness state.

model_config: TritonModelConfig property

Obtain the configuration of the model deployed on the Triton Inference Server.

This method waits for the server to get into readiness state before obtaining the model configuration.

Returns:

Name Type Description
TritonModelConfig TritonModelConfig

configuration of the model deployed on the Triton Inference Server.

Raises:

Type Description
PyTritonClientTimeoutError

If the server and model are not in readiness state before the given timeout.

PyTritonClientModelUnavailableError

If the model with the given name (and version) is unavailable.

KeyboardInterrupt

If the hosting process receives SIGINT.

PyTritonClientClosedError

If the ModelClient is closed.

__enter__()

Create context for using ModelClient as a context manager.

Source code in pytriton/client/client.py
def __enter__(self):
    """Create context for using ModelClient as a context manager."""
    return self

__exit__(*_)

Close resources used by ModelClient instance when exiting from the context.

Source code in pytriton/client/client.py
def __exit__(self, *_):
    """Close resources used by ModelClient instance when exiting from the context."""
    self.close()

close()

Close resources used by ModelClient.

This method closes the resources used by the ModelClient instance, including the Triton Inference Server connections. Once this method is called, the ModelClient instance should not be used again.

Source code in pytriton/client/client.py
def close(self):
    """Close resources used by ModelClient.

    This method closes the resources used by the ModelClient instance,
    including the Triton Inference Server connections.
    Once this method is called, the ModelClient instance should not be used again.
    """
    _LOGGER.debug("Closing ModelClient")
    try:
        if self._general_client is not None:
            self._general_client.close()
        if self._infer_client is not None:
            self._infer_client.close()
        self._general_client = None
        self._infer_client = None
    except Exception as e:
        _LOGGER.warning("Error while closing ModelClient resources: %s", e)
        raise e

get_lib()

Returns tritonclient library for given scheme.

Source code in pytriton/client/client.py
def get_lib(self):
    """Returns tritonclient library for given scheme."""
    return {"grpc": tritonclient.grpc, "http": tritonclient.http}[self._scheme.lower()]

infer_batch(*inputs, parameters=None, headers=None, **named_inputs)

Run synchronous inference on batched data.

Typical usage
with ModelClient("localhost", "MyModel") as client:
    result_dict = client.infer_batch(input1, input2)
Inference inputs can be provided either as positional or keyword arguments
result_dict = client.infer_batch(input1, input2)
result_dict = client.infer_batch(a=input1, b=input2)

Parameters:

Name Type Description Default
*inputs

Inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

Custom inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

Custom inference headers.

None
**named_inputs

Inference inputs provided as named arguments.

{}

Returns:

Type Description
Dict[str, np.ndarray]

Dictionary with inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientValueError

If mixing of positional and named arguments passing detected.

PyTritonClientTimeoutError

If the wait time for the server and model being ready exceeds init_timeout_s or inference request time exceeds inference_timeout_s.

PyTritonClientModelUnavailableError

If the model with the given name (and version) is unavailable.

PyTritonClientInferenceServerError

If an error occurred on the inference callable or Triton Inference Server side.

PyTritonClientModelDoesntSupportBatchingError

If the model doesn't support batching.

PyTritonClientValueError

if mixing of positional and named arguments passing detected.

PyTritonClientTimeoutError

in case of first method call, lazy_init argument is False and wait time for server and model being ready exceeds init_timeout_s or inference time exceeds inference_timeout_s passed to __init__.

PyTritonClientModelUnavailableError

If model with given name (and version) is unavailable.

PyTritonClientInferenceServerError

If error occurred on inference callable or Triton Inference Server side,

Source code in pytriton/client/client.py
def infer_batch(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Dict[str, np.ndarray]:
    """Run synchronous inference on batched data.

    Typical usage:

        ```python
        with ModelClient("localhost", "MyModel") as client:
            result_dict = client.infer_batch(input1, input2)
        ```

    Inference inputs can be provided either as positional or keyword arguments:

        ```python
        result_dict = client.infer_batch(input1, input2)
        result_dict = client.infer_batch(a=input1, b=input2)
        ```

    Args:
        *inputs: Inference inputs provided as positional arguments.
        parameters: Custom inference parameters.
        headers: Custom inference headers.
        **named_inputs: Inference inputs provided as named arguments.

    Returns:
        Dictionary with inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientValueError: If mixing of positional and named arguments passing detected.
        PyTritonClientTimeoutError: If the wait time for the server and model being ready exceeds `init_timeout_s` or
            inference request time exceeds `inference_timeout_s`.
        PyTritonClientModelUnavailableError: If the model with the given name (and version) is unavailable.
        PyTritonClientInferenceServerError: If an error occurred on the inference callable or Triton Inference Server side.
        PyTritonClientModelDoesntSupportBatchingError: If the model doesn't support batching.
        PyTritonClientValueError: if mixing of positional and named arguments passing detected.
        PyTritonClientTimeoutError:
            in case of first method call, `lazy_init` argument is False
            and wait time for server and model being ready exceeds `init_timeout_s` or
            inference time exceeds `inference_timeout_s` passed to `__init__`.
        PyTritonClientModelUnavailableError: If model with given name (and version) is unavailable.
        PyTritonClientInferenceServerError: If error occurred on inference callable or Triton Inference Server side,
    """
    _verify_inputs_args(inputs, named_inputs)
    _verify_parameters(parameters)
    _verify_parameters(headers)

    if not self.is_batching_supported:
        raise PyTritonClientModelDoesntSupportBatchingError(
            f"Model {self.model_config.model_name} doesn't support batching - use infer_sample method instead"
        )

    return self._infer(inputs or named_inputs, parameters, headers)

infer_sample(*inputs, parameters=None, headers=None, **named_inputs)

Run synchronous inference on a single data sample.

Typical usage
with ModelClient("localhost", "MyModel") as client:
    result_dict = client.infer_sample(input1, input2)
Inference inputs can be provided either as positional or keyword arguments
result_dict = client.infer_sample(input1, input2)
result_dict = client.infer_sample(a=input1, b=input2)

Parameters:

Name Type Description Default
*inputs

Inference inputs provided as positional arguments.

()
parameters Optional[Dict[str, Union[str, int, bool]]]

Custom inference parameters.

None
headers Optional[Dict[str, Union[str, int, bool]]]

Custom inference headers.

None
**named_inputs

Inference inputs provided as named arguments.

{}

Returns:

Type Description
Dict[str, np.ndarray]

Dictionary with inference results, where dictionary keys are output names.

Raises:

Type Description
PyTritonClientValueError

If mixing of positional and named arguments passing detected.

PyTritonClientTimeoutError

If the wait time for the server and model being ready exceeds init_timeout_s or inference request time exceeds inference_timeout_s.

PyTritonClientModelUnavailableError

If the model with the given name (and version) is unavailable.

PyTritonClientInferenceServerError

If an error occurred on the inference callable or Triton Inference Server side.

Source code in pytriton/client/client.py
def infer_sample(
    self,
    *inputs,
    parameters: Optional[Dict[str, Union[str, int, bool]]] = None,
    headers: Optional[Dict[str, Union[str, int, bool]]] = None,
    **named_inputs,
) -> Dict[str, np.ndarray]:
    """Run synchronous inference on a single data sample.

    Typical usage:

        ```python
        with ModelClient("localhost", "MyModel") as client:
            result_dict = client.infer_sample(input1, input2)
        ```
    Inference inputs can be provided either as positional or keyword arguments:

        ```python
        result_dict = client.infer_sample(input1, input2)
        result_dict = client.infer_sample(a=input1, b=input2)
        ```

    Args:
        *inputs: Inference inputs provided as positional arguments.
        parameters: Custom inference parameters.
        headers: Custom inference headers.
        **named_inputs: Inference inputs provided as named arguments.

    Returns:
        Dictionary with inference results, where dictionary keys are output names.

    Raises:
        PyTritonClientValueError: If mixing of positional and named arguments passing detected.
        PyTritonClientTimeoutError: If the wait time for the server and model being ready exceeds `init_timeout_s` or
            inference request time exceeds `inference_timeout_s`.
        PyTritonClientModelUnavailableError: If the model with the given name (and version) is unavailable.
        PyTritonClientInferenceServerError: If an error occurred on the inference callable or Triton Inference Server side.
    """
    _verify_inputs_args(inputs, named_inputs)
    _verify_parameters(parameters)
    _verify_parameters(headers)

    if self.is_batching_supported:
        if inputs:
            inputs = tuple(data[np.newaxis, ...] for data in inputs)
        elif named_inputs:
            named_inputs = {name: data[np.newaxis, ...] for name, data in named_inputs.items()}

    result = self._infer(inputs or named_inputs, parameters, headers)
    if self.is_batching_supported:
        result = {name: data[0] for name, data in result.items()}

    return result

wait_for_model(timeout_s)

Wait for the Triton Inference Server and the deployed model to be ready.

Parameters:

Name Type Description Default
timeout_s float

timeout in seconds to wait for the server and model to be ready.

required

Raises:

Type Description
PyTritonClientTimeoutError

If the server and model are not ready before the given timeout.

PyTritonClientModelUnavailableError

If the model with the given name (and version) is unavailable.

KeyboardInterrupt

If the hosting process receives SIGINT.

PyTritonClientClosedError

If the ModelClient is closed.

Source code in pytriton/client/client.py
def wait_for_model(self, timeout_s: float):
    """Wait for the Triton Inference Server and the deployed model to be ready.

    Args:
        timeout_s: timeout in seconds to wait for the server and model to be ready.

    Raises:
        PyTritonClientTimeoutError: If the server and model are not ready before the given timeout.
        PyTritonClientModelUnavailableError: If the model with the given name (and version) is unavailable.
        KeyboardInterrupt: If the hosting process receives SIGINT.
        PyTritonClientClosedError: If the ModelClient is closed.
    """
    if self._general_client is None:
        raise PyTritonClientClosedError("ModelClient is closed")
    wait_for_model_ready(self._general_client, self._model_name, self._model_version, timeout_s=timeout_s)