Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/queue metrics #860

Merged
merged 18 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion mlserver/batching/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from .requests import BatchedRequests

from prometheus_client import Histogram


class AdaptiveBatcher:
def __init__(self, model: MLModel):
Expand All @@ -31,6 +33,9 @@ def __init__(self, model: MLModel):
self.__requests: Optional[Queue[Tuple[str, InferenceRequest]]] = None
self._async_responses: Dict[str, Future[InferenceResponse]] = {}
self._batching_task = None
self.batch_request_queue_size = Histogram(
"batch_request_queue", "counter of request queue batch size"
)

async def predict(self, req: InferenceRequest) -> InferenceResponse:
internal_id, _ = await self._queue_request(req)
Expand All @@ -51,7 +56,7 @@ async def _queue_request(
req: InferenceRequest,
) -> Tuple[str, Awaitable[InferenceResponse]]:
internal_id = generate_uuid()

self._batch_queue_monitor()
await self._requests.put((internal_id, req))

loop = asyncio.get_running_loop()
Expand All @@ -60,6 +65,11 @@ async def _queue_request(

return internal_id, async_response

def _batch_queue_monitor(self):
"""Monitorize batch queue size"""
batch_queue_size = self._requests.qsize()
self.batch_request_queue_size.observe(batch_queue_size)

async def _wait_response(self, internal_id: str) -> InferenceResponse:
async_response = self._async_responses[internal_id]

Expand Down
23 changes: 19 additions & 4 deletions mlserver/parallel/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from typing import Dict, List
from typing import Dict, List, Tuple
from itertools import cycle
from multiprocessing import Queue
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -17,6 +17,7 @@
ModelRequestMessage,
ModelResponseMessage,
)
from prometheus_client import Histogram


class Dispatcher:
Expand All @@ -28,6 +29,11 @@ def __init__(self, workers: Dict[int, Worker], responses: Queue):
self._process_responses_task = None
self._executor = ThreadPoolExecutor()
self._async_responses: Dict[str, Future[ModelResponseMessage]] = {}
self.parallel_request_queue_size = Histogram(
"parallel_request_queue",
"counter of request queue size for workers",
["workerpid"],
)

def start(self):
self._active = True
Expand Down Expand Up @@ -79,18 +85,27 @@ async def _process_response(self, response: ModelResponseMessage):
async def dispatch_request(
self, request_message: ModelRequestMessage
) -> ModelResponseMessage:
worker = self._get_worker()
worker, wpid = self._get_worker()
self._workers_queue_monitor(worker, wpid)
worker.send_request(request_message)

return await self._dispatch(request_message)

def _get_worker(self) -> Worker:
def _get_worker(self) -> Tuple[Worker, int]:
"""
Get next available worker.
By default, this is just a round-robin through all the workers.
"""
worker_pid = next(self._workers_round_robin)
return self._workers[worker_pid]
return self._workers[worker_pid], worker_pid
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved

def _workers_queue_monitor(self, worker: Worker, worker_pid: int):
"""Get metrics from every worker request queue"""
queue_size = worker._requests.qsize()

self.parallel_request_queue_size.labels(workerpid=str(worker_pid)).observe(
float(queue_size)
)

