-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prometheus histograms #1165
Prometheus histograms #1165
Changes from 4 commits
e5107df
3bc05d5
dd85ad9
7626a96
ab2327b
99ffc0d
6c88c43
5d328c7
ff607b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,6 +121,7 @@ def __init__(self, registry): | |
self._counters = {} | ||
self._gauges = {} | ||
self._timers = {} | ||
self._histograms = {} | ||
self._registry = registry | ||
self._label_limit_logged = False | ||
|
||
|
@@ -155,7 +156,10 @@ def timer(self, name, reset_on_collect=False, unit=None, **labels): | |
""" | ||
return self._metric(self._timers, Timer, name, reset_on_collect, labels, unit) | ||
|
||
def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None): | ||
def histogram(self, name, reset_on_collect=False, unit=None, buckets=None, **labels): | ||
return self._metric(self._histograms, Histogram, name, reset_on_collect, labels, unit, buckets=buckets) | ||
|
||
def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None, **kwargs): | ||
""" | ||
Returns an existing or creates and returns a metric | ||
:param container: the container for the metric | ||
|
@@ -172,7 +176,10 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit= | |
if key not in container: | ||
if any(pattern.match(name) for pattern in self._registry.ignore_patterns): | ||
metric = noop_metric | ||
elif len(self._gauges) + len(self._counters) + len(self._timers) >= DISTINCT_LABEL_LIMIT: | ||
elif ( | ||
len(self._gauges) + len(self._counters) + len(self._timers) + len(self._histograms) | ||
>= DISTINCT_LABEL_LIMIT | ||
): | ||
if not self._label_limit_logged: | ||
self._label_limit_logged = True | ||
logger.warning( | ||
|
@@ -181,7 +188,7 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit= | |
) | ||
metric = noop_metric | ||
else: | ||
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit) | ||
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit, **kwargs) | ||
container[key] = metric | ||
return container[key] | ||
|
||
|
@@ -202,33 +209,42 @@ def collect(self): | |
samples = defaultdict(dict) | ||
if self._counters: | ||
# iterate over a copy of the dict to avoid threading issues, see #717 | ||
for (name, labels), c in compat.iteritems(self._counters.copy()): | ||
if c is not noop_metric: | ||
val = c.val | ||
if val or not c.reset_on_collect: | ||
samples[labels].update({name: {"value": val}}) | ||
if c.reset_on_collect: | ||
c.reset() | ||
for (name, labels), counter in compat.iteritems(self._counters.copy()): | ||
if counter is not noop_metric: | ||
val = counter.val | ||
if val or not counter.reset_on_collect: | ||
samples[labels].update({name: {"value": val, "type": "counter"}}) | ||
if counter.reset_on_collect: | ||
counter.reset() | ||
if self._gauges: | ||
for (name, labels), g in compat.iteritems(self._gauges.copy()): | ||
if g is not noop_metric: | ||
val = g.val | ||
if val or not g.reset_on_collect: | ||
samples[labels].update({name: {"value": val}}) | ||
if g.reset_on_collect: | ||
g.reset() | ||
for (name, labels), gauge in compat.iteritems(self._gauges.copy()): | ||
if gauge is not noop_metric: | ||
val = gauge.val | ||
if val or not gauge.reset_on_collect: | ||
samples[labels].update({name: {"value": val, "type": "gauge"}}) | ||
if gauge.reset_on_collect: | ||
gauge.reset() | ||
if self._timers: | ||
for (name, labels), t in compat.iteritems(self._timers.copy()): | ||
if t is not noop_metric: | ||
val, count = t.val | ||
if val or not t.reset_on_collect: | ||
for (name, labels), timer in compat.iteritems(self._timers.copy()): | ||
if timer is not noop_metric: | ||
val, count = timer.val | ||
if val or not timer.reset_on_collect: | ||
sum_name = ".sum" | ||
if t._unit: | ||
sum_name += "." + t._unit | ||
if timer._unit: | ||
sum_name += "." + timer._unit | ||
samples[labels].update({name + sum_name: {"value": val}}) | ||
samples[labels].update({name + ".count": {"value": count}}) | ||
if t.reset_on_collect: | ||
t.reset() | ||
if timer.reset_on_collect: | ||
timer.reset() | ||
if self._histograms: | ||
for (name, labels), histo in compat.iteritems(self._histograms.copy()): | ||
if histo is not noop_metric: | ||
counts = histo.val | ||
if counts or not histo.reset_on_collect: | ||
samples[labels].update({name: {"counts": counts, "values": histo.buckets, "type": "histogram"}}) | ||
if histo.reset_on_collect: | ||
histo.reset() | ||
|
||
if samples: | ||
for labels, sample in compat.iteritems(samples): | ||
result = {"samples": sample, "timestamp": timestamp} | ||
|
@@ -263,8 +279,16 @@ def before_yield(self, data): | |
return data | ||
|
||
|
||
class Counter(object): | ||
__slots__ = ("name", "_lock", "_initial_value", "_val", "reset_on_collect") | ||
class BaseMetric(object): | ||
__slots__ = ("name", "reset_on_collect") | ||
|
||
def __init__(self, name, reset_on_collect=False, **kwargs): | ||
self.name = name | ||
self.reset_on_collect = reset_on_collect | ||
|
||
|
||
class Counter(BaseMetric): | ||
__slots__ = BaseMetric.__slots__ + ("_lock", "_initial_value", "_val") | ||
|
||
def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None): | ||
""" | ||
|
@@ -273,10 +297,9 @@ def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None): | |
:param initial_value: initial value of the counter, defaults to 0 | ||
:param unit: unit of the observed counter. Unused for counters | ||
""" | ||
self.name = name | ||
self._lock = threading.Lock() | ||
self._val = self._initial_value = initial_value | ||
self.reset_on_collect = reset_on_collect | ||
super(Counter, self).__init__(name, reset_on_collect=reset_on_collect) | ||
|
||
def inc(self, delta=1): | ||
""" | ||
|
@@ -318,18 +341,17 @@ def val(self, value): | |
self._val = value | ||
|
||
|
||
class Gauge(object): | ||
__slots__ = ("name", "_val", "reset_on_collect") | ||
class Gauge(BaseMetric): | ||
__slots__ = BaseMetric.__slots__ + ("_val",) | ||
|
||
def __init__(self, name, reset_on_collect=False, unit=None): | ||
""" | ||
Creates a new gauge | ||
:param name: label of the gauge | ||
:param unit of the observed gauge. Unused for gauges | ||
""" | ||
self.name = name | ||
self._val = None | ||
self.reset_on_collect = reset_on_collect | ||
super(Gauge, self).__init__(name, reset_on_collect=reset_on_collect) | ||
|
||
@property | ||
def val(self): | ||
|
@@ -343,16 +365,15 @@ def reset(self): | |
self._val = 0 | ||
|
||
|
||
class Timer(object): | ||
__slots__ = ("name", "_val", "_count", "_lock", "_unit", "reset_on_collect") | ||
class Timer(BaseMetric): | ||
__slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit") | ||
|
||
def __init__(self, name=None, reset_on_collect=False, unit=None): | ||
self.name = name | ||
self._val = 0 | ||
self._count = 0 | ||
self._unit = unit | ||
self._lock = threading.Lock() | ||
self.reset_on_collect = reset_on_collect | ||
super(Timer, self).__init__(name, reset_on_collect=reset_on_collect) | ||
|
||
def update(self, duration, count=1): | ||
with self._lock: | ||
|
@@ -375,6 +396,46 @@ def val(self, value): | |
self._val, self._count = value | ||
|
||
|
||
class Histogram(BaseMetric): | ||
DEFAULT_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, float("inf")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @axw what are the default buckets that APM Server creates for transaction duration metrics? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Too many to list! APM Server uses HDRHistogram with 2 significant figures by default. |
||
|
||
__slots__ = BaseMetric.__slots__ + ("_lock", "_buckets", "_counts", "_lock", "_unit") | ||
|
||
def __init__(self, name=None, reset_on_collect=False, unit=None, buckets=None): | ||
self._lock = threading.Lock() | ||
self._buckets = buckets or Histogram.DEFAULT_BUCKETS | ||
if self._buckets[-1] < float("inf"): | ||
self._buckets.append(float("inf")) | ||
self._counts = [0] * len(self._buckets) | ||
self._unit = unit | ||
super(Histogram, self).__init__(name, reset_on_collect=reset_on_collect) | ||
|
||
def update(self, value, count=1): | ||
pos = 0 | ||
while value > self._buckets[pos]: | ||
pos += 1 | ||
with self._lock: | ||
self._counts[pos] += count | ||
|
||
@property | ||
def val(self): | ||
with self._lock: | ||
return self._counts | ||
|
||
@val.setter | ||
def val(self, value): | ||
with self._lock: | ||
self._counts = value | ||
|
||
@property | ||
def buckets(self): | ||
return self._buckets | ||
|
||
def reset(self): | ||
with self._lock: | ||
self._counts = [0] * len(self._buckets) | ||
|
||
|
||
class NoopMetric(object): | ||
""" | ||
A no-op metric that implements the "interface" of both Counter and Gauge. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,9 +46,9 @@ def before_collect(self): | |
metric_type = self.METRIC_MAP.get(metric.type, None) | ||
if not metric_type: | ||
continue | ||
metric_type(self, metric.name, metric.samples) | ||
metric_type(self, metric.name, metric.samples, metric.unit) | ||
|
||
def _prom_counter_handler(self, name, samples): | ||
def _prom_counter_handler(self, name, samples, unit): | ||
# Counters can be converted 1:1 from Prometheus to our | ||
# format. Each pair of samples represents a distinct labelset for a | ||
# given name. The pair consists of the value, and a "created" timestamp. | ||
|
@@ -58,7 +58,7 @@ def _prom_counter_handler(self, name, samples): | |
self._registry.client.config.prometheus_metrics_prefix + name, **total_sample.labels | ||
).val = total_sample.value | ||
|
||
def _prom_gauge_handler(self, name, samples): | ||
def _prom_gauge_handler(self, name, samples, unit): | ||
# Counters can be converted 1:1 from Prometheus to our | ||
# format. Each sample represents a distinct labelset for a | ||
# given name | ||
|
@@ -67,7 +67,7 @@ def _prom_gauge_handler(self, name, samples): | |
self._registry.client.config.prometheus_metrics_prefix + name, **sample.labels | ||
).val = sample.value | ||
|
||
def _prom_summary_handler(self, name, samples): | ||
def _prom_summary_handler(self, name, samples, unit): | ||
# Prometheus Summaries are analogous to our Timers, having | ||
# a count and a sum. A prometheus summary is represented by | ||
# three values. The list of samples for a given name can be | ||
|
@@ -79,7 +79,40 @@ def _prom_summary_handler(self, name, samples): | |
count_sample.value, | ||
) | ||
|
||
METRIC_MAP = {"counter": _prom_counter_handler, "gauge": _prom_gauge_handler, "summary": _prom_summary_handler} | ||
def _prom_histogram_handler(self, name, samples, unit): | ||
# Prometheus histograms are structured as a series of counts | ||
# with an "le" label. The count of each label signifies all | ||
# observations with a lower-or-equal value with respect to | ||
# the "le" label value. | ||
# After the le-samples, 3 more samples follow, with an overall | ||
# count, overall sum, and creation timestamp. | ||
sample_pos = 0 | ||
prev_val = 0 | ||
counts = [] | ||
values = [] | ||
name = self._registry.client.config.prometheus_metrics_prefix + name | ||
while sample_pos < len(samples): | ||
sample = samples[sample_pos] | ||
if "le" in sample.labels: | ||
values.append(float(sample.labels["le"])) | ||
counts.append(sample.value - prev_val) | ||
prev_val = sample.value | ||
sample_pos += 1 | ||
|
||
else: | ||
# we reached the end of one set of buckets/values, this is the "count" sample | ||
self.histogram(name, unit=unit, buckets=values, **sample.labels).val = counts | ||
prev_val = 0 | ||
counts = [] | ||
values = [] | ||
sample_pos += 3 # skip sum/created samples | ||
|
||
METRIC_MAP = { | ||
"counter": _prom_counter_handler, | ||
"gauge": _prom_gauge_handler, | ||
"summary": _prom_summary_handler, | ||
"histogram": _prom_histogram_handler, | ||
} | ||
Comment on lines
+89
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In APM Server we have some code for handling OpenTelemetry Metrics, following Prometheus's histogram_quantile method: https://github.com/elastic/apm-server/blob/f7c7b0c6873d40f533c072b3d9751c8fe285e63e/processor/otel/metrics.go#L191-L252 I think we should probably follow the same approach here:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that the same strategy that metricbeat is using to convert prometheus histograms? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good question. Looking at https://github.com/elastic/beats/blob/master/x-pack/metricbeat/module/prometheus/collector/histogram.go, no - there's some similarity, but it's not the same. Also I miswrote the rules above, now corrected. Metricbeat is doing the following:
@exekias WDYT about changing Metricbeat's implementation to what I outlined in #1165 (comment)? Or otherwise, can you elaborate on the rationale behind the current implementation? We should probably all do the same thing. In retrospect, it would be helpful if Elasticsearch accepted bucket ranges 😅
t-digest. Prometheus histograms are general and may have negative values, which won't work with HDRHistogram. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point, we've discussed whether to encode in _meta if the histogram is optimized for tdigest or hdrhistogram. Do you remember where we landed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was in an email thread with Lens folks. I think everyone agreed that it was a reasonable idea, but I haven't taken the idea any further yet. Lens will default to (like Elasticsearch) tdigest, and there's an issue open to support hdrhistogram: elastic/kibana#98499. I've thought a little more about that since, and I think we would only ever end up setting it for known fields like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I never thought about negative There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the underlying measurement can be negative (energy flow, temperature, monetary values etc), the bucket limits also have to be negative, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did a little bit more of research, and negative buckets are indeed a possibility, although so badly supported that I don't think they can be used for real: https://github.com/prometheus/client_golang/blob/cb5c8ff250fc376cc4088d824a262565fde83612/prometheus/histogram.go#L50-L56 Anyway I'm happy to adjust the implementation, as it won't change anything for positive buckets. Any thoughts on why putting +Inf on the last bucket value? My intent with interpolating a higher value so it moves the needle as much as previous midpoints. Again... if many values are ending in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I assume this question is for me, based on the description in my initial comment. It's not
My thinking is that since we don't have any insight into that last bucket, so reporting anything greater than the second last bucket's upper limit risks overstating the value. Reporting the second last bucket's upper limit is a bit more conservative, ensuring that we report a valid lower bound for percentile queries that fall beyond the configured buckets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the detailed explanation, I've created elastic/beats#26903 to adjust Beats implementation 👍 |
||
|
||
|
||
def grouper(iterable, n, fillvalue=None): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should omit
"type": "counter"
if the counter is resetting; just omit "type" altogether for now. Counter metrics are expected to be monotonically increasing.