Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus histograms #1165

Merged
merged 9 commits into from
Jul 19, 2021
2 changes: 1 addition & 1 deletion elasticapm/contrib/django/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ElasticAPMClientMiddlewareMixin(object):
@property
def client(self):
try:
app = apps.get_app_config("elasticapm.contrib.django")
app = apps.get_app_config("elasticapm")
return app.client
except LookupError:
return get_client()
Expand Down
131 changes: 95 additions & 36 deletions elasticapm/metrics/base_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, registry):
self._counters = {}
self._gauges = {}
self._timers = {}
self._histograms = {}
self._registry = registry
self._label_limit_logged = False

Expand Down Expand Up @@ -155,7 +156,10 @@ def timer(self, name, reset_on_collect=False, unit=None, **labels):
"""
return self._metric(self._timers, Timer, name, reset_on_collect, labels, unit)

def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None):
def histogram(self, name, reset_on_collect=False, unit=None, buckets=None, **labels):
return self._metric(self._histograms, Histogram, name, reset_on_collect, labels, unit, buckets=buckets)

def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None, **kwargs):
"""
Returns an existing or creates and returns a metric
:param container: the container for the metric
Expand All @@ -172,7 +176,10 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=
if key not in container:
if any(pattern.match(name) for pattern in self._registry.ignore_patterns):
metric = noop_metric
elif len(self._gauges) + len(self._counters) + len(self._timers) >= DISTINCT_LABEL_LIMIT:
elif (
len(self._gauges) + len(self._counters) + len(self._timers) + len(self._histograms)
>= DISTINCT_LABEL_LIMIT
):
if not self._label_limit_logged:
self._label_limit_logged = True
logger.warning(
Expand All @@ -181,7 +188,7 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=
)
metric = noop_metric
else:
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit)
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit, **kwargs)
container[key] = metric
return container[key]

Expand All @@ -202,33 +209,42 @@ def collect(self):
samples = defaultdict(dict)
if self._counters:
# iterate over a copy of the dict to avoid threading issues, see #717
for (name, labels), c in compat.iteritems(self._counters.copy()):
if c is not noop_metric:
val = c.val
if val or not c.reset_on_collect:
for (name, labels), counter in compat.iteritems(self._counters.copy()):
if counter is not noop_metric:
val = counter.val
if val or not counter.reset_on_collect:
samples[labels].update({name: {"value": val}})
if c.reset_on_collect:
c.reset()
if counter.reset_on_collect:
counter.reset()
if self._gauges:
for (name, labels), g in compat.iteritems(self._gauges.copy()):
if g is not noop_metric:
val = g.val
if val or not g.reset_on_collect:
samples[labels].update({name: {"value": val}})
if g.reset_on_collect:
g.reset()
for (name, labels), gauge in compat.iteritems(self._gauges.copy()):
if gauge is not noop_metric:
val = gauge.val
if val or not gauge.reset_on_collect:
samples[labels].update({name: {"value": val, "type": "gauge"}})
if gauge.reset_on_collect:
gauge.reset()
if self._timers:
for (name, labels), t in compat.iteritems(self._timers.copy()):
if t is not noop_metric:
val, count = t.val
if val or not t.reset_on_collect:
for (name, labels), timer in compat.iteritems(self._timers.copy()):
if timer is not noop_metric:
val, count = timer.val
if val or not timer.reset_on_collect:
sum_name = ".sum"
if t._unit:
sum_name += "." + t._unit
if timer._unit:
sum_name += "." + timer._unit
samples[labels].update({name + sum_name: {"value": val}})
samples[labels].update({name + ".count": {"value": count}})
if t.reset_on_collect:
t.reset()
if timer.reset_on_collect:
timer.reset()
if self._histograms:
for (name, labels), histo in compat.iteritems(self._histograms.copy()):
if histo is not noop_metric:
counts = histo.val
if counts or not histo.reset_on_collect:
samples[labels].update({name: {"counts": counts, "values": histo.buckets, "type": "histogram"}})
if histo.reset_on_collect:
histo.reset()

if samples:
for labels, sample in compat.iteritems(samples):
result = {"samples": sample, "timestamp": timestamp}
Expand Down Expand Up @@ -263,8 +279,16 @@ def before_yield(self, data):
return data


class Counter(object):
__slots__ = ("name", "_lock", "_initial_value", "_val", "reset_on_collect")
class BaseMetric(object):
__slots__ = ("name", "reset_on_collect")

def __init__(self, name, reset_on_collect=False, **kwargs):
self.name = name
self.reset_on_collect = reset_on_collect


