From f4b38b32a737c4463da9d7609e21dace803bc827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 14 Feb 2020 10:04:32 -0500 Subject: [PATCH 01/11] sdk/metrics/batcher: fix aggregator_for The passed class is not the exact time defined in the API but a subclass. Use issubclass instead of equal comparison. --- .../src/opentelemetry/sdk/metrics/export/batcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index c81db0fe740..a54150fecf5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -45,7 +45,7 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator: Aggregators keep track of and updates values when metrics get updated. """ # pylint:disable=R0201 - if metric_type == Counter: + if issubclass(metric_type, Counter): return CounterAggregator() # TODO: Add other aggregators return CounterAggregator() From 2b0967169dace6f284b33f82ac2880c1a583b46a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 14 Feb 2020 14:17:10 -0500 Subject: [PATCH 02/11] sdk/metrics: implement MinMaxSumCount aggregator This aggregator is the default aggregator for measure metrics and keeps the minimum, maximum, sum and count of those measures. --- .../sdk/metrics/export/aggregate.py | 50 +++++++++++ .../sdk/metrics/export/batcher.py | 5 +- .../tests/metrics/export/test_export.py | 89 +++++++++++++++++-- .../tests/metrics/test_metrics.py | 35 ++++---- 4 files changed, 157 insertions(+), 22 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 642fe1cdfe4..5c55ba038ac 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +from collections import namedtuple class Aggregator(abc.ABC): @@ -56,3 +57,52 @@ def take_checkpoint(self): def merge(self, other): self.checkpoint += other.checkpoint + + +class MinMaxSumCountAggregator(Aggregator): + """Agregator for Measure metrics that keeps min, max, sum and count.""" + + _TYPE = namedtuple("minmaxsumcount", "min max sum count") + + @classmethod + def _min(cls, val1, val2): + if val1 is None and val2 is None: + return None + return min(val1 or val2, val2 or val1) + + @classmethod + def _max(cls, val1, val2): + if val1 is None and val2 is None: + return None + return max(val1 or val2, val2 or val1) + + @classmethod + def _sum(cls, val1, val2): + if val1 is None and val2 is None: + return None + return (val1 or 0) + (val2 or 0) + + def __init__(self): + super().__init__() + self.current = self._TYPE(None, None, None, 0) + self.checkpoint = self._TYPE(None, None, None, 0) + + def update(self, value): + self.current = self._TYPE( + self._min(self.current.min, value), + self._max(self.current.max, value), + self._sum(self.current.sum, value), + self.current.count + 1, + ) + + def take_checkpoint(self): + self.checkpoint = self.current + self.current = self._TYPE(None, None, None, 0) + + def merge(self, other): + self.checkpoint = self._TYPE( + self._min(self.checkpoint.min, other.checkpoint.min), + self._max(self.checkpoint.max, other.checkpoint.max), + self._sum(self.checkpoint.sum, other.checkpoint.sum), + self.checkpoint.count + other.checkpoint.count, + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index a54150fecf5..86ddc3fcc16 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -15,11 +15,12 @@ import abc from typing import Sequence, Type -from opentelemetry.metrics import Counter, MetricT +from opentelemetry.metrics import Counter, Measure, MetricT from opentelemetry.sdk.metrics.export import MetricRecord from opentelemetry.sdk.metrics.export.aggregate import ( Aggregator, CounterAggregator, + MinMaxSumCountAggregator, ) @@ -47,6 +48,8 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator: # pylint:disable=R0201 if issubclass(metric_type, Counter): return CounterAggregator() + if issubclass(metric_type, Measure): + return MinMaxSumCountAggregator() # TODO: Add other aggregators return CounterAggregator() diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 816bfcfca9c..5df6c6d08a0 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -20,7 +20,10 @@ ConsoleMetricsExporter, MetricRecord, ) -from opentelemetry.sdk.metrics.export.aggregate import CounterAggregator +from opentelemetry.sdk.metrics.export.aggregate import ( + CounterAggregator, + MinMaxSumCountAggregator, +) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController @@ -218,22 +221,21 @@ def test_ungrouped_batcher_process_not_stateful(self): ) -class TestAggregator(unittest.TestCase): - # TODO: test other aggregators once implemented - def test_counter_update(self): +class TestCounterAggregator(unittest.TestCase): + def test_update(self): counter = CounterAggregator() counter.update(1.0) counter.update(2.0) self.assertEqual(counter.current, 3.0) - def test_counter_checkpoint(self): + def test_checkpoint(self): counter = CounterAggregator() counter.update(2.0) counter.take_checkpoint() self.assertEqual(counter.current, 0) self.assertEqual(counter.checkpoint, 2.0) - def test_counter_merge(self): + def test_merge(self): counter = CounterAggregator() counter2 = CounterAggregator() counter.checkpoint = 1.0 @@ -242,6 +244,81 @@ def test_counter_merge(self): self.assertEqual(counter.checkpoint, 4.0) +class TestMinMaxSumCountAggregator(unittest.TestCase): + def test_update(self): + mmsc = MinMaxSumCountAggregator() + # test current values without any update + self.assertEqual( + mmsc.current, (None, None, None, 0), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + mmsc.update(val) + + self.assertEqual( + mmsc.current, (min(values), max(values), sum(values), len(values)), + ) + + def test_checkpoint(self): + mmsc = MinMaxSumCountAggregator() + + # take checkpoint wihtout any update + mmsc.take_checkpoint() + self.assertEqual( + mmsc.checkpoint, (None, None, None, 0), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + mmsc.update(val) + + mmsc.take_checkpoint() + self.assertEqual( + mmsc.checkpoint, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual( + mmsc.current, (None, None, None, 0), + ) + + def test_merge(self): + mmsc1 = MinMaxSumCountAggregator() + mmsc2 = MinMaxSumCountAggregator() + + checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + mmsc1.checkpoint = checkpoint1 + mmsc2.checkpoint = checkpoint2 + + mmsc1.merge(mmsc2) + + self.assertEqual( + mmsc1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + ), + ) + + def test_merge_with_empty(self): + mmsc1 = MinMaxSumCountAggregator() + mmsc2 = MinMaxSumCountAggregator() + + checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc1.checkpoint = checkpoint1 + + mmsc1.merge(mmsc2) + + self.assertEqual(mmsc1.checkpoint, checkpoint1) + + class TestController(unittest.TestCase): def test_push_controller(self): meter = mock.Mock() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index a887621b0cb..db7e2d8c850 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -90,8 +90,10 @@ def test_record_batch_multiple(self): meter.record_batch(label_set, record_tuples) self.assertEqual(counter.get_handle(label_set).aggregator.current, 1.0) self.assertEqual(gauge.get_handle(label_set).aggregator.current, 5.0) - # TODO: Fix when aggregator implemented for measure - self.assertEqual(measure.get_handle(label_set).aggregator.current, 3.0) + self.assertEqual( + measure.get_handle(label_set).aggregator.current, + (3.0, 3.0, 3.0, 1), + ) def test_record_batch_exists(self): meter = metrics.Meter() @@ -195,9 +197,13 @@ def test_record(self): kvp = {"key": "value"} label_set = meter.get_label_set(kvp) handle = metric.get_handle(label_set) - metric.record(3, label_set) - # TODO: Fix once other aggregators implemented - self.assertEqual(handle.aggregator.current, 3) + values = (37, 42, 7) + for val in values: + metric.record(val, label_set) + self.assertEqual( + handle.aggregator.current, + (min(values), max(values), sum(values), len(values)), + ) class TestCounterHandle(unittest.TestCase): @@ -263,33 +269,32 @@ def test_update(self, time_mock): self.assertEqual(handle.aggregator.current, 4.0) -# TODO: fix tests once aggregator implemented class TestMeasureHandle(unittest.TestCase): def test_record(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.MeasureHandle(int, False, aggregator) + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, True, aggregator) handle.record(3) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (3, 3, 3, 1)) def test_record_disabled(self): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, False, aggregator) handle.record(3) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) @mock.patch("opentelemetry.sdk.metrics.logger") def test_record_incorrect_type(self, logger_mock): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, True, aggregator) handle.record(3.0) - self.assertEqual(handle.aggregator.current, 0) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) self.assertTrue(logger_mock.warning.called) @mock.patch("opentelemetry.sdk.metrics.time_ns") def test_update(self, time_mock): - aggregator = export.aggregate.CounterAggregator() + aggregator = export.aggregate.MinMaxSumCountAggregator() handle = metrics.MeasureHandle(int, True, aggregator) time_mock.return_value = 123 handle.update(4.0) self.assertEqual(handle.last_update_timestamp, 123) - self.assertEqual(handle.aggregator.current, 4.0) + self.assertEqual(handle.aggregator.current, (4.0, 4.0, 4.0, 1)) From 7b8ff910f70c7c17dd7d31693b2b530dbd25ad1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 14 Feb 2020 14:19:09 -0500 Subject: [PATCH 03/11] example/metrics: combine into a single file and add Measure example --- examples/metrics/simple_example.py | 106 +++++++++++++++++++++++++++++ examples/metrics/stateful.py | 72 -------------------- examples/metrics/stateless.py | 57 ---------------- 3 files changed, 106 insertions(+), 129 deletions(-) create mode 100644 examples/metrics/simple_example.py delete mode 100644 examples/metrics/stateful.py delete mode 100644 examples/metrics/stateless.py diff --git a/examples/metrics/simple_example.py b/examples/metrics/simple_example.py new file mode 100644 index 00000000000..75da80b73ac --- /dev/null +++ b/examples/metrics/simple_example.py @@ -0,0 +1,106 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module serves as an example for a simple application using metrics +It shows: +- How to configure a meter passing a sateful or stateless. +- How to configure an exporter and how to create a controller. +- How to create some metrics intruments and how to capture data with them. +""" +import sys +import time + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import Counter, Measure, Meter +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher +from opentelemetry.sdk.metrics.export.controller import PushController + +batcher_mode = "stateful" + + +def usage(argv): + print("usage:") + print("{} [mode]".format(argv[0])) + print("mode: stateful (default) or stateless") + + +if len(sys.argv) >= 2: + batcher_mode = sys.argv[1] + if batcher_mode not in ("stateful", "stateless"): + print("bad mode specified.") + usage(sys.argv) + sys.exit(1) + +# Batcher used to collect all created metrics from meter ready for exporting +# Pass in True/False to indicate whether the batcher is stateful. +# True indicates the batcher computes checkpoints from over the process +# lifetime. +# False indicates the batcher computes checkpoints which describe the updates +# of a single collection period (deltas) +batcher = UngroupedBatcher(batcher_mode == "stateful") + +# If a batcher is not provided, a default batcher is used +# Meter is responsible for creating and recording metrics +metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) +meter = metrics.meter() + +# Exporter to export metrics to the console +exporter = ConsoleMetricsExporter() + +# A PushController collects metrics created from meter and exports it via the +# exporter every interval +controller = PushController(meter, exporter, 5) + +# Metric instruments allow to capture measurements +requests_counter = meter.create_metric( + "requests", "number of requests", 1, int, Counter, ("environment",) +) + +clicks_counter = meter.create_metric( + "clicks", "number of clicks", 1, int, Counter, ("environment",) +) + +requests_size = meter.create_metric( + "requests_size", "size of requests", 1, int, Measure, ("environment",) +) + +# Labelsets are used to identify key-values that are associated with a specific +# metric that you want to record. These are useful for pre-aggregation and can +# be used to store custom dimensions pertaining to a metric +staging_label_set = meter.get_label_set({"environment": "staging"}) +testing_label_set = meter.get_label_set({"environment": "testing"}) + +# Update the metric instruments using the direct calling convention +requests_size.record(100, staging_label_set) +requests_counter.add(25, staging_label_set) +# Sleep for 5 seconds, exported value should be 25 +time.sleep(5) + +requests_size.record(5000, staging_label_set) +requests_counter.add(50, staging_label_set) +# Exported value should be 75 +time.sleep(5) + +requests_size.record(2, testing_label_set) +requests_counter.add(35, testing_label_set) +# There should be two exported values 75 and 35, one for each labelset +time.sleep(5) + +clicks_counter.add(5, staging_label_set) +# There should be three exported values, labelsets can be reused for different +# metrics but will be recorded seperately, 75, 35 and 5 + +time.sleep(5) diff --git a/examples/metrics/stateful.py b/examples/metrics/stateful.py deleted file mode 100644 index c43f795e228..00000000000 --- a/examples/metrics/stateful.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -This module serves as an example for a simple application using metrics -Examples show how to recording affects the collection of metrics to be exported -""" -import time - -from opentelemetry import metrics -from opentelemetry.sdk.metrics import Counter, Meter -from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher -from opentelemetry.sdk.metrics.export.controller import PushController - -# Batcher used to collect all created metrics from meter ready for exporting -# Pass in true/false to indicate whether the batcher is stateful. True -# indicates the batcher computes checkpoints from over the process lifetime. -# False indicates the batcher computes checkpoints which describe the updates -# of a single collection period (deltas) -batcher = UngroupedBatcher(True) -# If a batcher is not provded, a default batcher is used -# Meter is responsible for creating and recording metrics -metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) -meter = metrics.meter() -# exporter to export metrics to the console -exporter = ConsoleMetricsExporter() -# controller collects metrics created from meter and exports it via the -# exporter every interval -controller = PushController(meter, exporter, 5) - -counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) -) - -counter2 = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) -) - -# Labelsets are used to identify key-values that are associated with a specific -# metric that you want to record. These are useful for pre-aggregation and can -# be used to store custom dimensions pertaining to a metric -label_set = meter.get_label_set({"environment": "staging"}) -label_set2 = meter.get_label_set({"environment": "testing"}) - -counter.add(25, label_set) -# We sleep for 5 seconds, exported value should be 25 -time.sleep(5) - -counter.add(50, label_set) -# exported value should be 75 -time.sleep(5) - -counter.add(35, label_set2) -# should be two exported values 75 and 35, one for each labelset -time.sleep(5) - -counter2.add(5, label_set) -# should be three exported values, labelsets can be reused for different -# metrics but will be recorded seperately, 75, 35 and 5 -time.sleep(5) diff --git a/examples/metrics/stateless.py b/examples/metrics/stateless.py deleted file mode 100644 index 69213cbddd3..00000000000 --- a/examples/metrics/stateless.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2019, OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -This module serves as an example for a simple application using metrics -Examples show how to recording affects the collection of metrics to be exported -""" -import time - -from opentelemetry import metrics -from opentelemetry.sdk.metrics import Counter, Meter -from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher -from opentelemetry.sdk.metrics.export.controller import PushController - -# Batcher used to collect all created metrics from meter ready for exporting -# Pass in false for non-stateful batcher. Indicates the batcher computes -# checkpoints which describe the updates of a single collection period (deltas) -batcher = UngroupedBatcher(False) -# Meter is responsible for creating and recording metrics -metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) -meter = metrics.meter() -# exporter to export metrics to the console -exporter = ConsoleMetricsExporter() -# controller collects metrics created from meter and exports it via the -# exporter every interval -controller = PushController(meter, exporter, 5) - -counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) -) - -# Labelsets are used to identify key-values that are associated with a specific -# metric that you want to record. These are useful for pre-aggregation and can -# be used to store custom dimensions pertaining to a metric -label_set = meter.get_label_set({"environment": "staging"}) - -counter.add(25, label_set) -# We sleep for 5 seconds, exported value should be 25 -time.sleep(5) - -counter.add(50, label_set) -# exported value should be 50 due to non-stateful batcher -time.sleep(20) - -# Following exported values would be 0 From bc99f4866cf974222a6e262cb7a6f56b73ff28df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 14 Feb 2020 14:17:10 -0500 Subject: [PATCH 04/11] sdk/metrics: implement MinMaxSumCount aggregator This aggregator is the default aggregator for measure metrics and keeps the minimum, maximum, sum and count of those measures. --- .../tests/metrics/test_metrics.py | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index db7e2d8c850..a3816f9a1c4 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -237,36 +237,35 @@ def test_update(self, time_mock): self.assertEqual(handle.aggregator.current, 4.0) -# TODO: fix tests once aggregator implemented -class TestGaugeHandle(unittest.TestCase): - def test_set(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) - handle.set(3) - self.assertEqual(handle.aggregator.current, 3) +class TestMeasureHandle(unittest.TestCase): + def test_record(self): + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, True, aggregator) + handle.record(3) + self.assertEqual(handle.aggregator.current, (3, 3, 3, 1)) - def test_set_disabled(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, False, aggregator) - handle.set(3) - self.assertEqual(handle.aggregator.current, 0) + def test_record_disabled(self): + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, False, aggregator) + handle.record(3) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) @mock.patch("opentelemetry.sdk.metrics.logger") - def test_set_incorrect_type(self, logger_mock): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) - handle.set(3.0) - self.assertEqual(handle.aggregator.current, 0) + def test_record_incorrect_type(self, logger_mock): + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, True, aggregator) + handle.record(3.0) + self.assertEqual(handle.aggregator.current, (None, None, None, 0)) self.assertTrue(logger_mock.warning.called) @mock.patch("opentelemetry.sdk.metrics.time_ns") def test_update(self, time_mock): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) + aggregator = export.aggregate.MinMaxSumCountAggregator() + handle = metrics.MeasureHandle(int, True, aggregator) time_mock.return_value = 123 handle.update(4.0) self.assertEqual(handle.last_update_timestamp, 123) - self.assertEqual(handle.aggregator.current, 4.0) + self.assertEqual(handle.aggregator.current, (4.0, 4.0, 4.0, 1)) class TestMeasureHandle(unittest.TestCase): From be2f0293f806cd2c1197b91f198a2a0b8a645942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 12 Feb 2020 11:06:45 -0500 Subject: [PATCH 05/11] metrics: Remove Gauge instrument --- .../src/opentelemetry/metrics/__init__.py | 54 ++--------------- .../tests/metrics/test_metrics.py | 15 ----- .../src/opentelemetry/sdk/metrics/__init__.py | 40 ------------- .../tests/metrics/test_metrics.py | 60 +------------------ 4 files changed, 6 insertions(+), 163 deletions(-) diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 5045c38eed9..55be581485f 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -47,13 +47,6 @@ def add(self, value: ValueT) -> None: value: The value to add to the handle. """ - def set(self, value: ValueT) -> None: - """No-op implementation of `GaugeHandle` set. - - Args: - value: The value to set to the handle. - """ - def record(self, value: ValueT) -> None: """No-op implementation of `MeasureHandle` record. @@ -71,15 +64,6 @@ def add(self, value: ValueT) -> None: """ -class GaugeHandle: - def set(self, value: ValueT) -> None: - """Sets the current value of the handle to ``value``. - - Args: - value: The value to set to the handle. - """ - - class MeasureHandle: def record(self, value: ValueT) -> None: """Records the given ``value`` to this handle. @@ -121,7 +105,7 @@ def get_handle(self, label_set: LabelSet) -> "object": Handles are useful to reduce the cost of repeatedly recording a metric with a pre-defined set of label values. All metric kinds (counter, - gauge, measure) support declaring a set of required label keys. The + measure) support declaring a set of required label keys. The values corresponding to these keys should be specified in every handle. "Unspecified" label values, in cases where a handle is requested but a value was not provided are permitted. @@ -150,14 +134,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None: label_set: `LabelSet` to associate with the returned handle. """ - def set(self, value: ValueT, label_set: LabelSet) -> None: - """No-op implementation of `Gauge` set. - - Args: - value: The value to set the gauge metric to. - label_set: `LabelSet` to associate with the returned handle. - """ - def record(self, value: ValueT, label_set: LabelSet) -> None: """No-op implementation of `Measure` record. @@ -183,28 +159,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None: """ -class Gauge(Metric): - """A gauge type metric that expresses a pre-calculated value. - - Gauge metrics have a value that is either ``Set`` by explicit - instrumentation or observed through a callback. This kind of metric - should be used when the metric cannot be expressed as a sum or because - the measurement interval is arbitrary. - """ - - def get_handle(self, label_set: LabelSet) -> "GaugeHandle": - """Gets a `GaugeHandle`.""" - return GaugeHandle() - - def set(self, value: ValueT, label_set: LabelSet) -> None: - """Sets the value of the gauge to ``value``. - - Args: - value: The value to set the gauge metric to. - label_set: `LabelSet` to associate with the returned handle. - """ - - class Measure(Metric): """A measure type metric that represent raw stats that are recorded. @@ -224,15 +178,15 @@ def record(self, value: ValueT, label_set: LabelSet) -> None: """ -MetricT = TypeVar("MetricT", Counter, Gauge, Measure) +MetricT = TypeVar("MetricT", Counter, Measure) # pylint: disable=unused-argument class Meter(abc.ABC): """An interface to allow the recording of metrics. - `Metric` s are used for recording pre-defined aggregation (gauge and - counter), or raw values (measure) in which the aggregation and labels + `Metric` s are used for recording pre-defined aggregation (counter), + or raw values (measure) in which the aggregation and labels for the exported metric are deferred. """ diff --git a/opentelemetry-api/tests/metrics/test_metrics.py b/opentelemetry-api/tests/metrics/test_metrics.py index 3ec0f81c718..b864282126a 100644 --- a/opentelemetry-api/tests/metrics/test_metrics.py +++ b/opentelemetry-api/tests/metrics/test_metrics.py @@ -56,17 +56,6 @@ def test_counter_add(self): label_set = metrics.LabelSet() counter.add(1, label_set) - def test_gauge(self): - gauge = metrics.Gauge() - label_set = metrics.LabelSet() - handle = gauge.get_handle(label_set) - self.assertIsInstance(handle, metrics.GaugeHandle) - - def test_gauge_set(self): - gauge = metrics.Gauge() - label_set = metrics.LabelSet() - gauge.set(1, label_set) - def test_measure(self): measure = metrics.Measure() label_set = metrics.LabelSet() @@ -85,10 +74,6 @@ def test_counter_handle(self): handle = metrics.CounterHandle() handle.add(1) - def test_gauge_handle(self): - handle = metrics.GaugeHandle() - handle.set(1) - def test_measure_handle(self): handle = metrics.MeasureHandle() handle.record(1) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 4c9231582c8..48a6b8bccb7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -99,13 +99,6 @@ def add(self, value: metrics_api.ValueT) -> None: self.update(value) -class GaugeHandle(metrics_api.GaugeHandle, BaseHandle): - def set(self, value: metrics_api.ValueT) -> None: - """See `opentelemetry.metrics.GaugeHandle.set`.""" - if self._validate_update(value): - self.update(value) - - class MeasureHandle(metrics_api.MeasureHandle, BaseHandle): def record(self, value: metrics_api.ValueT) -> None: """See `opentelemetry.metrics.MeasureHandle.record`.""" @@ -197,39 +190,6 @@ def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: UPDATE_FUNCTION = add -class Gauge(Metric, metrics_api.Gauge): - """See `opentelemetry.metrics.Gauge`. - """ - - HANDLE_TYPE = GaugeHandle - - def __init__( - self, - name: str, - description: str, - unit: str, - value_type: Type[metrics_api.ValueT], - meter: "Meter", - label_keys: Sequence[str] = (), - enabled: bool = True, - ): - super().__init__( - name, - description, - unit, - value_type, - meter, - label_keys=label_keys, - enabled=enabled, - ) - - def set(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: - """See `opentelemetry.metrics.Gauge.set`.""" - self.get_handle(label_set).set(value) - - UPDATE_FUNCTION = set - - class Measure(Metric, metrics_api.Measure): """See `opentelemetry.metrics.Measure`.""" diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index a3816f9a1c4..ca3814ca1b5 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -82,14 +82,12 @@ def test_record_batch_multiple(self): counter = metrics.Counter( "name", "desc", "unit", float, meter, label_keys ) - gauge = metrics.Gauge("name", "desc", "unit", int, meter, label_keys) measure = metrics.Measure( "name", "desc", "unit", float, meter, label_keys ) - record_tuples = [(counter, 1.0), (gauge, 5), (measure, 3.0)] + record_tuples = [(counter, 1.0), (measure, 3.0)] meter.record_batch(label_set, record_tuples) self.assertEqual(counter.get_handle(label_set).aggregator.current, 1.0) - self.assertEqual(gauge.get_handle(label_set).aggregator.current, 5.0) self.assertEqual( measure.get_handle(label_set).aggregator.current, (3.0, 3.0, 3.0, 1), @@ -119,15 +117,6 @@ def test_create_metric(self): self.assertEqual(counter.value_type, int) self.assertEqual(counter.name, "name") - def test_create_gauge(self): - meter = metrics.Meter() - gauge = meter.create_metric( - "name", "desc", "unit", float, metrics.Gauge, () - ) - self.assertTrue(isinstance(gauge, metrics.Gauge)) - self.assertEqual(gauge.value_type, float) - self.assertEqual(gauge.name, "name") - def test_create_measure(self): meter = metrics.Meter() measure = meter.create_metric( @@ -155,7 +144,7 @@ def test_get_label_set_empty(self): class TestMetric(unittest.TestCase): def test_get_handle(self): meter = metrics.Meter() - metric_types = [metrics.Counter, metrics.Gauge, metrics.Measure] + metric_types = [metrics.Counter, metrics.Measure] for _type in metric_types: metric = _type("name", "desc", "unit", int, meter, ("key",)) kvp = {"key": "value"} @@ -176,20 +165,6 @@ def test_add(self): self.assertEqual(handle.aggregator.current, 5) -class TestGauge(unittest.TestCase): - def test_set(self): - meter = metrics.Meter() - metric = metrics.Gauge("name", "desc", "unit", int, meter, ("key",)) - kvp = {"key": "value"} - label_set = meter.get_label_set(kvp) - handle = metric.get_handle(label_set) - metric.set(3, label_set) - self.assertEqual(handle.aggregator.current, 3) - metric.set(2, label_set) - # TODO: Fix once other aggregators implemented - self.assertEqual(handle.aggregator.current, 5) - - class TestMeasure(unittest.TestCase): def test_record(self): meter = metrics.Meter() @@ -266,34 +241,3 @@ def test_update(self, time_mock): handle.update(4.0) self.assertEqual(handle.last_update_timestamp, 123) self.assertEqual(handle.aggregator.current, (4.0, 4.0, 4.0, 1)) - - -class TestMeasureHandle(unittest.TestCase): - def test_record(self): - aggregator = export.aggregate.MinMaxSumCountAggregator() - handle = metrics.MeasureHandle(int, True, aggregator) - handle.record(3) - self.assertEqual(handle.aggregator.current, (3, 3, 3, 1)) - - def test_record_disabled(self): - aggregator = export.aggregate.MinMaxSumCountAggregator() - handle = metrics.MeasureHandle(int, False, aggregator) - handle.record(3) - self.assertEqual(handle.aggregator.current, (None, None, None, 0)) - - @mock.patch("opentelemetry.sdk.metrics.logger") - def test_record_incorrect_type(self, logger_mock): - aggregator = export.aggregate.MinMaxSumCountAggregator() - handle = metrics.MeasureHandle(int, True, aggregator) - handle.record(3.0) - self.assertEqual(handle.aggregator.current, (None, None, None, 0)) - self.assertTrue(logger_mock.warning.called) - - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): - aggregator = export.aggregate.MinMaxSumCountAggregator() - handle = metrics.MeasureHandle(int, True, aggregator) - time_mock.return_value = 123 - handle.update(4.0) - self.assertEqual(handle.last_update_timestamp, 123) - self.assertEqual(handle.aggregator.current, (4.0, 4.0, 4.0, 1)) From c6a5c3cad2dd2e95f4569e7640aa77aef01fa716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 17 Feb 2020 11:13:18 -0500 Subject: [PATCH 06/11] Add Observer metric instrument Observer instruments are used to capture a current set of values at a point in time [1]. This commit extends the Meter interface to allow to register an observer instrument by pasing a callback that will be executed at collection time. The logic inside collection is updated to consider these instruments and a new ObserverAggregator is implemented. [1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments --- examples/metrics/observer_example.py | 67 +++++++++++ .../src/opentelemetry/metrics/__init__.py | 70 +++++++++++- .../tests/metrics/test_metrics.py | 7 ++ .../src/opentelemetry/sdk/metrics/__init__.py | 108 +++++++++++++++++- .../sdk/metrics/export/aggregate.py | 29 +++++ .../sdk/metrics/export/batcher.py | 5 +- .../tests/metrics/export/test_export.py | 87 ++++++++++++++ .../tests/metrics/test_metrics.py | 107 ++++++++++++++++- tox.ini | 5 +- 9 files changed, 477 insertions(+), 8 deletions(-) create mode 100644 examples/metrics/observer_example.py diff --git a/examples/metrics/observer_example.py b/examples/metrics/observer_example.py new file mode 100644 index 00000000000..fad25926830 --- /dev/null +++ b/examples/metrics/observer_example.py @@ -0,0 +1,67 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This example shows how the Observer metric instrument can be used to capture +asynchronous metrics data. +""" +import psutil + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import LabelSet, Meter +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher +from opentelemetry.sdk.metrics.export.controller import PushController + +# Configure a stateful batcher +batcher = UngroupedBatcher(True) + +metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) +meter = metrics.meter() + +# Exporter to export metrics to the console +exporter = ConsoleMetricsExporter() + +# Configure a push controller +controller = PushController(meter, exporter, 2) + + +# Callback to gather cpu usage +def get_cpu_usage_callback(observer): + for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)): + label_set = meter.get_label_set({"cpu_number": str(number)}) + observer.observe(percent, label_set) + + +meter.register_observer( + get_cpu_usage_callback, + "cpu_percent", + "per-cpu usage", + 1, + float, + ("cpu_number",), +) + + +# Callback to gather RAM memory usage +def get_ram_usage_callback(observer): + ram_percent = psutil.virtual_memory().percent + observer.observe(ram_percent, LabelSet()) + + +meter.register_observer( + get_ram_usage_callback, "ram_percent", "RAM memory usage", 1, float, (), +) + +input("Press a key to finish...\n") diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 55be581485f..13c0efae099 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -178,7 +178,38 @@ def record(self, value: ValueT, label_set: LabelSet) -> None: """ -MetricT = TypeVar("MetricT", Counter, Measure) +class Observer: + """An observer type metric instrument used to capture a current set of values. + + + Observer instruments are asynchronous, a callback is invoked with the + observer instrument as argument allowing the user to capture multiple + values per collection interval. + """ + + def observe(self, value: ValueT, label_set: LabelSet) -> None: + """Captures ``value`` to the observer. + + Args: + value: The value to capture to this observer metric. + label_set: `LabelSet` associated to ``value``. + """ + + +class DefaultObserver(Observer): + """No-op implementation of ``Observer``.""" + + def observe(self, value: ValueT, label_set: LabelSet) -> None: + """Captures ``value`` to the observer. + + Args: + value: The value to capture to this observer metric. + label_set: `LabelSet` associated to ``value``. + """ + + +MetricT = TypeVar("MetricT", Counter, Measure, Observer) +ObserverCallbackT = Callable[[Observer], None] # pylint: disable=unused-argument @@ -234,6 +265,31 @@ def create_metric( Returns: A new ``metric_type`` metric with values of ``value_type``. """ + @abc.abstractmethod + def register_observer( + self, + callback: ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> "Observer": + """Registers an ``Observer`` metric instrument. + + Args: + callback: Callback invoked each collection interval with the + observer as argument. + name: The name of the metric. + description: Human-readable description of the metric. + unit: Unit of the metric values. + value_type: The type of values being recorded by the metric. + label_keys: The keys for the labels with dynamic values. + enabled: Whether to report the metric by default. + Returns: A new ``Observer`` metric instrument. + """ + @abc.abstractmethod def get_label_set(self, labels: Dict[str, str]) -> "LabelSet": """Gets a `LabelSet` with the given labels. @@ -268,6 +324,18 @@ def create_metric( # pylint: disable=no-self-use return DefaultMetric() + def register_observer( + self, + callback: ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> "Observer": + return DefaultObserver() + def get_label_set(self, labels: Dict[str, str]) -> "LabelSet": # pylint: disable=no-self-use return DefaultLabelSet() diff --git a/opentelemetry-api/tests/metrics/test_metrics.py b/opentelemetry-api/tests/metrics/test_metrics.py index b864282126a..cf7f2b9e44f 100644 --- a/opentelemetry-api/tests/metrics/test_metrics.py +++ b/opentelemetry-api/tests/metrics/test_metrics.py @@ -33,6 +33,13 @@ def test_create_metric(self): metric = self.meter.create_metric("", "", "", float, metrics.Counter) self.assertIsInstance(metric, metrics.DefaultMetric) + def test_register_observer(self): + callback = mock.Mock() + observer = self.meter.register_observer( + callback, "", "", "", int, (), True + ) + self.assertIsInstance(observer, metrics.DefaultObserver) + def test_get_label_set(self): metric = self.meter.get_label_set({}) self.assertIsInstance(metric, metrics.DefaultLabelSet) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 48a6b8bccb7..b680108322a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -150,7 +150,7 @@ def get_handle(self, label_set: LabelSet) -> BaseHandle: return handle def __repr__(self): - return '{}(name="{}", description={})'.format( + return '{}(name="{}", description="{}")'.format( type(self).__name__, self.name, self.description ) @@ -202,11 +202,73 @@ def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: UPDATE_FUNCTION = record +class Observer(metrics_api.Observer): + """See `opentelemetry.metrics.Observer`.""" + + def __init__( + self, + callback: metrics_api.ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[metrics_api.ValueT], + meter: "Meter", + label_keys: Sequence[str] = (), + enabled: bool = True, + ): + self.callback = callback + self.name = name + self.description = description + self.unit = unit + self.value_type = value_type + self.meter = meter + self.label_keys = label_keys + self.enabled = enabled + + self.aggregators = {} + + def observe(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: + if not self.enabled: + return + if not isinstance(value, self.value_type): + logger.warning( + "Invalid value passed for %s.", self.value_type.__name__ + ) + return + + if label_set not in self.aggregators: + # TODO: how to cleanup aggregators? + self.aggregators[label_set] = self.meter.batcher.aggregator_for( + self.__class__ + ) + aggregator = self.aggregators[label_set] + aggregator.update(value) + + def run(self) -> bool: + try: + self.callback(self) + # pylint: disable=broad-except + except Exception as exc: + logger.warning( + "Exception while executing observer callback: %s.", exc + ) + return False + return True + + def __repr__(self): + return '{}(name="{}", description="{}")'.format( + type(self).__name__, self.name, self.description + ) + + class Record: """Container class used for processing in the `Batcher`""" def __init__( - self, metric: Metric, label_set: LabelSet, aggregator: Aggregator + self, + metric: metrics_api.MetricT, + label_set: LabelSet, + aggregator: Aggregator, ): self.metric = metric self.label_set = label_set @@ -227,6 +289,7 @@ class Meter(metrics_api.Meter): def __init__(self, batcher: Batcher = UngroupedBatcher(True)): self.batcher = batcher self.metrics = set() + self.observers = set() def collect(self) -> None: """Collects all the metrics created with this `Meter` for export. @@ -235,6 +298,11 @@ def collect(self) -> None: each aggregator belonging to the metrics that were created with this meter instance. """ + + self._collect_metrics() + self._collect_observers() + + def _collect_metrics(self) -> None: for metric in self.metrics: if metric.enabled: for label_set, handle in metric.handles.items(): @@ -244,6 +312,19 @@ def collect(self) -> None: # Applies different batching logic based on type of batcher self.batcher.process(record) + def _collect_observers(self) -> None: + for observer in self.observers: + if not observer.enabled: + continue + + # TODO: capture timestamp? + if not observer.run(): + continue + + for label_set, aggregator in observer.aggregators.items(): + record = Record(observer, label_set, aggregator) + self.batcher.process(record) + def record_batch( self, label_set: LabelSet, @@ -277,6 +358,29 @@ def create_metric( self.metrics.add(metric) return metric + def register_observer( + self, + callback: metrics_api.ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[metrics_api.ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> metrics_api.Observer: + ob = Observer( + callback, + name, + description, + unit, + value_type, + self, + label_keys, + enabled, + ) + self.observers.add(ob) + return ob + def get_label_set(self, labels: Dict[str, str]): """See `opentelemetry.metrics.Meter.create_metric`. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 5c55ba038ac..f34143c3c2c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -106,3 +106,32 @@ def merge(self, other): self._sum(self.checkpoint.sum, other.checkpoint.sum), self.checkpoint.count + other.checkpoint.count, ) + + +class ObserverAggregator(Aggregator): + """Same as MinMaxSumCount but also with last value.""" + + _TYPE = namedtuple("minmaxsumcountlast", "min max sum count last") + + def __init__(self): + super().__init__() + self.mmsc = MinMaxSumCountAggregator() + self.current = None + self.checkpoint = self._TYPE(None, None, None, 0, None) + + def update(self, value): + self.mmsc.update(value) + self.current = value + + def take_checkpoint(self): + self.mmsc.take_checkpoint() + self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,))) + + def merge(self, other): + self.mmsc.merge(other.mmsc) + self.checkpoint = self._TYPE( + *( + self.mmsc.checkpoint + + (other.checkpoint.last or self.checkpoint.last,) + ) + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index 86ddc3fcc16..f4418c61399 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -15,12 +15,13 @@ import abc from typing import Sequence, Type -from opentelemetry.metrics import Counter, Measure, MetricT +from opentelemetry.metrics import Counter, Measure, MetricT, Observer from opentelemetry.sdk.metrics.export import MetricRecord from opentelemetry.sdk.metrics.export.aggregate import ( Aggregator, CounterAggregator, MinMaxSumCountAggregator, + ObserverAggregator, ) @@ -50,6 +51,8 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator: return CounterAggregator() if issubclass(metric_type, Measure): return MinMaxSumCountAggregator() + if issubclass(metric_type, Observer): + return ObserverAggregator() # TODO: Add other aggregators return CounterAggregator() diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 5df6c6d08a0..c792ee93992 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -23,6 +23,7 @@ from opentelemetry.sdk.metrics.export.aggregate import ( CounterAggregator, MinMaxSumCountAggregator, + ObserverAggregator, ) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController @@ -319,6 +320,92 @@ def test_merge_with_empty(self): self.assertEqual(mmsc1.checkpoint, checkpoint1) +class TestObserverAggregator(unittest.TestCase): + def test_update(self): + observer = ObserverAggregator() + # test current values without any update + self.assertEqual( + observer.mmsc.current, (None, None, None, 0), + ) + self.assertIsNone(observer.current) + + # call update with some values + values = (3, 50, 3, 97, 27) + for val in values: + observer.update(val) + + self.assertEqual( + observer.mmsc.current, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual(observer.current, values[-1]) + + def test_checkpoint(self): + observer = ObserverAggregator() + + # take checkpoint wihtout any update + observer.take_checkpoint() + self.assertEqual( + observer.checkpoint, (None, None, None, 0, None), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + observer.update(val) + + observer.take_checkpoint() + self.assertEqual( + observer.checkpoint, + (min(values), max(values), sum(values), len(values), values[-1]), + ) + + def test_merge(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint2.last, + ), + ) + + def test_merge_with_empty(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer1.checkpoint = checkpoint1 + + observer1.merge(observer2) + + self.assertEqual(observer1.checkpoint, checkpoint1) + + class TestController(unittest.TestCase): def test_push_controller(self): meter = mock.Mock() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index ca3814ca1b5..c369ca6e55d 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -62,6 +62,23 @@ def test_collect_disabled_metric(self): meter.collect() self.assertFalse(batcher_mock.process.called) + def test_collect_observers(self): + meter = metrics.Meter() + batcher_mock = mock.Mock() + meter.batcher = batcher_mock + + def callback(observer): + self.assertIsInstance(observer, metrics_api.Observer) + observer.observe(45, meter.get_label_set(())) + + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + meter.observers.add(observer) + meter.collect() + self.assertTrue(batcher_mock.process.called) + def test_record_batch(self): meter = metrics.Meter() label_keys = ("key1",) @@ -113,7 +130,7 @@ def test_create_metric(self): counter = meter.create_metric( "name", "desc", "unit", int, metrics.Counter, () ) - self.assertTrue(isinstance(counter, metrics.Counter)) + self.assertIsInstance(counter, metrics.Counter) self.assertEqual(counter.value_type, int) self.assertEqual(counter.name, "name") @@ -122,10 +139,30 @@ def test_create_measure(self): measure = meter.create_metric( "name", "desc", "unit", float, metrics.Measure, () ) - self.assertTrue(isinstance(measure, metrics.Measure)) + self.assertIsInstance(measure, metrics.Measure) self.assertEqual(measure.value_type, float) self.assertEqual(measure.name, "name") + def test_register_observer(self): + meter = metrics.Meter() + + callback = mock.Mock() + + observer = meter.register_observer( + callback, "name", "desc", "unit", int, (), True + ) + + self.assertIsInstance(observer, metrics_api.Observer) + self.assertEqual(len(meter.observers), 1) + + self.assertEqual(observer.callback, callback) + self.assertEqual(observer.name, "name") + self.assertEqual(observer.description, "desc") + self.assertEqual(observer.unit, "unit") + self.assertEqual(observer.value_type, int) + self.assertEqual(observer.label_keys, ()) + self.assertTrue(observer.enabled) + def test_get_label_set(self): meter = metrics.Meter() kvp = {"environment": "staging", "a": "z"} @@ -181,6 +218,72 @@ def test_record(self): ) +class TestObserver(unittest.TestCase): + def test_observe(self): + meter = metrics.Meter() + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + values = (37, 42, 7, 21) + for val in values: + observer.observe(val, label_set) + self.assertEqual( + observer.aggregators[label_set].mmsc.current, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual(observer.aggregators[label_set].current, values[-1]) + + def test_observe_disabled(self): + meter = metrics.Meter() + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), False + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + observer.observe(37, label_set) + self.assertEqual(len(observer.aggregators), 0) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_observe_incorrect_type(self, logger_mock): + meter = metrics.Meter() + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + observer.observe(37.0, label_set) + self.assertEqual(len(observer.aggregators), 0) + self.assertTrue(logger_mock.warning.called) + + def test_run(self): + meter = metrics.Meter() + + callback = mock.Mock() + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertTrue(observer.run()) + callback.assert_called_once_with(observer) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_run_exception(self, logger_mock): + meter = metrics.Meter() + + callback = mock.Mock() + callback.side_effect = Exception("We have a problem!") + + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertFalse(observer.run()) + self.assertTrue(logger_mock.warning.called) + + class TestCounterHandle(unittest.TestCase): def test_add(self): aggregator = export.aggregate.CounterAggregator() diff --git a/tox.ini b/tox.ini index 51eda59d70c..5f982770606 100644 --- a/tox.ini +++ b/tox.ini @@ -115,6 +115,7 @@ deps = flake8 isort black + psutil commands_pre = python scripts/eachdist.py install --editable @@ -164,7 +165,7 @@ deps = docker-compose >= 1.25.2 pymongo ~= 3.1 -changedir = +changedir = ext/opentelemetry-ext-docker-tests/tests commands_pre = @@ -172,7 +173,7 @@ commands_pre = -e {toxinidir}/opentelemetry-sdk \ -e {toxinidir}/ext/opentelemetry-ext-pymongo - docker-compose up -d -commands = +commands = pytest {posargs} commands_post = From bc26ac47ee57153ae34c39a0526694357471326f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 19 Feb 2020 10:49:56 -0500 Subject: [PATCH 07/11] examples/metrics: miscellaneous updates - use explicit parameter names to make it more clear - fix comments (actually remove them) in simple_example --- examples/metrics/observer_example.py | 23 +++++++++++++-------- examples/metrics/record.py | 16 +++++++++++--- examples/metrics/simple_example.py | 31 +++++++++++++++++++--------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/examples/metrics/observer_example.py b/examples/metrics/observer_example.py index fad25926830..5940d149e23 100644 --- a/examples/metrics/observer_example.py +++ b/examples/metrics/observer_example.py @@ -25,7 +25,7 @@ from opentelemetry.sdk.metrics.export.controller import PushController # Configure a stateful batcher -batcher = UngroupedBatcher(True) +batcher = UngroupedBatcher(stateful=True) metrics.set_preferred_meter_implementation(lambda _: Meter(batcher)) meter = metrics.meter() @@ -34,7 +34,7 @@ exporter = ConsoleMetricsExporter() # Configure a push controller -controller = PushController(meter, exporter, 2) +controller = PushController(meter=meter, exporter=exporter, interval=2) # Callback to gather cpu usage @@ -45,12 +45,12 @@ def get_cpu_usage_callback(observer): meter.register_observer( - get_cpu_usage_callback, - "cpu_percent", - "per-cpu usage", - 1, - float, - ("cpu_number",), + callback=get_cpu_usage_callback, + name="cpu_percent", + description="per-cpu usage", + unit="1", + value_type=float, + label_keys=("cpu_number",), ) @@ -61,7 +61,12 @@ def get_ram_usage_callback(observer): meter.register_observer( - get_ram_usage_callback, "ram_percent", "RAM memory usage", 1, float, (), + callback=get_ram_usage_callback, + name="ram_percent", + description="RAM memory usage", + unit="1", + value_type=float, + label_keys=(), ) input("Press a key to finish...\n") diff --git a/examples/metrics/record.py b/examples/metrics/record.py index be68c8083ff..64902c0573b 100644 --- a/examples/metrics/record.py +++ b/examples/metrics/record.py @@ -30,15 +30,25 @@ exporter = ConsoleMetricsExporter() # controller collects metrics created from meter and exports it via the # exporter every interval -controller = PushController(meter, exporter, 5) +controller = PushController(meter=meter, exporter=exporter, interval=5) # Example to show how to record using the meter counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) + name="requests", + description="number of requests", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) counter2 = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) + name="clicks", + description="number of clicks", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) # Labelsets are used to identify key-values that are associated with a specific diff --git a/examples/metrics/simple_example.py b/examples/metrics/simple_example.py index 75da80b73ac..a5623ce6ad0 100644 --- a/examples/metrics/simple_example.py +++ b/examples/metrics/simple_example.py @@ -29,6 +29,7 @@ from opentelemetry.sdk.metrics.export.controller import PushController batcher_mode = "stateful" +stateful_bacher = False def usage(argv): @@ -50,7 +51,8 @@ def usage(argv): # lifetime. # False indicates the batcher computes checkpoints which describe the updates # of a single collection period (deltas) -batcher = UngroupedBatcher(batcher_mode == "stateful") +stateful_bacher = batcher_mode == "stateful" +batcher = UngroupedBatcher(stateful=stateful_bacher) # If a batcher is not provided, a default batcher is used # Meter is responsible for creating and recording metrics @@ -66,15 +68,30 @@ def usage(argv): # Metric instruments allow to capture measurements requests_counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) + name="requests", + description="number of requests", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) clicks_counter = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) + name="clicks", + description="number of clicks", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) requests_size = meter.create_metric( - "requests_size", "size of requests", 1, int, Measure, ("environment",) + name="requests_size", + description="size of requests", + unit="1", + value_type=int, + metric_type=Measure, + label_keys=("environment",), ) # Labelsets are used to identify key-values that are associated with a specific @@ -86,21 +103,15 @@ def usage(argv): # Update the metric instruments using the direct calling convention requests_size.record(100, staging_label_set) requests_counter.add(25, staging_label_set) -# Sleep for 5 seconds, exported value should be 25 time.sleep(5) requests_size.record(5000, staging_label_set) requests_counter.add(50, staging_label_set) -# Exported value should be 75 time.sleep(5) requests_size.record(2, testing_label_set) requests_counter.add(35, testing_label_set) -# There should be two exported values 75 and 35, one for each labelset time.sleep(5) clicks_counter.add(5, staging_label_set) -# There should be three exported values, labelsets can be reused for different -# metrics but will be recorded seperately, 75, 35 and 5 - time.sleep(5) From 3f8265d264cd008eda4307d1d2259572219418bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 19 Feb 2020 15:06:36 -0500 Subject: [PATCH 08/11] clarify units parameter --- opentelemetry-api/src/opentelemetry/metrics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 13c0efae099..cf1abbb7b74 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -257,7 +257,8 @@ def create_metric( Args: name: The name of the metric. description: Human-readable description of the metric. - unit: Unit of the metric values. + unit: Unit of the metric values following the UCUM convention + (https://unitsofmeasure.org/ucum.html). value_type: The type of values being recorded by the metric. metric_type: The type of metric being created. label_keys: The keys for the labels with dynamic values. From 7436362d4669d78e527c031d7d282ac46c2f7eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Thu, 20 Feb 2020 07:02:43 -0500 Subject: [PATCH 09/11] add unit descriptions for observers --- opentelemetry-api/src/opentelemetry/metrics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index cf1abbb7b74..69a3b92b455 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -284,7 +284,8 @@ def register_observer( observer as argument. name: The name of the metric. description: Human-readable description of the metric. - unit: Unit of the metric values. + unit: Unit of the metric values following the UCUM convention + (https://unitsofmeasure.org/ucum.html). value_type: The type of values being recorded by the metric. label_keys: The keys for the labels with dynamic values. enabled: Whether to report the metric by default. From df01d09d0ab7d5fa7d91d5512f38734c93bedc53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Thu, 27 Feb 2020 12:13:32 -0500 Subject: [PATCH 10/11] use ABC for observer --- opentelemetry-api/src/opentelemetry/metrics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 69a3b92b455..a677d850101 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -178,7 +178,7 @@ def record(self, value: ValueT, label_set: LabelSet) -> None: """ -class Observer: +class Observer(abc.ABC): """An observer type metric instrument used to capture a current set of values. @@ -187,6 +187,7 @@ class Observer: values per collection interval. """ + @abc.abstractmethod def observe(self, value: ValueT, label_set: LabelSet) -> None: """Captures ``value`` to the observer. From f56e583c3b173e135f6c94d0d525b8b6430af7ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 2 Mar 2020 10:05:51 -0500 Subject: [PATCH 11/11] fix tests --- examples/metrics/simple_example.py | 1 - opentelemetry-api/src/opentelemetry/metrics/__init__.py | 7 ++----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/metrics/simple_example.py b/examples/metrics/simple_example.py index 9c2cf78957e..2b8f5cfac8b 100644 --- a/examples/metrics/simple_example.py +++ b/examples/metrics/simple_example.py @@ -28,7 +28,6 @@ from opentelemetry.sdk.metrics.export.controller import PushController batcher_mode = "stateful" -stateful_bacher = False def usage(argv): diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 62eff16b699..3ba9bcad009 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -212,10 +212,6 @@ def observe(self, value: ValueT, label_set: LabelSet) -> None: """ -MetricT = TypeVar("MetricT", Counter, Measure, Observer) -ObserverCallbackT = Callable[[Observer], None] - - class MeterProvider(abc.ABC): @abc.abstractmethod def get_meter( @@ -266,7 +262,8 @@ def get_meter( return DefaultMeter() -MetricT = TypeVar("MetricT", Counter, Measure) +MetricT = TypeVar("MetricT", Counter, Measure, Observer) +ObserverCallbackT = Callable[[Observer], None] # pylint: disable=unused-argument