Skip to content

Commit

Permalink
Apply suggestions from review
Browse files Browse the repository at this point in the history
  • Loading branch information
fcollonval committed Sep 2, 2024
1 parent ed02f8b commit 4f5efa7
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
from opentelemetry.sdk.metrics._internal.exemplar import (
Exemplar,
ExemplarReservoirFactory,
ExemplarReservoirBuilder,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import (
Buckets,
Expand Down Expand Up @@ -88,18 +88,25 @@ class _Aggregation(ABC, Generic[_DataPointVarT]):
def __init__(
self,
attributes: Attributes,
reservoir_factory: ExemplarReservoirFactory,
reservoir_builder: ExemplarReservoirBuilder,
):
self._lock = Lock()
self._attributes = attributes
self._reservoir = reservoir_factory()
self._reservoir = reservoir_builder()
self._previous_point = None

@abstractmethod
def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True
def aggregate(
self, measurement: Measurement, should_sample_exemplar: bool = True
) -> None:
"""Aggregate a measurement.
Args:
measurement: Measurement to aggregate
should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not.
"""
pass

@abstractmethod
def collect(
self,
Expand All @@ -109,18 +116,33 @@ def collect(
pass

def _collect_exemplars(self) -> Sequence[Exemplar]:
return self._reservoir.collect(
self._attributes
)
"""Returns the collected exemplars.
def sample_exemplar(self, measurement: Measurement) -> None:
self._reservoir.offer(
Returns:
The exemplars collected by the reservoir
"""
return self._reservoir.collect(self._attributes)

def _sample_exemplar(
self, measurement: Measurement, should_sample_exemplar: bool
) -> None:
"""Offer the measurement to the exemplar reservoir for sampling.
It should be called within the each :ref:`aggregate` call.
Args:
measurement: The new measurement
should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not.
"""
if should_sample_exemplar:
self._reservoir.offer(
measurement.value,
measurement.time_unix_nano,
measurement.attributes,
measurement.context,
)



class _DropAggregation(_Aggregation):
def aggregate(
self, measurement: Measurement, should_sample_exemplar: bool = True
Expand All @@ -142,9 +164,9 @@ def __init__(
instrument_is_monotonic: bool,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
reservoir_factory: ExemplarReservoirFactory,
reservoir_builder: ExemplarReservoirBuilder,
):
super().__init__(attributes, reservoir_factory)
super().__init__(attributes, reservoir_builder)

self._start_time_unix_nano = start_time_unix_nano
self._instrument_aggregation_temporality = (
Expand All @@ -165,10 +187,8 @@ def aggregate(
self._value = 0

self._value = self._value + measurement.value

if should_sample_exemplar:
self.sample_exemplar(measurement)


self._sample_exemplar(measurement, should_sample_exemplar)

def collect(
self,
Expand Down Expand Up @@ -383,18 +403,18 @@ class _LastValueAggregation(_Aggregation[GaugePoint]):
def __init__(
self,
attributes: Attributes,
reservoir_factory: ExemplarReservoirFactory,
reservoir_builder: ExemplarReservoirBuilder,
):
super().__init__(attributes, reservoir_factory)
super().__init__(attributes, reservoir_builder)
self._value = None

def aggregate(
self, measurement: Measurement, should_sample_exemplar: bool = True
):
with self._lock:
self._value = measurement.value
if should_sample_exemplar:
self.sample_exemplar(measurement)

self._sample_exemplar(measurement, should_sample_exemplar)

def collect(
self,
Expand All @@ -410,7 +430,7 @@ def collect(
value = self._value
self._value = None

exemplars = self._collect_exemplars()
exemplars = self._collect_exemplars()

return NumberDataPoint(
attributes=self._attributes,
Expand All @@ -427,7 +447,7 @@ def __init__(
attributes: Attributes,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
reservoir_factory: ExemplarReservoirFactory,
reservoir_builder: ExemplarReservoirBuilder,
boundaries: Sequence[float] = (
0.0,
5.0,
Expand All @@ -449,8 +469,8 @@ def __init__(
):
super().__init__(
attributes,
reservoir_factory=partial(
reservoir_factory, boundaries=boundaries
reservoir_builder=partial(
reservoir_builder, boundaries=boundaries
),
)

Expand Down Expand Up @@ -494,8 +514,7 @@ def aggregate(

self._value[bisect_left(self._boundaries, measurement_value)] += 1

if should_sample_exemplar:
self.sample_exemplar(measurement)
self._sample_exemplar(measurement, should_sample_exemplar)

def collect(
self,
Expand All @@ -517,8 +536,6 @@ def collect(
self._min = inf
self._max = -inf

exemplars = self._collect_exemplars()

if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
Expand All @@ -542,7 +559,7 @@ def collect(

return HistogramDataPoint(
attributes=self._attributes,
exemplars=exemplars,
exemplars=self._collect_exemplars(),
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(value),
Expand Down Expand Up @@ -572,7 +589,7 @@ def collect(

return HistogramDataPoint(
attributes=self._attributes,
exemplars=exemplars,
exemplars=self._collect_exemplars(),
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(self._previous_value),
Expand Down Expand Up @@ -602,7 +619,7 @@ class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
reservoir_factory: ExemplarReservoirFactory,
reservoir_builder: ExemplarReservoirBuilder,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
# This is the default maximum number of buckets per positive or
Expand Down Expand Up @@ -648,8 +665,8 @@ def __init__(

super().__init__(
attributes,
reservoir_factory=partial(
reservoir_factory, size=min(20, max_size)
reservoir_builder=partial(
reservoir_builder, size=min(20, max_size)
),
)

Expand Down Expand Up @@ -794,8 +811,7 @@ def aggregate(
# in _ExplicitBucketHistogramAggregation.aggregate
value.increment_bucket(bucket_index)

if should_sample_exemplar:
self.sample_exemplar(measurement)
self._sample_exemplar(measurement, should_sample_exemplar)

def collect(
self,
Expand Down Expand Up @@ -826,8 +842,6 @@ def collect(
self._zero_count = 0
self._scale = None

exemplars = self._collect_exemplars()

if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
Expand All @@ -851,7 +865,7 @@ def collect(

return ExponentialHistogramDataPoint(
attributes=self._attributes,
exemplars=exemplars,
exemplars=self._collect_exemplars(),
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=count,
Expand Down Expand Up @@ -1015,7 +1029,7 @@ def collect(

return ExponentialHistogramDataPoint(
attributes=self._attributes,
exemplars=exemplars,
exemplars=self._collect_exemplars(),
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=self._previous_count,
Expand Down Expand Up @@ -1187,7 +1201,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand Down Expand Up @@ -1218,7 +1232,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand All @@ -1227,7 +1241,7 @@ def _create_aggregation(
if isinstance(instrument, Counter):
return _SumAggregation(
attributes,
reservoir_factory=reservoir_factory(_SumAggregation),
reservoir_builder=reservoir_factory(_SumAggregation),
instrument_is_monotonic=True,
instrument_aggregation_temporality=(
AggregationTemporality.DELTA
Expand All @@ -1237,7 +1251,7 @@ def _create_aggregation(
if isinstance(instrument, UpDownCounter):
return _SumAggregation(
attributes,
reservoir_factory=reservoir_factory(_SumAggregation),
reservoir_builder=reservoir_factory(_SumAggregation),
instrument_is_monotonic=False,
instrument_aggregation_temporality=(
AggregationTemporality.DELTA
Expand All @@ -1248,7 +1262,7 @@ def _create_aggregation(
if isinstance(instrument, ObservableCounter):
return _SumAggregation(
attributes,
reservoir_factory=reservoir_factory(_SumAggregation),
reservoir_builder=reservoir_factory(_SumAggregation),
instrument_is_monotonic=True,
instrument_aggregation_temporality=(
AggregationTemporality.CUMULATIVE
Expand All @@ -1259,7 +1273,7 @@ def _create_aggregation(
if isinstance(instrument, ObservableUpDownCounter):
return _SumAggregation(
attributes,
reservoir_factory=reservoir_factory(_SumAggregation),
reservoir_builder=reservoir_factory(_SumAggregation),
instrument_is_monotonic=False,
instrument_aggregation_temporality=(
AggregationTemporality.CUMULATIVE
Expand All @@ -1270,7 +1284,7 @@ def _create_aggregation(
if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation(
attributes,
reservoir_factory=reservoir_factory(
reservoir_builder=reservoir_factory(
_ExplicitBucketHistogramAggregation
),
instrument_aggregation_temporality=(
Expand All @@ -1282,13 +1296,13 @@ def _create_aggregation(
if isinstance(instrument, ObservableGauge):
return _LastValueAggregation(
attributes,
reservoir_factory=reservoir_factory(_LastValueAggregation),
reservoir_builder=reservoir_factory(_LastValueAggregation),
)

if isinstance(instrument, _Gauge):
return _LastValueAggregation(
attributes,
reservoir_factory=reservoir_factory(_LastValueAggregation),
reservoir_builder=reservoir_factory(_LastValueAggregation),
)

# pylint: disable=broad-exception-raised
Expand All @@ -1309,7 +1323,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand Down Expand Up @@ -1375,7 +1389,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand Down Expand Up @@ -1409,7 +1423,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand Down Expand Up @@ -1444,13 +1458,13 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
return _LastValueAggregation(
attributes,
reservoir_factory=reservoir_factory(_LastValueAggregation),
reservoir_builder=reservoir_factory(_LastValueAggregation),
)


Expand All @@ -1462,7 +1476,7 @@ def _create_aggregation(
instrument: Instrument,
attributes: Attributes,
reservoir_factory: Callable[
[Type[_Aggregation]], ExemplarReservoirFactory
[Type[_Aggregation]], ExemplarReservoirBuilder
],
start_time_unix_nano: int,
) -> _Aggregation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .exemplar_reservoir import (
AlignedHistogramBucketExemplarReservoir,
ExemplarReservoir,
ExemplarReservoirFactory,
ExemplarReservoirBuilder,
SimpleFixedSizeExemplarReservoir,
)

Expand All @@ -34,6 +34,6 @@
"TraceBasedExemplarFilter",
"AlignedHistogramBucketExemplarReservoir",
"ExemplarReservoir",
"ExemplarReservoirFactory",
"ExemplarReservoirBuilder",
"SimpleFixedSizeExemplarReservoir",
]
Loading

0 comments on commit 4f5efa7

Please sign in to comment.