class Counter(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_lock", "_initial_value", "_val")

def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None):
"""
Expand All @@ -273,10 +297,9 @@ def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None):
:param initial_value: initial value of the counter, defaults to 0
:param unit: unit of the observed counter. Unused for counters
"""
self.name = name
self._lock = threading.Lock()
self._val = self._initial_value = initial_value
self.reset_on_collect = reset_on_collect
super(Counter, self).__init__(name, reset_on_collect=reset_on_collect)

def inc(self, delta=1):
"""
Expand Down Expand Up @@ -318,18 +341,17 @@ def val(self, value):
self._val = value


class Gauge(object):
__slots__ = ("name", "_val", "reset_on_collect")
class Gauge(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_val",)

def __init__(self, name, reset_on_collect=False, unit=None):
"""
Creates a new gauge
:param name: label of the gauge
:param unit of the observed gauge. Unused for gauges
"""
self.name = name
self._val = None
self.reset_on_collect = reset_on_collect
super(Gauge, self).__init__(name, reset_on_collect=reset_on_collect)

@property
def val(self):
Expand All @@ -343,16 +365,15 @@ def reset(self):
self._val = 0


class Timer(object):
__slots__ = ("name", "_val", "_count", "_lock", "_unit", "reset_on_collect")
class Timer(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit")

def __init__(self, name=None, reset_on_collect=False, unit=None):
self.name = name
self._val = 0
self._count = 0
self._unit = unit
self._lock = threading.Lock()
self.reset_on_collect = reset_on_collect
super(Timer, self).__init__(name, reset_on_collect=reset_on_collect)

def update(self, duration, count=1):
with self._lock:
Expand All @@ -375,6 +396,44 @@ def val(self, value):
self._val, self._count = value


class Histogram(BaseMetric):
DEFAULT_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, float("inf"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw what are the default buckets that APM Server creates for transaction duration metrics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many to list! APM Server uses HDRHistogram with 2 significant figures by default.


__slots__ = BaseMetric.__slots__ + ("_lock", "_buckets", "_counts", "_lock", "_unit")

def __init__(self, name=None, reset_on_collect=False, unit=None, buckets=None):
self._lock = threading.Lock()
self._buckets = buckets or Histogram.DEFAULT_BUCKETS
self._counts = [0] * len(self._buckets)
self._unit = unit
super(Histogram, self).__init__(name, reset_on_collect=reset_on_collect)

def update(self, value, count=1):
pos = 0
while value > self._buckets[pos]:
pos += 1
with self._lock:
self._counts[pos] += count

@property
def val(self):
with self._lock:
return self._counts

@val.setter
def val(self, value):
with self._lock:
self._counts = value

@property
def buckets(self):
return self._buckets

def reset(self):
with self._lock:
self._counts = [0] * len(self._buckets)


class NoopMetric(object):
"""
A no-op metric that implements the "interface" of both Counter and Gauge.
Expand Down
52 changes: 47 additions & 5 deletions elasticapm/metrics/sets/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def before_collect(self):
metric_type = self.METRIC_MAP.get(metric.type, None)
if not metric_type:
continue
metric_type(self, metric.name, metric.samples)
metric_type(self, metric.name, metric.samples, metric.unit)

def _prom_counter_handler(self, name, samples):
def _prom_counter_handler(self, name, samples, unit):
# Counters can be converted 1:1 from Prometheus to our
# format. Each pair of samples represents a distinct labelset for a
# given name. The pair consists of the value, and a "created" timestamp.
Expand All @@ -58,7 +58,7 @@ def _prom_counter_handler(self, name, samples):
self._registry.client.config.prometheus_metrics_prefix + name, **total_sample.labels
).val = total_sample.value

def _prom_gauge_handler(self, name, samples):
def _prom_gauge_handler(self, name, samples, unit):
# Counters can be converted 1:1 from Prometheus to our
# format. Each sample represents a distinct labelset for a
# given name
Expand All @@ -67,7 +67,7 @@ def _prom_gauge_handler(self, name, samples):
self._registry.client.config.prometheus_metrics_prefix + name, **sample.labels
).val = sample.value

def _prom_summary_handler(self, name, samples):
def _prom_summary_handler(self, name, samples, unit):
# Prometheus Summaries are analogous to our Timers, having
# a count and a sum. A prometheus summary is represented by
# three values. The list of samples for a given name can be
Expand All @@ -79,7 +79,49 @@ def _prom_summary_handler(self, name, samples):
count_sample.value,
)

