diff --git a/container/prometheus/prometheus.yml b/container/prometheus/prometheus.yml index f62e8082a..ec2855f90 100644 --- a/container/prometheus/prometheus.yml +++ b/container/prometheus/prometheus.yml @@ -8,17 +8,17 @@ rule_files: # A scrape configuration scrape_configs: - - job_name: karapace-registry + - job_name: karapace-schema-registry metrics_path: /metrics static_configs: - targets: - - karapace-registry:8081 + - karapace-schema-registry:8081 - - job_name: karapace-rest + - job_name: karapace-rest-proxy metrics_path: /metrics static_configs: - targets: - - karapace-rest:8082 + - karapace-rest-proxy:8082 - job_name: statsd-exporter metrics_path: /metrics diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 7ff513584..d7a3b7b07 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -7,6 +7,7 @@ from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan import schema_registry.factory +import schema_registry.middlewares.prometheus import schema_registry.routers.compatibility import schema_registry.routers.config import schema_registry.routers.health @@ -32,6 +33,7 @@ schema_registry_container.wire( modules=[ __name__, + schema_registry.middlewares.prometheus, schema_registry.factory, schema_registry.user, schema_registry.routers.health, diff --git a/src/schema_registry/middlewares/__init__.py b/src/schema_registry/middlewares/__init__.py index 8df42b04c..ab41b36ef 100644 --- a/src/schema_registry/middlewares/__init__.py +++ b/src/schema_registry/middlewares/__init__.py @@ -7,6 +7,7 @@ from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse from karapace.content_type import check_schema_headers +from schema_registry.middlewares.prometheus import setup_prometheus_middleware def setup_middlewares(app: FastAPI) -> None: @@ -32,3 +33,5 @@ async def set_content_types(request: Request, call_next: Callable[[Request], Awa response = await call_next(request) response.headers["Content-Type"] = response_content_type return response + + setup_prometheus_middleware(app=app) diff --git a/src/schema_registry/middlewares/prometheus.py b/src/schema_registry/middlewares/prometheus.py new file mode 100644 index 000000000..d9624aeb1 --- /dev/null +++ b/src/schema_registry/middlewares/prometheus.py @@ -0,0 +1,53 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from collections.abc import Awaitable, Callable +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, FastAPI, Request, Response +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from schema_registry.container import SchemaRegistryContainer + +import logging +import time + +LOG = logging.getLogger(__name__) + + +@inject +async def prometheus_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + prometheus: PrometheusInstrumentation = Depends(Provide[SchemaRegistryContainer.karapace_container.prometheus]), +) -> Response: + # Set start time for request + setattr(request.state, prometheus.START_TIME_REQUEST_KEY, time.monotonic()) + + # Extract request labels + path = request.url.path + method = request.method + + # Increment requests in progress before response handler + prometheus.karapace_http_requests_in_progress.labels(method=method, path=path).inc() + + # Call request handler + response: Response = await call_next(request) + + # Instrument request duration + prometheus.karapace_http_requests_duration_seconds.labels(method=method, path=path).observe( + time.monotonic() - getattr(request.state, prometheus.START_TIME_REQUEST_KEY) + ) + + # Instrument total requests + prometheus.karapace_http_requests_total.labels(method=method, path=path, status=response.status_code).inc() + + # Decrement requests in progress after response handler + prometheus.karapace_http_requests_in_progress.labels(method=method, path=path).dec() + + return response + + +def setup_prometheus_middleware(app: FastAPI) -> None: + LOG.info("Setting up prometheus middleware for metrics") + app.middleware("http")(prometheus_middleware) diff --git a/tests/unit/schema_registry/__init__.py b/tests/unit/schema_registry/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/schema_registry/middlewares/__init__.py b/tests/unit/schema_registry/middlewares/__init__.py new file mode 100644 index 000000000..f53be7121 --- /dev/null +++ b/tests/unit/schema_registry/middlewares/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" diff --git a/tests/unit/schema_registry/middlewares/test_prometheus.py b/tests/unit/schema_registry/middlewares/test_prometheus.py new file mode 100644 index 000000000..c304d819e --- /dev/null +++ b/tests/unit/schema_registry/middlewares/test_prometheus.py @@ -0,0 +1,80 @@ +""" +schema_registry - prometheus instrumentation middleware tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from fastapi import FastAPI, Request, Response +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from schema_registry.middlewares.prometheus import prometheus_middleware, setup_prometheus_middleware +from starlette.datastructures import State +from unittest.mock import AsyncMock, call, MagicMock, patch + +import logging + + +def test_setup_prometheus_middleware(caplog: LogCaptureFixture) -> None: + app = AsyncMock(spec=FastAPI) + with caplog.at_level(logging.INFO, logger="schema_registry.middlewares.prometheus"): + setup_prometheus_middleware(app=app) + + for log in caplog.records: + assert log.name == "schema_registry.middlewares.prometheus" + assert log.levelname == "INFO" + assert log.message == "Setting up prometheus middleware for metrics" + + app.middleware.assert_called_once_with("http") + app.middleware.return_value.assert_called_once_with(prometheus_middleware) + + +async def test_prometheus_middleware() -> None: + response_mock = AsyncMock(spec=Response) + response_mock.status_code = 200 + + call_next = AsyncMock() + call_next.return_value = response_mock + + request = AsyncMock(spec=Request) + request.state = MagicMock(spec=State) + + prometheus = MagicMock(spec=PrometheusInstrumentation, START_TIME_REQUEST_KEY="start_time") + + with patch("schema_registry.middlewares.prometheus.time.monotonic", return_value=1): + response = await prometheus_middleware(request=request, call_next=call_next, prometheus=prometheus) + + # Check that the `start_time` for the request is set + assert hasattr(request.state, "start_time") + assert getattr(request.state, "start_time") == 1 + + # Check that `karapace_http_requests_in_progress` metric is incremented/decremented + prometheus.karapace_http_requests_in_progress.labels.assert_has_calls( + [ + call(method=request.method, path=request.url.path), + call().inc(), + call(method=request.method, path=request.url.path), + call().dec(), + ] + ) + + # Check that `karapace_http_requests_duration_seconds` metric is observed + prometheus.karapace_http_requests_duration_seconds.labels.assert_has_calls( + [ + call(method=request.method, path=request.url.path), + call().observe(0), + ] + ) + + # Check that the request handler is called + call_next.assert_awaited_once_with(request) + + # Check that `karapace_http_requests_total` metric is incremented/decremented + prometheus.karapace_http_requests_total.labels.assert_has_calls( + [ + call(method=request.method, path=request.url.path, status=response.status_code), + call().inc(), + ] + ) + + assert response == response_mock diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/schema_registry/test_schema_registry_api.py similarity index 100% rename from tests/unit/test_schema_registry_api.py rename to tests/unit/schema_registry/test_schema_registry_api.py