Skip to content

Commit

Permalink
Merge branch 'main' into issue_2300
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Feb 11, 2022
2 parents 96c1113 + ec3053e commit d6ebc41
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 124 deletions.
78 changes: 34 additions & 44 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@
_logger = getLogger(__name__)


class _InstrumentMonotonicityAwareAggregation:
def __init__(self, instrument_is_monotonic: bool):
self._instrument_is_monotonic = instrument_is_monotonic
super().__init__()


class Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()
Expand All @@ -54,16 +48,27 @@ def collect(self) -> Optional[_PointVarT]:
pass


class SynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = 0
class SumAggregation(Aggregation[Sum]):
def __init__(
self,
instrument_is_monotonic: bool,
instrument_temporality: AggregationTemporality,
):
super().__init__()

self._start_time_unix_nano = _time_ns()
self._instrument_temporality = instrument_temporality
self._instrument_is_monotonic = instrument_is_monotonic

if self._instrument_temporality is AggregationTemporality.DELTA:
self._value = 0
else:
self._value = None

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._value is None:
self._value = 0
self._value = self._value + measurement.value

def collect(self) -> Optional[Sum]:
Expand All @@ -73,47 +78,32 @@ def collect(self) -> Optional[Sum]:
"""
now = _time_ns()

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

self._value = 0
self._start_time_unix_nano = now + 1

return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)
if self._instrument_temporality is AggregationTemporality.DELTA:

with self._lock:
value = self._value
start_time_unix_nano = self._start_time_unix_nano

class AsynchronousSumAggregation(
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
):
def __init__(self, instrument_is_monotonic: bool):
super().__init__(instrument_is_monotonic)
self._value = None
self._start_time_unix_nano = _time_ns()
self._value = 0
self._start_time_unix_nano = now + 1

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
self._value = measurement.value
return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)

def collect(self) -> Optional[Sum]:
"""
Atomically return a point for the current value of the metric.
"""
if self._value is None:
return None

return Sum(
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=_time_ns(),
value=self._value,
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=now,
value=self._value,
)


Expand Down Expand Up @@ -180,7 +170,7 @@ def aggregate(self, measurement: Measurement) -> None:

self._bucket_counts[bisect_left(self._boundaries, value)] += 1

def collect(self) -> Optional[Histogram]:
def collect(self) -> Histogram:
"""
Atomically return a point for the current value of the metric.
"""
Expand Down
125 changes: 45 additions & 80 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,74 @@

from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
AsynchronousSumAggregation,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SynchronousSumAggregation,
SumAggregation,
_convert_aggregation_temporality,
_InstrumentMonotonicityAwareAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Sum


class TestSynchronousSumAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
def test_aggregate_delta(self):
"""
`SynchronousSumAggregation` is aware of the instrument monotonicity
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
self.assertIsInstance(
synchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)
self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic)

synchronous_sum_aggregation = SynchronousSumAggregation(False)
self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic)
synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(2))
synchronous_sum_aggregation.aggregate(Measurement(3))

def test_aggregate(self):
self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(-2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)

def test_aggregate_cumulative(self):
"""
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

synchronous_sum_aggregation.aggregate(Measurement(1))
synchronous_sum_aggregation.aggregate(Measurement(-2))
synchronous_sum_aggregation.aggregate(Measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)

def test_collect(self):
def test_collect_delta(self):
"""
`SynchronousSumAggregation` collects sum metric points
"""

synchronous_sum_aggregation = SynchronousSumAggregation(True)
synchronous_sum_aggregation = SumAggregation(
True, AggregationTemporality.DELTA
)

synchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = synchronous_sum_aggregation.collect()
Expand All @@ -91,87 +105,37 @@ def test_collect(self):
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
)


class TestAsynchronousSumAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
def test_collect_cumulative(self):
"""
`AsynchronousSumAggregation` is aware of the instrument monotonicity
`SynchronousSumAggregation` collects sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
self.assertIsInstance(
asynchronous_sum_aggregation,
_InstrumentMonotonicityAwareAggregation,
sum_aggregation = SumAggregation(
True, AggregationTemporality.CUMULATIVE
)
self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic)

asynchronous_sum_aggregation = AsynchronousSumAggregation(False)
self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic)

def test_aggregate(self):
"""
`AsynchronousSumAggregation` aggregates data for sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(2))
self.assertEqual(asynchronous_sum_aggregation._value, 2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

asynchronous_sum_aggregation.aggregate(Measurement(1))
self.assertEqual(asynchronous_sum_aggregation._value, 1)

asynchronous_sum_aggregation.aggregate(Measurement(-2))
self.assertEqual(asynchronous_sum_aggregation._value, -2)

asynchronous_sum_aggregation.aggregate(Measurement(3))
self.assertEqual(asynchronous_sum_aggregation._value, 3)

def test_collect(self):
"""
`AsynchronousSumAggregation` collects sum metric points
"""

asynchronous_sum_aggregation = AsynchronousSumAggregation(True)

self.assertIsNone(asynchronous_sum_aggregation.collect())

asynchronous_sum_aggregation.aggregate(Measurement(1))
first_sum = asynchronous_sum_aggregation.collect()
sum_aggregation.aggregate(Measurement(1))
first_sum = sum_aggregation.collect()

self.assertEqual(first_sum.value, 1)
self.assertTrue(first_sum.is_monotonic)

asynchronous_sum_aggregation.aggregate(Measurement(1))
second_sum = asynchronous_sum_aggregation.collect()
sum_aggregation.aggregate(Measurement(1))
second_sum = sum_aggregation.collect()

self.assertEqual(second_sum.value, 1)
self.assertEqual(second_sum.value, 2)
self.assertTrue(second_sum.is_monotonic)

self.assertEqual(
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
)


class TestLastValueAggregation(TestCase):
def test_instrument_monotonicity_awareness(self):
"""
`LastValueAggregation` is not aware of the instrument monotonicity
"""

sum_aggregation = LastValueAggregation()
self.assertNotIsInstance(
sum_aggregation, _InstrumentMonotonicityAwareAggregation
self.assertIsNone(
SumAggregation(True, AggregationTemporality.CUMULATIVE).collect()
)


class TestLastValueAggregation(TestCase):
def test_aggregate(self):
"""
`LastValueAggregation` collects data for gauge metric points with delta
Expand Down Expand Up @@ -200,6 +164,7 @@ def test_collect(self):

last_value_aggregation.aggregate(Measurement(1))
first_gauge = last_value_aggregation.collect()
self.assertIsInstance(first_gauge, Gauge)

self.assertEqual(first_gauge.value, 1)

Expand Down

0 comments on commit d6ebc41

Please sign in to comment.