From b0a70ff1f6513d04d8e2aad8da7ba9c40228ce8e Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 9 May 2022 15:54:36 -0600 Subject: [PATCH] Refactor metric format Fixes #2646 --- .../_internal/_view_instrument_match.py | 59 +- .../sdk/_metrics/_internal/aggregation.py | 251 ++++---- .../_internal/metric_reader_storage.py | 81 ++- .../sdk/_metrics/_internal/point.py | 106 +-- .../sdk/_metrics/export/__init__.py | 7 +- .../tests/metrics/test_aggregation.py | 603 +++--------------- .../tests/metrics/test_import.py | 5 +- .../metrics/test_in_memory_metric_reader.py | 28 +- .../metrics/test_metric_reader_storage.py | 16 +- .../tests/metrics/test_metrics.py | 4 +- .../test_periodic_exporting_metric_reader.py | 33 +- opentelemetry-sdk/tests/metrics/test_point.py | 72 ++- .../metrics/test_view_instrument_match.py | 68 +- .../src/opentelemetry/test/metrictestutil.py | 58 +- 14 files changed, 492 insertions(+), 899 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py index 2f002a272ec..c3f0b4ebf82 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py @@ -22,16 +22,11 @@ Aggregation, DefaultAggregation, _Aggregation, - _convert_aggregation_temporality, - _PointVarT, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric -from opentelemetry.sdk._metrics._internal.sdk_configuration import ( - SdkConfiguration, -) +from opentelemetry.sdk._metrics._internal.point import DataPointT from opentelemetry.sdk._metrics._internal.view import View _logger = getLogger(__name__) @@ -42,17 +37,12 @@ def __init__( self, view: View, instrument: Instrument, - sdk_config: SdkConfiguration, - instrument_class_temporality: Dict[type, AggregationTemporality], instrument_class_aggregation: Dict[type, Aggregation], ): self._view = view self._instrument = instrument - self._sdk_config = sdk_config self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} - self._attributes_previous_point: Dict[frozenset, _PointVarT] = {} self._lock = Lock() - self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation self._name = self._view._name or self._instrument.name self._description = ( @@ -124,46 +114,15 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self) -> Iterable[Metric]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Iterable[DataPointT]: + data_points = [] with self._lock: - for ( - attributes, - aggregation, - ) in self._attributes_aggregation.items(): - - previous_point = self._attributes_previous_point.get( - attributes - ) - - current_point = aggregation.collect() - - # pylint: disable=assignment-from-none - - self._attributes_previous_point[ - attributes - ] = _convert_aggregation_temporality( - previous_point, - current_point, - AggregationTemporality.CUMULATIVE, + for aggregation in self._attributes_aggregation.values(): + data_points.append( + aggregation.collect(aggregation_temporality) ) - if current_point is not None: - - yield Metric( - attributes=dict(attributes), - description=self._description, - instrumentation_scope=( - self._instrument.instrumentation_scope - ), - name=self._name, - resource=self._sdk_config.resource, - unit=self._instrument.unit, - point=_convert_aggregation_temporality( - previous_point, - current_point, - self._instrument_class_temporality[ - self._instrument.__class__ - ], - ), - ) + return data_points diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py index 1d10b3f8ead..052588c3763 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from bisect import bisect_left -from dataclasses import replace from enum import IntEnum from logging import getLogger from math import inf @@ -37,11 +36,15 @@ from opentelemetry.sdk._metrics._internal.point import ( Histogram as HistogramPoint, ) -from opentelemetry.sdk._metrics._internal.point import PointT, Sum +from opentelemetry.sdk._metrics._internal.point import ( + HistogramDataPoint, + NumberDataPoint, + Sum, +) from opentelemetry.util._time import _time_ns from opentelemetry.util.types import Attributes -_PointVarT = TypeVar("_PointVarT", bound=PointT) +_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) _logger = getLogger(__name__) @@ -58,17 +61,21 @@ class AggregationTemporality(IntEnum): CUMULATIVE = 2 -class _Aggregation(ABC, Generic[_PointVarT]): +class _Aggregation(ABC, Generic[_DataPointVarT]): def __init__(self, attributes: Attributes): self._lock = Lock() self._attributes = attributes + self._previous_point = None @abstractmethod def aggregate(self, measurement: Measurement) -> None: pass @abstractmethod - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -76,7 +83,10 @@ class _DropAggregation(_Aggregation): def aggregate(self, measurement: Measurement) -> None: pass - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -176,7 +186,9 @@ def aggregate(self, measurement: Measurement) -> None: self._value = 0 self._value = self._value + measurement.value - def collect(self) -> Optional[Sum]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Optional[NumberDataPoint]: """ Atomically return a point for the current value of the metric and reset the aggregation value. @@ -192,28 +204,50 @@ def collect(self) -> Optional[Sum]: self._value = 0 self._start_time_unix_nano = now + 1 - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) + else: - with self._lock: - if self._value is None: - return None - value = self._value - self._value = None + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None + start_time_unix_nano = self._start_time_unix_nano - return Sum( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=self._start_time_unix_nano, + current_point = NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, time_unix_nano=now, value=value, ) + if self._previous_point is None: + self._previous_point = current_point + return current_point + + if self._instrument_temporality is aggregation_temporality: + # Output DELTA for a synchronous instrument + # Output CUMULATIVE for an asynchronous instrument + return current_point + + if aggregation_temporality is AggregationTemporality.DELTA: + # Output temporality DELTA for an asynchronous instrument + value = current_point.value - self._previous_point.value + output_start_time_unix_nano = self._previous_point.time_unix_nano + + else: + # Output CUMULATIVE for a synchronous instrument + value = current_point.value + self._previous_point.value + output_start_time_unix_nano = ( + self._previous_point.start_time_unix_nano + ) + + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=output_start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + value=value, + ) + class _LastValueAggregation(_Aggregation[Gauge]): def __init__(self, attributes: Attributes): @@ -224,7 +258,10 @@ def aggregate(self, measurement: Measurement): with self._lock: self._value = measurement.value - def collect(self) -> Optional[Gauge]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ @@ -234,7 +271,9 @@ def collect(self) -> Optional[Gauge]: value = self._value self._value = None - return Gauge( + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=0, time_unix_nano=_time_ns(), value=value, ) @@ -266,6 +305,11 @@ def __init__( self._sum = 0 self._record_min_max = record_min_max self._start_time_unix_nano = _time_ns() + # It is assumed that the "natural" aggregation temporality for a + # Histogram instrument is DELTA, like the "natural" aggregation + # temporality for a Counter is DELTA and the "natural" aggregation + # temporality for an ObservableCounter is CUMULATIVE. + self._instrument_temporality = AggregationTemporality.DELTA def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) @@ -282,18 +326,24 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> HistogramPoint: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ now = _time_ns() with self._lock: - value = self._bucket_counts + if not any(self._bucket_counts): + return None + + bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano - histogram_sum = self._sum - histogram_max = self._max - histogram_min = self._min + sum_ = self._sum + max_ = self._max + min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = now + 1 @@ -301,146 +351,63 @@ def collect(self) -> HistogramPoint: self._min = inf self._max = -inf - return HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=tuple(value), - explicit_bounds=self._boundaries, - max=histogram_max, - min=histogram_min, + current_point = HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=histogram_sum, time_unix_nano=now, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=self._boundaries, + min=min_, + max=max_, ) - -# pylint: disable=too-many-return-statements,too-many-branches -def _convert_aggregation_temporality( - previous_point: Optional[_PointVarT], - current_point: _PointVarT, - aggregation_temporality: AggregationTemporality, -) -> _PointVarT: - """Converts `current_point` to the requested `aggregation_temporality` - given the `previous_point`. - - `previous_point` must have `CUMULATIVE` temporality. `current_point` may - have `DELTA` or `CUMULATIVE` temporality. - - The output point will have temporality `aggregation_temporality`. Since - `GAUGE` points have no temporality, they are returned unchanged. - """ - - current_point_type = type(current_point) - - if current_point_type is Gauge: - return current_point - - if ( - previous_point is not None - and current_point is not None - and type(previous_point) is not type(current_point) - ): - _logger.warning( - "convert_aggregation_temporality called with mismatched " - "point types: %s and %s", - type(previous_point), - current_point_type, - ) - - return current_point - - if current_point_type is Sum: - if previous_point is None: - # Output CUMULATIVE for a synchronous instrument - # There is no previous value, return the delta point as a - # cumulative - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: - # Output DELTA for a synchronous instrument - # Output CUMULATIVE for an asynchronous instrument + if self._previous_point is None: + self._previous_point = current_point return current_point - if aggregation_temporality is AggregationTemporality.DELTA: - # Output temporality DELTA for an asynchronous instrument - value = current_point.value - previous_point.value - output_start_time_unix_nano = previous_point.time_unix_nano - - else: - # Output CUMULATIVE for a synchronous instrument - value = current_point.value + previous_point.value - output_start_time_unix_nano = previous_point.start_time_unix_nano - - is_monotonic = ( - previous_point.is_monotonic and current_point.is_monotonic - ) - - return Sum( - start_time_unix_nano=output_start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - value=value, - aggregation_temporality=aggregation_temporality, - is_monotonic=is_monotonic, - ) - - if current_point_type is HistogramPoint: - if previous_point is None: - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: + if self._instrument_temporality is aggregation_temporality: return current_point max_ = current_point.max min_ = current_point.min if aggregation_temporality is AggregationTemporality.CUMULATIVE: - start_time_unix_nano = previous_point.start_time_unix_nano - sum_ = current_point.sum + previous_point.sum + start_time_unix_nano = self._previous_point.start_time_unix_nano + sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative - max_ = max(current_point.max, previous_point.max) - min_ = min(current_point.min, previous_point.min) + max_ = max(current_point.max, self._previous_point.max) + min_ = min(current_point.min, self._previous_point.min) bucket_counts = [ curr_count + prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] else: - start_time_unix_nano = previous_point.time_unix_nano - sum_ = current_point.sum - previous_point.sum + start_time_unix_nano = self._previous_point.time_unix_nano + sum_ = current_point.sum - self._previous_point.sum bucket_counts = [ curr_count - prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] - return HistogramPoint( - aggregation_temporality=aggregation_temporality, - bucket_counts=bucket_counts, - explicit_bounds=current_point.explicit_bounds, - max=max_, - min=min_, + return HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=sum_, time_unix_nano=current_point.time_unix_nano, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=current_point.explicit_bounds, + min=min_, + max=max_, ) - return None class ExplicitBucketHistogramAggregation(Aggregation): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py index b163f414afb..a39283149e0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py @@ -16,17 +16,31 @@ from threading import RLock from typing import Dict, Iterable, List -from opentelemetry._metrics import Asynchronous, Instrument +from opentelemetry._metrics import ( + Asynchronous, + Counter, + Instrument, + ObservableCounter, +) from opentelemetry.sdk._metrics._internal._view_instrument_match import ( _ViewInstrumentMatch, ) from opentelemetry.sdk._metrics._internal.aggregation import ( Aggregation, ExplicitBucketHistogramAggregation, + _DropAggregation, + _ExplicitBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric +from opentelemetry.sdk._metrics._internal.point import ( + Gauge, + Histogram, + Metric, + Sum, +) from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) @@ -81,10 +95,6 @@ def _get_or_init_view_instrument_match( _ViewInstrumentMatch( view=_DEFAULT_VIEW, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), @@ -117,10 +127,59 @@ def collect(self) -> Iterable[Metric]: # for a single instrument. with self._lock: for ( - view_instrument_matches - ) in self._instrument_view_instrument_matches.values(): + instrument, + view_instrument_matches, + ) in self._instrument_view_instrument_matches.items(): + aggregation_temporality = self._instrument_class_temporality[ + instrument.__class__ + ] + for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect()) + + if isinstance( + view_instrument_match._aggregation, _SumAggregation + ): + data = Sum( + aggregation_temporality=aggregation_temporality, + data_points=view_instrument_match.collect( + aggregation_temporality + ), + is_monotonic=isinstance( + instrument, (Counter, ObservableCounter) + ), + ) + elif isinstance( + view_instrument_match._aggregation, + _LastValueAggregation, + ): + data = Gauge( + data_points=view_instrument_match.collect( + aggregation_temporality + ) + ) + elif isinstance( + view_instrument_match._aggregation, + _ExplicitBucketHistogramAggregation, + ): + data = Histogram( + data_points=view_instrument_match.collect( + aggregation_temporality + ), + aggregation_temporality=aggregation_temporality, + ) + elif isinstance( + view_instrument_match._aggregation, _DropAggregation + ): + continue + + metrics.append( + Metric( + name=view_instrument_match._name, + description=view_instrument_match._description, + unit=view_instrument_match._instrument.unit, + data=data, + ) + ) return metrics @@ -140,10 +199,6 @@ def _handle_view_instrument_match( new_view_instrument_match = _ViewInstrumentMatch( view=view, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py index eca9783eeea..7b40f98ad0a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py @@ -14,29 +14,48 @@ # pylint: disable=unused-import -from dataclasses import asdict, dataclass +from dataclasses import dataclass, asdict from json import dumps from typing import Sequence, Union # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk._metrics._internal -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util.types import Attributes +@dataclass(frozen=True) +class NumberDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + + @dataclass(frozen=True) class Sum: """Represents the type of a scalar metric that is calculated as a sum of all reported measurements over a time interval.""" + data_points: Sequence[NumberDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) is_monotonic: bool - start_time_unix_nano: int - time_unix_nano: int - value: Union[int, float] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + "is_monotonic": self.is_monotonic, + } + ) @dataclass(frozen=True) @@ -45,8 +64,33 @@ class Gauge: value for every data point. It should be used for an unknown aggregation.""" + data_points: Sequence[NumberDataPoint] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ) + } + ) + + +@dataclass(frozen=True) +class HistogramDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int time_unix_nano: int - value: Union[int, float] + count: int + sum: Union[int, float] + bucket_counts: Sequence[int] + explicit_bounds: Sequence[float] + min: float + max: float @dataclass(frozen=True) @@ -54,52 +98,42 @@ class Histogram: """Represents the type of a metric that is calculated by aggregating as a histogram of all reported measurements over a time interval.""" + data_points: Sequence[HistogramDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) - bucket_counts: Sequence[int] - explicit_bounds: Sequence[float] - max: int - min: int - start_time_unix_nano: int - sum: Union[int, float] - time_unix_nano: int + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + } + ) -PointT = Union[Sum, Gauge, Histogram] +DataT = Union[Sum, Gauge, Histogram] +DataPointT = Union[NumberDataPoint, HistogramDataPoint] @dataclass(frozen=True) class Metric: - """Represents a metric point in the OpenTelemetry data model to be exported + """Represents a metric point in the OpenTelemetry data model to be + exported.""" - Concrete metric types contain all the information as in the OTLP proto definitions - (https://github.com/open-telemetry/opentelemetry-proto/blob/b43e9b18b76abf3ee040164b55b9c355217151f3/opentelemetry/proto/metrics/v1/metrics.proto#L37) but are flattened as much as possible. - """ - - # common fields to all metric kinds - attributes: Attributes - description: str - instrumentation_scope: InstrumentationScope name: str - resource: Resource + description: str unit: str - point: PointT - """Contains non-common fields for the given metric""" + data: DataT def to_json(self) -> str: return dumps( { - "attributes": self.attributes if self.attributes else "", - "description": self.description if self.description else "", - "instrumentation_scope": repr(self.instrumentation_scope) - if self.instrumentation_scope - else "", "name": self.name, - "resource": repr(self.resource.attributes) - if self.resource - else "", + "description": self.description if self.description else "", "unit": self.unit if self.unit else "", - "point": asdict(self.point) if self.point else "", + "data": self.data.to_json(), } ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 107f73f79d5..fdc0df4b923 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -28,17 +28,20 @@ # The point module is not in the export directory to avoid a circular import. from opentelemetry.sdk._metrics._internal.point import ( # noqa: F401 + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, Metric, - PointT, + NumberDataPoint, Sum, ) __all__ = [] for key, value in globals().copy().items(): if not key.startswith("_"): - if _version_info.minor == 6 and key == "PointT": + if _version_info.minor == 6 and key == "DataT": continue value.__module__ = __name__ __all__.append(key) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index e747e49cdba..e71f99d9eed 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import replace -from logging import WARNING from math import inf from time import sleep from typing import Union @@ -29,15 +27,15 @@ UpDownCounter, ) from opentelemetry.sdk._metrics._internal.aggregation import ( - _convert_aggregation_temporality, _ExplicitBucketHistogramAggregation, _LastValueAggregation, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics.export import AggregationTemporality, Gauge -from opentelemetry.sdk._metrics.export import Histogram as HistogramPoint -from opentelemetry.sdk._metrics.export import Sum +from opentelemetry.sdk._metrics.export import ( + AggregationTemporality, + NumberDataPoint, +) from opentelemetry.sdk._metrics.view import ( DefaultAggregation, ExplicitBucketHistogramAggregation, @@ -110,20 +108,44 @@ def test_collect_delta(self): """ synchronous_sum_aggregation = _SumAggregation( - Mock(), True, AggregationTemporality.DELTA + {}, True, AggregationTemporality.DELTA + ) + + synchronous_sum_aggregation.aggregate(measurement(1)) + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(first_sum.value, 1) + + synchronous_sum_aggregation.aggregate(measurement(1)) + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(second_sum.value, 2) + + self.assertEqual( + second_sum.start_time_unix_nano, first_sum.start_time_unix_nano + ) + + synchronous_sum_aggregation = _SumAggregation( + {}, True, AggregationTemporality.DELTA ) synchronous_sum_aggregation.aggregate(measurement(1)) - first_sum = synchronous_sum_aggregation.collect() + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) synchronous_sum_aggregation.aggregate(measurement(1)) - second_sum = synchronous_sum_aggregation.collect() + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertGreater( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano @@ -131,32 +153,30 @@ def test_collect_delta(self): def test_collect_cumulative(self): """ - `SynchronousSumAggregation` collects sum metric points + `SynchronousSumAggregation` collects number data points """ sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE, Mock() + {}, True, AggregationTemporality.CUMULATIVE ) sum_aggregation.aggregate(measurement(1)) - first_sum = sum_aggregation.collect() + first_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) # should have been reset after first collect sum_aggregation.aggregate(measurement(1)) - second_sum = sum_aggregation.collect() + second_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertEqual( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano ) # if no point seen for a whole interval, should return None - third_sum = sum_aggregation.collect() + third_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertIsNone(third_sum) @@ -180,35 +200,44 @@ def test_aggregate(self): def test_collect(self): """ - `LastValueAggregation` collects sum metric points + `LastValueAggregation` collects number data points """ last_value_aggregation = _LastValueAggregation(Mock()) - self.assertIsNone(last_value_aggregation.collect()) + self.assertIsNone( + last_value_aggregation.collect(AggregationTemporality.CUMULATIVE) + ) last_value_aggregation.aggregate(measurement(1)) - first_gauge = last_value_aggregation.collect() - self.assertIsInstance(first_gauge, Gauge) + first_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsInstance(first_number_data_point, NumberDataPoint) - self.assertEqual(first_gauge.value, 1) + self.assertEqual(first_number_data_point.value, 1) last_value_aggregation.aggregate(measurement(1)) # CI fails the last assertion without this sleep(0.1) - second_gauge = last_value_aggregation.collect() + second_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_gauge.value, 1) + self.assertEqual(second_number_data_point.value, 1) self.assertGreater( - second_gauge.time_unix_nano, first_gauge.time_unix_nano + second_number_data_point.time_unix_nano, + first_number_data_point.time_unix_nano, ) # if no observation seen for the interval, it should return None - third_gauge = last_value_aggregation.collect() - self.assertIsNone(third_gauge) + third_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsNone(third_number_data_point) class TestExplicitBucketHistogramAggregation(TestCase): @@ -249,7 +278,9 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation._bucket_counts[3], 1 ) - histo = explicit_bucket_histogram_aggregation.collect() + histo = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(histo.sum, 14) def test_min_max(self): @@ -294,7 +325,9 @@ def test_collect(self): ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - first_histogram = explicit_bucket_histogram_aggregation.collect() + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) self.assertEqual(first_histogram.sum, 1) @@ -303,510 +336,42 @@ def test_collect(self): sleep(0.1) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - second_histogram = explicit_bucket_histogram_aggregation.collect() + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) - self.assertEqual(second_histogram.sum, 1) + self.assertEqual(second_histogram.bucket_counts, (0, 2, 0, 0)) + self.assertEqual(second_histogram.sum, 2) self.assertGreater( second_histogram.time_unix_nano, first_histogram.time_unix_nano ) - -class TestConvertAggregationTemporality(TestCase): - """ - Test aggregation temporality conversion algorithm - """ - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - - def test_mismatched_point_types(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - with self.assertRaises(AssertionError): - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - None, - AggregationTemporality.DELTA, - ), - current_point, - ) - - def test_current_point_sum_previous_point_none(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_current_point_sum_current_point_same_aggregation_temporality( - self, - ): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - def test_current_point_sum_aggregation_temporality_delta(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - ) - - def test_current_point_sum_aggregation_temporality_cumulative(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - ) - - def test_current_point_gauge(self): - - current_point = Gauge(time_unix_nano=1, value=0) - self.assertEqual( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - -class TestHistogramConvertAggregationTemporality(TestCase): - def test_previous_point_none(self): - - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=2, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), - AggregationTemporality.DELTA, - ), - - def test_same_aggregation_temporality_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, + explicit_bucket_histogram_aggregation = ( + _ExplicitBucketHistogramAggregation(Mock(), boundaries=[0, 1, 2]) ) - def test_same_aggregation_temporality_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) + self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(first_histogram.sum, 1) - def test_aggregation_temporality_to_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ) + # CI fails the last assertion without this + sleep(0.1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - def test_aggregation_temporality_to_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) + self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(second_histogram.sum, 1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.DELTA, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), + self.assertGreater( + second_histogram.time_unix_nano, first_histogram.time_unix_nano ) diff --git a/opentelemetry-sdk/tests/metrics/test_import.py b/opentelemetry-sdk/tests/metrics/test_import.py index 1a31ab3b885..43c65337dc7 100644 --- a/opentelemetry-sdk/tests/metrics/test_import.py +++ b/opentelemetry-sdk/tests/metrics/test_import.py @@ -46,15 +46,18 @@ def test_import_export(self): from opentelemetry.sdk._metrics.export import ( # noqa: F401 AggregationTemporality, ConsoleMetricExporter, + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, MetricReader, + NumberDataPoint, PeriodicExportingMetricReader, - PointT, Sum, ) except Exception as error: diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 15be13ac9af..1b7c56bec0a 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -21,10 +21,9 @@ AggregationTemporality, InMemoryMetricReader, Metric, + NumberDataPoint, Sum, ) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope class TestInMemoryMetricReader(TestCase): @@ -37,16 +36,18 @@ def test_no_metrics(self): def test_converts_metrics_to_list(self): metric = Metric( - attributes={"myattr": "baz"}, - description="", - instrumentation_scope=InstrumentationScope("testmetrics"), name="foo", - resource=Resource.create(), + description="", unit="", - point=Sum( - start_time_unix_nano=1647626444152947792, - time_unix_nano=1647626444153163239, - value=72.3309814450449, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"myattr": "baz"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=72.3309814450449, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, ), @@ -77,5 +78,8 @@ def test_integration(self): counter1.add(1, {"foo": "2"}) metrics = reader.get_metrics() - # should be 3 metrics, one from the observable gauge and one for each labelset from the counter - self.assertEqual(len(metrics), 3) + # should be 3 number data points, one from the observable gauge and one + # for each labelset from the counter + self.assertEqual(len(metrics), 2) + self.assertEqual(len(metrics[0].data.data_points), 2) + self.assertEqual(len(metrics[1].data.data_points), 1) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index cd1a8481ee3..04ec07dd6c8 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -21,6 +21,9 @@ ObservableCounter, UpDownCounter, ) +from opentelemetry.sdk._metrics._internal.aggregation import ( + _LastValueAggregation, +) from opentelemetry.sdk._metrics._internal.measurement import Measurement from opentelemetry.sdk._metrics._internal.metric_reader_storage import ( _DEFAULT_VIEW, @@ -106,9 +109,9 @@ def test_creates_view_instrument_matches( def test_forwards_calls_to_view_instrument_match( self, MockViewInstrumentMatch: Mock ): - view_instrument_match1 = Mock() - view_instrument_match2 = Mock() - view_instrument_match3 = Mock() + view_instrument_match1 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match2 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match3 = Mock(_aggregation=_LastValueAggregation({})) MockViewInstrumentMatch.side_effect = [ view_instrument_match1, view_instrument_match2, @@ -163,7 +166,12 @@ def test_forwards_calls_to_view_instrument_match( view_instrument_match1.collect.assert_called_once() view_instrument_match2.collect.assert_called_once() view_instrument_match3.collect.assert_called_once() - self.assertEqual(result, all_metrics) + self.assertEqual(result[0].data.data_points[0], all_metrics[0]) + self.assertEqual(result[0].data.data_points[1], all_metrics[1]) + self.assertEqual(result[1].data.data_points[0], all_metrics[2]) + self.assertEqual(result[1].data.data_points[1], all_metrics[3]) + self.assertEqual(result[2].data.data_points[0], all_metrics[4]) + self.assertEqual(result[2].data.data_points[1], all_metrics[5]) @patch( "opentelemetry.sdk._metrics._internal." diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index b20dd567a26..9097ca3d01b 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -513,11 +513,11 @@ def test_duplicate_instrument_aggregate_data(self): self.assertEqual(metric_0.name, "counter") self.assertEqual(metric_0.unit, "unit") self.assertEqual(metric_0.description, "description") - self.assertEqual(metric_0.point.value, 3) + self.assertEqual(metric_0.data.data_points[0].value, 3) metric_1 = metrics[1] self.assertEqual(metric_1.name, "counter") self.assertEqual(metric_1.unit, "unit") self.assertEqual(metric_1.description, "description") - self.assertEqual(metric_1.point.value, 7) + self.assertEqual(metric_1.data.data_points[0].value, 7) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 531ef90c955..597bd37259c 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -23,10 +23,10 @@ Metric, MetricExporter, MetricExportResult, + NumberDataPoint, PeriodicExportingMetricReader, Sum, ) -from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.util._time import _time_ns @@ -54,29 +54,34 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: metrics_list = [ Metric( name="sum_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Sum( - start_time_unix_nano=_time_ns(), - time_unix_nano=_time_ns(), - value=2, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ], aggregation_temporality=1, is_monotonic=True, ), ), Metric( name="gauge_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Gauge( - time_unix_nano=_time_ns(), - value=2, + data=Gauge( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ] ), ), ] diff --git a/opentelemetry-sdk/tests/metrics/test_point.py b/opentelemetry-sdk/tests/metrics/test_point.py index 70b2235f168..7488c85c995 100644 --- a/opentelemetry-sdk/tests/metrics/test_point.py +++ b/opentelemetry-sdk/tests/metrics/test_point.py @@ -14,22 +14,22 @@ from unittest import TestCase -from opentelemetry.sdk._metrics.export import Gauge, Histogram, Metric, Sum -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk._metrics.export import ( + Gauge, + Histogram, + HistogramDataPoint, + Metric, + NumberDataPoint, + Sum, +) -def _create_metric(value): +def _create_metric(data): return Metric( - attributes={"attr-key": "test-val"}, - description="test-description", - instrumentation_scope=InstrumentationScope( - name="name", version="version" - ), name="test-name", - resource=Resource({"resource-key": "resource-val"}), + description="test-description", unit="test-unit", - point=value, + data=data, ) @@ -38,40 +38,62 @@ def test_sum(self): self.maxDiff = None point = _create_metric( Sum( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ], aggregation_temporality=2, is_monotonic=True, - start_time_unix_nano=10, - time_unix_nano=20, - value=9, ) ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 2, "is_monotonic": true, "start_time_unix_nano": 10, "time_unix_nano": 20, "value": 9}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\", \\"aggregation_temporality\\": 2, \\"is_monotonic\\": true}"}', point.to_json(), ) def test_gauge(self): - point = _create_metric(Gauge(time_unix_nano=40, value=20)) + point = _create_metric( + Gauge( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ] + ) + ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"time_unix_nano": 40, "value": 20}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\"}"}', point.to_json(), ) def test_histogram(self): point = _create_metric( Histogram( + data_points=[ + HistogramDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=50, + time_unix_nano=60, + count=1, + sum=0.8, + bucket_counts=[0, 0, 1, 0], + explicit_bounds=[0.1, 0.5, 0.9, 1], + min=0.8, + max=0.8, + ) + ], aggregation_temporality=1, - bucket_counts=[0, 0, 1, 0], - explicit_bounds=[0.1, 0.5, 0.9, 1], - max=0.8, - min=0.8, - start_time_unix_nano=50, - sum=0.8, - time_unix_nano=60, ) ) self.maxDiff = None self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 1, "bucket_counts": [0, 0, 1, 0], "explicit_bounds": [0.1, 0.5, 0.9, 1], "max": 0.8, "min": 0.8, "start_time_unix_nano": 50, "sum": 0.8, "time_unix_nano": 60}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 50, \\\\\\"time_unix_nano\\\\\\": 60, \\\\\\"count\\\\\\": 1, \\\\\\"sum\\\\\\": 0.8, \\\\\\"bucket_counts\\\\\\": [0, 0, 1, 0], \\\\\\"explicit_bounds\\\\\\": [0.1, 0.5, 0.9, 1], \\\\\\"min\\\\\\": 0.8, \\\\\\"max\\\\\\": 0.8}]\\", \\"aggregation_temporality\\": 1}"}', point.to_json(), ) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 9f05d2bfee0..25c9a31e892 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -27,7 +27,7 @@ from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) -from opentelemetry.sdk._metrics.export import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.export import AggregationTemporality from opentelemetry.sdk._metrics.view import ( DefaultAggregation, DropAggregation, @@ -63,12 +63,6 @@ def test_consume_measurement(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -110,12 +104,6 @@ def test_consume_measurement(self): aggregation=self.mock_aggregation_factory, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -147,12 +135,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -177,12 +159,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -196,24 +172,22 @@ def test_consume_measurement(self): ) def test_collect(self): - instrument1 = Mock( - name="instrument1", description="description", unit="unit" + instrument1 = Counter( + "instrument1", + Mock(), + Mock(), + description="description", + unit="unit", ) instrument1.instrumentation_scope = self.mock_instrumentation_scope view_instrument_match = _ViewInstrumentMatch( view=View( instrument_name="instrument1", name="name", - aggregation=self.mock_aggregation_factory, + aggregation=DefaultAggregation(), attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -226,18 +200,16 @@ def test_collect(self): attributes={"c": "d", "f": "g"}, ) ) - self.assertEqual( - next(view_instrument_match.collect()), - Metric( - attributes={"c": "d"}, - description="description", - instrumentation_scope=self.mock_instrumentation_scope, - name="name", - resource=self.mock_resource, - unit="unit", - point=None, - ), + + number_data_points = view_instrument_match.collect( + AggregationTemporality.CUMULATIVE ) + self.assertEqual(len(number_data_points), 1) + + number_data_point = number_data_points[0] + + self.assertEqual(number_data_point.attributes, frozenset({("c", "d")})) + self.assertEqual(number_data_point.value, 0) def test_setting_aggregation(self): instrument1 = Counter( @@ -256,12 +228,6 @@ def test_setting_aggregation(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation={Counter: LastValueAggregation()}, ) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index b11e1646246..a18cc274067 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -13,71 +13,74 @@ # limitations under the License. -from collections import OrderedDict - from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._metrics.export import ( AggregationTemporality, Gauge, Metric, Sum, + NumberDataPoint ) -from opentelemetry.sdk.resources import Resource as SDKResource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope def _generate_metric( - name, point, attributes=None, description=None, unit=None + name, data, attributes=None, description=None, unit=None ) -> Metric: - if not attributes: - attributes = BoundedAttributes(attributes={"a": 1, "b": True}) - if not description: + if description is None: description = "foo" - if not unit: + if unit is None: unit = "s" return Metric( - resource=SDKResource(OrderedDict([("a", 1), ("b", False)])), - instrumentation_scope=InstrumentationScope( - "first_name", "first_version" - ), - attributes=attributes, - description=description, name=name, + description=description, unit=unit, - point=point, + data=data, ) def _generate_sum( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Sum: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Sum( + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, - start_time_unix_nano=1641946015139533244, - time_unix_nano=1641946016139533244, - value=val, ), - attributes=attributes, description=description, - unit=unit, + unit=unit ) def _generate_gauge( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Gauge: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Gauge( - time_unix_nano=1641946016139533244, - value=val, + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], ), - attributes=attributes, description=description, - unit=unit, + unit=unit ) @@ -87,7 +90,6 @@ def _generate_unsupported_metric( return _generate_metric( name, None, - attributes=attributes, description=description, unit=unit, )