Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/queue metrics #860

Merged
merged 18 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion mlserver/batching/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from .requests import BatchedRequests

from prometheus_client import Histogram


class AdaptiveBatcher:
def __init__(self, model: MLModel):
Expand All @@ -31,6 +33,7 @@ def __init__(self, model: MLModel):
self.__requests: Optional[Queue[Tuple[str, InferenceRequest]]] = None
self._async_responses: Dict[str, Future[InferenceResponse]] = {}
self._batching_task = None
self.batch_queue_request_count = Histogram("batch_queue_request_counter", "counter of request queue batch size")

async def predict(self, req: InferenceRequest) -> InferenceResponse:
internal_id, _ = await self._queue_request(req)
Expand All @@ -51,14 +54,19 @@ async def _queue_request(
req: InferenceRequest,
) -> Tuple[str, Awaitable[InferenceResponse]]:
internal_id = generate_uuid()

self._batch_queue_monitor()
await self._requests.put((internal_id, req))

loop = asyncio.get_running_loop()
async_response = loop.create_future()
self._async_responses[internal_id] = async_response

return internal_id, async_response

def _batch_queue_monitor(self):
"""Monitorize batch queue size"""
batch_queue_size = self._requests.qsize()
self.batch_queue_request_count.observe(batch_queue_size)

async def _wait_response(self, internal_id: str) -> InferenceResponse:
async_response = self._async_responses[internal_id]
Expand Down
19 changes: 17 additions & 2 deletions mlserver/parallel/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ModelRequestMessage,
ModelResponseMessage,
)
from prometheus_client import Histogram


class Dispatcher:
Expand All @@ -28,6 +29,11 @@ def __init__(self, workers: Dict[int, Worker], responses: Queue):
self._process_responses_task = None
self._executor = ThreadPoolExecutor()
self._async_responses: Dict[str, Future[ModelResponseMessage]] = {}
self.queue_request_count = Histogram(
"worker_queue_request",
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved
"counter of request queue size for workers",
['workerpid']
)

def start(self):
self._active = True
Expand Down Expand Up @@ -79,7 +85,8 @@ async def _process_response(self, response: ModelResponseMessage):
async def dispatch_request(
self, request_message: ModelRequestMessage
) -> ModelResponseMessage:
worker = self._get_worker()
worker, wpid = self._get_worker()
self._workers_queue_monitor(worker,wpid)
worker.send_request(request_message)

