-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prometheus client metrics support #711
Closed
Closed
Changes from 43 commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
e3cb524
Karapace metrics
libretto 32ce060
Merge branch 'master' into karapace-metrics
libretto 8dab84d
fixup issues
libretto 2898e31
fixup issues
libretto c974579
fixup annotations issue
libretto 7256f5d
fixup exception message
libretto ab6ae96
get rid of multiple instances of class
libretto 733d1f2
fixup issue
libretto 8751eea
change code to send raw data only
libretto 53d3e4b
merge with master
libretto fedff8f
fixup
libretto 31d16d4
Merge branch 'master' into karapace-metrics
libretto b70ae03
fixup code
libretto a0387a3
fixup
libretto 358facc
fixup
libretto a064624
merge
libretto 8533959
improve code by request
libretto ac48829
merge with main
libretto 90e221c
add psutil typing support
libretto 4c48576
fixup
libretto f9cb6d8
fixup
libretto 765864b
Merge branch 'main' into karapace-metrics
libretto 073aa16
merge with master
libretto 0c73a1a
refactor
libretto c495c50
fixup
libretto 6fb96d0
prometheus support
libretto 1f6fce2
fixup requirements
libretto 569d5ef
merge with master
libretto 0386ca9
remove connections counter
libretto 5f302a8
fixup lint
libretto c785e1a
skip the README.rst updates
libretto 5da8ca8
fixup metrics stats usage
libretto 6f2093e
Merge branch 'master' into prometheus2
libretto 03f1189
merge with master
libretto 2c06480
merge with master and fixup conflict
libretto 7e71ecb
merge with master
libretto c66b48f
merge with master
libretto 80d92c0
add unit tests
libretto dafa3f2
pylint fixes
libretto c7e6796
merge with main branch
libretto a687a33
fixup test issues
libretto 8da1e33
fixup prometheus client issue
libretto 2b019c8
improve tests
libretto 44cd5b8
add test
libretto 13abf60
merge with main
libretto c8fa58d
fixup Metrics issues
libretto fc90f72
fixup Metrics issues
libretto 33e4155
fixup minor issue
libretto b2d4eaa
pylint issue
libretto File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
""" | ||
karapace - basestats | ||
|
||
Supports base class for statsd and prometheus protocols: | ||
|
||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from __future__ import annotations | ||
|
||
from abc import ABC, abstractmethod | ||
from contextlib import contextmanager | ||
from karapace.config import Config | ||
from karapace.sentry import get_sentry_client | ||
from typing import Final, Iterator | ||
|
||
import time | ||
|
||
|
||
class StatsClient(ABC): | ||
@abstractmethod | ||
def __init__( | ||
self, | ||
config: Config, | ||
) -> None: | ||
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) | ||
|
||
@contextmanager | ||
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: | ||
start_time = time.monotonic() | ||
yield | ||
self.timing(metric, time.monotonic() - start_time, tags) | ||
|
||
@abstractmethod | ||
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def timing(self, metric: str, value: float, tags: dict | None = None) -> None: | ||
pass | ||
|
||
def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: | ||
all_tags = { | ||
"exception": ex.__class__.__name__, | ||
"where": where, | ||
} | ||
all_tags.update(tags or {}) | ||
self.increase("exception", tags=all_tags) | ||
scope_args = {**(tags or {}), "where": where} | ||
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) | ||
|
||
def close(self) -> None: | ||
self.sentry_client.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
""" | ||
karapace - metrics | ||
Supports collection of system metrics | ||
list of supported metrics: | ||
connections-active - The number of active HTTP(S) connections to server. | ||
Data collected inside aiohttp request handler. | ||
|
||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from __future__ import annotations | ||
|
||
from karapace.base_stats import StatsClient | ||
from karapace.config import Config | ||
from karapace.prometheus import PrometheusClient | ||
from karapace.statsd import StatsdClient | ||
|
||
import threading | ||
|
||
|
||
class MetricsException(Exception): | ||
pass | ||
|
||
|
||
class Singleton(type): | ||
_instance: Singleton | None = None | ||
|
||
def __call__(cls, *args: str, **kwargs: int) -> Singleton: | ||
if cls._instance is None: | ||
instance = super().__call__(*args, **kwargs) | ||
cls._instance = instance | ||
return cls._instance | ||
|
||
|
||
class Metrics(metaclass=Singleton): | ||
stats_client: StatsClient | ||
|
||
def __init__( | ||
self, | ||
) -> None: | ||
self.is_ready = False | ||
self.lock = threading.Lock() | ||
self.request_size_total = 0 | ||
self.request_count = 0 | ||
|
||
def setup(self, config: Config) -> None: | ||
with self.lock: | ||
if self.is_ready: | ||
return | ||
|
||
if not config.get("metrics_extended"): | ||
return | ||
stats_service = config.get("stats_service") | ||
if stats_service == "statsd": | ||
self.stats_client = StatsdClient(config=config) | ||
elif stats_service == "prometheus": | ||
self.stats_client = PrometheusClient(config=config) | ||
else: | ||
raise MetricsException('Config variable "stats_service" is not defined') | ||
self.is_ready = True | ||
|
||
def request(self, size: int) -> None: | ||
if not self.is_ready or self.stats_client is None: | ||
return | ||
if not isinstance(self.stats_client, StatsClient): | ||
raise RuntimeError("no StatsClient available") | ||
self.stats_client.increase("request-size-total", size) | ||
self.stats_client.increase("request-count", 1) | ||
|
||
def response(self, size: int) -> None: | ||
if not self.is_ready or self.stats_client is None: | ||
return | ||
if not isinstance(self.stats_client, StatsClient): | ||
raise RuntimeError("no StatsClient available") | ||
self.stats_client.increase("response-size-total", size) | ||
self.stats_client.increase("response-count", 1) | ||
|
||
def are_we_master(self, is_master: bool) -> None: | ||
if not self.is_ready or self.stats_client is None: | ||
return | ||
if not isinstance(self.stats_client, StatsClient): | ||
raise RuntimeError("no StatsClient available") | ||
self.stats_client.gauge("master-slave-role", int(is_master)) | ||
|
||
def latency(self, latency_ms: float) -> None: | ||
if not self.is_ready or self.stats_client is None: | ||
return | ||
if not isinstance(self.stats_client, StatsClient): | ||
raise RuntimeError("no StatsClient available") | ||
self.stats_client.timing("latency_ms", latency_ms) | ||
|
||
def error(self) -> None: | ||
if not self.is_ready or self.stats_client is None: | ||
return | ||
if not isinstance(self.stats_client, StatsClient): | ||
raise RuntimeError("no StatsClient available") | ||
self.stats_client.increase("error-total", 1) | ||
|
||
def cleanup(self) -> None: | ||
if self.stats_client: | ||
self.stats_client.close() | ||
if not self.is_ready: | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
""" | ||
karapace - prometheus | ||
|
||
Supports telegraf's statsd protocol extension for 'key=value' tags: | ||
|
||
https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd | ||
|
||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from __future__ import annotations | ||
|
||
from karapace.base_stats import StatsClient | ||
from karapace.config import Config | ||
from prometheus_client import Counter, Gauge, REGISTRY, Summary | ||
from prometheus_client.exposition import make_wsgi_app | ||
from socketserver import ThreadingMixIn | ||
from typing import Final | ||
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer | ||
|
||
import logging | ||
import socket | ||
import threading | ||
|
||
LOG = logging.getLogger(__name__) | ||
HOST: Final = "127.0.0.1" | ||
PORT: Final = 8005 | ||
|
||
|
||
class PrometheusException(Exception): | ||
pass | ||
|
||
|
||
class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): | ||
"""Thread per request HTTP server.""" | ||
|
||
# Make worker threads "fire and forget". Beginning with Python 3.7 this | ||
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all | ||
# non-daemon threads in a list in order to join on them at server close. | ||
daemon_threads = True | ||
|
||
|
||
class _SilentHandler(WSGIRequestHandler): | ||
"""WSGI handler that does not log requests.""" | ||
|
||
# pylint: disable=W0622 | ||
def log_message(self, format, *args): | ||
"""Log nothing.""" | ||
|
||
|
||
def get_family(address, port): | ||
infos = socket.getaddrinfo(address, port) | ||
family, _, _, _, sockaddr = next(iter(infos)) | ||
return family, sockaddr[0] | ||
|
||
|
||
class PrometheusClient(StatsClient): | ||
server_is_active: bool = False | ||
|
||
def __init__(self, config: Config) -> None: | ||
super().__init__(config) | ||
self.lock = threading.Lock() | ||
self.httpd = None | ||
self.thread = None | ||
with self.lock: | ||
_host = config.get("prometheus_host", None) | ||
_port = config.get("prometheus_port", None) | ||
if _host is None: | ||
raise PrometheusException("prometheus_host host is undefined") | ||
if _port is None: | ||
raise PrometheusException("prometheus_host port is undefined") | ||
if not PrometheusClient.server_is_active: | ||
# We wrapped httpd server creation from prometheus client to allow stop this server""" | ||
self.start_server(_host, _port) | ||
|
||
PrometheusClient.server_is_active = True | ||
else: | ||
raise PrometheusException("Double instance of Prometheus interface") | ||
self._gauge: dict[str, Gauge] = dict() | ||
self._summary: dict[str, Summary] = dict() | ||
self._counter: dict[str, Counter] = dict() | ||
|
||
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: | ||
m = self._gauge.get(metric) | ||
if m is None: | ||
m = Gauge(metric, metric) | ||
self._gauge[metric] = m | ||
m.set(value) | ||
|
||
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: | ||
m = self._counter.get(metric) | ||
if m is None: | ||
m = Counter(metric, metric) | ||
self._counter[metric] = m | ||
m.inc(inc_value) | ||
|
||
def timing(self, metric: str, value: float, tags: dict | None = None) -> None: | ||
m = self._summary.get(metric) | ||
if m is None: | ||
m = Summary(metric, metric) | ||
self._summary[metric] = m | ||
m.observe(value) | ||
|
||
def start_server(self, addr: str, port: int) -> None: | ||
class TmpServer(ThreadingWSGIServer): | ||
pass | ||
|
||
TmpServer.address_family, addr = get_family(addr, port) | ||
app = make_wsgi_app(REGISTRY) | ||
self.httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler) | ||
self.thread = threading.Thread(target=self.httpd.serve_forever) | ||
self.thread.daemon = True | ||
self.thread.start() | ||
|
||
def stop_server(self) -> None: | ||
self.httpd.shutdown() | ||
self.httpd.server_close() | ||
self.thread.join() | ||
|
||
def close(self): | ||
with self.lock: | ||
if self.server_is_active: | ||
self.stop_server() | ||
PrometheusClient.server_is_active = False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stats_client: StatsClient | None = None
, I've run this locally withmetrics_extended
set toFalse
and its failing because thestats_client
its not going to be setThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed and unit test for this case is added.