Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus client metrics support #711

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e3cb524
Karapace metrics
libretto Jun 9, 2023
32ce060
Merge branch 'master' into karapace-metrics
libretto Jun 9, 2023
8dab84d
fixup issues
libretto Jun 10, 2023
2898e31
fixup issues
libretto Jun 10, 2023
c974579
fixup annotations issue
libretto Jun 12, 2023
7256f5d
fixup exception message
libretto Jun 12, 2023
ab6ae96
get rid of multiple instances of class
libretto Jun 12, 2023
733d1f2
fixup issue
libretto Jun 12, 2023
8751eea
change code to send raw data only
libretto Jun 16, 2023
53d3e4b
merge with master
libretto Jun 16, 2023
fedff8f
fixup
libretto Jun 22, 2023
31d16d4
Merge branch 'master' into karapace-metrics
libretto Jun 22, 2023
b70ae03
fixup code
libretto Jun 22, 2023
a0387a3
fixup
libretto Jun 22, 2023
358facc
fixup
libretto Jun 22, 2023
a064624
merge
libretto Jul 3, 2023
8533959
improve code by request
libretto Aug 8, 2023
ac48829
merge with main
libretto Aug 8, 2023
90e221c
add psutil typing support
libretto Aug 8, 2023
4c48576
fixup
libretto Aug 8, 2023
f9cb6d8
fixup
libretto Aug 8, 2023
765864b
Merge branch 'main' into karapace-metrics
libretto Aug 30, 2023
073aa16
merge with master
libretto Sep 2, 2023
0c73a1a
refactor
libretto Sep 2, 2023
c495c50
fixup
libretto Sep 2, 2023
6fb96d0
prometheus support
libretto Sep 6, 2023
1f6fce2
fixup requirements
libretto Sep 14, 2023
569d5ef
merge with master
libretto Dec 20, 2023
0386ca9
remove connections counter
libretto Dec 20, 2023
5f302a8
fixup lint
libretto Dec 20, 2023
c785e1a
skip the README.rst updates
libretto Dec 21, 2023
5da8ca8
fixup metrics stats usage
libretto Jan 10, 2024
6f2093e
Merge branch 'master' into prometheus2
libretto Jan 10, 2024
03f1189
merge with master
libretto Jan 25, 2024
2c06480
merge with master and fixup conflict
libretto Feb 13, 2024
7e71ecb
merge with master
libretto Feb 21, 2024
c66b48f
merge with master
libretto Apr 3, 2024
80d92c0
add unit tests
libretto May 22, 2024
dafa3f2
pylint fixes
libretto May 22, 2024
c7e6796
merge with main branch
libretto May 23, 2024
a687a33
fixup test issues
libretto May 30, 2024
8da1e33
fixup prometheus client issue
libretto May 30, 2024
2b019c8
improve tests
libretto May 31, 2024
44cd5b8
add test
libretto Jun 12, 2024
13abf60
merge with main
libretto Jun 13, 2024
c8fa58d
fixup Metrics issues
libretto Jun 13, 2024
fc90f72
fixup Metrics issues
libretto Jun 13, 2024
33e4155
fixup minor issue
libretto Jun 13, 2024
b2d4eaa
pylint issue
libretto Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats

Supports base class for statsd and prometheus protocols:

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
12 changes: 12 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class Config(TypedDict):
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -150,6 +156,12 @@ class ConfigDefaults(Config, total=False):
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
130 changes: 130 additions & 0 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import threading


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class Metrics(metaclass=Singleton):
def __init__(
self,
) -> None:
self.stats_client: StatsClient | None = None
self.is_ready = False
self.lock = threading.Lock()
self.request_size_total = 0
self.request_count = 0

def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return
if not config.get("metrics_extended"):
return
stats_service = config.get("stats_service")
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
self.is_ready = True

def request(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("request-size-total", size)
self.stats_client.increase("request-count", 1)

def response(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("response-size-total", size)
self.stats_client.increase("response-count", 1)

def are_we_master(self, is_master: bool) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge(metric, value, tags)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase(metric, inc_value, tags)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing(metric, value, tags)

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.unexpected_exception(ex, where, tags)

def error(self) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error-total", 1)

def cleanup(self) -> None:
if self.stats_client and isinstance(self.stats_client, StatsClient):
self.stats_client.close()

if not self.is_ready:
return
122 changes: 122 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
karapace - prometheus

Supports telegraf's statsd protocol extension for 'key=value' tags:

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, REGISTRY, Summary
from prometheus_client.exposition import make_wsgi_app
from socketserver import ThreadingMixIn
from typing import Any, Final
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer

import logging
import socket
import threading

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
"""Thread per request HTTP server."""

# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
daemon_threads = True


class _SilentHandler(WSGIRequestHandler):
"""WSGI handler that does not log requests."""

# pylint: disable=W0622

def log_message(self, format: str, *args: Any) -> None:
"""Log nothing."""


def get_family(address: bytes | str | None, port: int | str | None) -> tuple[socket.AddressFamily, str]:
infos = socket.getaddrinfo(address, port)
family, _, _, _, sockaddr = next(iter(infos))
return family, sockaddr[0]


class PrometheusClient(StatsClient):
server_is_active: bool = False

def __init__(self, config: Config) -> None:
super().__init__(config)
self.lock = threading.Lock()
self.thread = None
with self.lock:
_host = config.get("prometheus_host", None)
_port = config.get("prometheus_port", None)
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not PrometheusClient.server_is_active:
# We wrapped httpd server creation from prometheus client to allow stop this server"""
class TmpServer(ThreadingWSGIServer):
pass

TmpServer.address_family, addr = get_family(_host, _port)
app = make_wsgi_app(REGISTRY)
self.httpd = make_server(addr, _port, app, TmpServer, handler_class=_SilentHandler)
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
PrometheusClient.server_is_active = True

else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)

def stop_server(self) -> None:
self.httpd.shutdown()
self.httpd.server_close()
if isinstance(self.thread, threading.Thread):
self.thread.join()

def close(self) -> None:
with self.lock:
if self.server_is_active:
self.stop_server()
PrometheusClient.server_is_active = False
Loading
Loading