Skip to content

Commit

Permalink
Implement temporality conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Feb 1, 2022
1 parent d71cf68 commit d2f7c99
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
54 changes: 54 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import replace
from abc import ABC, abstractmethod
from bisect import bisect_left
from logging import getLogger
Expand Down Expand Up @@ -199,3 +200,56 @@ def collect(self) -> Optional[Histogram]:
explicit_bounds=self._boundaries,
aggregation_temporality=AggregationTemporality.DELTA,
)


def _convert_aggregation_temporality(
previous_point: Optional[_PointVarT],
current_point: _PointVarT,
aggregation_temporality: int,
) -> _PointVarT:

previous_point_type = type(previous_point)
current_point_type = type(current_point)

if previous_point is not None and type(previous_point) is not type(
current_point
):
_logger.warning(
"convert_aggregation_temporality called with mismatched "
"point types: %s and %s",
previous_point_type,
current_point_type,
)

return current_point

if current_point_type is Sum:
if previous_point is None:

return replace(
current_point, aggregation_temporality=aggregation_temporality
)

if current_point.aggregation_temporality is aggregation_temporality:
return current_point

if aggregation_temporality == AggregationTemporality.DELTA:
value = current_point.value - previous_point.value

else:
value = current_point.value + previous_point.value

is_monotonic = (
previous_point.is_monotonic and current_point.is_monotonic
)

return Sum(
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
start_time_unix_nano=previous_point.start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
)

elif current_point_type is Gauge:
return current_point
43 changes: 42 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from dataclasses import replace
from math import inf
from time import sleep
from unittest import TestCase
Expand All @@ -23,8 +23,14 @@
LastValueAggregation,
SynchronousSumAggregation,
_InstrumentMonotonicityAwareAggregation,
_convert_aggregation_temporality
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import (
Gauge,
Sum,
)


class TestSynchronousSumAggregation(TestCase):
Expand Down Expand Up @@ -308,3 +314,38 @@ def test_collect(self):
self.assertGreater(
second_histogram.time_unix_nano, first_histogram.time_unix_nano
)


class TestConvertAggregationTemporality(TestCase):
"""
Test aggregation temporality conversion algorithm
"""

def test_mismatched_point_types(self):

current_point = Sum(0, 0, 0, AggregationTemporality.DELTA, False)

self.assertIs(
_convert_aggregation_temporality(
Gauge(0, 0), current_point, AggregationTemporality.DELTA
),
current_point,
)

def test_current_point_sum_previous_point_none(self):

point_arguments = (0, 0, 0, AggregationTemporality.DELTA, False)

current_point = Sum(*point_arguments)

self.assertEqual(
_convert_aggregation_temporality(
None,
current_point,
AggregationTemporality.CUMULATIVE
),
replace(
current_point,
aggregation_temporality=AggregationTemporality.CUMULATIVE
)
)

0 comments on commit d2f7c99

Please sign in to comment.