Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Asynchronous and Synchronous sum aggregations #2379

Merged
merged 13 commits into from
Feb 11, 2022
85 changes: 36 additions & 49 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@
_logger = getLogger(__name__)


class _InstrumentMonotonicityAwareAggregation:
def __init__(self, instrument_is_monotonic: bool):
self._instrument_is_monotonic = instrument_is_monotonic
super().__init__()


class Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()
Expand All @@ -54,16 +48,27 @@ def collect(self) -> Optional[_PointVarT]:
pass


class SynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = 0
class SumAggregation(Aggregation[Sum]):
def __init__(
self,
instrument_is_monotonic: bool,
instrument_temporality: AggregationTemporality,
):
super().__init__()

self._start_time_unix_nano = _time_ns()
self._instrument_temporality = instrument_temporality
self._instrument_is_monotonic = instrument_is_monotonic

if self._instrument_temporality is AggregationTemporality.DELTA:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
self._value = 0
else:
self._value = None
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

ocelotl marked this conversation as resolved.
Show resolved Hide resolved
def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._value is None:
self._value = 0
self._value = self._value + measurement.value

def collect(self) -> Optional[Sum]:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -73,66 +78,48 @@ def collect(self) -> Optional[Sum]:
"""
now = _time_ns()

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

self._value = 0
self._start_time_unix_nano = now + 1

return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)
if self._instrument_temporality is AggregationTemporality.DELTA:

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

class AsynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = None
self._start_time_unix_nano = _time_ns()
self._value = 0
self._start_time_unix_nano = now + 1

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = measurement.value
return Sum(
aabmass marked this conversation as resolved.
Show resolved Hide resolved
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)

def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
return None

return Sum(
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=_time_ns(),
value=self._value,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=now,
value=self._value,
)


class LastValueAggregation(Aggregation[Gauge]):
def __init__(self):
super().__init__()
self._value = None
self._value = 0

def aggregate(self, measurement: Measurement):
with self._lock:
self._value = measurement.value

def collect(self) -> Optional[Gauge]:
def collect(self) -> Gauge:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
return None

ocelotl marked this conversation as resolved.
Show resolved Hide resolved
return Gauge(
time_unix_nano=_time_ns(),
value=self._value,
Expand Down Expand Up @@ -180,7 +167,7 @@ def aggregate(self, measurement: Measurement) -> None:

self._bucket_counts[bisect_left(self._boundaries, value)] += 1

def collect(self) -> Optional[Histogram]:
def collect(self) -> Histogram:
"""
Atomically return a point for the current value of the metric.
"""
Expand Down
122 changes: 41 additions & 81 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,74 @@

from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
AsynchronousSumAggregation,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SynchronousSumAggregation,
SumAggregation,
_convert_aggregation_temporality,
_InstrumentMonotonicityAwareAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Sum


class TestSynchronousSumAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
def test_aggregate_delta(self):
"""
`SynchronousSumAggregation` is aware of the instrument monotonicity
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
self.assertIsInstance(
synchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)
self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic)

synchronous_sum_aggregation = SynchronousSumAggregation(False)
self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic)
synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(2))
synchronous_sum_aggregation.aggregate(Measurement(3))

def test_aggregate(self):
self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(-2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)

def test_aggregate_cumulative(self):
"""
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(-2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)

def test_collect(self):
def test_collect_delta(self):
"""
`SynchronousSumAggregation` collects sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = synchronous_sum_aggregation.collect()
Expand All @@ -91,69 +105,25 @@ def test_collect(self):
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
)


class TestAsynchronousSumAggregation(TestCase):
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
def test_instrument_monotonicity_awareness(self):
def test_collect_cumulative(self):
"""
`AsynchronousSumAggregation` is aware of the instrument monotonicity
`SynchronousSumAggregation` collects sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
self.assertIsInstance(
asynchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
synchronous_sum_aggregation = SumAggregation(
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
True, AggregationTemporality.CUMULATIVE
)
self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic)

asynchronous_sum_aggregation = AsynchronousSumAggregation(False)
self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic)

def test_aggregate(self):
"""
`AsynchronousSumAggregation` aggregates data for sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(2))
self.assertEqual(asynchronous_sum_aggregation._value, 2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(-2))
self.assertEqual(asynchronous_sum_aggregation._value, -2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

def test_collect(self):
"""
`AsynchronousSumAggregation` collects sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

self.assertIsNone(asynchronous_sum_aggregation.collect())

asynchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = asynchronous_sum_aggregation.collect()
synchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = synchronous_sum_aggregation.collect()

self.assertEqual(first_sum.value, 1)
self.assertTrue(first_sum.is_monotonic)

asynchronous_sum_aggregation.aggregate(Measurement(1))
second_sum = asynchronous_sum_aggregation.collect()
synchronous_sum_aggregation.aggregate(Measurement(1))
second_sum = synchronous_sum_aggregation.collect()

self.assertEqual(second_sum.value, 1)
self.assertEqual(second_sum.value, 2)
self.assertTrue(second_sum.is_monotonic)

self.assertEqual(
Expand All @@ -162,16 +132,6 @@ def test_collect(self):


class TestLastValueAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
"""
`LastValueAggregation` is not aware of the instrument monotonicity
"""

sum_aggregation = LastValueAggregation()
self.assertNotIsInstance(
sum_aggregation, _InstrumentMonotonicityAwareAggregation
)

def test_aggregate(self):
"""
`LastValueAggregation` collects data for gauge metric points with delta
Expand All @@ -196,7 +156,7 @@ def test_collect(self):

last_value_aggregation = LastValueAggregation()

self.assertIsNone(last_value_aggregation.collect())
self.assertIsInstance(last_value_aggregation.collect(), Gauge)

last_value_aggregation.aggregate(Measurement(1))
first_gauge = last_value_aggregation.collect()
Expand Down