Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus client metrics support #711

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e3cb524
Karapace metrics
libretto Jun 9, 2023
32ce060
Merge branch 'master' into karapace-metrics
libretto Jun 9, 2023
8dab84d
fixup issues
libretto Jun 10, 2023
2898e31
fixup issues
libretto Jun 10, 2023
c974579
fixup annotations issue
libretto Jun 12, 2023
7256f5d
fixup exception message
libretto Jun 12, 2023
ab6ae96
get rid of multiple instances of class
libretto Jun 12, 2023
733d1f2
fixup issue
libretto Jun 12, 2023
8751eea
change code to send raw data only
libretto Jun 16, 2023
53d3e4b
merge with master
libretto Jun 16, 2023
fedff8f
fixup
libretto Jun 22, 2023
31d16d4
Merge branch 'master' into karapace-metrics
libretto Jun 22, 2023
b70ae03
fixup code
libretto Jun 22, 2023
a0387a3
fixup
libretto Jun 22, 2023
358facc
fixup
libretto Jun 22, 2023
a064624
merge
libretto Jul 3, 2023
8533959
improve code by request
libretto Aug 8, 2023
ac48829
merge with main
libretto Aug 8, 2023
90e221c
add psutil typing support
libretto Aug 8, 2023
4c48576
fixup
libretto Aug 8, 2023
f9cb6d8
fixup
libretto Aug 8, 2023
765864b
Merge branch 'main' into karapace-metrics
libretto Aug 30, 2023
073aa16
merge with master
libretto Sep 2, 2023
0c73a1a
refactor
libretto Sep 2, 2023
c495c50
fixup
libretto Sep 2, 2023
6fb96d0
prometheus support
libretto Sep 6, 2023
1f6fce2
fixup requirements
libretto Sep 14, 2023
569d5ef
merge with master
libretto Dec 20, 2023
0386ca9
remove connections counter
libretto Dec 20, 2023
5f302a8
fixup lint
libretto Dec 20, 2023
c785e1a
skip the README.rst updates
libretto Dec 21, 2023
5da8ca8
fixup metrics stats usage
libretto Jan 10, 2024
6f2093e
Merge branch 'master' into prometheus2
libretto Jan 10, 2024
03f1189
merge with master
libretto Jan 25, 2024
2c06480
merge with master and fixup conflict
libretto Feb 13, 2024
7e71ecb
merge with master
libretto Feb 21, 2024
c66b48f
merge with master
libretto Apr 3, 2024
80d92c0
add unit tests
libretto May 22, 2024
dafa3f2
pylint fixes
libretto May 22, 2024
c7e6796
merge with main branch
libretto May 23, 2024
a687a33
fixup test issues
libretto May 30, 2024
8da1e33
fixup prometheus client issue
libretto May 30, 2024
2b019c8
improve tests
libretto May 31, 2024
44cd5b8
add test
libretto Jun 12, 2024
13abf60
merge with main
libretto Jun 13, 2024
c8fa58d
fixup Metrics issues
libretto Jun 13, 2024
fc90f72
fixup Metrics issues
libretto Jun 13, 2024
33e4155
fixup minor issue
libretto Jun 13, 2024
b2d4eaa
pylint issue
libretto Jun 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats

Supports base class for statsd and prometheus protocols:

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
12 changes: 12 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class Config(TypedDict):
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -150,6 +156,12 @@ class ConfigDefaults(Config, total=False):
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
103 changes: 103 additions & 0 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import threading


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class Metrics(metaclass=Singleton):
stats_client: StatsClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stats_client: StatsClient | None = None, I've run this locally with metrics_extended set to False and its failing because the stats_client its not going to be set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and unit test for this case is added.


def __init__(
self,
) -> None:
self.is_ready = False
self.lock = threading.Lock()
self.request_size_total = 0
self.request_count = 0

def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return

stats_service = config.get("stats_service")
if not config.get("metrics_extended"):
return
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
self.is_ready = True