METRIC_MAP = {"counter": _prom_counter_handler, "gauge": _prom_gauge_handler, "summary": _prom_summary_handler}
def _prom_histogram_handler(self, name, samples, unit):
# Prometheus histograms are structured as a series of counts
# with an "le" label. The count of each label signifies all
# observations with a lower-or-equal value with respect to
# the "le" label value.
# After the le-samples, 3 more samples follow, with an overall
# count, overall sum, and creation timestamp.
sample_pos = 0
name = self._registry.client.config.prometheus_metrics_prefix + name
hist_samples = []
while sample_pos < len(samples):
sample = samples[sample_pos]
if "le" in sample.labels:
hist_samples.append((float(sample.labels["le"]), sample))
sample_pos += 1
else:
# we reached the end of one set of buckets/values, this is the "count" sample
counts = []
values = []
prev_sample = None
prev_le = 0
for i, (le, hist_sample) in enumerate(hist_samples):
if i == 0:
val = le if le < 0 else le / 2.0
elif le == float("+Inf"):
val = prev_le
else:
val = prev_le + (le - prev_le) / 2.0
values.append(val)
counts.append(int(hist_sample.value - (prev_sample.value if prev_sample else 0)))
prev_le = le
prev_sample = hist_sample
self.histogram(name, unit=unit, buckets=values, **sample.labels).val = counts

hist_samples = []
sample_pos += 3 # skip sum/created samples

METRIC_MAP = {
"counter": _prom_counter_handler,
"gauge": _prom_gauge_handler,
"summary": _prom_summary_handler,
"histogram": _prom_histogram_handler,
}


def grouper(iterable, n, fillvalue=None):
Expand Down
20 changes: 20 additions & 0 deletions tests/metrics/base_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ def test_metrics_counter(elasticapm_client):
assert data["samples"]["x"]["value"] == 0


def test_metrics_histogram(elasticapm_client):
metricset = MetricsSet(MetricsRegistry(elasticapm_client))
hist = metricset.histogram("x", buckets=[1, 10, 100])
assert len(hist.buckets) == 4
assert hist.buckets[3] == float("inf")

hist.update(0.3)
hist.update(1)
hist.update(5)
hist.update(20)
hist.update(100)
hist.update(1000)

data = list(metricset.collect())
assert len(data) == 1
d = data[0]
assert d["samples"]["x"]["counts"] == [2, 1, 2, 1]
assert d["samples"]["x"]["values"] == [1, 10, 100, float("inf")]


def test_metrics_labels(elasticapm_client):
metricset = MetricsSet(MetricsRegistry(elasticapm_client))
metricset.counter("x", mylabel="a").inc()
Expand Down
31 changes: 31 additions & 0 deletions tests/metrics/prometheus_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,34 @@ def test_summary(elasticapm_client):
assert data[2]["samples"]["prometheus.metrics.summary_with_labels.count"]["value"] == 1.0
assert data[2]["samples"]["prometheus.metrics.summary_with_labels.sum"]["value"] == 11.0
assert data[2]["tags"] == {"alabel": "bar", "anotherlabel": "bazzinga"}


def test_histogram(elasticapm_client):
metricset = PrometheusMetrics(MetricsRegistry(elasticapm_client))
histo = prometheus_client.Histogram("test", "test histogram", buckets=[1, 10, 100, float("inf")])
histo_with_labels = prometheus_client.Histogram(
"testwithlabel", "test histogram with labels", ["alabel", "anotherlabel"], buckets=[1, 10, 100, float("inf")]
)
histo.observe(0.5)
histo.observe(0.6)
histo.observe(1.5)
histo.observe(26)
histo.observe(42)
histo.observe(12)
histo.observe(105)
histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(1)
histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(10)
histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(100)
histo_with_labels.labels(alabel="foo", anotherlabel="bazzinga").observe(1000)
data = list(metricset.collect())
assert len(data) == 3, data
assert data[0]["samples"]["prometheus.metrics.test"]["values"] == [0.5, 5.5, 55.0, 100.0]
assert data[0]["samples"]["prometheus.metrics.test"]["counts"] == [2, 1, 3, 1]

assert data[1]["samples"]["prometheus.metrics.testwithlabel"]["values"] == [0.5, 5.5, 55.0, 100.0]
assert data[1]["samples"]["prometheus.metrics.testwithlabel"]["counts"] == [1, 1, 1, 0]
assert data[1]["tags"] == {"alabel": "foo", "anotherlabel": "baz"}

assert data[2]["samples"]["prometheus.metrics.testwithlabel"]["values"] == [0.5, 5.5, 55.0, 100.0]
assert data[2]["samples"]["prometheus.metrics.testwithlabel"]["counts"] == [0, 0, 0, 1]
assert data[2]["tags"] == {"alabel": "foo", "anotherlabel": "bazzinga"}