Skip to content

Commit

Permalink
add factories for aggregations (#2482)
Browse files Browse the repository at this point in the history
* add factories for aggregations

Fixes #2481

* fix lint

* make classes/methods private

* updates from feedback

* remove Factory suffix

* Apply suggestions from code review

Co-authored-by: Diego Hurtado <[email protected]>

Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
codeboten and ocelotl authored Feb 24, 2022
1 parent cd4c5e7 commit 89514cd
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 24 deletions.
67 changes: 63 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
from threading import Lock
from typing import Generic, List, Optional, Sequence, TypeVar

from opentelemetry._metrics.instrument import (
Asynchronous,
Instrument,
Synchronous,
_Monotonic,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Expand All @@ -35,7 +41,7 @@
_logger = getLogger(__name__)


class Aggregation(ABC, Generic[_PointVarT]):
class _Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
self._lock = Lock()

Expand All @@ -48,7 +54,7 @@ def collect(self) -> Optional[_PointVarT]:
pass


class SumAggregation(Aggregation[Sum]):
class _SumAggregation(_Aggregation[Sum]):
def __init__(
self,
instrument_is_monotonic: bool,
Expand Down Expand Up @@ -107,7 +113,7 @@ def collect(self) -> Optional[Sum]:
)


class LastValueAggregation(Aggregation[Gauge]):
class _LastValueAggregation(_Aggregation[Gauge]):
def __init__(self):
super().__init__()
self._value = None
Expand All @@ -129,7 +135,7 @@ def collect(self) -> Optional[Gauge]:
)


class ExplicitBucketHistogramAggregation(Aggregation[Histogram]):
class _ExplicitBucketHistogramAggregation(_Aggregation[Histogram]):
def __init__(
self,
boundaries: Sequence[float] = (
Expand Down Expand Up @@ -311,3 +317,56 @@ def _convert_aggregation_temporality(
aggregation_temporality=aggregation_temporality,
)
return None


class _AggregationFactory(ABC):
@abstractmethod
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
"""Creates an aggregation"""


class ExplicitBucketHistogramAggregation(_AggregationFactory):
def __init__(
self,
boundaries: Sequence[float] = (
0.0,
5.0,
10.0,
25.0,
50.0,
75.0,
100.0,
250.0,
500.0,
1000.0,
),
record_min_max: bool = True,
) -> None:
self._boundaries = boundaries
self._record_min_max = record_min_max

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _ExplicitBucketHistogramAggregation(
boundaries=self._boundaries,
record_min_max=self._record_min_max,
)


class SumAggregation(_AggregationFactory):
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:

temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
temporality = AggregationTemporality.CUMULATIVE

return _SumAggregation(
isinstance(instrument, _Monotonic),
temporality,
)


class LastValueAggregation(_AggregationFactory):
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _LastValueAggregation()
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
)
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SumAggregation,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
Expand Down Expand Up @@ -76,11 +76,11 @@ def _get_or_init_view_instrument_match(
if not matches:
# TODO: the logic to select aggregation could be moved
if isinstance(instrument, Counter):
agg = SumAggregation(True, AggregationTemporality.DELTA)
agg = _SumAggregation(True, AggregationTemporality.DELTA)
elif isinstance(instrument, Histogram):
agg = ExplicitBucketHistogramAggregation()
agg = _ExplicitBucketHistogramAggregation()
else:
agg = LastValueAggregation()
agg = _LastValueAggregation()
matches.append(
_ViewInstrumentMatch(
resource=self._sdk_config.resource,
Expand Down
89 changes: 75 additions & 14 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
from unittest import TestCase
from unittest.mock import Mock

from opentelemetry.sdk._metrics import instrument
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SumAggregation,
_convert_aggregation_temporality,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Histogram, Sum
Expand All @@ -44,7 +48,7 @@ def test_aggregate_delta(self):
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SumAggregation(
synchronous_sum_aggregation = _SumAggregation(
True, AggregationTemporality.DELTA
)

Expand All @@ -54,7 +58,7 @@ def test_aggregate_delta(self):

self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SumAggregation(
synchronous_sum_aggregation = _SumAggregation(
True, AggregationTemporality.DELTA
)

Expand All @@ -69,7 +73,7 @@ def test_aggregate_cumulative(self):
`SynchronousSumAggregation` aggregates data for sum metric points
"""

synchronous_sum_aggregation = SumAggregation(
synchronous_sum_aggregation = _SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

Expand All @@ -79,7 +83,7 @@ def test_aggregate_cumulative(self):

self.assertEqual(synchronous_sum_aggregation._value, 6)

synchronous_sum_aggregation = SumAggregation(
synchronous_sum_aggregation = _SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

Expand All @@ -94,7 +98,7 @@ def test_collect_delta(self):
`SynchronousSumAggregation` collects sum metric points
"""

synchronous_sum_aggregation = SumAggregation(
synchronous_sum_aggregation = _SumAggregation(
True, AggregationTemporality.DELTA
)

Expand All @@ -119,7 +123,7 @@ def test_collect_cumulative(self):
`SynchronousSumAggregation` collects sum metric points
"""

sum_aggregation = SumAggregation(
sum_aggregation = _SumAggregation(
True, AggregationTemporality.CUMULATIVE
)

Expand All @@ -140,7 +144,7 @@ def test_collect_cumulative(self):
)

self.assertIsNone(
SumAggregation(True, AggregationTemporality.CUMULATIVE).collect()
_SumAggregation(True, AggregationTemporality.CUMULATIVE).collect()
)


Expand All @@ -151,7 +155,7 @@ def test_aggregate(self):
temporality
"""

last_value_aggregation = LastValueAggregation()
last_value_aggregation = _LastValueAggregation()

last_value_aggregation.aggregate(measurement(1))
self.assertEqual(last_value_aggregation._value, 1)
Expand All @@ -167,7 +171,7 @@ def test_collect(self):
`LastValueAggregation` collects sum metric points
"""

last_value_aggregation = LastValueAggregation()
last_value_aggregation = _LastValueAggregation()

self.assertIsNone(last_value_aggregation.collect())

Expand Down Expand Up @@ -198,7 +202,7 @@ def test_aggregate(self):
"""

explicit_bucket_histogram_aggregation = (
ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4])
_ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4])
)

explicit_bucket_histogram_aggregation.aggregate(measurement(-1))
Expand Down Expand Up @@ -239,7 +243,7 @@ def test_min_max(self):
"""

explicit_bucket_histogram_aggregation = (
ExplicitBucketHistogramAggregation()
_ExplicitBucketHistogramAggregation()
)

explicit_bucket_histogram_aggregation.aggregate(measurement(-1))
Expand All @@ -252,7 +256,7 @@ def test_min_max(self):
self.assertEqual(explicit_bucket_histogram_aggregation._max, 9999)

explicit_bucket_histogram_aggregation = (
ExplicitBucketHistogramAggregation(record_min_max=False)
_ExplicitBucketHistogramAggregation(record_min_max=False)
)

explicit_bucket_histogram_aggregation.aggregate(measurement(-1))
Expand All @@ -266,11 +270,11 @@ def test_min_max(self):

def test_collect(self):
"""
`ExplicitBucketHistogramAggregation` collects sum metric points
`_ExplicitBucketHistogramAggregation` collects sum metric points
"""

explicit_bucket_histogram_aggregation = (
ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2])
_ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2])
)

explicit_bucket_histogram_aggregation.aggregate(measurement(1))
Expand Down Expand Up @@ -749,3 +753,60 @@ def test_aggregation_temporality_to_delta(self):
aggregation_temporality=AggregationTemporality.DELTA,
),
)


class TestAggregationFactory(TestCase):
def test_sum_factory(self):
counter = instrument.Counter("name", Mock(), Mock())
factory = SumAggregation()
aggregation = factory._create_aggregation(counter)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertTrue(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality, AggregationTemporality.DELTA
)
aggregation2 = factory._create_aggregation(counter)
self.assertNotEqual(aggregation, aggregation2)

counter = instrument.UpDownCounter("name", Mock(), Mock())
factory = SumAggregation()
aggregation = factory._create_aggregation(counter)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertFalse(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality, AggregationTemporality.DELTA
)

counter = instrument.ObservableCounter("name", Mock(), Mock(), None)
factory = SumAggregation()
aggregation = factory._create_aggregation(counter)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertTrue(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality,
AggregationTemporality.CUMULATIVE,
)

def test_explicit_bucket_histogram_factory(self):
histo = instrument.Histogram("name", Mock(), Mock())
factory = ExplicitBucketHistogramAggregation(
boundaries=(
0.0,
5.0,
),
record_min_max=False,
)
aggregation = factory._create_aggregation(histo)
self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation)
self.assertFalse(aggregation._record_min_max)
self.assertEqual(aggregation._boundaries, (0.0, 5.0))
aggregation2 = factory._create_aggregation(histo)
self.assertNotEqual(aggregation, aggregation2)

def test_last_value_factory(self):
counter = instrument.Counter("name", Mock(), Mock())
factory = LastValueAggregation()
aggregation = factory._create_aggregation(counter)
self.assertIsInstance(aggregation, _LastValueAggregation)
aggregation2 = factory._create_aggregation(counter)
self.assertNotEqual(aggregation, aggregation2)

0 comments on commit 89514cd

Please sign in to comment.