Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karapace metrics #652

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
* - ``metrics_mode``
- ``statsd``
- Statistics server mode. For karapace supports ststsd server
* - ``statsd_uri``
libretto marked this conversation as resolved.
Show resolved Hide resolved
- ``127.0.0.1:8125``
- Host:Port of statsd server
* - ``metrics_extended``
libretto marked this conversation as resolved.
Show resolved Hide resolved
- ``true``
- Enable extended metrics. Extended metrics: connections_active, request_size_avg, request_size_max, response_size_avg, response_size_max
libretto marked this conversation as resolved.
Show resolved Hide resolved


Authentication and authorization of Karapace Schema Registry REST API
Expand Down
5 changes: 4 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"metrics_mode": "statsd",
"statsd_uri": "127.0.0.1:8125",
"metrics_extended": true
}
3 changes: 3 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ class ConfigDefaults(Config, total=False):
"karapace_registry": False,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"metrics_mode": "statsd",
"statsd_uri": "127.0.0.1:8125",
"metrics_extended": True,
libretto marked this conversation as resolved.
Show resolved Hide resolved
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
133 changes: 133 additions & 0 deletions karapace/karapacemetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: On the naming of this module, we're already in the karapace namespace, let's simply name the module metrics? (Resulting name would be karapace.metrics instead of repeating karapace.karapacemetrics).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from kafka.metrics import Metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

from karapace.config import Config
from karapace.statsd import StatsClient

import os
import psutil
import schedule
import threading
import time


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class KarapaceMetrics(metaclass=Singleton):
def __init__(self) -> None:
self.active: object | None = None
libretto marked this conversation as resolved.
Show resolved Hide resolved
self.stats_client: StatsClient | None = None
self.is_ready = False
self.metrics = Metrics()
libretto marked this conversation as resolved.
Show resolved Hide resolved
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self.worker)
self.lock = threading.Lock()
self.error_count = 0
self.app_host = ""
self.app_port = 8081
libretto marked this conversation as resolved.
Show resolved Hide resolved

def setup(self, stats_client: StatsClient, config: Config) -> None:
self.active = config.get("metrics_extended")
if not self.active:
return
with self.lock:
if self.is_ready:
return
self.is_ready = True
if not self.stats_client:
self.stats_client = stats_client
else:
self.active = False
return
app_host = config.get("host")
app_port = config.get("port")
if app_host and app_port:
self.app_host = app_host
self.app_port = app_port
else:
raise RuntimeError("No application host or port defined in application")

schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the threading needed only for the reading connection count? Maybe push the logic out from this class and implement which would be called from the reader thread. Consider also thread safety.

def connections(self, connections: int) -> None:
    ...
    self.stats_client.gauge("connections", connections)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread job operates at a frequency of every 10 seconds, which is a relatively resource-efficient approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libretto Could you shed some further light on this question?

Is the threading needed only for the reading connection count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if Karapace is handling 100 requests per second, the function that retrieves the connection count would be called with every request. In my solution, we acquire this data every 10 seconds, which means significantly fewer calls (about 1000 times fewer). While this approach might not be as precise, it's effective enough for our purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aiven-anton @jjaakola-aiven could you check?


