From a821311a954f027f4ae00f87611a73706c679f47 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Tue, 10 May 2022 10:15:05 -0600 Subject: [PATCH] Add attributes to aggregation constructor parameters (#2676) Fixes #2675 --- .../_internal/_view_instrument_match.py | 8 +-- .../sdk/_metrics/_internal/aggregation.py | 56 ++++++++++++------ .../tests/metrics/test_aggregation.py | 58 ++++++++++--------- 3 files changed, 75 insertions(+), 47 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py index 3fdc65352dc..2f002a272ec 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py @@ -60,12 +60,12 @@ def __init__( ) if not isinstance(self._view._aggregation, DefaultAggregation): self._aggregation = self._view._aggregation._create_aggregation( - self._instrument + self._instrument, None ) else: self._aggregation = self._instrument_class_aggregation[ self._instrument.__class__ - ]._create_aggregation(self._instrument) + ]._create_aggregation(self._instrument, None) def conflicts(self, other: "_ViewInstrumentMatch") -> bool: # pylint: disable=protected-access @@ -113,13 +113,13 @@ def consume_measurement(self, measurement: Measurement) -> None: ): aggregation = ( self._view._aggregation._create_aggregation( - self._instrument + self._instrument, attributes ) ) else: aggregation = self._instrument_class_aggregation[ self._instrument.__class__ - ]._create_aggregation(self._instrument) + ]._create_aggregation(self._instrument, attributes) self._attributes_aggregation[attributes] = aggregation self._attributes_aggregation[attributes].aggregate(measurement) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py index 95993c663ba..1d10b3f8ead 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py @@ -39,6 +39,7 @@ ) from opentelemetry.sdk._metrics._internal.point import PointT, Sum from opentelemetry.util._time import _time_ns +from opentelemetry.util.types import Attributes _PointVarT = TypeVar("_PointVarT", bound=PointT) @@ -58,8 +59,9 @@ class AggregationTemporality(IntEnum): class _Aggregation(ABC, Generic[_PointVarT]): - def __init__(self): + def __init__(self, attributes: Attributes): self._lock = Lock() + self._attributes = attributes @abstractmethod def aggregate(self, measurement: Measurement) -> None: @@ -84,7 +86,9 @@ class Aggregation(ABC): """ @abstractmethod - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: """Creates an aggregation""" @@ -107,37 +111,43 @@ class DefaultAggregation(Aggregation): ==================================================== ==================================== """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: # pylint: disable=too-many-return-statements if isinstance(instrument, Counter): return _SumAggregation( + attributes, instrument_is_monotonic=True, instrument_temporality=AggregationTemporality.DELTA, ) if isinstance(instrument, UpDownCounter): return _SumAggregation( + attributes, instrument_is_monotonic=False, instrument_temporality=AggregationTemporality.DELTA, ) if isinstance(instrument, ObservableCounter): return _SumAggregation( + attributes, instrument_is_monotonic=True, instrument_temporality=AggregationTemporality.CUMULATIVE, ) if isinstance(instrument, ObservableUpDownCounter): return _SumAggregation( + attributes, instrument_is_monotonic=False, instrument_temporality=AggregationTemporality.CUMULATIVE, ) if isinstance(instrument, Histogram): - return _ExplicitBucketHistogramAggregation() + return _ExplicitBucketHistogramAggregation(attributes) if isinstance(instrument, ObservableGauge): - return _LastValueAggregation() + return _LastValueAggregation(attributes) raise Exception(f"Invalid instrument type {type(instrument)} found") @@ -145,10 +155,11 @@ def _create_aggregation(self, instrument: Instrument) -> _Aggregation: class _SumAggregation(_Aggregation[Sum]): def __init__( self, + attributes: Attributes, instrument_is_monotonic: bool, instrument_temporality: AggregationTemporality, ): - super().__init__() + super().__init__(attributes) self._start_time_unix_nano = _time_ns() self._instrument_temporality = instrument_temporality @@ -205,8 +216,8 @@ def collect(self) -> Optional[Sum]: class _LastValueAggregation(_Aggregation[Gauge]): - def __init__(self): - super().__init__() + def __init__(self, attributes: Attributes): + super().__init__(attributes) self._value = None def aggregate(self, measurement: Measurement): @@ -232,6 +243,7 @@ def collect(self) -> Optional[Gauge]: class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, + attributes: Attributes, boundaries: Sequence[float] = ( 0.0, 5.0, @@ -246,7 +258,7 @@ def __init__( ), record_min_max: bool = True, ): - super().__init__() + super().__init__(attributes) self._boundaries = tuple(boundaries) self._bucket_counts = self._get_empty_bucket_counts() self._min = inf @@ -464,10 +476,13 @@ def __init__( self._boundaries = boundaries self._record_min_max = record_min_max - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: return _ExplicitBucketHistogramAggregation( - boundaries=self._boundaries, - record_min_max=self._record_min_max, + attributes, + self._boundaries, + self._record_min_max, ) @@ -477,7 +492,9 @@ class SumAggregation(Aggregation): - The arithmetic sum of Measurement values. """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): @@ -486,6 +503,7 @@ def _create_aggregation(self, instrument: Instrument) -> _Aggregation: temporality = AggregationTemporality.CUMULATIVE return _SumAggregation( + attributes, isinstance(instrument, (Counter, ObservableCounter)), temporality, ) @@ -499,12 +517,16 @@ class LastValueAggregation(Aggregation): - The timestamp of the last Measurement. """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: - return _LastValueAggregation() + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: + return _LastValueAggregation(attributes) class DropAggregation(Aggregation): """Using this aggregation will make all measurements be ignored.""" - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: - return _DropAggregation() + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: + return _DropAggregation(attributes) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 6fb49fce73a..e747e49cdba 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -60,7 +60,7 @@ def test_aggregate_delta(self): """ synchronous_sum_aggregation = _SumAggregation( - True, AggregationTemporality.DELTA + True, AggregationTemporality.DELTA, Mock() ) synchronous_sum_aggregation.aggregate(measurement(1)) @@ -70,7 +70,7 @@ def test_aggregate_delta(self): self.assertEqual(synchronous_sum_aggregation._value, 6) synchronous_sum_aggregation = _SumAggregation( - True, AggregationTemporality.DELTA + Mock(), True, AggregationTemporality.DELTA ) synchronous_sum_aggregation.aggregate(measurement(1)) @@ -85,7 +85,7 @@ def test_aggregate_cumulative(self): """ synchronous_sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE + True, AggregationTemporality.CUMULATIVE, Mock() ) synchronous_sum_aggregation.aggregate(measurement(1)) @@ -95,7 +95,7 @@ def test_aggregate_cumulative(self): self.assertEqual(synchronous_sum_aggregation._value, 6) synchronous_sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE + Mock(), True, AggregationTemporality.CUMULATIVE ) synchronous_sum_aggregation.aggregate(measurement(1)) @@ -110,7 +110,7 @@ def test_collect_delta(self): """ synchronous_sum_aggregation = _SumAggregation( - True, AggregationTemporality.DELTA + Mock(), True, AggregationTemporality.DELTA ) synchronous_sum_aggregation.aggregate(measurement(1)) @@ -135,7 +135,7 @@ def test_collect_cumulative(self): """ sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE + True, AggregationTemporality.CUMULATIVE, Mock() ) sum_aggregation.aggregate(measurement(1)) @@ -167,7 +167,7 @@ def test_aggregate(self): temporality """ - last_value_aggregation = _LastValueAggregation() + last_value_aggregation = _LastValueAggregation(Mock()) last_value_aggregation.aggregate(measurement(1)) self.assertEqual(last_value_aggregation._value, 1) @@ -183,7 +183,7 @@ def test_collect(self): `LastValueAggregation` collects sum metric points """ - last_value_aggregation = _LastValueAggregation() + last_value_aggregation = _LastValueAggregation(Mock()) self.assertIsNone(last_value_aggregation.collect()) @@ -218,7 +218,7 @@ def test_aggregate(self): """ explicit_bucket_histogram_aggregation = ( - _ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4]) + _ExplicitBucketHistogramAggregation(Mock(), boundaries=[0, 2, 4]) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -259,7 +259,7 @@ def test_min_max(self): """ explicit_bucket_histogram_aggregation = ( - _ExplicitBucketHistogramAggregation() + _ExplicitBucketHistogramAggregation(Mock()) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -272,7 +272,7 @@ def test_min_max(self): self.assertEqual(explicit_bucket_histogram_aggregation._max, 9999) explicit_bucket_histogram_aggregation = ( - _ExplicitBucketHistogramAggregation(record_min_max=False) + _ExplicitBucketHistogramAggregation(Mock(), record_min_max=False) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -290,7 +290,7 @@ def test_collect(self): """ explicit_bucket_histogram_aggregation = ( - _ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2]) + _ExplicitBucketHistogramAggregation(Mock(), boundaries=[0, 1, 2]) ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) @@ -814,18 +814,18 @@ class TestAggregationFactory(TestCase): def test_sum_factory(self): counter = Counter("name", Mock(), Mock()) factory = SumAggregation() - aggregation = factory._create_aggregation(counter) + aggregation = factory._create_aggregation(counter, Mock()) self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( aggregation._instrument_temporality, AggregationTemporality.DELTA ) - aggregation2 = factory._create_aggregation(counter) + aggregation2 = factory._create_aggregation(counter, Mock()) self.assertNotEqual(aggregation, aggregation2) counter = UpDownCounter("name", Mock(), Mock()) factory = SumAggregation() - aggregation = factory._create_aggregation(counter) + aggregation = factory._create_aggregation(counter, Mock()) self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) self.assertEqual( @@ -834,7 +834,7 @@ def test_sum_factory(self): counter = ObservableCounter("name", Mock(), Mock(), None) factory = SumAggregation() - aggregation = factory._create_aggregation(counter) + aggregation = factory._create_aggregation(counter, Mock()) self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) self.assertEqual( @@ -851,19 +851,19 @@ def test_explicit_bucket_histogram_factory(self): ), record_min_max=False, ) - aggregation = factory._create_aggregation(histo) + aggregation = factory._create_aggregation(histo, Mock()) self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation) self.assertFalse(aggregation._record_min_max) self.assertEqual(aggregation._boundaries, (0.0, 5.0)) - aggregation2 = factory._create_aggregation(histo) + aggregation2 = factory._create_aggregation(histo, Mock()) self.assertNotEqual(aggregation, aggregation2) def test_last_value_factory(self): counter = Counter("name", Mock(), Mock()) factory = LastValueAggregation() - aggregation = factory._create_aggregation(counter) + aggregation = factory._create_aggregation(counter, Mock()) self.assertIsInstance(aggregation, _LastValueAggregation) - aggregation2 = factory._create_aggregation(counter) + aggregation2 = factory._create_aggregation(counter, Mock()) self.assertNotEqual(aggregation, aggregation2) @@ -875,7 +875,7 @@ def setUpClass(cls): def test_counter(self): aggregation = self.default_aggregation._create_aggregation( - Counter("name", Mock(), Mock()) + Counter("name", Mock(), Mock()), Mock() ) self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) @@ -886,7 +886,7 @@ def test_counter(self): def test_up_down_counter(self): aggregation = self.default_aggregation._create_aggregation( - UpDownCounter("name", Mock(), Mock()) + UpDownCounter("name", Mock(), Mock()), Mock() ) self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) @@ -897,7 +897,8 @@ def test_up_down_counter(self): def test_observable_counter(self): aggregation = self.default_aggregation._create_aggregation( - ObservableCounter("name", Mock(), Mock(), callbacks=[Mock()]) + ObservableCounter("name", Mock(), Mock(), callbacks=[Mock()]), + Mock(), ) self.assertIsInstance(aggregation, _SumAggregation) self.assertTrue(aggregation._instrument_is_monotonic) @@ -909,7 +910,10 @@ def test_observable_counter(self): def test_observable_up_down_counter(self): aggregation = self.default_aggregation._create_aggregation( - ObservableUpDownCounter("name", Mock(), Mock(), callbacks=[Mock()]) + ObservableUpDownCounter( + "name", Mock(), Mock(), callbacks=[Mock()] + ), + Mock(), ) self.assertIsInstance(aggregation, _SumAggregation) self.assertFalse(aggregation._instrument_is_monotonic) @@ -925,7 +929,8 @@ def test_histogram(self): "name", Mock(), Mock(), - ) + ), + Mock(), ) self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation) @@ -937,6 +942,7 @@ def test_observable_gauge(self): Mock(), Mock(), callbacks=[Mock()], - ) + ), + Mock(), ) self.assertIsInstance(aggregation, _LastValueAggregation)