diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index db0b4563dda..842d98b34e5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -34,12 +34,6 @@ _logger = getLogger(__name__) -class _InstrumentMonotonicityAwareAggregation: - def __init__(self, instrument_is_monotonic: bool): - self._instrument_is_monotonic = instrument_is_monotonic - super().__init__() - - class Aggregation(ABC, Generic[_PointVarT]): def __init__(self): self._lock = Lock() @@ -53,57 +47,56 @@ def collect(self) -> Optional[_PointVarT]: pass -class SynchronousSumAggregation( - _InstrumentMonotonicityAwareAggregation, Aggregation[Sum] -): - def __init__(self, instrument_is_monotonic: bool): - super().__init__(instrument_is_monotonic) - self._value = 0 +class SumAggregation(Aggregation[Sum]): + def __init__( + self, + instrument_is_monotonic: bool, + instrument_temporality: AggregationTemporality, + ): + super().__init__() + self._instrument_is_monotonic = instrument_is_monotonic + + if instrument_temporality is AggregationTemporality.DELTA: + self._value = 0 + + else: + self._value = None + self._start_time_unix_nano = _time_ns() + self._instrument_temporality = instrument_temporality def aggregate(self, measurement: Measurement) -> None: - with self._lock: - self._value = self._value + measurement.value + + if self._instrument_temporality is AggregationTemporality.DELTA: + with self._lock: + self._value = self._value + measurement.value + else: + with self._lock: + self._value = measurement.value def collect(self) -> Optional[Sum]: """ Atomically return a point for the current value of the metric and reset the aggregation value. """ - now = _time_ns() - - with self._lock: - value = self._value - start_time_unix_nano = self._start_time_unix_nano + if self._instrument_temporality is AggregationTemporality.DELTA: + now = _time_ns() - self._value = 0 - self._start_time_unix_nano = now + 1 + with self._lock: + value = self._value + start_time_unix_nano = self._start_time_unix_nano - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) + self._value = 0 + self._start_time_unix_nano = now + 1 + return Sum( + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=self._instrument_is_monotonic, + start_time_unix_nano=start_time_unix_nano, + time_unix_nano=now, + value=value, + ) -class AsynchronousSumAggregation( - _InstrumentMonotonicityAwareAggregation, Aggregation[Sum] -): - def __init__(self, instrument_is_monotonic: bool): - super().__init__(instrument_is_monotonic) - self._value = None - self._start_time_unix_nano = _time_ns() - - def aggregate(self, measurement: Measurement) -> None: - with self._lock: - self._value = measurement.value - - def collect(self) -> Optional[Sum]: - """ - Atomically return a point for the current value of the metric. - """ if self._value is None: return None diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index bc8858b4744..cae1df091b2 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -18,37 +18,23 @@ from unittest import TestCase from opentelemetry.sdk._metrics.aggregation import ( - AsynchronousSumAggregation, + AggregationTemporality, ExplicitBucketHistogramAggregation, LastValueAggregation, - SynchronousSumAggregation, - _InstrumentMonotonicityAwareAggregation, + SumAggregation, ) from opentelemetry.sdk._metrics.measurement import Measurement class TestSynchronousSumAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): - """ - `SynchronousSumAggregation` is aware of the instrument monotonicity - """ - - synchronous_sum_aggregation = SynchronousSumAggregation(True) - self.assertIsInstance( - synchronous_sum_aggregation, - _InstrumentMonotonicityAwareAggregation, - ) - self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic) - - synchronous_sum_aggregation = SynchronousSumAggregation(False) - self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic) - def test_aggregate(self): """ `SynchronousSumAggregation` aggregates data for sum metric points """ - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA + ) synchronous_sum_aggregation.aggregate(Measurement(1)) synchronous_sum_aggregation.aggregate(Measurement(2)) @@ -56,7 +42,9 @@ def test_aggregate(self): self.assertEqual(synchronous_sum_aggregation._value, 6) - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA + ) synchronous_sum_aggregation.aggregate(Measurement(1)) synchronous_sum_aggregation.aggregate(Measurement(-2)) @@ -69,7 +57,9 @@ def test_collect(self): `SynchronousSumAggregation` collects sum metric points """ - synchronous_sum_aggregation = SynchronousSumAggregation(True) + synchronous_sum_aggregation = SumAggregation( + True, AggregationTemporality.DELTA + ) synchronous_sum_aggregation.aggregate(Measurement(1)) first_sum = synchronous_sum_aggregation.collect() @@ -88,86 +78,7 @@ def test_collect(self): ) -class TestAsynchronousSumAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): - """ - `AsynchronousSumAggregation` is aware of the instrument monotonicity - """ - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - self.assertIsInstance( - asynchronous_sum_aggregation, - _InstrumentMonotonicityAwareAggregation, - ) - self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic) - - asynchronous_sum_aggregation = AsynchronousSumAggregation(False) - self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic) - - def test_aggregate(self): - """ - `AsynchronousSumAggregation` aggregates data for sum metric points - """ - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - self.assertEqual(asynchronous_sum_aggregation._value, 1) - - asynchronous_sum_aggregation.aggregate(Measurement(2)) - self.assertEqual(asynchronous_sum_aggregation._value, 2) - - asynchronous_sum_aggregation.aggregate(Measurement(3)) - self.assertEqual(asynchronous_sum_aggregation._value, 3) - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - self.assertEqual(asynchronous_sum_aggregation._value, 1) - - asynchronous_sum_aggregation.aggregate(Measurement(-2)) - self.assertEqual(asynchronous_sum_aggregation._value, -2) - - asynchronous_sum_aggregation.aggregate(Measurement(3)) - self.assertEqual(asynchronous_sum_aggregation._value, 3) - - def test_collect(self): - """ - `AsynchronousSumAggregation` collects sum metric points - """ - - asynchronous_sum_aggregation = AsynchronousSumAggregation(True) - - self.assertIsNone(asynchronous_sum_aggregation.collect()) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - first_sum = asynchronous_sum_aggregation.collect() - - self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) - - asynchronous_sum_aggregation.aggregate(Measurement(1)) - second_sum = asynchronous_sum_aggregation.collect() - - self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) - - self.assertEqual( - second_sum.start_time_unix_nano, first_sum.start_time_unix_nano - ) - - class TestLastValueAggregation(TestCase): - def test_instrument_monotonicity_awareness(self): - """ - `LastValueAggregation` is not aware of the instrument monotonicity - """ - - sum_aggregation = LastValueAggregation() - self.assertNotIsInstance( - sum_aggregation, _InstrumentMonotonicityAwareAggregation - ) - def test_aggregate(self): """ `LastValueAggregation` collects data for gauge metric points with delta