From 1f85268a6f00de4a771e0aaa8663ca6fbc07c575 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 18 Nov 2021 00:18:02 -0600 Subject: [PATCH] Adds Aggregation and instruments as part of Metrics SDK (#2234) * Adds metrics API (#1887) * Adds metric prototype Fixes #1835 * Fix docs * Add API metrics doc * Add missing docs * Add files * Adding docs * Refactor to _initialize * Refactor initialize * Add more documentation * Add exporter test * Add process * Fix tests * Try to add aggregator_class argument Tests are failing here * Fix instrument parent classes * Test default aggregator * WIP * Add prototype test * Tests passing again * Use right counters * All tests passing * Rearrange instrument storage * Fix tests * Add HTTP server test * WIP * WIP * Add prototype * WIP * Fail the test * WIP * WIP * WIP * WIP * Add views * Discard instruments via views * Fix tests * WIP * WIP * Fix lint * WIP * Fix test * Fix lint * Fix method * Fix lint * Mypy workaround * Skip if 3.6 * Fix lint * Add reason * Fix 3.6 * Fix run * Fix lint * Remove SDK metrics * Remove SDK docs * Remove metrics * Remove assertnotraises mixin * Revert sdk docs conf * Remove SDK env var changes * Fix unit checking * Define positional-only arguments * Add Metrics plans * Add API tests * WIP * WIP test * WIP * WIP * WIP * Set provider test passing * Use a fixture * Add test for get_provider * Rename tests * WIP * WIP * WIP * WIP * Remove non specific requirement * Add meter requirements * Put all meter provider tests in one file * Add meter tests * Make attributes be passed as a dictionary * Make some interfaces private * Log an error instead * Remove ASCII flag * Add CHANGELOG entry * Add instrument tests * All tests passing * Add test * Add name tests * Add unit tests * Add description tests * Add counter tests * Add more tests * Add Histogram tests * Add observable gauge tests * Add updowncounter tests * Add observableupdowncounter tests * Fix lint * Fix docs * Fix lint * Ignore mypy * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Add locks to meter and meterprovider * Add lock to instruments * Fix fixmes * Fix lint * Add documentation placeholder * Remove blank line as requested. * Do not override Rlock * Remove unecessary super calls * Add missing super calls * Remove plan files * Add missing parameters * Rename observe to callback * Fix lint * Rename to secure_instrument_name * Remove locks * Fix lint * Remove args and kwargs * Remove implementation that gives meters access to meter provider * Allow creating async instruments with either a callback function or generator * add additional test with callback form of observable counter * add a test/example that reads measurements from proc stat * implement cpu time integration test with generator too Co-authored-by: Aaron Abbott * Make measurement a concrete class (#2153) * Make Measurement a concrete class * comments * update changelog * Return proxy instruments from ProxyMeter (#2169) * Merge main 4 (#2236) * Add MeterProvider and Meter to the SDK Fixes #2200 * Add FIXMEs * Fix docstring * Add FIXME * Fix meter return * Log an error if a force flush fails * Add FIXME * Fix lint * Remove SDK API module * Unregister * Fix API names * Return _DefaultMeter * Remove properties * Pass MeterProvider as a parameter to __init__ * Add FIXMEs * Add FIXMEs * Fix lint * Add Aggregation to the metrics SDK Fixes #2229 * lint fix wip * Fix lint * Add proto to setup.cfg * Add timestamp for last value * Rename modules to be private * Fix paths * Set value in concrete classes init * Fix test * Fix lint * Remove temporalities * Use frozenset as key * Test instruments * Handle min, max and sum in explicit bucket histogram aggregator * Add test for negative values * Remove collect method from aggregations * Add make_point_and_reset * Remove add implementation * Remove _Synchronous * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Aaron Abbott * Requested fixes * Remove NoneAggregation * Add changelog entry * Fix tests * Fix boundaries * More fixes * Update CHANGELOG.md Co-authored-by: Srikanth Chekuri Co-authored-by: Aaron Abbott Co-authored-by: Srikanth Chekuri --- CHANGELOG.md | 3 + .../opentelemetry/sdk/_metrics/aggregation.py | 137 +++++++++++++++ .../opentelemetry/sdk/_metrics/instrument.py | 160 ++++++++++++++++++ .../tests/metrics/test_aggregation.py | 126 ++++++++++++++ 4 files changed, 426 insertions(+) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py create mode 100644 opentelemetry-sdk/tests/metrics/test_aggregation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 53f742dce3d..c7e3c2fce16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.7.0-0.26b0...HEAD) +- Adds Aggregation and instruments as part of Metrics SDK + ([#2234](https://github.com/open-telemetry/opentelemetry-python/pull/2234)) + ## [1.7.1-0.26b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.7.0-0.26b0) - 2021-11-11 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py new file mode 100644 index 00000000000..456e4471621 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -0,0 +1,137 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from collections import OrderedDict +from logging import getLogger +from math import inf + +from opentelemetry._metrics.instrument import _Monotonic +from opentelemetry.util._time import _time_ns + +_logger = getLogger(__name__) + + +class Aggregation(ABC): + @property + def value(self): + return self._value # pylint: disable=no-member + + @abstractmethod + def aggregate(self, value): + pass + + @abstractmethod + def make_point_and_reset(self): + """ + Atomically return a point for the current value of the metric and reset the internal state. + """ + + +class SumAggregation(Aggregation): + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__(self, instrument): + self._value = 0 + + def aggregate(self, value): + self._value = self._value + value + + def make_point_and_reset(self): + pass + + +class LastValueAggregation(Aggregation): + + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__(self, instrument): + self._value = None + self._timestamp = _time_ns() + + def aggregate(self, value): + self._value = value + self._timestamp = _time_ns() + + def make_point_and_reset(self): + pass + + +class ExplicitBucketHistogramAggregation(Aggregation): + + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__( + self, + instrument, + *args, + boundaries=(0, 5, 10, 25, 50, 75, 100, 250, 500, 1000), + record_min_max=True, + ): + super().__init__() + self._value = OrderedDict([(key, 0) for key in (*boundaries, inf)]) + self._min = inf + self._max = -inf + self._sum = 0 + self._instrument = instrument + self._record_min_max = record_min_max + + @property + def min(self): + if not self._record_min_max: + _logger.warning("Min is not being recorded") + + return self._min + + @property + def max(self): + if not self._record_min_max: + _logger.warning("Max is not being recorded") + + return self._max + + @property + def sum(self): + if isinstance(self._instrument, _Monotonic): + return self._sum + + _logger.warning( + "Sum is not filled out when the associated " + "instrument is not monotonic" + ) + return None + + def aggregate(self, value): + if self._record_min_max: + self._min = min(self._min, value) + self._max = max(self._max, value) + + if isinstance(self._instrument, _Monotonic): + self._sum += value + + for key in self._value.keys(): + + if value < key: + self._value[key] = self._value[key] + value + + break + + def make_point_and_reset(self): + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py new file mode 100644 index 00000000000..fc63311ce7b --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -0,0 +1,160 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=function-redefined +# pylint: disable=dangerous-default-value +# Classes in this module use dictionaries as default arguments. This is +# considered dangerous by pylint because the default dictionary is shared by +# all instances. Implementations of these classes must not make any change to +# this default dictionary in __init__. + +from opentelemetry._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk._metrics.aggregation import ( + ExplicitBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) + + +class _Instrument: + def __init__( + self, + name, + unit="", + description="", + aggregation=None, + aggregation_config={}, + ): + self._attributes_aggregations = {} + self._aggregation = aggregation + self._aggregation_config = aggregation_config + aggregation(self, **aggregation_config) + + +class Counter(_Instrument, Counter): + def __init__( + self, + name, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class UpDownCounter(_Instrument, UpDownCounter): + def __init__( + self, + name, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableCounter(_Instrument, ObservableCounter): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableUpDownCounter(_Instrument, ObservableUpDownCounter): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class Histogram(_Instrument, Histogram): + def __init__( + self, + name, + unit="", + description="", + aggregation=ExplicitBucketHistogramAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableGauge(_Instrument, ObservableGauge): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=LastValueAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py new file mode 100644 index 00000000000..1c4fa1420e6 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -0,0 +1,126 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import WARNING +from math import inf +from unittest import TestCase +from unittest.mock import Mock + +from opentelemetry.sdk._metrics.aggregation import ( + ExplicitBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) + + +class TestSumAggregation(TestCase): + def test_aggregate(self): + """ + `SumAggregation` collects data for sum metric points + """ + + sum_aggregation = SumAggregation(Mock()) + + sum_aggregation.aggregate(1) + sum_aggregation.aggregate(2) + sum_aggregation.aggregate(3) + + self.assertEqual(sum_aggregation.value, 6) + + sum_aggregation = SumAggregation(Mock()) + + sum_aggregation.aggregate(1) + sum_aggregation.aggregate(-2) + sum_aggregation.aggregate(3) + + self.assertEqual(sum_aggregation.value, 2) + + +class TestLastValueAggregation(TestCase): + def test_aggregate(self): + """ + `LastValueAggregation` collects data for gauge metric points with delta + temporality + """ + + last_value_aggregation = LastValueAggregation(Mock()) + + last_value_aggregation.aggregate(1) + self.assertEqual(last_value_aggregation.value, 1) + + last_value_aggregation.aggregate(2) + self.assertEqual(last_value_aggregation.value, 2) + + last_value_aggregation.aggregate(3) + self.assertEqual(last_value_aggregation.value, 3) + + +class TestExplicitBucketHistogramAggregation(TestCase): + def test_aggregate(self): + """ + `ExplicitBucketHistogramAggregation` collects data for explicit_bucket_histogram metric points + """ + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock()) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + self.assertEqual(explicit_bucket_histogram_aggregation.value[0], -1) + self.assertEqual(explicit_bucket_histogram_aggregation.value[5], 2) + self.assertEqual(explicit_bucket_histogram_aggregation.value[10], 15) + self.assertEqual( + explicit_bucket_histogram_aggregation.value[inf], 9999 + ) + + def test_min_max(self): + """ + `record_min_max` indicates the aggregator to record the minimum and + maximum value in the population + """ + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock()) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + self.assertEqual(explicit_bucket_histogram_aggregation.min, -1) + self.assertEqual(explicit_bucket_histogram_aggregation.max, 9999) + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock(), record_min_max=False) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + with self.assertLogs(level=WARNING): + self.assertEqual(explicit_bucket_histogram_aggregation.min, inf) + + with self.assertLogs(level=WARNING): + self.assertEqual(explicit_bucket_histogram_aggregation.max, -inf)