async def dispatch_update(
self, model_update: ModelUpdateMessage
Expand Down
27 changes: 27 additions & 0 deletions runtimes/alibi-explain/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,38 @@

from .helpers.tf_model import get_tf_mnist_model_uri, TFMNISTModel
from .helpers.run_async import run_async_as_sync
from prometheus_client.registry import REGISTRY
from starlette_exporter import PrometheusMiddleware

TESTS_PATH = Path(os.path.dirname(__file__))
_ANCHOR_IMAGE_DIR = TESTS_PATH / ".data" / "mnist_anchor_image"


@pytest.fixture(autouse=True)
def prometheus_registry() -> Iterable:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
async def inference_pool(settings: Settings) -> AsyncIterable[InferencePool]:
pool = InferencePool(settings)
Expand Down
30 changes: 29 additions & 1 deletion runtimes/mlflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
import numpy as np
import pandas as pd

from typing import Iterable
from sklearn.dummy import DummyClassifier
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
Expand All @@ -17,11 +17,39 @@

from .torch_fixtures import MNISTDataModule, LightningMNISTClassifier

from prometheus_client.registry import REGISTRY
from starlette_exporter import PrometheusMiddleware

TESTS_PATH = os.path.dirname(__file__)
TESTDATA_PATH = os.path.join(TESTS_PATH, "testdata")
TESTDATA_CACHE_PATH = os.path.join(TESTDATA_PATH, ".cache")


@pytest.fixture(autouse=True)
def prometheus_registry() -> Iterable:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
def event_loop():
# By default use uvloop for tests
Expand Down
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from unittest.mock import Mock
from mlserver.handlers import DataPlane, ModelRepositoryHandlers
from mlserver.registry import MultiModelRegistry
from prometheus_client.registry import REGISTRY, CollectorRegistry
from starlette_exporter import PrometheusMiddleware
from mlserver.repository import (
ModelRepository,
SchemalessModelRepository,
Expand Down Expand Up @@ -47,6 +49,31 @@ def logger():
return logger


@pytest.fixture(autouse=True)
def prometheus_registry() -> CollectorRegistry:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
def event_loop():
# By default use uvloop for tests
Expand Down
29 changes: 27 additions & 2 deletions tests/grpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from grpc import aio
from typing import AsyncGenerator, Dict
from google.protobuf import json_format
from prometheus_client.registry import CollectorRegistry

from mlserver.parallel import InferencePool
from mlserver.batching import load_batching
Expand All @@ -14,10 +13,11 @@
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc.dataplane_pb2_grpc import GRPCInferenceServiceStub
from mlserver.grpc import GRPCServer
from prometheus_client.registry import REGISTRY, CollectorRegistry
from starlette_exporter import PrometheusMiddleware

from ..conftest import TESTDATA_PATH
from ..fixtures import SumModel
from ..metrics.conftest import prometheus_registry # noqa: F401

TESTDATA_GRPC_PATH = os.path.join(TESTDATA_PATH, "grpc")

Expand All @@ -30,6 +30,31 @@ def _read_testdata_pb(payload_path: str, pb_klass):
return model_infer_request


@pytest.fixture
def delete_registry() -> CollectorRegistry:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
async def model_registry(
sum_model_settings: ModelSettings, inference_pool: InferencePool
Expand Down
11 changes: 9 additions & 2 deletions tests/grpc/test_model_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from mlserver.grpc.model_repository_pb2_grpc import ModelRepositoryServiceStub
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc import model_repository_pb2 as mr_pb
from .conftest import delete_registry # noqa: F401


@pytest.fixture
Expand Down Expand Up @@ -83,7 +84,10 @@ async def test_model_repository_unload(


async def test_model_repository_load(
inference_service_stub, model_repository_service_stub, sum_model_settings
inference_service_stub,
delete_registry, # noqa: F811
model_repository_service_stub,
sum_model_settings,
):
await model_repository_service_stub.RepositoryModelUnload(
mr_pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
Expand All @@ -100,8 +104,11 @@ async def test_model_repository_load(


async def test_model_repository_load_error(
inference_service_stub, model_repository_service_stub, sum_model_settings
inference_service_stub,
model_repository_service_stub,
sum_model_settings,
):

with pytest.raises(grpc.RpcError) as err:
load_request = mr_pb.RepositoryModelLoadRequest(model_name="my-model")
await model_repository_service_stub.RepositoryModelLoad(load_request)
Expand Down
21 changes: 16 additions & 5 deletions tests/grpc/test_servicers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import grpc


from mlserver.cloudevents import (
CLOUDEVENTS_HEADER_SPECVERSION_DEFAULT,
CLOUDEVENTS_HEADER_SPECVERSION,
Expand All @@ -13,6 +14,7 @@
)
from mlserver.raw import pack, unpack
from mlserver import __version__
from .conftest import delete_registry # noqa: F401


async def test_server_live(inference_service_stub):
Expand Down Expand Up @@ -60,7 +62,10 @@ async def test_model_metadata(inference_service_stub, sum_model_settings):
"model_name,model_version", [("sum-model", "v1.2.3"), ("sum-model", None)]
)
async def test_model_infer(
inference_service_stub, model_infer_request, model_name, model_version
inference_service_stub,
model_infer_request,
model_name,
model_version,
):
model_infer_request.model_name = model_name
if model_version is not None:
Expand Down Expand Up @@ -104,7 +109,9 @@ async def test_model_infer_raw_contents(inference_service_stub, model_infer_requ


async def test_model_infer_headers(
inference_service_stub, model_infer_request, sum_model_settings
inference_service_stub,
model_infer_request,
sum_model_settings,
):
model_infer_request.model_name = sum_model_settings.name
model_infer_request.ClearField("model_version")
Expand Down Expand Up @@ -136,7 +143,8 @@ async def test_model_infer_error(inference_service_stub, model_infer_request):


async def test_model_repository_index(
inference_service_stub, grpc_repository_index_request
inference_service_stub,
grpc_repository_index_request,
):
index = await inference_service_stub.RepositoryIndex(grpc_repository_index_request)

Expand All @@ -153,11 +161,14 @@ async def test_model_repository_unload(inference_service_stub, sum_model_setting
)


async def test_model_repository_load(inference_service_stub, sum_model_settings):
async def test_model_repository_load(
inference_service_stub,
delete_registry, # noqa: F811
sum_model_settings,
):
await inference_service_stub.RepositoryModelUnload(
pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
)

load_request = pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
await inference_service_stub.RepositoryModelLoad(load_request)

Expand Down
Loading