Skip to content

Commit

Permalink
Add aggregation temporality conversion algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Jan 18, 2022
1 parent ced22b9 commit e794769
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 20 deletions.
13 changes: 7 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import replace
from abc import ABC, abstractmethod
from bisect import bisect_left
from dataclasses import replace
from logging import getLogger
from math import inf
from threading import Lock
Expand Down Expand Up @@ -208,7 +208,6 @@ def _convert_aggregation_temporality(
aggregation_temporality: int,
) -> _PointVarT:

previous_point_type = type(previous_point)
current_point_type = type(current_point)

if previous_point is not None and type(previous_point) is not type(
Expand All @@ -217,7 +216,7 @@ def _convert_aggregation_temporality(
_logger.warning(
"convert_aggregation_temporality called with mismatched "
"point types: %s and %s",
previous_point_type,
type(previous_point),
current_point_type,
)

Expand All @@ -244,12 +243,14 @@ def _convert_aggregation_temporality(
)

return Sum(
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
start_time_unix_nano=previous_point.start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
)

elif current_point_type is Gauge:
if current_point_type is Gauge:
return current_point

return None
267 changes: 253 additions & 14 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
from unittest import TestCase

from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
AsynchronousSumAggregation,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SynchronousSumAggregation,
_convert_aggregation_temporality,
_InstrumentMonotonicityAwareAggregation,
_convert_aggregation_temporality
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import (
Gauge,
Sum,
)
from opentelemetry.sdk._metrics.point import Gauge, Sum


class TestSynchronousSumAggregation(TestCase):
Expand Down Expand Up @@ -323,7 +320,13 @@ class TestConvertAggregationTemporality(TestCase):

def test_mismatched_point_types(self):

current_point = Sum(0, 0, 0, AggregationTemporality.DELTA, False)
current_point = Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
)

self.assertIs(
_convert_aggregation_temporality(
Expand All @@ -334,18 +337,254 @@ def test_mismatched_point_types(self):

def test_current_point_sum_previous_point_none(self):

point_arguments = (0, 0, 0, AggregationTemporality.DELTA, False)
current_point = Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
)

self.assertEqual(
_convert_aggregation_temporality(
None, current_point, AggregationTemporality.CUMULATIVE
),
replace(
current_point,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
),
)

def test_current_point_sum_current_point_same_aggregation_temporality(
self,
):

current_point = Sum(*point_arguments)
current_point = Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
)

self.assertEqual(
_convert_aggregation_temporality(
None,
Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
current_point,
AggregationTemporality.CUMULATIVE
AggregationTemporality.DELTA,
),
replace(
current_point,
)

current_point = Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
)

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=0,
time_unix_nano=0,
value=0,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
current_point,
aggregation_temporality=AggregationTemporality.CUMULATIVE
)
AggregationTemporality.CUMULATIVE,
),
current_point,
)

def test_current_point_sum_aggregation_temporality_delta(self):

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
AggregationTemporality.DELTA,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
)

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
AggregationTemporality.DELTA,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
)

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
),
AggregationTemporality.DELTA,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
),
)

def test_current_point_sum_aggregation_temporality_cumulative(self):

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
AggregationTemporality.CUMULATIVE,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=9,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
)

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=False,
),
AggregationTemporality.CUMULATIVE,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=9,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=False,
),
)

self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
),
Sum(
start_time_unix_nano=4,
time_unix_nano=5,
value=6,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
),
AggregationTemporality.CUMULATIVE,
),
Sum(
start_time_unix_nano=1,
time_unix_nano=5,
value=9,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
),
)

def test_current_point_gauge(self):

current_point = (Gauge(0, 0),)
self.assertEqual(
_convert_aggregation_temporality(
Sum(
start_time_unix_nano=1,
time_unix_nano=2,
value=3,
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
),
current_point,
AggregationTemporality.CUMULATIVE,
),
current_point,
)

0 comments on commit e794769

Please sign in to comment.