Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karapace metrics #652

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)

* - ``metrics_extended``
libretto marked this conversation as resolved.
Show resolved Hide resolved
- ``true``
- Enable extended metrics. Extended metrics: connections_active, [request|response]_size
* - ``statsd_host``
- ``127.0.0.1``
- Host of statsd server
* - ``statsd_port``
- ``8125``
- Port of statsd server

Authentication and authorization of Karapace Schema Registry REST API
=====================================================================
Expand Down
5 changes: 4 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125
}
6 changes: 6 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class Config(TypedDict):
karapace_registry: bool
master_election_strategy: str
protobuf_runtime_directory: str
metrics_extended: bool
statsd_host: str
statsd_port: int

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -144,6 +147,9 @@ class ConfigDefaults(Config, total=False):
"karapace_registry": False,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
119 changes: 119 additions & 0 deletions karapace/karapacemetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: On the naming of this module, we're already in the karapace namespace, let's simply name the module metrics? (Resulting name would be karapace.metrics instead of repeating karapace.karapacemetrics).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.config import Config
from karapace.statsd import StatsClient

import os
import psutil
import schedule
import threading
import time


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class KarapaceMetrics(metaclass=Singleton):
def __init__(self) -> None:
self.active = False
self.stats_client: StatsClient | None = None
self.is_ready = False
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self.worker)
self.lock = threading.Lock()

def setup(self, stats_client: StatsClient, config: Config) -> None:
self.active = config.get("metrics_extended") or False
if not self.active:
return
with self.lock:
if self.is_ready:
return
self.is_ready = True
if not self.stats_client:
self.stats_client = stats_client
else:
self.active = False
return

schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the threading needed only for the reading connection count? Maybe push the logic out from this class and implement which would be called from the reader thread. Consider also thread safety.

def connections(self, connections: int) -> None:
    ...
    self.stats_client.gauge("connections", connections)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread job operates at a frequency of every 10 seconds, which is a relatively resource-efficient approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libretto Could you shed some further light on this question?

Is the threading needed only for the reading connection count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if Karapace is handling 100 requests per second, the function that retrieves the connection count would be called with every request. In my solution, we acquire this data every 10 seconds, which means significantly fewer calls (about 1000 times fewer). While this approach might not be as precise, it's effective enough for our purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aiven-anton @jjaakola-aiven could you check?


def request(self, size: int) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("request-size", size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter, name as request_size_total.

The use of Gauge tries to emulate the behavior of Prometheus counter? If StatsD is the backend I think the correct would be the counter, it does get reset to 0 on every flush. Prometheus counter does not, so there is definitely difference. If Prometheus support is added I think the self.stats_client would be a different implementation suitable for Prometheus. This comment applies to other counter comments too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unclear about the necessity of using "request_size_total" when our aim is to create a graph depicting request size over time. As far as I can see, there isn't a metric that requires a counter. As I know both Prometheus and StatsD have a similar gauge metric type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libretto I think gauge simply doesn't make sense here?

our aim is to create a graph depicting request size over time

The way to do that with prometheus is to graph request_size_total / request_count, I'm unsure what makes sense with StatsD.

Copy link
Contributor Author

@libretto libretto Sep 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to calculate "request_size_total / request_count," we would need to store "request_count" and perform the calculation for "request_size_total" on the karapace side. However, I noticed here your comment where You mentioned that this approach might not be advisable. Consequently, I modified the code in this manner because both Statsd and Prometheus can visualize this data without the need for internal calculations. Perhaps this approach may be less efficient than mine, but at least it avoids any calculations within karapace.


def response(self, size: int) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("response-size", size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter, name as response_size_total.

Copy link
Contributor Author

@libretto libretto Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "Total"? why counter?


def are_we_master(self, is_master: bool) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error_total", 1)

def connections(self) -> None:
if not self.active:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
connections = 0
karapace_proc = psutil.Process(os.getpid())

for conn in karapace_proc.connections(kind="tcp"):
if conn.laddr and conn.status == "ESTABLISHED":
connections += 1
self.stats_client.gauge("connections-active", connections)

def worker(self) -> None:
while True:
if self.stop_event.is_set():
break
schedule.run_pending()
time.sleep(1)

def cleanup(self) -> None:
if not self.active:
return
self.stop_event.set()
if self.worker_thread.is_alive():
self.worker_thread.join()
14 changes: 14 additions & 0 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from accept_types import get_best_match
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.karapacemetrics import KarapaceMetrics
from karapace.statsd import StatsClient
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
Expand Down Expand Up @@ -134,6 +135,8 @@ def __init__(
if content_type:
self.headers["Content-Type"] = content_type
super().__init__(f"HTTPResponse {status.value}")
if not is_success(status):
KarapaceMetrics().error()

def ok(self) -> bool:
"""True if resposne has a 2xx status_code"""
Expand Down Expand Up @@ -169,6 +172,7 @@ def __init__(
self.stats = StatsClient(config=config)
self.app.on_cleanup.append(self.close_by_app)
self.not_ready_handler = not_ready_handler
KarapaceMetrics().setup(self.stats, config)

def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application:
return aiohttp.web.Application(client_max_size=config["http_request_max_size"])
Expand All @@ -183,6 +187,7 @@ async def close(self) -> None:
set as hook because the awaitables have to run inside the event loop
created by the aiohttp library.
"""
KarapaceMetrics().cleanup()
self.stats.close()

@staticmethod
Expand Down Expand Up @@ -269,15 +274,21 @@ async def _handle_request(
url=request.url,
path_for_stats=path_for_stats,
)

try:
if request.method == "OPTIONS":
origin = request.headers.get("Origin")
if not origin:
raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST)
headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin)

raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers)

body = await request.read()
if body:
KarapaceMetrics().request(len(body))
else:
KarapaceMetrics().request(0)
if json_request:
if not body:
raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST)
Expand Down Expand Up @@ -385,6 +396,7 @@ async def _handle_request(
)
headers = {"Content-Type": "application/json"}
resp = aiohttp.web.Response(body=body, status=status.value, headers=headers)

except asyncio.CancelledError:
self.log.debug("Client closed connection")
raise
Expand All @@ -393,6 +405,8 @@ async def _handle_request(
self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url)
resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value)
finally:
KarapaceMetrics().response(resp.content_length)
KarapaceMetrics().latency((time.monotonic() - start_time) * 1000)
self.stats.timing(
self.app_request_metric,
time.monotonic() - start_time,
Expand Down
2 changes: 2 additions & 0 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
VersionNotFoundException,
)
from karapace.in_memory_database import InMemoryDatabase
from karapace.karapacemetrics import KarapaceMetrics
from karapace.key_format import KeyFormatter
from karapace.master_coordinator import MasterCoordinator
from karapace.messaging import KarapaceProducer
Expand Down Expand Up @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
elif not ignore_readiness and self.schema_reader.ready is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
KarapaceMetrics().are_we_master(are_we_master)
return are_we_master, master_url
await asyncio.sleep(1.0)

Expand Down
4 changes: 3 additions & 1 deletion karapace/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def __init__(
host: str = STATSD_HOST,
port: int = STATSD_PORT,
) -> None:
self._dest_addr: Final = (host, port)
_host = config.get("statsd_host") if "statsd_host" in config else host
_port = config.get("statsd_port") if "statsd_port" in config else port
self._dest_addr: Final = (_host, _port)
self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._tags: Final = config.get("tags", {})
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))
Expand Down
Loading