From 89514cd8d5c110175ae6fde50741bf6651d6f004 Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Thu, 24 Feb 2022 15:35:34 -0800 Subject: [PATCH] add factories for aggregations (#2482) * add factories for aggregations Fixes #2481 * fix lint * make classes/methods private * updates from feedback * remove Factory suffix * Apply suggestions from code review Co-authored-by: Diego Hurtado Co-authored-by: Diego Hurtado --- .../opentelemetry/sdk/_metrics/aggregation.py | 67 +++++++++++++- .../sdk/_metrics/metric_reader_storage.py | 12 +-- .../tests/metrics/test_aggregation.py | 89 ++++++++++++++++--- 3 files changed, 144 insertions(+), 24 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 9518434ee1e..9258cc8d7a5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -20,6 +20,12 @@ from threading import Lock from typing import Generic, List, Optional, Sequence, TypeVar +from opentelemetry._metrics.instrument import ( + Asynchronous, + Instrument, + Synchronous, + _Monotonic, +) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import ( AggregationTemporality, @@ -35,7 +41,7 @@ _logger = getLogger(__name__) -class Aggregation(ABC, Generic[_PointVarT]): +class _Aggregation(ABC, Generic[_PointVarT]): def __init__(self): self._lock = Lock() @@ -48,7 +54,7 @@ def collect(self) -> Optional[_PointVarT]: pass -class SumAggregation(Aggregation[Sum]): +class _SumAggregation(_Aggregation[Sum]): def __init__( self, instrument_is_monotonic: bool, @@ -107,7 +113,7 @@ def collect(self) -> Optional[Sum]: ) -class LastValueAggregation(Aggregation[Gauge]): +class _LastValueAggregation(_Aggregation[Gauge]): def __init__(self): super().__init__() self._value = None @@ -129,7 +135,7 @@ def collect(self) -> Optional[Gauge]: ) -class ExplicitBucketHistogramAggregation(Aggregation[Histogram]): +class _ExplicitBucketHistogramAggregation(_Aggregation[Histogram]): def __init__( self, boundaries: Sequence[float] = ( @@ -311,3 +317,56 @@ def _convert_aggregation_temporality( aggregation_temporality=aggregation_temporality, ) return None + + +class _AggregationFactory(ABC): + @abstractmethod + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + """Creates an aggregation""" + + +class ExplicitBucketHistogramAggregation(_AggregationFactory): + def __init__( + self, + boundaries: Sequence[float] = ( + 0.0, + 5.0, + 10.0, + 25.0, + 50.0, + 75.0, + 100.0, + 250.0, + 500.0, + 1000.0, + ), + record_min_max: bool = True, + ) -> None: + self._boundaries = boundaries + self._record_min_max = record_min_max + + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + return _ExplicitBucketHistogramAggregation( + boundaries=self._boundaries, + record_min_max=self._record_min_max, + ) + + +class SumAggregation(_AggregationFactory): + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + + temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + temporality = AggregationTemporality.CUMULATIVE + + return _SumAggregation( + isinstance(instrument, _Monotonic), + temporality, + ) + + +class LastValueAggregation(_AggregationFactory): + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + return _LastValueAggregation() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 18991f307c0..01724742f2c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -21,9 +21,9 @@ ) from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality, - ExplicitBucketHistogramAggregation, - LastValueAggregation, - SumAggregation, + _ExplicitBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Metric @@ -76,11 +76,11 @@ def _get_or_init_view_instrument_match( if not matches: # TODO: the logic to select aggregation could be moved if isinstance(instrument, Counter): - agg = SumAggregation(True, AggregationTemporality.DELTA) + agg = _SumAggregation(True, AggregationTemporality.DELTA) elif isinstance(instrument, Histogram): - agg = ExplicitBucketHistogramAggregation() + agg = _ExplicitBucketHistogramAggregation() else: - agg = LastValueAggregation() + agg = _LastValueAggregation() matches.append( _ViewInstrumentMatch( resource=self._sdk_config.resource, diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index cbeb9bf4acd..8263e2e6bc8 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -20,12 +20,16 @@ from unittest import TestCase from unittest.mock import Mock +from opentelemetry.sdk._metrics import instrument from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality, ExplicitBucketHistogramAggregation, LastValueAggregation, SumAggregation, _convert_aggregation_temporality, + _ExplicitBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Gauge, Histogram, Sum @@ -44,7 +48,7 @@ def test_aggregate_delta(self): `SynchronousSumAggregation` aggregates data for sum metric points """ - synchronous_sum_aggregation = SumAggregation( + synchronous_sum_aggregation = _SumAggregation( True, AggregationTemporality.DELTA ) @@ -54,7 +58,7 @@ def test_aggregate_delta(self): self.assertEqual(synchronous_sum_aggregation._value, 6) - synchronous_sum_aggregation = SumAggregation( + synchronous_sum_aggregation = _SumAggregation( True, AggregationTemporality.DELTA ) @@ -69,7 +73,7 @@ def test_aggregate_cumulative(self): `SynchronousSumAggregation` aggregates data for sum metric points """ - synchronous_sum_aggregation = SumAggregation( + synchronous_sum_aggregation = _SumAggregation( True, AggregationTemporality.CUMULATIVE ) @@ -79,7 +83,7 @@ def test_aggregate_cumulative(self): self.assertEqual(synchronous_sum_aggregation._value, 6) - synchronous_sum_aggregation = SumAggregation( + synchronous_sum_aggregation = _SumAggregation( True, AggregationTemporality.CUMULATIVE ) @@ -94,7 +98,7 @@ def test_collect_delta(self): `SynchronousSumAggregation` collects sum metric points """ - synchronous_sum_aggregation = SumAggregation( + synchronous_sum_aggregation = _SumAggregation( True, AggregationTemporality.DELTA ) @@ -119,7 +123,7 @@ def test_collect_cumulative(self): `SynchronousSumAggregation` collects sum metric points """ - sum_aggregation = SumAggregation( + sum_aggregation = _SumAggregation( True, AggregationTemporality.CUMULATIVE ) @@ -140,7 +144,7 @@ def test_collect_cumulative(self): ) self.assertIsNone( - SumAggregation(True, AggregationTemporality.CUMULATIVE).collect() + _SumAggregation(True, AggregationTemporality.CUMULATIVE).collect() ) @@ -151,7 +155,7 @@ def test_aggregate(self): temporality """ - last_value_aggregation = LastValueAggregation() + last_value_aggregation = _LastValueAggregation() last_value_aggregation.aggregate(measurement(1)) self.assertEqual(last_value_aggregation._value, 1) @@ -167,7 +171,7 @@ def test_collect(self): `LastValueAggregation` collects sum metric points """ - last_value_aggregation = LastValueAggregation() + last_value_aggregation = _LastValueAggregation() self.assertIsNone(last_value_aggregation.collect()) @@ -198,7 +202,7 @@ def test_aggregate(self): """ explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4]) + _ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4]) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -239,7 +243,7 @@ def test_min_max(self): """ explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation() + _ExplicitBucketHistogramAggregation() ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -252,7 +256,7 @@ def test_min_max(self): self.assertEqual(explicit_bucket_histogram_aggregation._max, 9999) explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(record_min_max=False) + _ExplicitBucketHistogramAggregation(record_min_max=False) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -266,11 +270,11 @@ def test_min_max(self): def test_collect(self): """ - `ExplicitBucketHistogramAggregation` collects sum metric points + `_ExplicitBucketHistogramAggregation` collects sum metric points """ explicit_bucket_histogram_aggregation = ( - ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2]) + _ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2]) ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) @@ -749,3 +753,60 @@ def test_aggregation_temporality_to_delta(self): aggregation_temporality=AggregationTemporality.DELTA, ), ) + + +class TestAggregationFactory(TestCase): + def test_sum_factory(self): + counter = instrument.Counter("name", Mock(), Mock()) + factory = SumAggregation() + aggregation = factory._create_aggregation(counter) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertTrue(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, AggregationTemporality.DELTA + ) + aggregation2 = factory._create_aggregation(counter) + self.assertNotEqual(aggregation, aggregation2) + + counter = instrument.UpDownCounter("name", Mock(), Mock()) + factory = SumAggregation() + aggregation = factory._create_aggregation(counter) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertFalse(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, AggregationTemporality.DELTA + ) + + counter = instrument.ObservableCounter("name", Mock(), Mock(), None) + factory = SumAggregation() + aggregation = factory._create_aggregation(counter) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertTrue(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, + AggregationTemporality.CUMULATIVE, + ) + + def test_explicit_bucket_histogram_factory(self): + histo = instrument.Histogram("name", Mock(), Mock()) + factory = ExplicitBucketHistogramAggregation( + boundaries=( + 0.0, + 5.0, + ), + record_min_max=False, + ) + aggregation = factory._create_aggregation(histo) + self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation) + self.assertFalse(aggregation._record_min_max) + self.assertEqual(aggregation._boundaries, (0.0, 5.0)) + aggregation2 = factory._create_aggregation(histo) + self.assertNotEqual(aggregation, aggregation2) + + def test_last_value_factory(self): + counter = instrument.Counter("name", Mock(), Mock()) + factory = LastValueAggregation() + aggregation = factory._create_aggregation(counter) + self.assertIsInstance(aggregation, _LastValueAggregation) + aggregation2 = factory._create_aggregation(counter) + self.assertNotEqual(aggregation, aggregation2)