Skip to content

Commit

Permalink
Merge Asynchronous and Synchronous sum aggregations
Browse files Browse the repository at this point in the history
Fixes #2353
  • Loading branch information
ocelotl committed Jan 15, 2022
1 parent fb1063c commit 011e60e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 144 deletions.
81 changes: 37 additions & 44 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
_logger = getLogger(__name__)


class _InstrumentMonotonicityAwareAggregation:
def __init__(self, instrument_is_monotonic: bool):
self._instrument_is_monotonic = instrument_is_monotonic
super().__init__()


class Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()
Expand All @@ -53,57 +47,56 @@ def collect(self) -> Optional[_PointVarT]:
pass


class SynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = 0
class SumAggregation(Aggregation[Sum]):
def __init__(
self,
instrument_is_monotonic: bool,
instrument_temporality: AggregationTemporality,
):
super().__init__()
self._instrument_is_monotonic = instrument_is_monotonic

if instrument_temporality is AggregationTemporality.DELTA:
self._value = 0

else:
self._value = None

self._start_time_unix_nano = _time_ns()
self._instrument_temporality = instrument_temporality

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = self._value + measurement.value

if self._instrument_temporality is AggregationTemporality.DELTA:
with self._lock:
self._value = self._value + measurement.value
else:
with self._lock:
self._value = measurement.value

def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric and
reset the aggregation value.
"""
now = _time_ns()

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano
if self._instrument_temporality is AggregationTemporality.DELTA:
now = _time_ns()

self._value = 0
self._start_time_unix_nano = now + 1
with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)
self._value = 0
self._start_time_unix_nano = now + 1

return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)

class AsynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = None
self._start_time_unix_nano = _time_ns()

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = measurement.value

def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
return None

Expand Down
111 changes: 11 additions & 100 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,33 @@
from unittest import TestCase

from opentelemetry.sdk._metrics.aggregation import (
AsynchronousSumAggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SynchronousSumAggregation,
_InstrumentMonotonicityAwareAggregation,
SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement


class TestSynchronousSumAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
"""
`SynchronousSumAggregation` is aware of the instrument monotonicity
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
self.assertIsInstance(
synchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
)
self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic)

synchronous_sum_aggregation = SynchronousSumAggregation(False)
self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic)

def test_aggregate(self):
"""
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(-2))
Expand All @@ -69,7 +57,9 @@ def test_collect(self):
`SynchronousSumAggregation` collects sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = synchronous_sum_aggregation.collect()
Expand All @@ -88,86 +78,7 @@ def test_collect(self):
)


class TestAsynchronousSumAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
"""
`AsynchronousSumAggregation` is aware of the instrument monotonicity
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
self.assertIsInstance(
asynchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
)
self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic)

asynchronous_sum_aggregation = AsynchronousSumAggregation(False)
self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic)

def test_aggregate(self):
"""
`AsynchronousSumAggregation` aggregates data for sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(2))
self.assertEqual(asynchronous_sum_aggregation._value, 2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(-2))
self.assertEqual(asynchronous_sum_aggregation._value, -2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

def test_collect(self):
"""
`AsynchronousSumAggregation` collects sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

self.assertIsNone(asynchronous_sum_aggregation.collect())

asynchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = asynchronous_sum_aggregation.collect()

self.assertEqual(first_sum.value, 1)
self.assertTrue(first_sum.is_monotonic)

asynchronous_sum_aggregation.aggregate(Measurement(1))
second_sum = asynchronous_sum_aggregation.collect()

self.assertEqual(second_sum.value, 1)
self.assertTrue(second_sum.is_monotonic)

self.assertEqual(
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
)


class TestLastValueAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
"""
`LastValueAggregation` is not aware of the instrument monotonicity
"""

sum_aggregation = LastValueAggregation()
self.assertNotIsInstance(
sum_aggregation, _InstrumentMonotonicityAwareAggregation
)

def test_aggregate(self):
"""
`LastValueAggregation` collects data for gauge metric points with delta
Expand Down

0 comments on commit 011e60e

Please sign in to comment.