def request(self, size: int) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("request-size", size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter, name as request_size_total.

The use of Gauge tries to emulate the behavior of Prometheus counter? If StatsD is the backend I think the correct would be the counter, it does get reset to 0 on every flush. Prometheus counter does not, so there is definitely difference. If Prometheus support is added I think the self.stats_client would be a different implementation suitable for Prometheus. This comment applies to other counter comments too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear about the necessity of using "request_size_total" when our aim is to create a graph depicting request size over time. As far as I can see, there isn't a metric that requires a counter. As I know both Prometheus and StatsD have a similar gauge metric type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libretto I think gauge simply doesn't make sense here?

our aim is to create a graph depicting request size over time

The way to do that with prometheus is to graph request_size_total / request_count, I'm unsure what makes sense with StatsD.

Copy link
Contributor Author

@libretto libretto Sep 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to calculate "request_size_total / request_count," we would need to store "request_count" and perform the calculation for "request_size_total" on the karapace side. However, I noticed here your comment where You mentioned that this approach might not be advisable. Consequently, I modified the code in this manner because both Statsd and Prometheus can visualize this data without the need for internal calculations. Perhaps this approach may be less efficient than mine, but at least it avoids any calculations within karapace.


def response(self, size: int) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("response-size", size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter, name as response_size_total.

Copy link
Contributor Author

@libretto libretto Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "Total"? why counter?


def are_we_master(self, is_master: bool) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", latency_ms)
libretto marked this conversation as resolved.
Show resolved Hide resolved

def error(self) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.error_count += 1
libretto marked this conversation as resolved.
Show resolved Hide resolved
self.stats_client.gauge("error", self.error_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter, name as error_total.

Copy link
Contributor Author

@libretto libretto Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our goal is to create a graph showing the request error rate, which represents the number of error requests per second. However, considering your previous advice that we shouldn't perform calculations or aggregation on the karapace side, I'm currently uncertain about whether to use a gauge or counter to achieve the desired rate graph as the output.
If we use
self.stats_client.increase("error_total", 1)
Do we have the possibility to create an error-rate graph with this data on statsd?


def connections(self) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
psutil.Process(os.getpid()).connections()
connections = 0
for conn in psutil.net_connections(kind="tcp"):
if not conn.laddr:
continue
if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED":
connections += 1
self.stats_client.gauge("connections-active", connections)
libretto marked this conversation as resolved.
Show resolved Hide resolved

def worker(self) -> None:
while True:
if self.stop_event.is_set():
break
schedule.run_pending()
time.sleep(1)

def cleanup(self) -> None:
if not self.active:
return
self.stop_event.set()
if self.worker_thread.is_alive():
self.worker_thread.join()
15 changes: 15 additions & 0 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from accept_types import get_best_match
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.karapacemetrics import KarapaceMetrics
from karapace.statsd import StatsClient
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
Expand Down Expand Up @@ -134,6 +135,8 @@ def __init__(
if content_type:
self.headers["Content-Type"] = content_type
super().__init__(f"HTTPResponse {status.value}")
if not is_success(status):
KarapaceMetrics().error()

def ok(self) -> bool:
"""True if resposne has a 2xx status_code"""
Expand Down Expand Up @@ -169,6 +172,7 @@ def __init__(
self.stats = StatsClient(config=config)
self.app.on_cleanup.append(self.close_by_app)
self.not_ready_handler = not_ready_handler
KarapaceMetrics().setup(self.stats, config)

def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application:
return aiohttp.web.Application(client_max_size=config["http_request_max_size"])
Expand All @@ -183,6 +187,7 @@ async def close(self) -> None:
set as hook because the awaitables have to run inside the event loop
created by the aiohttp library.
"""
KarapaceMetrics().cleanup()
self.stats.close()

@staticmethod
Expand Down Expand Up @@ -269,15 +274,22 @@ async def _handle_request(
url=request.url,
path_for_stats=path_for_stats,
)

try:
if request.method == "OPTIONS":
# self.metrics.request(0)
libretto marked this conversation as resolved.
Show resolved Hide resolved
origin = request.headers.get("Origin")
if not origin:
raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST)
headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin)

raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers)

body = await request.read()
if body:
KarapaceMetrics().request(len(body))
else:
KarapaceMetrics().request(0)
if json_request:
if not body:
raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST)
Expand Down Expand Up @@ -385,6 +397,7 @@ async def _handle_request(
)
headers = {"Content-Type": "application/json"}
resp = aiohttp.web.Response(body=body, status=status.value, headers=headers)

except asyncio.CancelledError:
self.log.debug("Client closed connection")
raise
Expand All @@ -393,6 +406,8 @@ async def _handle_request(
self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url)
resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value)
finally:
KarapaceMetrics().response(resp.content_length)
KarapaceMetrics().latency((time.monotonic() - start_time) * 1000)
self.stats.timing(
self.app_request_metric,
time.monotonic() - start_time,
Expand Down
2 changes: 2 additions & 0 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
VersionNotFoundException,
)
from karapace.in_memory_database import InMemoryDatabase
from karapace.karapacemetrics import KarapaceMetrics
from karapace.key_format import KeyFormatter
from karapace.master_coordinator import MasterCoordinator
from karapace.messaging import KarapaceProducer
Expand Down Expand Up @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
elif not ignore_readiness and self.schema_reader.ready is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
KarapaceMetrics().are_we_master(are_we_master)
return are_we_master, master_url
await asyncio.sleep(1.0)

Expand Down
15 changes: 14 additions & 1 deletion karapace/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import socket
import time
import urllib

STATSD_HOST: Final = "127.0.0.1"
STATSD_PORT: Final = 8125
Expand All @@ -32,7 +33,19 @@ def __init__(
host: str = STATSD_HOST,
port: int = STATSD_PORT,
) -> None:
self._dest_addr: Final = (host, port)
_host = host
_port = port

if config.get("metrics_mode") == "statsd":
statsd_uri = config.get("statsd_uri")
if statsd_uri:
srv = urllib.parse.urlsplit("//" + str(statsd_uri))
if srv.hostname:
_host = str(srv.hostname)
if srv.port:
_port = int(srv.port)
libretto marked this conversation as resolved.
Show resolved Hide resolved

self._dest_addr: Final = (_host, _port)
self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._tags: Final = config.get("tags", {})
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))
Expand Down
11 changes: 7 additions & 4 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ commonmark==0.9.1
# via
# -r requirements.txt
# rich
configargparse==1.5.3
configargparse==1.5.5
# via locust
exceptiongroup==1.1.1
exceptiongroup==1.1.2
# via
# -r requirements.txt
# anyio
Expand Down Expand Up @@ -89,7 +89,7 @@ geventhttpclient==2.0.9
# via locust
greenlet==2.0.2
# via gevent
hypothesis==6.79.3
hypothesis==6.80.0
# via -r requirements-dev.in
idna==3.4
# via
Expand Down Expand Up @@ -150,6 +150,7 @@ protobuf==3.20.3
psutil==5.9.5
# via
# -r requirements-dev.in
# -r requirements.txt
# locust
# pytest-xdist
pygments==2.15.1
Expand Down Expand Up @@ -184,6 +185,8 @@ rich==12.5.1
# via -r requirements.txt
roundrobin==0.0.4
# via locust
schedule==1.2.0
# via -r requirements.txt
sentry-sdk==1.26.0
# via -r requirements-dev.in
six==1.16.0
Expand All @@ -202,7 +205,7 @@ tenacity==8.2.2
# via -r requirements.txt
tomli==2.0.1
# via pytest
typing-extensions==4.6.3
typing-extensions==4.7.1
# via
# -r requirements.txt
# locust
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-typing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ types-cachetools==5.3.0.5
# via -r requirements-typing.in
types-jsonschema==4.17.0.8
# via -r requirements-typing.in
typing-extensions==4.6.3
typing-extensions==4.7.1
# via
# -c requirements-dev.txt
# mypy
Expand Down
3 changes: 3 additions & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ tenacity<9
typing-extensions
ujson<6
watchfiles<1
schedule
psutil
xxhash~=3.0
rich~=12.5.0
cachetools==5.2.0


# Patched dependencies
#
# Note: It is important to use commits to reference patched dependencies. This
Expand Down
8 changes: 6 additions & 2 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ charset-normalizer==3.1.0
# via aiohttp
commonmark==0.9.1
# via rich
exceptiongroup==1.1.1
exceptiongroup==1.1.2
# via anyio
frozenlist==1.3.3
# via
Expand Down Expand Up @@ -62,6 +62,8 @@ pkgutil-resolve-name==1.3.10
# via jsonschema
protobuf==3.20.3
# via -r requirements.in
psutil==5.9.5
# via -r requirements.in
pygments==2.15.1
# via rich
pyrsistent==0.19.3
Expand All @@ -70,6 +72,8 @@ python-dateutil==2.8.2
# via -r requirements.in
rich==12.5.1
# via -r requirements.in
schedule==1.2.0
# via -r requirements.in
six==1.16.0
# via
# isodate
Expand All @@ -78,7 +82,7 @@ sniffio==1.3.0
# via anyio
tenacity==8.2.2
# via -r requirements.in
typing-extensions==4.6.3
typing-extensions==4.7.1
# via
# -r requirements.in
# rich
Expand Down