From 05ebc34baed6716c550807e611aab6036e3e7adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Mon, 12 Aug 2024 16:01:48 +0200 Subject: [PATCH] First propagation of filter and reservoir factory --- .../src/opentelemetry/sdk/metrics/__init__.py | 3 + .../sdk/metrics/_internal/__init__.py | 3 + .../_internal/_view_instrument_match.py | 6 +- .../sdk/metrics/_internal/aggregation.py | 97 ++++++++++++++----- .../metrics/_internal/exemplar/__init__.py | 4 + .../_internal/exemplar/exemplar_reservoir.py | 26 +++-- .../metrics/_internal/measurement_consumer.py | 2 +- .../_internal/metric_reader_storage.py | 4 +- .../metrics/_internal/sdk_configuration.py | 1 + .../sdk/metrics/_internal/view.py | 29 +++++- 10 files changed, 138 insertions(+), 37 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index a907a289760..5f66331305d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -15,6 +15,7 @@ from opentelemetry.sdk.metrics._internal import Meter, MeterProvider from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.exemplar import ExemplarFilter, ExemplarReservoir from opentelemetry.sdk.metrics._internal.instrument import Counter from opentelemetry.sdk.metrics._internal.instrument import Gauge as _Gauge from opentelemetry.sdk.metrics._internal.instrument import ( @@ -26,6 +27,8 @@ ) __all__ = [ + "ExemplarFilter", + "ExemplarReservoir", "Meter", "MeterProvider", "MetricsTimeoutError", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 9dc95c0edb8..a0eb87e2fd6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -35,6 +35,7 @@ from opentelemetry.metrics import _Gauge as APIGauge from opentelemetry.sdk.environment_variables import OTEL_SDK_DISABLED from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError +from opentelemetry.sdk.metrics._internal.exemplar import ExemplarFilter, TraceBasedExemplarFilter from opentelemetry.sdk.metrics._internal.instrument import ( _Counter, _Gauge, @@ -381,6 +382,7 @@ def __init__( "opentelemetry.sdk.metrics.export.MetricReader" ] = (), resource: Resource = None, + exemplar_filter: Optional[ExemplarFilter] = None, shutdown_on_exit: bool = True, views: Sequence["opentelemetry.sdk.metrics.view.View"] = (), ): @@ -390,6 +392,7 @@ def __init__( if resource is None: resource = Resource.create({}) self._sdk_config = SdkConfiguration( + exemplar_filter = TraceBasedExemplarFilter() if exemplar_filter is None else exemplar_filter, resource=resource, metric_readers=metric_readers, views=views, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py index 7dd7f58f272..b527f0e5df5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py @@ -81,7 +81,7 @@ def conflicts(self, other: "_ViewInstrumentMatch") -> bool: return result # pylint: disable=protected-access - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: if self._view._attribute_keys is not None: @@ -107,6 +107,7 @@ def consume_measurement(self, measurement: Measurement) -> None: self._view._aggregation._create_aggregation( self._instrument, attributes, + self._view._exemplar_reservoir_factory, self._start_time_unix_nano, ) ) @@ -116,11 +117,12 @@ def consume_measurement(self, measurement: Measurement) -> None: ]._create_aggregation( self._instrument, attributes, + self._view._exemplar_reservoir_factory, self._start_time_unix_nano, ) self._attributes_aggregation[aggr_key] = aggregation - self._attributes_aggregation[aggr_key].aggregate(measurement) + self._attributes_aggregation[aggr_key].aggregate(measurement, should_sample_exemplar) def collect( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 62ac967bbec..07535029aae 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -17,10 +17,11 @@ from abc import ABC, abstractmethod from bisect import bisect_left from enum import IntEnum +from functools import partial from logging import getLogger from math import inf from threading import Lock -from typing import Generic, List, Optional, Sequence, TypeVar +from typing import Callable, Generic, List, Optional, Sequence, Type, TypeVar from opentelemetry.metrics import ( Asynchronous, @@ -34,6 +35,7 @@ UpDownCounter, _Gauge, ) +from opentelemetry.sdk.metrics._internal.exemplar import Exemplar, ExemplarReservoirFactory from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( Buckets, ) @@ -80,14 +82,20 @@ class AggregationTemporality(IntEnum): class _Aggregation(ABC, Generic[_DataPointVarT]): - def __init__(self, attributes: Attributes): + def __init__(self, attributes: Attributes, reservoir_factory: ExemplarReservoirFactory): self._lock = Lock() self._attributes = attributes + self._reservoir = reservoir_factory() self._previous_point = None - @abstractmethod - def aggregate(self, measurement: Measurement) -> None: - pass + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: + if should_sample_exemplar: + self._reservoir.offer( + measurement.value, + measurement.time_unix_nano, + measurement.attributes, + measurement.context, + ) @abstractmethod def collect( @@ -97,9 +105,14 @@ def collect( ) -> Optional[_DataPointVarT]: pass + def _collect_exemplars(self) -> Sequence[Exemplar]: + return self._reservoir.collect( + self._attributes + ) # FIXME provide filtered data point attributes + class _DropAggregation(_Aggregation): - def aggregate(self, measurement: Measurement) -> None: + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: pass def collect( @@ -117,13 +130,12 @@ def __init__( instrument_is_monotonic: bool, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, + reservoir_factory: ExemplarReservoirFactory, ): - super().__init__(attributes) + super().__init__(attributes, reservoir_factory) self._start_time_unix_nano = start_time_unix_nano - self._instrument_aggregation_temporality = ( - instrument_aggregation_temporality - ) + self._instrument_aggregation_temporality = instrument_aggregation_temporality self._instrument_is_monotonic = instrument_is_monotonic self._value = None @@ -131,13 +143,15 @@ def __init__( self._previous_collection_start_nano = self._start_time_unix_nano self._previous_value = 0 - def aggregate(self, measurement: Measurement) -> None: + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: with self._lock: if self._value is None: self._value = 0 self._value = self._value + measurement.value + super().aggregate(measurement, should_sample_exemplar) + def collect( self, collection_aggregation_temporality: AggregationTemporality, @@ -266,6 +280,7 @@ def collect( with self._lock: value = self._value self._value = None + exemplars = self._collect_exemplars() if ( self._instrument_aggregation_temporality @@ -290,6 +305,7 @@ def collect( return NumberDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, value=value, @@ -302,6 +318,7 @@ def collect( return NumberDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, value=self._previous_value, @@ -330,6 +347,7 @@ def collect( return NumberDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, value=result_value, @@ -337,6 +355,7 @@ def collect( return NumberDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, value=value, @@ -344,13 +363,15 @@ def collect( class _LastValueAggregation(_Aggregation[GaugePoint]): - def __init__(self, attributes: Attributes): - super().__init__(attributes) + def __init__(self, attributes: Attributes, reservoir_factory: ExemplarReservoirFactory): + super().__init__(attributes, reservoir_factory) self._value = None - def aggregate(self, measurement: Measurement): + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True): with self._lock: self._value = measurement.value + + super().aggregate(measurement, should_sample_exemplar) def collect( self, @@ -366,8 +387,11 @@ def collect( value = self._value self._value = None + exemplars = self._collect_exemplars() + return NumberDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=None, time_unix_nano=collection_start_nano, value=value, @@ -380,6 +404,7 @@ def __init__( attributes: Attributes, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, + reservoir_factory: ExemplarReservoirFactory, boundaries: Sequence[float] = ( 0.0, 5.0, @@ -399,7 +424,7 @@ def __init__( ), record_min_max: bool = True, ): - super().__init__(attributes) + super().__init__(attributes, reservoir_factory=partial(reservoir_factory, boundaries=boundaries)) self._instrument_aggregation_temporality = ( instrument_aggregation_temporality @@ -423,7 +448,7 @@ def __init__( def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) - def aggregate(self, measurement: Measurement) -> None: + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: with self._lock: if self._value is None: @@ -439,6 +464,8 @@ def aggregate(self, measurement: Measurement) -> None: self._value[bisect_left(self._boundaries, measurement_value)] += 1 + super().aggregate(measurement, should_sample_exemplar) + def collect( self, collection_aggregation_temporality: AggregationTemporality, @@ -459,6 +486,8 @@ def collect( self._min = inf self._max = -inf + exemplars = self._collect_exemplars() + if ( self._instrument_aggregation_temporality is AggregationTemporality.DELTA @@ -482,6 +511,7 @@ def collect( return HistogramDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, count=sum(value), @@ -511,6 +541,7 @@ def collect( return HistogramDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, count=sum(self._previous_value), @@ -540,6 +571,7 @@ class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, + reservoir_factory: ExemplarReservoirFactory, instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, # This is the default maximum number of buckets per positive or @@ -583,7 +615,7 @@ def __init__( # _ExplicitBucketHistogramAggregation both size and amount of buckets # remain constant once it is instantiated). - super().__init__(attributes) + super().__init__(attributes, reservoir_factory=partial(reservoir_factory, size=min(20, max_size))) self._instrument_aggregation_temporality = ( instrument_aggregation_temporality @@ -614,7 +646,7 @@ def __init__( self._mapping = self._new_mapping(self._max_scale) - def aggregate(self, measurement: Measurement) -> None: + def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: # pylint: disable=too-many-branches,too-many-statements, too-many-locals with self._lock: @@ -724,6 +756,8 @@ def aggregate(self, measurement: Measurement) -> None: # in _ExplicitBucketHistogramAggregation.aggregate value.increment_bucket(bucket_index) + super().aggregate(measurement, should_sample_exemplar) + def collect( self, collection_aggregation_temporality: AggregationTemporality, @@ -753,6 +787,8 @@ def collect( self._zero_count = 0 self._scale = None + exemplars = self._collect_exemplars() + if ( self._instrument_aggregation_temporality is AggregationTemporality.DELTA @@ -776,6 +812,7 @@ def collect( return ExponentialHistogramDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, count=count, @@ -939,6 +976,7 @@ def collect( return ExponentialHistogramDataPoint( attributes=self._attributes, + exemplars=exemplars, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, count=self._previous_count, @@ -1109,6 +1147,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: """Creates an aggregation""" @@ -1137,6 +1176,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: @@ -1144,6 +1184,7 @@ def _create_aggregation( if isinstance(instrument, Counter): return _SumAggregation( attributes, + reservoir_factory=reservoir_factory(_SumAggregation), instrument_is_monotonic=True, instrument_aggregation_temporality=( AggregationTemporality.DELTA @@ -1153,6 +1194,7 @@ def _create_aggregation( if isinstance(instrument, UpDownCounter): return _SumAggregation( attributes, + reservoir_factory=reservoir_factory(_SumAggregation), instrument_is_monotonic=False, instrument_aggregation_temporality=( AggregationTemporality.DELTA @@ -1163,6 +1205,7 @@ def _create_aggregation( if isinstance(instrument, ObservableCounter): return _SumAggregation( attributes, + reservoir_factory=reservoir_factory(_SumAggregation), instrument_is_monotonic=True, instrument_aggregation_temporality=( AggregationTemporality.CUMULATIVE @@ -1173,6 +1216,7 @@ def _create_aggregation( if isinstance(instrument, ObservableUpDownCounter): return _SumAggregation( attributes, + reservoir_factory=reservoir_factory(_SumAggregation), instrument_is_monotonic=False, instrument_aggregation_temporality=( AggregationTemporality.CUMULATIVE @@ -1183,6 +1227,7 @@ def _create_aggregation( if isinstance(instrument, Histogram): return _ExplicitBucketHistogramAggregation( attributes, + reservoir_factory=reservoir_factory(_ExplicitBucketHistogramAggregation), instrument_aggregation_temporality=( AggregationTemporality.DELTA ), @@ -1190,10 +1235,10 @@ def _create_aggregation( ) if isinstance(instrument, ObservableGauge): - return _LastValueAggregation(attributes) + return _LastValueAggregation(attributes, reservoir_factory=reservoir_factory(_LastValueAggregation)) if isinstance(instrument, _Gauge): - return _LastValueAggregation(attributes) + return _LastValueAggregation(attributes, reservoir_factory=reservoir_factory(_LastValueAggregation)) # pylint: disable=broad-exception-raised raise Exception(f"Invalid instrument type {type(instrument)} found") @@ -1212,6 +1257,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: @@ -1225,6 +1271,7 @@ def _create_aggregation( return _ExponentialBucketHistogramAggregation( attributes, + reservoir_factory(_ExponentialBucketHistogramAggregation), instrument_aggregation_temporality, start_time_unix_nano, max_size=self._max_size, @@ -1274,6 +1321,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: @@ -1289,6 +1337,7 @@ def _create_aggregation( attributes, instrument_aggregation_temporality, start_time_unix_nano, + reservoir_factory(_ExplicitBucketHistogramAggregation), self._boundaries, self._record_min_max, ) @@ -1304,6 +1353,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: @@ -1320,6 +1370,7 @@ def _create_aggregation( isinstance(instrument, (Counter, ObservableCounter)), instrument_aggregation_temporality, start_time_unix_nano, + reservoir_factory(_SumAggregation), ) @@ -1335,9 +1386,10 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: - return _LastValueAggregation(attributes) + return _LastValueAggregation(attributes, reservoir_factory=reservoir_factory(_LastValueAggregation)) class DropAggregation(Aggregation): @@ -1347,6 +1399,7 @@ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, + reservoir_factory: Callable[[Type[_Aggregation]], ExemplarReservoirFactory], start_time_unix_nano: int, ) -> _Aggregation: - return _DropAggregation(attributes) + return _DropAggregation(attributes, reservoir_factory(_DropAggregation)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py index a83b5b82b89..c5ed4454e59 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/__init__.py @@ -14,6 +14,7 @@ from .exemplar import Exemplar from .exemplar_filter import ( + ExemplarFilter, AlwaysOffExemplarFilter, AlwaysOnExemplarFilter, TraceBasedExemplarFilter, @@ -21,15 +22,18 @@ from .exemplar_reservoir import ( AlignedHistogramBucketExemplarReservoir, ExemplarReservoir, + ExemplarReservoirFactory, SimpleFixedSizeExemplarReservoir, ) __all__ = [ "Exemplar", + "ExemplarFilter", "AlwaysOffExemplarFilter", "AlwaysOnExemplarFilter", "TraceBasedExemplarFilter", "AlignedHistogramBucketExemplarReservoir", "ExemplarReservoir", + "ExemplarReservoirFactory", "SimpleFixedSizeExemplarReservoir", ] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py index 7df4ed2b56c..81db862944f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from random import randrange -from typing import Optional, Sequence, Union +from typing import Any, Callable, Optional, Sequence, TypeAlias, Union from opentelemetry import trace from opentelemetry.context import Context @@ -28,6 +28,10 @@ class ExemplarReservoir(ABC): """ExemplarReservoir provide a method to offer measurements to the reservoir and another to collect accumulated Exemplars. + Note: + The constructor MUST accept ``**kwargs`` that may be set from aggregation + parameters. + Reference: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplarreservoir """ @@ -118,8 +122,8 @@ def __reset(self) -> None: class FixedSizeExemplarReservoirABC(ExemplarReservoir): """Abstract class for a reservoir with fixed size.""" - def __init__(self, size: int) -> None: - super().__init__() + def __init__(self, size: int, **kwargs) -> None: + super().__init__(**kwargs) self._size: int = size self._reservoir_storage: list[ExemplarBucket] = [ ExemplarBucket() for _ in range(self._size) @@ -165,8 +169,8 @@ class SimpleFixedSizeExemplarReservoir(FixedSizeExemplarReservoirABC): https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir """ - def __init__(self, size: int = 1) -> None: - super().__init__(size) + def __init__(self, size: int = 1, **kwargs) -> None: + super().__init__(size, **kwargs) self._measurements_seen: int = 0 def _reset(self) -> None: @@ -209,8 +213,8 @@ class AlignedHistogramBucketExemplarReservoir(FixedSizeExemplarReservoirABC): https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#alignedhistogrambucketexemplarreservoir """ - def __init__(self, boundaries: Sequence[float]) -> None: - super().__init__(len(boundaries) + 1) + def __init__(self, boundaries: Sequence[float], **kwargs) -> None: + super().__init__(len(boundaries) + 1, **kwargs) self._boundaries: Sequence[float] = boundaries def offer( @@ -235,3 +239,11 @@ def _find_bucket_index( if value <= boundary: return i return len(self._boundaries) + + +ExemplarReservoirFactory: TypeAlias = Callable[[dict[str, Any]], ExemplarReservoir] +ExemplarReservoirFactory.__doc__ = """ExemplarReservoir factory. + +It may receive the Aggregation parameters it is bounded to; e.g. +the _ExplicitBucketHistogramAggregation will provide the boundaries. +""" \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 4310061b823..2d755b6e5fc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -79,7 +79,7 @@ def __init__( def consume_measurement(self, measurement: Measurement) -> None: for reader_storage in self._reader_storages.values(): - reader_storage.consume_measurement(measurement) + reader_storage.consume_measurement(measurement, self._sdk_config.exemplar_filter.should_sample(measurement.value, measurement.time_unix_nano, measurement.attributes, measurement.context)) def register_asynchronous_instrument( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index 7fac6c6c105..e8d3bd802f4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -113,11 +113,11 @@ def _get_or_init_view_instrument_match( return view_instrument_matches - def consume_measurement(self, measurement: Measurement) -> None: + def consume_measurement(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None: for view_instrument_match in self._get_or_init_view_instrument_match( measurement.instrument ): - view_instrument_match.consume_measurement(measurement) + view_instrument_match.consume_measurement(measurement, should_sample_exemplar) def collect(self) -> Optional[MetricsData]: # Use a list instead of yielding to prevent a slow reader from holding diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index 9594ab38a74..3d88facb0c3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -24,6 +24,7 @@ @dataclass class SdkConfiguration: + exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" resource: "opentelemetry.sdk.resources.Resource" metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.View"] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py index 9473acde4d4..31fee511b4d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/view.py @@ -15,17 +15,35 @@ from fnmatch import fnmatch from logging import getLogger -from typing import Optional, Set, Type +from typing import Callable, Optional, Set, Type from opentelemetry.metrics import Instrument from opentelemetry.sdk.metrics._internal.aggregation import ( + _Aggregation, Aggregation, DefaultAggregation, + _ExplicitBucketHistogramAggregation, + _ExponentialBucketHistogramAggregation, +) +from opentelemetry.sdk.metrics._internal.exemplar import ( + AlignedHistogramBucketExemplarReservoir, + ExemplarReservoir, + ExemplarReservoirFactory, + SimpleFixedSizeExemplarReservoir, ) _logger = getLogger(__name__) +def _default_reservoir_factory(aggregationType: Type[_Aggregation]) -> ExemplarReservoirFactory: + """Default reservoir factory per aggregation.""" + if issubclass(aggregationType, _ExplicitBucketHistogramAggregation): + return AlignedHistogramBucketExemplarReservoir + elif issubclass(aggregationType, _ExponentialBucketHistogramAggregation): + return SimpleFixedSizeExemplarReservoir + return SimpleFixedSizeExemplarReservoir + + class View: """ A `View` configuration parameters can be used for the following @@ -73,6 +91,9 @@ class View: corresponding metrics stream. If `None` an instance of `DefaultAggregation` will be used. + exemplar_reservoir_factory: This is a metric stream customizing attribute: + the exemplar reservoir factory + instrument_unit: This is an instrument matching attribute: the unit the instrument must have to match the view. @@ -92,6 +113,7 @@ def __init__( description: Optional[str] = None, attribute_keys: Optional[Set[str]] = None, aggregation: Optional[Aggregation] = None, + exemplar_reservoir_factory: Optional[Callable[[Type[_Aggregation]], ExemplarReservoirFactory]] = None, instrument_unit: Optional[str] = None, ): if ( @@ -120,8 +142,8 @@ def __init__( "characters in instrument_name" ) - # _name, _description, _aggregation and _attribute_keys will be - # accessed when instantiating a _ViewInstrumentMatch. + # _name, _description, _aggregation, _exemplar_reservoir_factory and + # _attribute_keys will be accessed when instantiating a _ViewInstrumentMatch. self._name = name self._instrument_type = instrument_type self._instrument_name = instrument_name @@ -133,6 +155,7 @@ def __init__( self._description = description self._attribute_keys = attribute_keys self._aggregation = aggregation or self._default_aggregation + self._exemplar_reservoir_factory = exemplar_reservoir_factory or _default_reservoir_factory # pylint: disable=too-many-return-statements # pylint: disable=too-many-branches