From ec3053efd835d1776112447808712ba52f9c70ad Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 11 Feb 2022 14:24:38 -0600 Subject: [PATCH] Merge Asynchronous and Synchronous sum aggregations (#2379) * Merge Asynchronous and Synchronous sum aggregations Fixes #2353 * Don't use None for self._value * Remove temporality check from sum aggregate * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Aaron Abbott * Add cumulative test cases * Fix lint * Set value to None This is done in order to identify the situation when aggregate has not been called before collect is. * Refactor collect * Fix initial value * Update opentelemetry-sdk/tests/metrics/test_aggregation.py Co-authored-by: Aaron Abbott * Undo changes to lastvalueaggrgation * Fix test case Co-authored-by: Aaron Abbott --- .../opentelemetry/sdk/_metrics/aggregation.py | 78 +++++------ .../tests/metrics/test_aggregation.py | 125 +++++++----------- 2 files changed, 79 insertions(+), 124 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 9f1bee7a388..b2341d9063d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -35,12 +35,6 @@ _logger = getLogger(__name__) -class _InstrumentMonotonicityAwareAggregation: - def __init__(self, instrument_is_monotonic: bool): - self._instrument_is_monotonic = instrument_is_monotonic - super().__init__() - - class Aggregation(ABC, Generic[_PointVarT]): def __init__(self): self._lock = Lock() @@ -54,16 +48,27 @@ def collect(self) -> Optional[_PointVarT]: pass -class SynchronousSumAggregation( - _InstrumentMonotonicityAwareAggregation, Aggregation[Sum] -): - def __init__(self, instrument_is_monotonic: bool): - super().__init__(instrument_is_monotonic) - self._value = 0 +class SumAggregation(Aggregation[Sum]): + def __init__( + self, + instrument_is_monotonic: bool, + instrument_temporality: AggregationTemporality, + ): + super().__init__() + self._start_time_unix_nano = _time_ns() + self._instrument_temporality = instrument_temporality + self._instrument_is_monotonic = instrument_is_monotonic + + if self._instrument_temporality is AggregationTemporality.DELTA: + self._value = 0 + else: + self._value = None def aggregate(self, measurement: Measurement) -> None: with self._lock: + if self._value is None: + self._value = 0 self._value = self._value + measurement.value def collect(self) -> Optional[Sum]: @@ -73,47 +78,32 @@ def collect(self) -> Optional[Sum]: """ now = _time_ns() - with self._lock: - value = self._value - start_time_unix_nano = self._start_time_unix_nano - - self._value = 0 - self._start_time_unix_nano = now + 1 - - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) + if self._instrument_temporality is AggregationTemporality.DELTA: + with self._lock: + value = self._value + start_time_unix_nano = self._start_time_unix_nano -class AsynchronousSumAggregation( - _InstrumentMonotonicityAwareAggregation, Aggregation[Sum] -): - def __init__(self, instrument_is_monotonic: bool): - super().__init__(instrument_is_monotonic) - self._value = None - self._start_time_unix_nano = _time_ns() + self._value = 0 + self._start_time_unix_nano = now + 1 - def aggregate(self, measurement: Measurement) -> None: - with self._lock: - self._value = measurement.value + return Sum( + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=self._instrument_is_monotonic, + start_time_unix_nano=start_time_unix_nano, + time_unix_nano=now, + value=value, + ) - def collect(self) -> Optional[Sum]: - """ - Atomically return a point for the current value of the metric. - """ if self._value is None: return None return Sum( - start_time_unix_nano=self._start_time_unix_nano, - time_unix_nano=_time_ns(), - value=self._value, aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=self._instrument_is_monotonic, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=now, + value=self._value, ) @@ -180,7 +170,7 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> Optional[Histogram]: + def collect(self) -> Histogram: """ Atomically return a point for the current value of the metric. """ diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index cd3bbea765b..f6d3ce3ca8c 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -20,39 +20,49 @@ from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality, - AsynchronousSumAggregation, ExplicitBucketHistogramAggregation, LastValueAggregation, - SynchronousSumAggregation, + SumAggregation, _convert_aggregation_temporality, - _InstrumentMonotonicityAwareAggregation, ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Gauge, Sum class TestSynchronousSumAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): + def test_aggregate_delta(self): """ - `SynchronousSumAggregation` is aware of the instrument monotonicity + `SynchronousSumAggregation` aggregates data for sum metric points """ - synchronous_sum_aggregation = SynchronousSumAggregation(True) - self.assertIsInstance( - synchronous_sum_aggregation, - _InstrumentMonotonicityAwareAggregation, + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA ) - self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic) - synchronous_sum_aggregation = SynchronousSumAggregation(False) - self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic) + synchronous_sum_aggregation.aggregate(Measurement(1)) + synchronous_sum_aggregation.aggregate(Measurement(2)) + synchronous_sum_aggregation.aggregate(Measurement(3)) - def test_aggregate(self): + self.assertEqual(synchronous_sum_aggregation._value, 6) + + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA + ) + + synchronous_sum_aggregation.aggregate(Measurement(1)) + synchronous_sum_aggregation.aggregate(Measurement(-2)) + synchronous_sum_aggregation.aggregate(Measurement(3)) + + self.assertEqual(synchronous_sum_aggregation._value, 2) + + def test_aggregate_cumulative(self): """ `SynchronousSumAggregation` aggregates data for sum metric points """ - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.CUMULATIVE + ) synchronous_sum_aggregation.aggregate(Measurement(1)) synchronous_sum_aggregation.aggregate(Measurement(2)) @@ -60,7 +70,9 @@ def test_aggregate(self): self.assertEqual(synchronous_sum_aggregation._value, 6) - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.CUMULATIVE + ) synchronous_sum_aggregation.aggregate(Measurement(1)) synchronous_sum_aggregation.aggregate(Measurement(-2)) @@ -68,12 +80,14 @@ def test_aggregate(self): self.assertEqual(synchronous_sum_aggregation._value, 2) - def test_collect(self): + def test_collect_delta(self): """ `SynchronousSumAggregation` collects sum metric points """ - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA + ) synchronous_sum_aggregation.aggregate(Measurement(1)) first_sum = synchronous_sum_aggregation.collect() @@ -91,87 +105,37 @@ def test_collect(self): second_sum.start_time_unix_nano, first_sum.start_time_unix_nano ) - -class TestAsynchronousSumAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): + def test_collect_cumulative(self): """ - `AsynchronousSumAggregation` is aware of the instrument monotonicity + `SynchronousSumAggregation` collects sum metric points """ - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - self.assertIsInstance( - asynchronous_sum_aggregation, - _InstrumentMonotonicityAwareAggregation, + sum_aggregation = SumAggregation( + True, AggregationTemporality.CUMULATIVE ) - self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic) - - asynchronous_sum_aggregation = AsynchronousSumAggregation(False) - self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic) - - def test_aggregate(self): - """ - `AsynchronousSumAggregation` aggregates data for sum metric points - """ - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - self.assertEqual(asynchronous_sum_aggregation._value, 1) - - asynchronous_sum_aggregation.aggregate(Measurement(2)) - self.assertEqual(asynchronous_sum_aggregation._value, 2) - - asynchronous_sum_aggregation.aggregate(Measurement(3)) - self.assertEqual(asynchronous_sum_aggregation._value, 3) - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - self.assertEqual(asynchronous_sum_aggregation._value, 1) - - asynchronous_sum_aggregation.aggregate(Measurement(-2)) - self.assertEqual(asynchronous_sum_aggregation._value, -2) - - asynchronous_sum_aggregation.aggregate(Measurement(3)) - self.assertEqual(asynchronous_sum_aggregation._value, 3) - - def test_collect(self): - """ - `AsynchronousSumAggregation` collects sum metric points - """ - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - self.assertIsNone(asynchronous_sum_aggregation.collect()) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - first_sum = asynchronous_sum_aggregation.collect() + sum_aggregation.aggregate(Measurement(1)) + first_sum = sum_aggregation.collect() self.assertEqual(first_sum.value, 1) self.assertTrue(first_sum.is_monotonic) - asynchronous_sum_aggregation.aggregate(Measurement(1)) - second_sum = asynchronous_sum_aggregation.collect() + sum_aggregation.aggregate(Measurement(1)) + second_sum = sum_aggregation.collect() - self.assertEqual(second_sum.value, 1) + self.assertEqual(second_sum.value, 2) self.assertTrue(second_sum.is_monotonic) self.assertEqual( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano ) - -class TestLastValueAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): - """ - `LastValueAggregation` is not aware of the instrument monotonicity - """ - - sum_aggregation = LastValueAggregation() - self.assertNotIsInstance( - sum_aggregation, _InstrumentMonotonicityAwareAggregation + self.assertIsNone( + SumAggregation(True, AggregationTemporality.CUMULATIVE).collect() ) + +class TestLastValueAggregation(TestCase): def test_aggregate(self): """ `LastValueAggregation` collects data for gauge metric points with delta @@ -200,6 +164,7 @@ def test_collect(self): last_value_aggregation.aggregate(Measurement(1)) first_gauge = last_value_aggregation.collect() + self.assertIsInstance(first_gauge, Gauge) self.assertEqual(first_gauge.value, 1)