return await self._dispatch(request_message)
Expand All @@ -90,7 +97,15 @@ def _get_worker(self) -> Worker:
By default, this is just a round-robin through all the workers.
"""
worker_pid = next(self._workers_round_robin)
return self._workers[worker_pid]
return self._workers[worker_pid], worker_pid
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved

def _workers_queue_monitor(self, worker: Worker, worker_pid: int):
"""Get metrics from every worker request queue"""
queue_size = worker._requests.qsize()

self.queue_request_count.labels(workerpid=str(worker_pid)).observe(
float(queue_size)
)

async def dispatch_update(
self, model_update: ModelUpdateMessage
Expand Down
9 changes: 8 additions & 1 deletion tests/batching/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from mlserver.utils import generate_uuid

from .conftest import TestRequestSender

from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry

async def test_batch_requests(
prometheus_registry: CollectorRegistry,
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved
adaptive_batcher: AdaptiveBatcher,
send_request: TestRequestSender,
):
Expand All @@ -30,6 +32,7 @@ async def test_batch_requests(


async def test_batch_requests_timeout(
prometheus_registry: CollectorRegistry,
adaptive_batcher: AdaptiveBatcher,
send_request: TestRequestSender,
):
Expand All @@ -48,6 +51,7 @@ async def test_batch_requests_timeout(


async def test_batcher(
prometheus_registry: CollectorRegistry,
adaptive_batcher: AdaptiveBatcher,
send_request: TestRequestSender,
sum_model: MLModel,
Expand All @@ -72,6 +76,7 @@ async def test_batcher(


async def test_batcher_propagates_errors(
prometheus_registry: CollectorRegistry,
adaptive_batcher: AdaptiveBatcher,
send_request: TestRequestSender,
mocker,
Expand All @@ -98,6 +103,7 @@ async def _async_exception():


async def test_batcher_cancels_responses(
prometheus_registry: CollectorRegistry,
adaptive_batcher: AdaptiveBatcher,
mocker,
):
Expand Down Expand Up @@ -184,6 +190,7 @@ async def _async_exception():
],
)
async def test_predict(
prometheus_registry: CollectorRegistry,
requests: List[InferenceRequest],
adaptive_batcher: AdaptiveBatcher,
sum_model: MLModel,
Expand Down
5 changes: 4 additions & 1 deletion tests/batching/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from mlserver.model import MLModel
from mlserver.types import InferenceRequest, InferenceResponse
from mlserver.batching.hooks import load_batching

from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry

async def test_batching_predict(
prometheus_registry: CollectorRegistry,
sum_model: MLModel, inference_request: InferenceRequest
):
await load_batching(sum_model)
Expand All @@ -28,6 +30,7 @@ async def test_batching_predict(
],
)
async def test_load_batching_disabled(
prometheus_registry: CollectorRegistry,
max_batch_size: int, max_batch_time: float, sum_model: MLModel
):
sum_model.settings.max_batch_size = max_batch_size
Expand Down
26 changes: 26 additions & 0 deletions tests/grpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc.dataplane_pb2_grpc import GRPCInferenceServiceStub
from mlserver.grpc import GRPCServer
from prometheus_client.registry import REGISTRY, CollectorRegistry
from starlette_exporter import PrometheusMiddleware

from ..conftest import TESTDATA_PATH
from ..fixtures import SumModel
Expand All @@ -29,6 +31,30 @@ def _read_testdata_pb(payload_path: str, pb_klass):

return model_infer_request

@pytest.fixture()
def delete_registry() -> CollectorRegistry:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
async def model_registry(
Expand Down
11 changes: 8 additions & 3 deletions tests/grpc/test_model_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from mlserver.grpc.model_repository_pb2_grpc import ModelRepositoryServiceStub
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc import model_repository_pb2 as mr_pb
from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry


@pytest.fixture
Expand All @@ -30,7 +32,7 @@ async def model_repository_service_stub(
) -> AsyncGenerator[ModelRepositoryServiceStub, None]:
async with aio.insecure_channel(f"{settings.host}:{settings.grpc_port}") as channel:
yield ModelRepositoryServiceStub(channel)


def test_repositoryindexrequest_to_types(grpc_repository_index_request):
repository_index_request = RepositoryIndexRequestConverter.to_types(
Expand Down Expand Up @@ -83,7 +85,7 @@ async def test_model_repository_unload(


async def test_model_repository_load(
inference_service_stub, model_repository_service_stub, sum_model_settings
inference_service_stub, model_repository_service_stub, sum_model_settings, prometheus_registry: CollectorRegistry
):
await model_repository_service_stub.RepositoryModelUnload(
mr_pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
Expand All @@ -99,9 +101,12 @@ async def test_model_repository_load(
assert response.name == sum_model_settings.name




async def test_model_repository_load_error(
inference_service_stub, model_repository_service_stub, sum_model_settings
prometheus_registry: CollectorRegistry,inference_service_stub, model_repository_service_stub, sum_model_settings,
):

with pytest.raises(grpc.RpcError) as err:
load_request = mr_pb.RepositoryModelLoadRequest(model_name="my-model")
await model_repository_service_stub.RepositoryModelLoad(load_request)
Expand Down
31 changes: 17 additions & 14 deletions tests/grpc/test_servicers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import pytest
import grpc
from .conftest import delete_registry
from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry


from mlserver.cloudevents import (
CLOUDEVENTS_HEADER_SPECVERSION_DEFAULT,
Expand All @@ -15,28 +19,28 @@
from mlserver import __version__


async def test_server_live(inference_service_stub):
async def test_server_live(prometheus_registry: CollectorRegistry, inference_service_stub):
req = pb.ServerLiveRequest()
response = await inference_service_stub.ServerLive(req)

assert response.live


async def test_server_ready(inference_service_stub):
async def test_server_ready(prometheus_registry: CollectorRegistry, inference_service_stub):
req = pb.ServerReadyRequest()
response = await inference_service_stub.ServerReady(req)

assert response.ready


async def test_model_ready(inference_service_stub, sum_model):
async def test_model_ready(prometheus_registry: CollectorRegistry, inference_service_stub, sum_model):
req = pb.ModelReadyRequest(name=sum_model.name, version=sum_model.version)
response = await inference_service_stub.ModelReady(req)

assert response.ready


async def test_server_metadata(inference_service_stub):
async def test_server_metadata(prometheus_registry: CollectorRegistry, inference_service_stub):
req = pb.ServerMetadataRequest()
response = await inference_service_stub.ServerMetadata(req)

Expand All @@ -45,7 +49,7 @@ async def test_server_metadata(inference_service_stub):
assert response.extensions == []


async def test_model_metadata(inference_service_stub, sum_model_settings):
async def test_model_metadata(prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings):
req = pb.ModelMetadataRequest(
name=sum_model_settings.name, version=sum_model_settings.parameters.version
)
Expand All @@ -60,7 +64,7 @@ async def test_model_metadata(inference_service_stub, sum_model_settings):
"model_name,model_version", [("sum-model", "v1.2.3"), ("sum-model", None)]
)
async def test_model_infer(
inference_service_stub, model_infer_request, model_name, model_version
prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request, model_name, model_version
):
model_infer_request.model_name = model_name
if model_version is not None:
Expand All @@ -76,7 +80,7 @@ async def test_model_infer(
assert prediction.outputs[0].contents == expected


async def test_model_infer_raw_contents(inference_service_stub, model_infer_request):
async def test_model_infer_raw_contents(prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request):
# Prepare request with raw contents
for input_tensor in model_infer_request.inputs:
request_input = InferInputTensorConverter.to_types(input_tensor)
Expand Down Expand Up @@ -104,7 +108,7 @@ async def test_model_infer_raw_contents(inference_service_stub, model_infer_requ


async def test_model_infer_headers(
inference_service_stub, model_infer_request, sum_model_settings
prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request, sum_model_settings
):
model_infer_request.model_name = sum_model_settings.name
model_infer_request.ClearField("model_version")
Expand All @@ -126,7 +130,7 @@ async def test_model_infer_headers(
assert trailing_metadata[key] == value


async def test_model_infer_error(inference_service_stub, model_infer_request):
async def test_model_infer_error(prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request):
with pytest.raises(grpc.RpcError) as err:
model_infer_request.model_name = "my-model"
await inference_service_stub.ModelInfer(model_infer_request)
Expand All @@ -136,14 +140,14 @@ async def test_model_infer_error(inference_service_stub, model_infer_request):


async def test_model_repository_index(
inference_service_stub, grpc_repository_index_request
prometheus_registry: CollectorRegistry, inference_service_stub, grpc_repository_index_request
):
index = await inference_service_stub.RepositoryIndex(grpc_repository_index_request)

assert len(index.models) == 1


async def test_model_repository_unload(inference_service_stub, sum_model_settings):
async def test_model_repository_unload(prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings):
unload_request = pb.RepositoryModelUnloadRequest(model_name=sum_model_settings.name)
await inference_service_stub.RepositoryModelUnload(unload_request)

Expand All @@ -153,11 +157,10 @@ async def test_model_repository_unload(inference_service_stub, sum_model_setting
)


async def test_model_repository_load(inference_service_stub, sum_model_settings):
async def test_model_repository_load(prometheus_registry: CollectorRegistry, inference_service_stub, delete_registry: CollectorRegistry, sum_model_settings):
await inference_service_stub.RepositoryModelUnload(
pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
)

load_request = pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
await inference_service_stub.RepositoryModelLoad(load_request)

Expand All @@ -168,7 +171,7 @@ async def test_model_repository_load(inference_service_stub, sum_model_settings)
assert response.name == sum_model_settings.name


async def test_model_repository_load_error(inference_service_stub, sum_model_settings):
async def test_model_repository_load_error(prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings):
with pytest.raises(grpc.RpcError) as err:
load_request = pb.RepositoryModelLoadRequest(model_name="my-model")
await inference_service_stub.RepositoryModelLoad(load_request)
Expand Down
4 changes: 4 additions & 0 deletions tests/kafka/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from mlserver.settings import Settings
from mlserver.cloudevents import CLOUDEVENTS_HEADER_ID
from mlserver.kafka.handlers import KafkaMessage, MLSERVER_MODEL_NAME_HEADER
from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry


async def test_infer(
prometheus_registry: CollectorRegistry,
kafka_producer: AIOKafkaProducer,
kafka_consumer: AIOKafkaConsumer,
kafka_settings: Settings,
Expand All @@ -31,6 +34,7 @@ async def test_infer(


async def test_infer_error(
prometheus_registry: CollectorRegistry,
kafka_producer: AIOKafkaProducer,
kafka_consumer: AIOKafkaConsumer,
kafka_settings: Settings,
Expand Down
Loading