diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 5048d146b49..9f1bee7a388 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -14,6 +14,7 @@ from abc import ABC, abstractmethod from bisect import bisect_left +from dataclasses import replace from logging import getLogger from math import inf from threading import Lock @@ -200,3 +201,80 @@ def collect(self) -> Optional[Histogram]: aggregation_temporality=AggregationTemporality.DELTA, sum=self._sum, ) + + +def _convert_aggregation_temporality( + previous_point: Optional[_PointVarT], + current_point: _PointVarT, + aggregation_temporality: AggregationTemporality, +) -> _PointVarT: + """Converts `current_point` to the requested `aggregation_temporality` + given the `previous_point`. + + `previous_point` must have `CUMULATIVE` temporality. `current_point` may + have `DELTA` or `CUMULATIVE` temporality. + + The output point will have temporality `aggregation_temporality`. Since + `GAUGE` points have no temporality, they are returned unchanged. + """ + + current_point_type = type(current_point) + + if current_point_type is Gauge: + return current_point + + if previous_point is not None and type(previous_point) is not type( + current_point + ): + _logger.warning( + "convert_aggregation_temporality called with mismatched " + "point types: %s and %s", + type(previous_point), + current_point_type, + ) + + return current_point + + if current_point_type is Sum: + if previous_point is None: + # Output CUMULATIVE for a synchronous instrument + # There is no previous value, return the delta point as a + # cumulative + return replace( + current_point, aggregation_temporality=aggregation_temporality + ) + if previous_point.aggregation_temporality is not ( + AggregationTemporality.CUMULATIVE + ): + raise Exception( + "previous_point aggregation temporality must be CUMULATIVE" + ) + + if current_point.aggregation_temporality is aggregation_temporality: + # Output DELTA for a synchronous instrument + # Output CUMULATIVE for an asynchronous instrument + return current_point + + if aggregation_temporality is AggregationTemporality.DELTA: + # Output temporality DELTA for an asynchronous instrument + value = current_point.value - previous_point.value + output_start_time_unix_nano = previous_point.time_unix_nano + + else: + # Output CUMULATIVE for a synchronous instrument + value = current_point.value + previous_point.value + output_start_time_unix_nano = previous_point.start_time_unix_nano + + is_monotonic = ( + previous_point.is_monotonic and current_point.is_monotonic + ) + + return Sum( + start_time_unix_nano=output_start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + value=value, + aggregation_temporality=aggregation_temporality, + is_monotonic=is_monotonic, + ) + + return None diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 535d8d44d56..cd3bbea765b 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -12,19 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from dataclasses import replace +from logging import WARNING from math import inf from time import sleep from unittest import TestCase from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, AsynchronousSumAggregation, ExplicitBucketHistogramAggregation, LastValueAggregation, SynchronousSumAggregation, + _convert_aggregation_temporality, _InstrumentMonotonicityAwareAggregation, ) from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import Gauge, Sum class TestSynchronousSumAggregation(TestCase): @@ -311,3 +315,299 @@ def test_collect(self): self.assertGreater( second_histogram.time_unix_nano, first_histogram.time_unix_nano ) + + +class TestConvertAggregationTemporality(TestCase): + """ + Test aggregation temporality conversion algorithm + """ + + def test_previous_point_non_cumulative(self): + + with self.assertRaises(Exception): + + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + AggregationTemporality.DELTA, + ), + + def test_mismatched_point_types(self): + + current_point = Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ) + + with self.assertLogs(level=WARNING): + self.assertIs( + _convert_aggregation_temporality( + Gauge(time_unix_nano=0, value=0), + current_point, + AggregationTemporality.DELTA, + ), + current_point, + ) + + def test_current_point_sum_previous_point_none(self): + + current_point = Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ) + + self.assertEqual( + _convert_aggregation_temporality( + None, current_point, AggregationTemporality.CUMULATIVE + ), + replace( + current_point, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + ), + ) + + def test_current_point_sum_current_point_same_aggregation_temporality( + self, + ): + + current_point = Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + current_point, + AggregationTemporality.DELTA, + ), + current_point, + ) + + current_point = Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=0, + time_unix_nano=0, + value=0, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + current_point, + AggregationTemporality.CUMULATIVE, + ), + current_point, + ) + + def test_current_point_sum_aggregation_temporality_delta(self): + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + AggregationTemporality.DELTA, + ), + Sum( + start_time_unix_nano=2, + time_unix_nano=5, + value=3, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + AggregationTemporality.DELTA, + ), + Sum( + start_time_unix_nano=2, + time_unix_nano=5, + value=3, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + AggregationTemporality.DELTA, + ), + Sum( + start_time_unix_nano=2, + time_unix_nano=5, + value=3, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=True, + ), + ) + + def test_current_point_sum_aggregation_temporality_cumulative(self): + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + AggregationTemporality.CUMULATIVE, + ), + Sum( + start_time_unix_nano=1, + time_unix_nano=5, + value=9, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=False, + ), + AggregationTemporality.CUMULATIVE, + ), + Sum( + start_time_unix_nano=1, + time_unix_nano=5, + value=9, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=False, + ), + ) + + self.assertEqual( + _convert_aggregation_temporality( + Sum( + start_time_unix_nano=1, + time_unix_nano=2, + value=3, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + Sum( + start_time_unix_nano=4, + time_unix_nano=5, + value=6, + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=True, + ), + AggregationTemporality.CUMULATIVE, + ), + Sum( + start_time_unix_nano=1, + time_unix_nano=5, + value=9, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + + def test_current_point_gauge(self): + + current_point = Gauge(time_unix_nano=1, value=0) + self.assertEqual( + _convert_aggregation_temporality( + Gauge(time_unix_nano=0, value=0), + current_point, + AggregationTemporality.CUMULATIVE, + ), + current_point, + )