def request(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("request-size-total", size)
self.stats_client.increase("request-count", 1)

def response(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("response-size-total", size)
self.stats_client.increase("response-count", 1)

def are_we_master(self, is_master: bool) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error-total", 1)

def cleanup(self) -> None:
if self.stats_client:
self.stats_client.close()
if not self.is_ready:
return
69 changes: 69 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
karapace - prometheus

Supports telegraf's statsd protocol extension for 'key=value' tags:

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, start_http_server, Summary
from typing import Final

import logging

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class PrometheusClient(StatsClient):
server_is_active = False

def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None:
super().__init__(config)

_host = config.get("prometheus_host") if "prometheus_host" in config else host
_port = config.get("prometheus_port") if "prometheus_port" in config else port
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not self.server_is_active:
start_http_server(_port, _host)
self.server_is_active = True
else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)
18 changes: 15 additions & 3 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from accept_types import get_best_match
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.statsd import StatsClient
from karapace.metrics import Metrics
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Callable, Dict, NoReturn, Optional, overload, Union
Expand Down Expand Up @@ -134,6 +134,8 @@ def __init__(
if content_type:
self.headers["Content-Type"] = content_type
super().__init__(f"HTTPResponse {status.value}")
if not is_success(status):
Metrics().error()

def ok(self) -> bool:
"""True if resposne has a 2xx status_code"""
Expand Down Expand Up @@ -166,7 +168,8 @@ def __init__(
self.app_request_metric = f"{app_name}_request"
self.app = self._create_aiohttp_application(config=config)
self.log = logging.getLogger(self.app_name)
self.stats = StatsClient(config=config)
Metrics().setup(config)
self.stats = Metrics().stats_client
self.app.on_shutdown.append(self.close_by_app)
self.not_ready_handler = not_ready_handler

Expand All @@ -183,7 +186,7 @@ async def close(self) -> None:
set as hook because the awaitables have to run inside the event loop
created by the aiohttp library.
"""
self.stats.close()
Metrics().cleanup()

@staticmethod
def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument
Expand Down Expand Up @@ -269,15 +272,21 @@ async def _handle_request(
url=request.url,
path_for_stats=path_for_stats,
)

try:
if request.method == "OPTIONS":
origin = request.headers.get("Origin")
if not origin:
raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST)
headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin)

raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers)

body = await request.read()
if body:
Metrics().request(len(body))
else:
Metrics().request(0)
if json_request:
if not body:
raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST)
Expand Down Expand Up @@ -393,6 +402,7 @@ async def _handle_request(
)
headers = {"Content-Type": "application/json"}
resp = aiohttp.web.Response(body=body, status=status.value, headers=headers)

except asyncio.CancelledError:
self.log.debug("Client closed connection")
raise
Expand All @@ -401,6 +411,8 @@ async def _handle_request(
self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url)
resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value)
finally:
Metrics().response(resp.content_length)
Metrics().latency((time.monotonic() - start_time) * 1000)
self.stats.timing(
self.app_request_metric,
time.monotonic() - start_time,
Expand Down
5 changes: 3 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.master_coordinator import MasterCoordinator
from karapace.metrics import Metrics
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, JSONDecodeError
from threading import Event, Thread
Expand Down Expand Up @@ -126,7 +126,8 @@ def __init__(
self.topic_replication_factor = self.config["replication_factor"]
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
Metrics().setup(config=config)
self.stats = Metrics().stats_client

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down
2 changes: 2 additions & 0 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from karapace.key_format import KeyFormatter
from karapace.master_coordinator import MasterCoordinator
from karapace.messaging import KarapaceProducer
from karapace.metrics import Metrics
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
Expand Down Expand Up @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
elif not ignore_readiness and self.schema_reader.ready is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
Metrics().are_we_master(are_we_master)
return are_we_master, master_url
await asyncio.sleep(1.0)

Expand Down
Loading
Loading