diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 21fc500fac..271e256449 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -37,7 +37,7 @@ ) from opentelemetry.sdk.metrics._internal.exemplar import ( Exemplar, - ExemplarReservoirFactory, + ExemplarReservoirBuilder, ) from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( Buckets, @@ -88,18 +88,25 @@ class _Aggregation(ABC, Generic[_DataPointVarT]): def __init__( self, attributes: Attributes, - reservoir_factory: ExemplarReservoirFactory, + reservoir_builder: ExemplarReservoirBuilder, ): self._lock = Lock() self._attributes = attributes - self._reservoir = reservoir_factory() + self._reservoir = reservoir_builder() self._previous_point = None @abstractmethod - def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True + def aggregate( + self, measurement: Measurement, should_sample_exemplar: bool = True ) -> None: + """Aggregate a measurement. + + Args: + measurement: Measurement to aggregate + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ pass - + @abstractmethod def collect( self, @@ -109,18 +116,33 @@ def collect( pass def _collect_exemplars(self) -> Sequence[Exemplar]: - return self._reservoir.collect( - self._attributes - ) + """Returns the collected exemplars. - def sample_exemplar(self, measurement: Measurement) -> None: - self._reservoir.offer( + Returns: + The exemplars collected by the reservoir + """ + return self._reservoir.collect(self._attributes) + + def _sample_exemplar( + self, measurement: Measurement, should_sample_exemplar: bool + ) -> None: + """Offer the measurement to the exemplar reservoir for sampling. + + It should be called within the each :ref:`aggregate` call. + + Args: + measurement: The new measurement + should_sample_exemplar: Whether the measurement should be sampled by the exemplars reservoir or not. + """ + if should_sample_exemplar: + self._reservoir.offer( measurement.value, measurement.time_unix_nano, measurement.attributes, measurement.context, ) - + + class _DropAggregation(_Aggregation): def aggregate( self, measurement: Measurement, should_sample_exemplar: bool = True @@ -142,9 +164,9 @@ def __init__( instrument_is_monotonic: bool, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, - reservoir_factory: ExemplarReservoirFactory, + reservoir_builder: ExemplarReservoirBuilder, ): - super().__init__(attributes, reservoir_factory) + super().__init__(attributes, reservoir_builder) self._start_time_unix_nano = start_time_unix_nano self._instrument_aggregation_temporality = ( @@ -165,10 +187,8 @@ def aggregate( self._value = 0 self._value = self._value + measurement.value - - if should_sample_exemplar: - self.sample_exemplar(measurement) - + + self._sample_exemplar(measurement, should_sample_exemplar) def collect( self, @@ -383,9 +403,9 @@ class _LastValueAggregation(_Aggregation[GaugePoint]): def __init__( self, attributes: Attributes, - reservoir_factory: ExemplarReservoirFactory, + reservoir_builder: ExemplarReservoirBuilder, ): - super().__init__(attributes, reservoir_factory) + super().__init__(attributes, reservoir_builder) self._value = None def aggregate( @@ -393,8 +413,8 @@ def aggregate( ): with self._lock: self._value = measurement.value - if should_sample_exemplar: - self.sample_exemplar(measurement) + + self._sample_exemplar(measurement, should_sample_exemplar) def collect( self, @@ -410,7 +430,7 @@ def collect( value = self._value self._value = None - exemplars = self._collect_exemplars() + exemplars = self._collect_exemplars() return NumberDataPoint( attributes=self._attributes, @@ -427,7 +447,7 @@ def __init__( attributes: Attributes, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, - reservoir_factory: ExemplarReservoirFactory, + reservoir_builder: ExemplarReservoirBuilder, boundaries: Sequence[float] = ( 0.0, 5.0, @@ -449,8 +469,8 @@ def __init__( ): super().__init__( attributes, - reservoir_factory=partial( - reservoir_factory, boundaries=boundaries + reservoir_builder=partial( + reservoir_builder, boundaries=boundaries ), ) @@ -494,8 +514,7 @@ def aggregate( self._value[bisect_left(self._boundaries, measurement_value)] += 1 - if should_sample_exemplar: - self.sample_exemplar(measurement) + self._sample_exemplar(measurement, should_sample_exemplar) def collect( self, @@ -517,8 +536,6 @@ def collect( self._min = inf self._max = -inf - exemplars = self._collect_exemplars() - if ( self._instrument_aggregation_temporality is AggregationTemporality.DELTA @@ -542,7 +559,7 @@ def collect( return HistogramDataPoint( attributes=self._attributes, - exemplars=exemplars, + exemplars=self._collect_exemplars(), start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, count=sum(value), @@ -572,7 +589,7 @@ def collect( return HistogramDataPoint( attributes=self._attributes, - exemplars=exemplars, + exemplars=self._collect_exemplars(), start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, count=sum(self._previous_value), @@ -602,7 +619,7 @@ class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, - reservoir_factory: ExemplarReservoirFactory, + reservoir_builder: ExemplarReservoirBuilder, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, # This is the default maximum number of buckets per positive or @@ -648,8 +665,8 @@ def __init__( super().__init__( attributes, - reservoir_factory=partial( - reservoir_factory, size=min(20, max_size) + reservoir_builder=partial( + reservoir_builder, size=min(20, max_size) ), ) @@ -794,8 +811,7 @@ def aggregate( # in _ExplicitBucketHistogramAggregation.aggregate value.increment_bucket(bucket_index) - if should_sample_exemplar: - self.sample_exemplar(measurement) + self._sample_exemplar(measurement, should_sample_exemplar) def collect( self, @@ -826,8 +842,6 @@ def collect( self._zero_count = 0 self._scale = None - exemplars = self._collect_exemplars() - if ( self._instrument_aggregation_temporality is AggregationTemporality.DELTA @@ -851,7 +865,7 @@ def collect( return ExponentialHistogramDataPoint( attributes=self._attributes, - exemplars=exemplars, + exemplars=self._collect_exemplars(), start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, count=count, @@ -1015,7 +1029,7 @@ def collect( return ExponentialHistogramDataPoint( attributes=self._attributes, - exemplars=exemplars, + exemplars=self._collect_exemplars(), start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, count=self._previous_count, @@ -1187,7 +1201,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: @@ -1218,7 +1232,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: @@ -1227,7 +1241,7 @@ def _create_aggregation( if isinstance(instrument, Counter): return _SumAggregation( attributes, - reservoir_factory=reservoir_factory(_SumAggregation), + reservoir_builder=reservoir_factory(_SumAggregation), instrument_is_monotonic=True, instrument_aggregation_temporality=( AggregationTemporality.DELTA @@ -1237,7 +1251,7 @@ def _create_aggregation( if isinstance(instrument, UpDownCounter): return _SumAggregation( attributes, - reservoir_factory=reservoir_factory(_SumAggregation), + reservoir_builder=reservoir_factory(_SumAggregation), instrument_is_monotonic=False, instrument_aggregation_temporality=( AggregationTemporality.DELTA @@ -1248,7 +1262,7 @@ def _create_aggregation( if isinstance(instrument, ObservableCounter): return _SumAggregation( attributes, - reservoir_factory=reservoir_factory(_SumAggregation), + reservoir_builder=reservoir_factory(_SumAggregation), instrument_is_monotonic=True, instrument_aggregation_temporality=( AggregationTemporality.CUMULATIVE @@ -1259,7 +1273,7 @@ def _create_aggregation( if isinstance(instrument, ObservableUpDownCounter): return _SumAggregation( attributes, - reservoir_factory=reservoir_factory(_SumAggregation), + reservoir_builder=reservoir_factory(_SumAggregation), instrument_is_monotonic=False, instrument_aggregation_temporality=( AggregationTemporality.CUMULATIVE @@ -1270,7 +1284,7 @@ def _create_aggregation( if isinstance(instrument, Histogram): return _ExplicitBucketHistogramAggregation( attributes, - reservoir_factory=reservoir_factory( + reservoir_builder=reservoir_factory( _ExplicitBucketHistogramAggregation ), instrument_aggregation_temporality=( @@ -1282,13 +1296,13 @@ def _create_aggregation( if isinstance(instrument, ObservableGauge): return _LastValueAggregation( attributes, - reservoir_factory=reservoir_factory(_LastValueAggregation), + reservoir_builder=reservoir_factory(_LastValueAggregation), ) if isinstance(instrument, _Gauge): return _LastValueAggregation( attributes, - reservoir_factory=reservoir_factory(_LastValueAggregation), + reservoir_builder=reservoir_factory(_LastValueAggregation), ) # pylint: disable=broad-exception-raised @@ -1309,7 +1323,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: @@ -1375,7 +1389,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: @@ -1409,7 +1423,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: @@ -1444,13 +1458,13 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: return _LastValueAggregation( attributes, - reservoir_factory=reservoir_factory(_LastValueAggregation), + reservoir_builder=reservoir_factory(_LastValueAggregation), ) @@ -1462,7 +1476,7 @@ def _create_aggregation( instrument: Instrument, attributes: Attributes, reservoir_factory: Callable[ - [Type[_Aggregation]], ExemplarReservoirFactory + [Type[_Aggregation]], ExemplarReservoirBuilder ], start_time_unix_nano: int, ) -> _Aggregation: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py index f3032c5d1e..ee93dd1827 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py @@ -22,7 +22,7 @@ from .exemplar_reservoir import ( AlignedHistogramBucketExemplarReservoir, ExemplarReservoir, - ExemplarReservoirFactory, + ExemplarReservoirBuilder, SimpleFixedSizeExemplarReservoir, ) @@ -34,6 +34,6 @@ "TraceBasedExemplarFilter", "AlignedHistogramBucketExemplarReservoir", "ExemplarReservoir", - "ExemplarReservoirFactory", + "ExemplarReservoirBuilder", "SimpleFixedSizeExemplarReservoir", ] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py index bbb26d3ed0..0e090f9e35 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_filter.py @@ -38,7 +38,7 @@ def should_sample( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> bool: """Returns whether or not a reservoir should attempt to filter a measurement. @@ -46,7 +46,7 @@ def should_sample( value: The value of the measurement timestamp: A timestamp that best represents when the measurement was taken attributes: The complete set of measurement attributes - ctx: The Context of the measurement + context: The Context of the measurement """ raise NotImplementedError( "ExemplarFilter.should_sample is not implemented" @@ -65,7 +65,7 @@ def should_sample( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> bool: """Returns whether or not a reservoir should attempt to filter a measurement. @@ -73,7 +73,7 @@ def should_sample( value: The value of the measurement timestamp: A timestamp that best represents when the measurement was taken attributes: The complete set of measurement attributes - ctx: The Context of the measurement + context: The Context of the measurement """ return True @@ -92,7 +92,7 @@ def should_sample( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> bool: """Returns whether or not a reservoir should attempt to filter a measurement. @@ -100,7 +100,7 @@ def should_sample( value: The value of the measurement timestamp: A timestamp that best represents when the measurement was taken attributes: The complete set of measurement attributes - ctx: The Context of the measurement + context: The Context of the measurement """ return False @@ -118,7 +118,7 @@ def should_sample( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> bool: """Returns whether or not a reservoir should attempt to filter a measurement. @@ -126,9 +126,9 @@ def should_sample( value: The value of the measurement timestamp: A timestamp that best represents when the measurement was taken attributes: The complete set of measurement attributes - ctx: The Context of the measurement + context: The Context of the measurement """ - span = trace.get_current_span(ctx) + span = trace.get_current_span(context) if span == INVALID_SPAN: return False return span.get_span_context().trace_flags.sampled diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py index 9d356f8738..2ce5d2461a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -42,9 +42,16 @@ def offer( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> None: - """Offers a measurement to be sampled.""" + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ raise NotImplementedError("ExemplarReservoir.offer is not implemented") @abstractmethod @@ -79,13 +86,20 @@ def offer( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> None: - """Offers a measurement to be sampled.""" + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ self.__value = value self.__time_unix_nano = time_unix_nano self.__attributes = attributes - span = trace.get_current_span(ctx) + span = trace.get_current_span(context) if span != INVALID_SPAN: span_context = span.get_span_context() self.__span_id = span_context.span_id @@ -97,8 +111,10 @@ def collect(self, point_attributes: Attributes) -> Exemplar | None: """May return an Exemplar and resets the bucket for the next sampling period.""" if not self.__offered: return None - + # filters out attributes from the measurement that are already included in the metric data point + # See the specification for more details: + # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar filtered_attributes = ( { k: v @@ -120,6 +136,7 @@ def collect(self, point_attributes: Attributes) -> Exemplar | None: return exemplar def __reset(self) -> None: + """Reset the bucket state after a collection cycle.""" self.__value = 0 self.__attributes = {} self.__time_unix_nano = 0 @@ -128,6 +145,10 @@ def __reset(self) -> None: self.__offered = False +class BucketIndexError(ValueError): + """An exception raised when the bucket index cannot be found.""" + + class FixedSizeExemplarReservoirABC(ExemplarReservoir): """Abstract class for a reservoir with fixed size.""" @@ -160,9 +181,32 @@ def collect(self, point_attributes: Attributes) -> list[Exemplar]: self._reset() return [*exemplars] - def _reset(self) -> None: - """Reset the reservoir by resetting any stateful logic after a collection cycle.""" - pass + def offer( + self, + value: Union[int, float], + time_unix_nano: int, + attributes: Attributes, + context: Context, + ) -> None: + """Offers a measurement to be sampled. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + """ + try: + index = self._find_bucket_index( + value, time_unix_nano, attributes, context + ) + + self._reservoir_storage[index].offer( + value, time_unix_nano, attributes, context + ) + except BucketIndexError: + # Ignore invalid bucket index + pass @abstractmethod def _find_bucket_index( @@ -170,14 +214,31 @@ def _find_bucket_index( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> int: - """ - Determines the bucket index for the given measurement. - Should be implemented by subclasses based on specific strategies. + """Determines the bucket index for the given measurement. + + It should be implemented by subclasses based on specific strategies. + + Args: + value: Measured value + time_unix_nano: Measurement instant + attributes: Measurement attributes + context: Measurement context + + Returns: + The bucket index + + Raises: + BucketIndexError: If no bucket index can be found. """ pass + def _reset(self) -> None: + """Reset the reservoir by resetting any stateful logic after a collection cycle.""" + pass + + class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC): """This reservoir uses an uniformly-weighted sampling algorithm based on the number of samples the reservoir has seen so far to determine if the offered measurements @@ -195,34 +256,22 @@ def _reset(self) -> None: super()._reset() self._measurements_seen = 0 - def offer( - self, - value: Union[int, float], - time_unix_nano: int, - attributes: Attributes, - ctx: Context, - ) -> None: - """Offers a measurement to be sampled.""" - index = self._find_bucket_index(value, time_unix_nano, attributes, ctx) - if index != -1: - self._reservoir_storage[index].offer( - value, time_unix_nano, attributes, ctx - ) - def _find_bucket_index( self, value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> int: + self._measurements_seen += 1 if self._measurements_seen < self._size: - self._measurements_seen += 1 return self._measurements_seen - 1 index = randrange(0, self._measurements_seen) - self._measurements_seen += 1 - return index if index < self._size else -1 + if index < self._size: + return index + + raise BucketIndexError("Unable to find the bucket index.") class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC): @@ -243,12 +292,14 @@ def offer( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> None: """Offers a measurement to be sampled.""" - index = self._find_bucket_index(value, time_unix_nano, attributes, ctx) + index = self._find_bucket_index( + value, time_unix_nano, attributes, context + ) self._reservoir_storage[index].offer( - value, time_unix_nano, attributes, ctx + value, time_unix_nano, attributes, context ) def _find_bucket_index( @@ -256,7 +307,7 @@ def _find_bucket_index( value: Union[int, float], time_unix_nano: int, attributes: Attributes, - ctx: Context, + context: Context, ) -> int: for i, boundary in enumerate(self._boundaries): if value <= boundary: @@ -264,10 +315,10 @@ def _find_bucket_index( return len(self._boundaries) -ExemplarReservoirFactory: TypeAlias = Callable[ +ExemplarReservoirBuilder: TypeAlias = Callable[ [dict[str, Any]], ExemplarReservoir ] -ExemplarReservoirFactory.__doc__ = """ExemplarReservoir factory. +ExemplarReservoirBuilder.__doc__ = """ExemplarReservoir builder. It may receive the Aggregation parameters it is bounded to; e.g. the _ExplicitBucketHistogramAggregation will provide the boundaries. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py index 8d913cf8d0..e9176af33e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py @@ -27,7 +27,7 @@ ) from opentelemetry.sdk.metrics._internal.exemplar import ( AlignedHistogramBucketExemplarReservoir, - ExemplarReservoirFactory, + ExemplarReservoirBuilder, SimpleFixedSizeExemplarReservoir, ) @@ -36,7 +36,7 @@ def _default_reservoir_factory( aggregationType: Type[_Aggregation], -) -> ExemplarReservoirFactory: +) -> ExemplarReservoirBuilder: """Default reservoir factory per aggregation.""" if issubclass(aggregationType, _ExplicitBucketHistogramAggregation): return AlignedHistogramBucketExemplarReservoir @@ -115,7 +115,7 @@ def __init__( attribute_keys: Optional[Set[str]] = None, aggregation: Optional[Aggregation] = None, exemplar_reservoir_factory: Optional[ - Callable[[Type[_Aggregation]], ExemplarReservoirFactory] + Callable[[Type[_Aggregation]], ExemplarReservoirBuilder] ] = None, instrument_unit: Optional[str] = None, ): diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 7b5809af4b..4c3f1518d9 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -50,7 +50,7 @@ from opentelemetry.util.types import Attributes from opentelemetry.sdk.metrics._internal.exemplar import ( AlignedHistogramBucketExemplarReservoir, - ExemplarReservoirFactory, + ExemplarReservoirBuilder, SimpleFixedSizeExemplarReservoir, ) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 6d80a752bf..c2ad05e21c 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -48,13 +48,13 @@ from opentelemetry.sdk.metrics._internal.exemplar import ( AlignedHistogramBucketExemplarReservoir, ExemplarReservoir, - ExemplarReservoirFactory, + ExemplarReservoirBuilder, SimpleFixedSizeExemplarReservoir ) from typing import Callable, Optional, Set, Type, Any, Sequence -def generalized_reservoir_factory(size: int = 1, boundaries: Sequence[float] = None) -> Callable[[Type[_Aggregation]], ExemplarReservoirFactory]: - def factory(aggregationType: Type[_Aggregation]) -> ExemplarReservoirFactory: +def generalized_reservoir_factory(size: int = 1, boundaries: Sequence[float] = None) -> Callable[[Type[_Aggregation]], ExemplarReservoirBuilder]: + def factory(aggregationType: Type[_Aggregation]) -> ExemplarReservoirBuilder: if issubclass(aggregationType, _ExplicitBucketHistogramAggregation): return lambda **kwargs: AlignedHistogramBucketExemplarReservoir(boundaries=boundaries or [], **{k: v for k, v in kwargs.items() if k != 'boundaries'}) else: