diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 18b43d7f725..256fdc668b4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import replace from abc import ABC, abstractmethod from bisect import bisect_left from logging import getLogger @@ -199,3 +200,56 @@ def collect(self) -> Optional[Histogram]: explicit_bounds=self._boundaries, aggregation_temporality=AggregationTemporality.DELTA, ) + + +def _convert_aggregation_temporality( + previous_point: Optional[_PointVarT], + current_point: _PointVarT, + aggregation_temporality: int, +) -> _PointVarT: + + previous_point_type = type(previous_point) + current_point_type = type(current_point) + + if previous_point is not None and type(previous_point) is not type( + current_point + ): + _logger.warning( + "convert_aggregation_temporality called with mismatched " + "point types: %s and %s", + previous_point_type, + current_point_type, + ) + + return current_point + + if current_point_type is Sum: + if previous_point is None: + + return replace( + current_point, aggregation_temporality=aggregation_temporality + ) + + if current_point.aggregation_temporality is aggregation_temporality: + return current_point + + if aggregation_temporality == AggregationTemporality.DELTA: + value = current_point.value - previous_point.value + + else: + value = current_point.value + previous_point.value + + is_monotonic = ( + previous_point.is_monotonic and current_point.is_monotonic + ) + + return Sum( + aggregation_temporality=aggregation_temporality, + is_monotonic=is_monotonic, + start_time_unix_nano=previous_point.start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + value=value, + ) + + elif current_point_type is Gauge: + return current_point diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 6ca443e7a98..ffffb45fef7 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from dataclasses import replace from math import inf from time import sleep from unittest import TestCase @@ -23,8 +23,14 @@ LastValueAggregation, SynchronousSumAggregation, _InstrumentMonotonicityAwareAggregation, + _convert_aggregation_temporality ) +from opentelemetry.sdk._metrics.aggregation import AggregationTemporality from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import ( + Gauge, + Sum, +) class TestSynchronousSumAggregation(TestCase): @@ -308,3 +314,38 @@ def test_collect(self): self.assertGreater( second_histogram.time_unix_nano, first_histogram.time_unix_nano ) + + +class TestConvertAggregationTemporality(TestCase): + """ + Test aggregation temporality conversion algorithm + """ + + def test_mismatched_point_types(self): + + current_point = Sum(0, 0, 0, AggregationTemporality.DELTA, False) + + self.assertIs( + _convert_aggregation_temporality( + Gauge(0, 0), current_point, AggregationTemporality.DELTA + ), + current_point, + ) + + def test_current_point_sum_previous_point_none(self): + + point_arguments = (0, 0, 0, AggregationTemporality.DELTA, False) + + current_point = Sum(*point_arguments) + + self.assertEqual( + _convert_aggregation_temporality( + None, + current_point, + AggregationTemporality.CUMULATIVE + ), + replace( + current_point, + aggregation_temporality=AggregationTemporality.CUMULATIVE + ) + )