From 5a0dae96da92f26e523775678425b857e0ac3d79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 18 Feb 2020 22:59:43 -0500 Subject: [PATCH] Implement MinMaxSumCount aggregator (#422) Adding one of the core aggregators in the metrics API. This aggregator is the default aggregator for measure metrics and keeps the minimum, maximum, sum and count of those measures. --- examples/metrics/simple_example.py | 106 ++++++++++++++++++ examples/metrics/stateful.py | 72 ------------ examples/metrics/stateless.py | 57 ---------- .../sdk/metrics/export/aggregate.py | 50 +++++++++ .../sdk/metrics/export/batcher.py | 7 +- .../tests/metrics/export/test_export.py | 89 ++++++++++++++- .../tests/metrics/test_metrics.py | 35 +++--- 7 files changed, 264 insertions(+), 152 deletions(-) create mode 100644 examples/metrics/simple_example.py delete mode 100644 examples/metrics/stateful.py delete mode 100644 examples/metrics/stateless.py diff --git a/examples/metrics/simple_example.py b/examples/metrics/simple_example.py new file mode 100644 index 00000000000..75da80b73ac --- /dev/null +++ b/examples/metrics/simple_example.py @@ -0,0 +1,106 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module serves as an example for a simple application using metrics +It shows: +- How to configure a meter passing a sateful or stateless. +- How to configure an exporter and how to create a controller. +- How to create some metrics intruments and how to capture data with them. +""" +import sys +import time + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import Counter, Measure, Meter +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher +from opentelemetry.sdk.metrics.export.controller import PushController + +batcher_mode = "stateful" + + +def usage(argv): + print("usage:") + print("{} [mode]".format(argv[0])) + print("mode: stateful (default) or stateless") + + +if len(sys.argv) >= 2: + batcher_mode = sys.argv[1] + if batcher_mode not in ("stateful", "stateless"): + print("bad mode specified.") + usage(sys.argv) + sys.exit(1) + +# Batcher used to collect all created metrics from meter ready for exporting +# Pass in True/False to indicate whether the batcher is stateful. +# True indicates the batcher computes checkpoints from over the process +# lifetime. +# False indicates the batcher computes checkpoints which describe the updates +# of a single collection period (deltas) +batcher = UngroupedBatcher(batcher_mode == "stateful") + +# If a batcher is not provided, a default batcher is used +# Meter is responsible for creating and recording metrics +metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) +meter = metrics.meter() + +# Exporter to export metrics to the console +exporter = ConsoleMetricsExporter() + +# A PushController collects metrics created from meter and exports it via the +# exporter every interval +controller = PushController(meter, exporter, 5) + +# Metric instruments allow to capture measurements +requests_counter = meter.create_metric( + "requests", "number of requests", 1, int, Counter, ("environment",) +) + +clicks_counter = meter.create_metric( + "clicks", "number of clicks", 1, int, Counter, ("environment",) +) + +requests_size = meter.create_metric( + "requests_size", "size of requests", 1, int, Measure, ("environment",) +) + +# Labelsets are used to identify key-values that are associated with a specific +# metric that you want to record. These are useful for pre-aggregation and can +# be used to store custom dimensions pertaining to a metric +staging_label_set = meter.get_label_set({"environment": "staging"}) +testing_label_set = meter.get_label_set({"environment": "testing"}) + +# Update the metric instruments using the direct calling convention +requests_size.record(100, staging_label_set) +requests_counter.add(25, staging_label_set) +# Sleep for 5 seconds, exported value should be 25 +time.sleep(5) + +requests_size.record(5000, staging_label_set) +requests_counter.add(50, staging_label_set) +# Exported value should be 75 +time.sleep(5) + +requests_size.record(2, testing_label_set) +requests_counter.add(35, testing_label_set) +# There should be two exported values 75 and 35, one for each labelset +time.sleep(5) + +clicks_counter.add(5, staging_label_set) +# There should be three exported values, labelsets can be reused for different +# metrics but will be recorded seperately, 75, 35 and 5 + +time.sleep(5) diff --git a/examples/metrics/stateful.py b/examples/metrics/stateful.py deleted file mode 100644 index c43f795e228..00000000000 --- a/examples/metrics/stateful.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -This module serves as an example for a simple application using metrics -Examples show how to recording affects the collection of metrics to be exported -""" -import time - -from opentelemetry import metrics -from opentelemetry.sdk.metrics import Counter, Meter -from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher -from opentelemetry.sdk.metrics.export.controller import PushController - -# Batcher used to collect all created metrics from meter ready for exporting -# Pass in true/false to indicate whether the batcher is stateful. True -# indicates the batcher computes checkpoints from over the process lifetime. -# False indicates the batcher computes checkpoints which describe the updates -# of a single collection period (deltas) -batcher = UngroupedBatcher(True) -# If a batcher is not provded, a default batcher is used -# Meter is responsible for creating and recording metrics -metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) -meter = metrics.meter() -# exporter to export metrics to the console -exporter = ConsoleMetricsExporter() -# controller collects metrics created from meter and exports it via the -# exporter every interval -controller = PushController(meter, exporter, 5) - -counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) -) - -counter2 = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) -) - -# Labelsets are used to identify key-values that are associated with a specific -# metric that you want to record. These are useful for pre-aggregation and can -# be used to store custom dimensions pertaining to a metric -label_set = meter.get_label_set({"environment": "staging"}) -label_set2 = meter.get_label_set({"environment": "testing"}) - -counter.add(25, label_set) -# We sleep for 5 seconds, exported value should be 25 -time.sleep(5) - -counter.add(50, label_set) -# exported value should be 75 -time.sleep(5) - -counter.add(35, label_set2) -# should be two exported values 75 and 35, one for each labelset -time.sleep(5) - -counter2.add(5, label_set) -# should be three exported values, labelsets can be reused for different -# metrics but will be recorded seperately, 75, 35 and 5 -time.sleep(5) diff --git a/examples/metrics/stateless.py b/examples/metrics/stateless.py deleted file mode 100644 index 69213cbddd3..00000000000 --- a/examples/metrics/stateless.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -This module serves as an example for a simple application using metrics -Examples show how to recording affects the collection of metrics to be exported -""" -import time - -from opentelemetry import metrics -from opentelemetry.sdk.metrics import Counter, Meter -from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher -from opentelemetry.sdk.metrics.export.controller import PushController - -# Batcher used to collect all created metrics from meter ready for exporting -# Pass in false for non-stateful batcher. Indicates the batcher computes -# checkpoints which describe the updates of a single collection period (deltas) -batcher = UngroupedBatcher(False) -# Meter is responsible for creating and recording metrics -metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) -meter = metrics.meter() -# exporter to export metrics to the console -exporter = ConsoleMetricsExporter() -# controller collects metrics created from meter and exports it via the -# exporter every interval -controller = PushController(meter, exporter, 5) - -counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) -) - -# Labelsets are used to identify key-values that are associated with a specific -# metric that you want to record. These are useful for pre-aggregation and can -# be used to store custom dimensions pertaining to a metric -label_set = meter.get_label_set({"environment": "staging"}) - -counter.add(25, label_set) -# We sleep for 5 seconds, exported value should be 25 -time.sleep(5) - -counter.add(50, label_set) -# exported value should be 50 due to non-stateful batcher -time.sleep(20) - -# Following exported values would be 0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 642fe1cdfe4..5c55ba038ac 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +from collections import namedtuple class Aggregator(abc.ABC): @@ -56,3 +57,52 @@ def take_checkpoint(self): def merge(self, other): self.checkpoint += other.checkpoint + + +class MinMaxSumCountAggregator(Aggregator): + """Agregator for Measure metrics that keeps min, max, sum and count.""" + + _TYPE = namedtuple("minmaxsumcount", "min max sum count") + + @classmethod + def _min(cls, val1, val2): + if val1 is None and val2 is None: + return None + return min(val1 or val2, val2 or val1) + + @classmethod + def _max(cls, val1, val2): + if val1 is None and val2 is None: + return None + return max(val1 or val2, val2 or val1) + + @classmethod + def _sum(cls, val1, val2): + if val1 is None and val2 is None: + return None + return (val1 or 0) + (val2 or 0) + + def __init__(self): + super().__init__() + self.current = self._TYPE(None, None, None, 0) + self.checkpoint = self._TYPE(None, None, None, 0) + + def update(self, value): + self.current = self._TYPE( + self._min(self.current.min, value), + self._max(self.current.max, value), + self._sum(self.current.sum, value), + self.current.count + 1, + ) + + def take_checkpoint(self): + self.checkpoint = self.current + self.current = self._TYPE(None, None, None, 0) + + def merge(self, other): + self.checkpoint = self._TYPE( + self._min(self.checkpoint.min, other.checkpoint.min), + self._max(self.checkpoint.max, other.checkpoint.max), + self._sum(self.checkpoint.sum, other.checkpoint.sum), + self.checkpoint.count + other.checkpoint.count, + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index c81db0fe740..86ddc3fcc16 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -15,11 +15,12 @@ import abc from typing import Sequence, Type -from opentelemetry.metrics import Counter, MetricT +from opentelemetry.metrics import Counter, Measure, MetricT from opentelemetry.sdk.metrics.export import MetricRecord from opentelemetry.sdk.metrics.export.aggregate import ( Aggregator, CounterAggregator, + MinMaxSumCountAggregator, ) @@ -45,8 +46,10 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator: Aggregators keep track of and updates values when metrics get updated. """ # pylint:disable=R0201 - if metric_type == Counter: + if issubclass(metric_type, Counter): return CounterAggregator() + if issubclass(metric_type, Measure): + return MinMaxSumCountAggregator() # TODO: Add other aggregators return CounterAggregator() diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 816bfcfca9c..5df6c6d08a0 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -20,7 +20,10 @@ ConsoleMetricsExporter, MetricRecord, ) -from opentelemetry.sdk.metrics.export.aggregate import CounterAggregator +from opentelemetry.sdk.metrics.export.aggregate import ( + CounterAggregator, + MinMaxSumCountAggregator, +) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController @@ -218,22 +221,21 @@ def test_ungrouped_batcher_process_not_stateful(self): ) -class TestAggregator(unittest.TestCase): - # TODO: test other aggregators once implemented - def test_counter_update(self): +class TestCounterAggregator(unittest.TestCase): + def test_update(self): counter = CounterAggregator() counter.update(1.0) counter.update(2.0) self.assertEqual(counter.current, 3.0) - def test_counter_checkpoint(self): + def test_checkpoint(self): counter = CounterAggregator() counter.update(2.0) counter.take_checkpoint() self.assertEqual(counter.current, 0) self.assertEqual(counter.checkpoint, 2.0) - def test_counter_merge(self): + def test_merge(self): counter = CounterAggregator() counter2 = CounterAggregator() counter.checkpoint = 1.0 @@ -242,6 +244,81 @@ def test_counter_merge(self): self.assertEqual(counter.checkpoint, 4.0) +class TestMinMaxSumCountAggregator(unittest.TestCase): + def test_update(self): + mmsc = MinMaxSumCountAggregator() + # test current values without any update + self.assertEqual( + mmsc.current, (None, None, None, 0), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + mmsc.update(val) + + self.assertEqual( + mmsc.current, (min(values), max(values), sum(values), len(values)), + ) + + def test_checkpoint(self): + mmsc = MinMaxSumCountAggregator() + + # take checkpoint wihtout any update + mmsc.take_checkpoint() + self.assertEqual( + mmsc.checkpoint, (None, None, None, 0), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + mmsc.update(val) + + mmsc.take_checkpoint() + self.assertEqual( + mmsc.checkpoint, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual( + mmsc.current, (None, None, None, 0), + ) + + def test_merge(self): + mmsc1 = MinMaxSumCountAggregator() + mmsc2 = MinMaxSumCountAggregator() + + checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + mmsc1.checkpoint = checkpoint1 + mmsc2.checkpoint = checkpoint2 + + mmsc1.merge(mmsc2) + + self.assertEqual( + mmsc1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + ), + ) + + def test_merge_with_empty(self): + mmsc1 = MinMaxSumCountAggregator() + mmsc2 = MinMaxSumCountAggregator() + + checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc1.checkpoint = checkpoint1 + + mmsc1.merge(mmsc2) + + self.assertEqual(mmsc1.checkpoint, checkpoint1) + + class TestController(unittest.TestCase): def test_push_controller(self): meter = mock.Mock() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index a887621b0cb..db7e2d8c850 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -90,8 +90,10 @@ def test_record_batch_multiple(self): meter.record_batch(label_set, record_tuples) self.assertEqual(counter.get_handle(label_set).aggregator.current, 1.0) self.assertEqual(gauge.get_handle(label_set).aggregator.current, 5.0) - # TODO: Fix when aggregator implemented for measure - self.assertEqual(measure.get_handle(label_set).aggregator.current, 3.0) + self.assertEqual( + measure.get_handle(label_set).aggregator.current, + (3.0, 3.0, 3.0, 1), + ) def test_record_batch_exists(self): meter = metrics.Meter() @@ -195,9 +197,13 @@ def test_record(self): kvp = {"key": "value"} label_set = meter.get_label_set(kvp) handle = metric.get_handle(label_set) - metric.record(3, label_set) - # TODO: Fix once other aggregators implemented - self.assertEqual(handle.aggregator.current, 3) + values = (37, 42, 7) + for val in values: + metric.record(val, label_set) + self.assertEqual( + handle.aggregator.current, + (min(values), max(values), sum(values), len(values)), + ) class TestCounterHandle(unittest.TestCase): @@ -263,33 +269,32 @@ def test_update(self, time_mock): self.assertEqual(handle.aggregator.current, 4.0) -# TODO: fix tests once aggregator implemented class TestMeasureHandle(unittest.TestCase): def test_record(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.MeasureHandle(int, False, aggregator) + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, True, aggregator) handle.record(3) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (3, 3, 3, 1)) def test_record_disabled(self): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, False, aggregator) handle.record(3) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) @mock.patch("opentelemetry.sdk.metrics.logger") def test_record_incorrect_type(self, logger_mock): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, True, aggregator) handle.record(3.0) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) self.assertTrue(logger_mock.warning.called) @mock.patch("opentelemetry.sdk.metrics.time_ns") def test_update(self, time_mock): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, True, aggregator) time_mock.return_value = 123 handle.update(4.0) self.assertEqual(handle.last_update_timestamp, 123) - self.assertEqual(handle.aggregator.current, 4.0) + self.assertEqual(handle.aggregator.current, (4.0, 4.0, 4.0, 1))