diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 3bf6e54c98b..ed66b24f12d 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -80,6 +80,7 @@ from opentelemetry.sdk._metrics.export import ( Gauge, Histogram, + HistogramDataPoint, Metric, MetricReader, Sum, @@ -88,12 +89,14 @@ _logger = getLogger(__name__) -def _convert_buckets(metric: Metric) -> Sequence[Tuple[str, int]]: +def _convert_buckets( + bucket_counts: Sequence[int], explicit_bounds: Sequence[float] +) -> Sequence[Tuple[str, int]]: buckets = [] total_count = 0 for upper_bound, count in zip( - chain(metric.point.explicit_bounds, ["+Inf"]), - metric.point.bucket_counts, + chain(explicit_bounds, ["+Inf"]), + bucket_counts, ): total_count += count buckets.append((f"{upper_bound}", total_count)) @@ -174,11 +177,31 @@ def _translate_to_prometheus( metric: Metric, metric_family_id_metric_family: Dict[str, PrometheusMetric], ): - label_values = [] - label_keys = [] - for key, value in metric.attributes.items(): - label_keys.append(self._sanitize(key)) - label_values.append(self._check_value(value)) + # pylint: disable=too-many-locals,too-many-branches + label_keyss = [] + label_valuess = [] + values = [] + + for number_data_point in metric.data.data_points: + label_keys = [] + label_values = [] + + for key, value in number_data_point.attributes.items(): + label_keys.append(self._sanitize(key)) + label_values.append(self._check_value(value)) + + label_keyss.append(label_keys) + label_valuess.append(label_values) + if isinstance(number_data_point, HistogramDataPoint): + values.append( + { + "bucket_counts": number_data_point.bucket_counts, + "explicit_bounds": number_data_point.explicit_bounds, + "sum": number_data_point.sum, + } + ) + else: + values.append(number_data_point.value) metric_name = "" if self._prefix != "": @@ -187,68 +210,94 @@ def _translate_to_prometheus( description = metric.description or "" - metric_family_id = "|".join( - [metric_name, description, "%".join(label_keys), metric.unit] - ) + pre_metric_family_ids = [] + + for label_keys in label_keyss: + pre_metric_family_ids.append( + "|".join( + [ + metric_name, + description, + "%".join(label_keys), + metric.unit, + ] + ) + ) - if isinstance(metric.point, Sum): + for pre_metric_family_id, label_values, value in zip( + pre_metric_family_ids, label_valuess, values + ): + if isinstance(metric.data, Sum): - metric_family_id = "|".join( - [metric_family_id, CounterMetricFamily.__name__] - ) + metric_family_id = "|".join( + [pre_metric_family_id, CounterMetricFamily.__name__] + ) - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ + if ( metric_family_id - ] = CounterMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = CounterMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[metric_family_id].add_metric( + labels=label_values, value=value ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, value=metric.point.value - ) - elif isinstance(metric.point, Gauge): + elif isinstance(metric.data, Gauge): - metric_family_id = "|".join( - [metric_family_id, GaugeMetricFamily.__name__] - ) + metric_family_id = "|".join( + [pre_metric_family_id, GaugeMetricFamily.__name__] + ) - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ + if ( metric_family_id - ] = GaugeMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = GaugeMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[metric_family_id].add_metric( + labels=label_values, value=value ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, value=metric.point.value - ) - elif isinstance(metric.point, Histogram): + elif isinstance(metric.data, Histogram): - metric_family_id = "|".join( - [metric_family_id, HistogramMetricFamily.__name__] - ) + metric_family_id = "|".join( + [pre_metric_family_id, HistogramMetricFamily.__name__] + ) - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ + if ( metric_family_id - ] = HistogramMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = HistogramMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[metric_family_id].add_metric( + labels=label_values, + buckets=_convert_buckets( + value["bucket_counts"], value["explicit_bounds"] + ), + sum_value=value["sum"], + ) + else: + _logger.warning( + "Unsupported metric data. %s", type(metric.data) ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, - buckets=_convert_buckets(metric), - sum_value=metric.point.sum, - ) - else: - _logger.warning("Unsupported metric type. %s", type(metric.point)) def _sanitize(self, key: str) -> str: """sanitize the given metric name or label according to Prometheus rule. diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index e5dcb796834..2694badd069 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -23,10 +23,14 @@ PrometheusMetricReader, _CustomCollector, ) -from opentelemetry.sdk._metrics.export import AggregationTemporality, Histogram +from opentelemetry.sdk._metrics.export import ( + AggregationTemporality, + Histogram, + HistogramDataPoint, + Metric, +) from opentelemetry.test.metrictestutil import ( _generate_gauge, - _generate_metric, _generate_sum, _generate_unsupported_metric, ) @@ -57,19 +61,26 @@ def test_shutdown(self): self.assertTrue(registry_unregister_patch.called) def test_histogram_to_prometheus(self): - record = _generate_metric( - "test@name", - Histogram( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[1, 3, 2], - explicit_bounds=[123.0, 456.0], - start_time_unix_nano=1641946016139533244, - max=457, - min=1, - sum=579.0, - time_unix_nano=1641946016139533244, + record = Metric( + name="test@name", + description="foo", + unit="s", + data=Histogram( + data_points=[ + HistogramDataPoint( + attributes={"histo": 1}, + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=6, + sum=579.0, + bucket_counts=[1, 3, 2], + explicit_bounds=[123.0, 456.0], + min=1, + max=457, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, ), - attributes={"histo": 1}, ) collector = _CustomCollector("testprefix") diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py index 2f002a272ec..c3f0b4ebf82 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py @@ -22,16 +22,11 @@ Aggregation, DefaultAggregation, _Aggregation, - _convert_aggregation_temporality, - _PointVarT, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric -from opentelemetry.sdk._metrics._internal.sdk_configuration import ( - SdkConfiguration, -) +from opentelemetry.sdk._metrics._internal.point import DataPointT from opentelemetry.sdk._metrics._internal.view import View _logger = getLogger(__name__) @@ -42,17 +37,12 @@ def __init__( self, view: View, instrument: Instrument, - sdk_config: SdkConfiguration, - instrument_class_temporality: Dict[type, AggregationTemporality], instrument_class_aggregation: Dict[type, Aggregation], ): self._view = view self._instrument = instrument - self._sdk_config = sdk_config self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} - self._attributes_previous_point: Dict[frozenset, _PointVarT] = {} self._lock = Lock() - self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation self._name = self._view._name or self._instrument.name self._description = ( @@ -124,46 +114,15 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self) -> Iterable[Metric]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Iterable[DataPointT]: + data_points = [] with self._lock: - for ( - attributes, - aggregation, - ) in self._attributes_aggregation.items(): - - previous_point = self._attributes_previous_point.get( - attributes - ) - - current_point = aggregation.collect() - - # pylint: disable=assignment-from-none - - self._attributes_previous_point[ - attributes - ] = _convert_aggregation_temporality( - previous_point, - current_point, - AggregationTemporality.CUMULATIVE, + for aggregation in self._attributes_aggregation.values(): + data_points.append( + aggregation.collect(aggregation_temporality) ) - if current_point is not None: - - yield Metric( - attributes=dict(attributes), - description=self._description, - instrumentation_scope=( - self._instrument.instrumentation_scope - ), - name=self._name, - resource=self._sdk_config.resource, - unit=self._instrument.unit, - point=_convert_aggregation_temporality( - previous_point, - current_point, - self._instrument_class_temporality[ - self._instrument.__class__ - ], - ), - ) + return data_points diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py index 1d10b3f8ead..052588c3763 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from bisect import bisect_left -from dataclasses import replace from enum import IntEnum from logging import getLogger from math import inf @@ -37,11 +36,15 @@ from opentelemetry.sdk._metrics._internal.point import ( Histogram as HistogramPoint, ) -from opentelemetry.sdk._metrics._internal.point import PointT, Sum +from opentelemetry.sdk._metrics._internal.point import ( + HistogramDataPoint, + NumberDataPoint, + Sum, +) from opentelemetry.util._time import _time_ns from opentelemetry.util.types import Attributes -_PointVarT = TypeVar("_PointVarT", bound=PointT) +_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) _logger = getLogger(__name__) @@ -58,17 +61,21 @@ class AggregationTemporality(IntEnum): CUMULATIVE = 2 -class _Aggregation(ABC, Generic[_PointVarT]): +class _Aggregation(ABC, Generic[_DataPointVarT]): def __init__(self, attributes: Attributes): self._lock = Lock() self._attributes = attributes + self._previous_point = None @abstractmethod def aggregate(self, measurement: Measurement) -> None: pass @abstractmethod - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -76,7 +83,10 @@ class _DropAggregation(_Aggregation): def aggregate(self, measurement: Measurement) -> None: pass - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -176,7 +186,9 @@ def aggregate(self, measurement: Measurement) -> None: self._value = 0 self._value = self._value + measurement.value - def collect(self) -> Optional[Sum]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Optional[NumberDataPoint]: """ Atomically return a point for the current value of the metric and reset the aggregation value. @@ -192,28 +204,50 @@ def collect(self) -> Optional[Sum]: self._value = 0 self._start_time_unix_nano = now + 1 - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) + else: - with self._lock: - if self._value is None: - return None - value = self._value - self._value = None + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None + start_time_unix_nano = self._start_time_unix_nano - return Sum( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=self._start_time_unix_nano, + current_point = NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, time_unix_nano=now, value=value, ) + if self._previous_point is None: + self._previous_point = current_point + return current_point + + if self._instrument_temporality is aggregation_temporality: + # Output DELTA for a synchronous instrument + # Output CUMULATIVE for an asynchronous instrument + return current_point + + if aggregation_temporality is AggregationTemporality.DELTA: + # Output temporality DELTA for an asynchronous instrument + value = current_point.value - self._previous_point.value + output_start_time_unix_nano = self._previous_point.time_unix_nano + + else: + # Output CUMULATIVE for a synchronous instrument + value = current_point.value + self._previous_point.value + output_start_time_unix_nano = ( + self._previous_point.start_time_unix_nano + ) + + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=output_start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + value=value, + ) + class _LastValueAggregation(_Aggregation[Gauge]): def __init__(self, attributes: Attributes): @@ -224,7 +258,10 @@ def aggregate(self, measurement: Measurement): with self._lock: self._value = measurement.value - def collect(self) -> Optional[Gauge]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ @@ -234,7 +271,9 @@ def collect(self) -> Optional[Gauge]: value = self._value self._value = None - return Gauge( + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=0, time_unix_nano=_time_ns(), value=value, ) @@ -266,6 +305,11 @@ def __init__( self._sum = 0 self._record_min_max = record_min_max self._start_time_unix_nano = _time_ns() + # It is assumed that the "natural" aggregation temporality for a + # Histogram instrument is DELTA, like the "natural" aggregation + # temporality for a Counter is DELTA and the "natural" aggregation + # temporality for an ObservableCounter is CUMULATIVE. + self._instrument_temporality = AggregationTemporality.DELTA def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) @@ -282,18 +326,24 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> HistogramPoint: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ now = _time_ns() with self._lock: - value = self._bucket_counts + if not any(self._bucket_counts): + return None + + bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano - histogram_sum = self._sum - histogram_max = self._max - histogram_min = self._min + sum_ = self._sum + max_ = self._max + min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = now + 1 @@ -301,146 +351,63 @@ def collect(self) -> HistogramPoint: self._min = inf self._max = -inf - return HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=tuple(value), - explicit_bounds=self._boundaries, - max=histogram_max, - min=histogram_min, + current_point = HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=histogram_sum, time_unix_nano=now, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=self._boundaries, + min=min_, + max=max_, ) - -# pylint: disable=too-many-return-statements,too-many-branches -def _convert_aggregation_temporality( - previous_point: Optional[_PointVarT], - current_point: _PointVarT, - aggregation_temporality: AggregationTemporality, -) -> _PointVarT: - """Converts `current_point` to the requested `aggregation_temporality` - given the `previous_point`. - - `previous_point` must have `CUMULATIVE` temporality. `current_point` may - have `DELTA` or `CUMULATIVE` temporality. - - The output point will have temporality `aggregation_temporality`. Since - `GAUGE` points have no temporality, they are returned unchanged. - """ - - current_point_type = type(current_point) - - if current_point_type is Gauge: - return current_point - - if ( - previous_point is not None - and current_point is not None - and type(previous_point) is not type(current_point) - ): - _logger.warning( - "convert_aggregation_temporality called with mismatched " - "point types: %s and %s", - type(previous_point), - current_point_type, - ) - - return current_point - - if current_point_type is Sum: - if previous_point is None: - # Output CUMULATIVE for a synchronous instrument - # There is no previous value, return the delta point as a - # cumulative - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: - # Output DELTA for a synchronous instrument - # Output CUMULATIVE for an asynchronous instrument + if self._previous_point is None: + self._previous_point = current_point return current_point - if aggregation_temporality is AggregationTemporality.DELTA: - # Output temporality DELTA for an asynchronous instrument - value = current_point.value - previous_point.value - output_start_time_unix_nano = previous_point.time_unix_nano - - else: - # Output CUMULATIVE for a synchronous instrument - value = current_point.value + previous_point.value - output_start_time_unix_nano = previous_point.start_time_unix_nano - - is_monotonic = ( - previous_point.is_monotonic and current_point.is_monotonic - ) - - return Sum( - start_time_unix_nano=output_start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - value=value, - aggregation_temporality=aggregation_temporality, - is_monotonic=is_monotonic, - ) - - if current_point_type is HistogramPoint: - if previous_point is None: - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: + if self._instrument_temporality is aggregation_temporality: return current_point max_ = current_point.max min_ = current_point.min if aggregation_temporality is AggregationTemporality.CUMULATIVE: - start_time_unix_nano = previous_point.start_time_unix_nano - sum_ = current_point.sum + previous_point.sum + start_time_unix_nano = self._previous_point.start_time_unix_nano + sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative - max_ = max(current_point.max, previous_point.max) - min_ = min(current_point.min, previous_point.min) + max_ = max(current_point.max, self._previous_point.max) + min_ = min(current_point.min, self._previous_point.min) bucket_counts = [ curr_count + prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] else: - start_time_unix_nano = previous_point.time_unix_nano - sum_ = current_point.sum - previous_point.sum + start_time_unix_nano = self._previous_point.time_unix_nano + sum_ = current_point.sum - self._previous_point.sum bucket_counts = [ curr_count - prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] - return HistogramPoint( - aggregation_temporality=aggregation_temporality, - bucket_counts=bucket_counts, - explicit_bounds=current_point.explicit_bounds, - max=max_, - min=min_, + return HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=sum_, time_unix_nano=current_point.time_unix_nano, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=current_point.explicit_bounds, + min=min_, + max=max_, ) - return None class ExplicitBucketHistogramAggregation(Aggregation): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index d8a753ba7bc..a940618208b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, Optional, Sequence from typing_extensions import final @@ -262,7 +262,7 @@ def _set_collect_callback( @abstractmethod def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: @@ -300,24 +300,28 @@ def __init__( preferred_aggregation=preferred_aggregation, ) self._lock = RLock() - self._metrics: List["opentelemetry.sdk._metrics.export.Metric"] = [] + self._metrics_data: ( + "opentelemetry.sdk._metrics.export.MetricsData" + ) = None - def get_metrics(self) -> List["opentelemetry.sdk._metrics.export.Metric"]: + def get_metrics_data(self) -> ( + "opentelemetry.sdk._metrics.export.MetricsData" + ): """Reads and returns current metrics from the SDK""" with self._lock: self.collect() - metrics = self._metrics - self._metrics = [] - return metrics + metrics_data = self._metrics_data + self._metrics_data = None + return metrics_data def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: with self._lock: - self._metrics = list(metrics) + self._metrics_data = metrics_data def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -389,15 +393,17 @@ def _ticker(self) -> None: def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: - if metrics is None: + if metrics_data is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export(metrics, timeout_millis=timeout_millis) + self._exporter.export( + metrics_data, timeout_millis=timeout_millis + ) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py index b163f414afb..7e2ba5156d3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py @@ -14,19 +14,36 @@ from logging import getLogger from threading import RLock -from typing import Dict, Iterable, List +from typing import Dict, List -from opentelemetry._metrics import Asynchronous, Instrument +from opentelemetry._metrics import ( + Asynchronous, + Counter, + Instrument, + ObservableCounter, +) from opentelemetry.sdk._metrics._internal._view_instrument_match import ( _ViewInstrumentMatch, ) from opentelemetry.sdk._metrics._internal.aggregation import ( Aggregation, ExplicitBucketHistogramAggregation, + _DropAggregation, + _ExplicitBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric +from opentelemetry.sdk._metrics._internal.point import ( + Gauge, + Histogram, + Metric, + Sum, + MetricsData, + ScopeMetrics, + ResourceMetrics +) from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) @@ -81,10 +98,6 @@ def _get_or_init_view_instrument_match( _ViewInstrumentMatch( view=_DEFAULT_VIEW, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), @@ -102,10 +115,9 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self) -> Iterable[Metric]: + def collect(self) -> MetricsData: # Use a list instead of yielding to prevent a slow reader from holding # SDK locks - metrics: List[Metric] = [] # While holding the lock, new _ViewInstrumentMatch can't be added from # another thread (so we are sure we collect all existing view). @@ -116,13 +128,89 @@ def collect(self) -> Iterable[Metric]: # streams produced by the SDK, but we still align the output timestamps # for a single instrument. with self._lock: + + scope_metrics: List[ScopeMetrics] = [] + for ( - view_instrument_matches - ) in self._instrument_view_instrument_matches.values(): + instrument, + view_instrument_matches, + ) in self._instrument_view_instrument_matches.items(): + aggregation_temporality = self._instrument_class_temporality[ + instrument.__class__ + ] + + metrics: List[Metric] = [] + for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect()) - return metrics + if isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _SumAggregation, + ): + data = Sum( + aggregation_temporality=aggregation_temporality, + data_points=view_instrument_match.collect( + aggregation_temporality + ), + is_monotonic=isinstance( + instrument, (Counter, ObservableCounter) + ), + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _LastValueAggregation, + ): + data = Gauge( + data_points=view_instrument_match.collect( + aggregation_temporality + ) + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _ExplicitBucketHistogramAggregation, + ): + data = Histogram( + data_points=view_instrument_match.collect( + aggregation_temporality + ), + aggregation_temporality=aggregation_temporality, + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _DropAggregation, + ): + continue + + metrics.append( + Metric( + # pylint: disable=protected-access + name=view_instrument_match._name, + description=view_instrument_match._description, + unit=view_instrument_match._instrument.unit, + data=data, + ) + ) + scope_metrics.append( + ScopeMetrics( + scope=instrument.instrumentation_scope, + metrics=metrics, + schema_url=instrument.instrumentation_scope.schema_url + ) + ) + + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self._sdk_config.resource, + scope_metrics=scope_metrics, + schema_url=self._sdk_config.resource.schema_url + ) + ] + ) def _handle_view_instrument_match( self, @@ -140,10 +228,6 @@ def _handle_view_instrument_match( new_view_instrument_match = _ViewInstrumentMatch( view=view, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py index eca9783eeea..2a75c99ede6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py @@ -20,9 +20,21 @@ # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk._metrics._internal -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util.types import Attributes +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk.resources import Resource + + +@dataclass(frozen=True) +class NumberDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] @dataclass(frozen=True) @@ -30,13 +42,22 @@ class Sum: """Represents the type of a scalar metric that is calculated as a sum of all reported measurements over a time interval.""" + data_points: Sequence[NumberDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) is_monotonic: bool - start_time_unix_nano: int - time_unix_nano: int - value: Union[int, float] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + "is_monotonic": self.is_monotonic, + } + ) @dataclass(frozen=True) @@ -45,8 +66,33 @@ class Gauge: value for every data point. It should be used for an unknown aggregation.""" + data_points: Sequence[NumberDataPoint] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ) + } + ) + + +@dataclass(frozen=True) +class HistogramDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int time_unix_nano: int - value: Union[int, float] + count: int + sum: Union[int, float] + bucket_counts: Sequence[int] + explicit_bounds: Sequence[float] + min: float + max: float @dataclass(frozen=True) @@ -54,52 +100,67 @@ class Histogram: """Represents the type of a metric that is calculated by aggregating as a histogram of all reported measurements over a time interval.""" + data_points: Sequence[HistogramDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) - bucket_counts: Sequence[int] - explicit_bounds: Sequence[float] - max: int - min: int - start_time_unix_nano: int - sum: Union[int, float] - time_unix_nano: int + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + } + ) -PointT = Union[Sum, Gauge, Histogram] +DataT = Union[Sum, Gauge, Histogram] +DataPointT = Union[NumberDataPoint, HistogramDataPoint] @dataclass(frozen=True) class Metric: - """Represents a metric point in the OpenTelemetry data model to be exported + """Represents a metric point in the OpenTelemetry data model to be + exported.""" - Concrete metric types contain all the information as in the OTLP proto definitions - (https://github.com/open-telemetry/opentelemetry-proto/blob/b43e9b18b76abf3ee040164b55b9c355217151f3/opentelemetry/proto/metrics/v1/metrics.proto#L37) but are flattened as much as possible. - """ - - # common fields to all metric kinds - attributes: Attributes - description: str - instrumentation_scope: InstrumentationScope name: str - resource: Resource + description: str unit: str - point: PointT - """Contains non-common fields for the given metric""" + data: DataT def to_json(self) -> str: return dumps( { - "attributes": self.attributes if self.attributes else "", - "description": self.description if self.description else "", - "instrumentation_scope": repr(self.instrumentation_scope) - if self.instrumentation_scope - else "", "name": self.name, - "resource": repr(self.resource.attributes) - if self.resource - else "", + "description": self.description if self.description else "", "unit": self.unit if self.unit else "", - "point": asdict(self.point) if self.point else "", + "data": self.data.to_json(), } ) + + +@dataclass(frozen=True) +class ScopeMetrics: + """A collection of Metrics produced by a scope""" + + scope: InstrumentationScope + metrics: Sequence[Metric] + schema_url: str + + +@dataclass(frozen=True) +class ResourceMetrics: + """A collection of ScopeMetrics from a Resource""" + + resource: Resource + scope_metrics: Sequence[ScopeMetrics] + schema_url: str + + +@dataclass(frozen=True) +class MetricsData: + """An array of ResourceMetrics""" + + resource_metrics: Sequence[ResourceMetrics] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 107f73f79d5..fdc0df4b923 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -28,17 +28,20 @@ # The point module is not in the export directory to avoid a circular import. from opentelemetry.sdk._metrics._internal.point import ( # noqa: F401 + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, Metric, - PointT, + NumberDataPoint, Sum, ) __all__ = [] for key, value in globals().copy().items(): if not key.startswith("_"): - if _version_info.minor == 6 and key == "PointT": + if _version_info.minor == 6 and key == "DataT": continue value.__module__ = __name__ __all__.append(key) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index 5534e85090a..e3f5d1a601f 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -32,7 +32,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics_data(), []) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() @@ -51,6 +51,16 @@ def test_disable_default_views_add_custom(self): counter.add(10, {"label": "value3"}) histogram.record(12, {"label": "value"}) - metrics = reader.get_metrics() - self.assertEqual(len(metrics), 1) - self.assertEqual(metrics[0].name, "testhist") + metrics = reader.get_metrics_data() + self.assertEqual(len(metrics.resource_metrics), 1) + self.assertEqual(len(metrics.resource_metrics[0].scope_metrics), 2) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[0].metrics), 0 + ) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[1].metrics), 1 + ) + self.assertEqual( + metrics.resource_metrics[0].scope_metrics[1].metrics[0].name, + "testhist" + ) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index e747e49cdba..e71f99d9eed 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import replace -from logging import WARNING from math import inf from time import sleep from typing import Union @@ -29,15 +27,15 @@ UpDownCounter, ) from opentelemetry.sdk._metrics._internal.aggregation import ( - _convert_aggregation_temporality, _ExplicitBucketHistogramAggregation, _LastValueAggregation, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics.export import AggregationTemporality, Gauge -from opentelemetry.sdk._metrics.export import Histogram as HistogramPoint -from opentelemetry.sdk._metrics.export import Sum +from opentelemetry.sdk._metrics.export import ( + AggregationTemporality, + NumberDataPoint, +) from opentelemetry.sdk._metrics.view import ( DefaultAggregation, ExplicitBucketHistogramAggregation, @@ -110,20 +108,44 @@ def test_collect_delta(self): """ synchronous_sum_aggregation = _SumAggregation( - Mock(), True, AggregationTemporality.DELTA + {}, True, AggregationTemporality.DELTA + ) + + synchronous_sum_aggregation.aggregate(measurement(1)) + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(first_sum.value, 1) + + synchronous_sum_aggregation.aggregate(measurement(1)) + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(second_sum.value, 2) + + self.assertEqual( + second_sum.start_time_unix_nano, first_sum.start_time_unix_nano + ) + + synchronous_sum_aggregation = _SumAggregation( + {}, True, AggregationTemporality.DELTA ) synchronous_sum_aggregation.aggregate(measurement(1)) - first_sum = synchronous_sum_aggregation.collect() + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) synchronous_sum_aggregation.aggregate(measurement(1)) - second_sum = synchronous_sum_aggregation.collect() + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertGreater( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano @@ -131,32 +153,30 @@ def test_collect_delta(self): def test_collect_cumulative(self): """ - `SynchronousSumAggregation` collects sum metric points + `SynchronousSumAggregation` collects number data points """ sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE, Mock() + {}, True, AggregationTemporality.CUMULATIVE ) sum_aggregation.aggregate(measurement(1)) - first_sum = sum_aggregation.collect() + first_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) # should have been reset after first collect sum_aggregation.aggregate(measurement(1)) - second_sum = sum_aggregation.collect() + second_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertEqual( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano ) # if no point seen for a whole interval, should return None - third_sum = sum_aggregation.collect() + third_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertIsNone(third_sum) @@ -180,35 +200,44 @@ def test_aggregate(self): def test_collect(self): """ - `LastValueAggregation` collects sum metric points + `LastValueAggregation` collects number data points """ last_value_aggregation = _LastValueAggregation(Mock()) - self.assertIsNone(last_value_aggregation.collect()) + self.assertIsNone( + last_value_aggregation.collect(AggregationTemporality.CUMULATIVE) + ) last_value_aggregation.aggregate(measurement(1)) - first_gauge = last_value_aggregation.collect() - self.assertIsInstance(first_gauge, Gauge) + first_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsInstance(first_number_data_point, NumberDataPoint) - self.assertEqual(first_gauge.value, 1) + self.assertEqual(first_number_data_point.value, 1) last_value_aggregation.aggregate(measurement(1)) # CI fails the last assertion without this sleep(0.1) - second_gauge = last_value_aggregation.collect() + second_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_gauge.value, 1) + self.assertEqual(second_number_data_point.value, 1) self.assertGreater( - second_gauge.time_unix_nano, first_gauge.time_unix_nano + second_number_data_point.time_unix_nano, + first_number_data_point.time_unix_nano, ) # if no observation seen for the interval, it should return None - third_gauge = last_value_aggregation.collect() - self.assertIsNone(third_gauge) + third_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsNone(third_number_data_point) class TestExplicitBucketHistogramAggregation(TestCase): @@ -249,7 +278,9 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation._bucket_counts[3], 1 ) - histo = explicit_bucket_histogram_aggregation.collect() + histo = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(histo.sum, 14) def test_min_max(self): @@ -294,7 +325,9 @@ def test_collect(self): ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - first_histogram = explicit_bucket_histogram_aggregation.collect() + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) self.assertEqual(first_histogram.sum, 1) @@ -303,510 +336,42 @@ def test_collect(self): sleep(0.1) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - second_histogram = explicit_bucket_histogram_aggregation.collect() + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) - self.assertEqual(second_histogram.sum, 1) + self.assertEqual(second_histogram.bucket_counts, (0, 2, 0, 0)) + self.assertEqual(second_histogram.sum, 2) self.assertGreater( second_histogram.time_unix_nano, first_histogram.time_unix_nano ) - -class TestConvertAggregationTemporality(TestCase): - """ - Test aggregation temporality conversion algorithm - """ - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - - def test_mismatched_point_types(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - with self.assertRaises(AssertionError): - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - None, - AggregationTemporality.DELTA, - ), - current_point, - ) - - def test_current_point_sum_previous_point_none(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_current_point_sum_current_point_same_aggregation_temporality( - self, - ): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - def test_current_point_sum_aggregation_temporality_delta(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - ) - - def test_current_point_sum_aggregation_temporality_cumulative(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - ) - - def test_current_point_gauge(self): - - current_point = Gauge(time_unix_nano=1, value=0) - self.assertEqual( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - -class TestHistogramConvertAggregationTemporality(TestCase): - def test_previous_point_none(self): - - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=2, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), - AggregationTemporality.DELTA, - ), - - def test_same_aggregation_temporality_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, + explicit_bucket_histogram_aggregation = ( + _ExplicitBucketHistogramAggregation(Mock(), boundaries=[0, 1, 2]) ) - def test_same_aggregation_temporality_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) + self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(first_histogram.sum, 1) - def test_aggregation_temporality_to_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ) + # CI fails the last assertion without this + sleep(0.1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - def test_aggregation_temporality_to_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) + self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(second_histogram.sum, 1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.DELTA, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), + self.assertGreater( + second_histogram.time_unix_nano, first_histogram.time_unix_nano ) diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py index f9e543db512..b59313c1d96 100644 --- a/opentelemetry-sdk/tests/metrics/test_backward_compat.py +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -100,8 +100,14 @@ def test_observable_callback(self): # produce some data meter_provider.get_meter("foo").create_counter("mycounter").add(12) try: - metrics = reader.get_metrics() + metrics_data = reader.get_metrics_data() except Exception: self.fail() - self.assertEqual(len(metrics), 1) + self.assertEqual(len(metrics_data.resource_metrics), 1) + self.assertEqual( + len(metrics_data.resource_metrics[0].scope_metrics), 1 + ) + self.assertEqual( + len(metrics_data.resource_metrics[0].scope_metrics[0].metrics), 1 + ) diff --git a/opentelemetry-sdk/tests/metrics/test_import.py b/opentelemetry-sdk/tests/metrics/test_import.py index 1a31ab3b885..43c65337dc7 100644 --- a/opentelemetry-sdk/tests/metrics/test_import.py +++ b/opentelemetry-sdk/tests/metrics/test_import.py @@ -46,15 +46,18 @@ def test_import_export(self): from opentelemetry.sdk._metrics.export import ( # noqa: F401 AggregationTemporality, ConsoleMetricExporter, + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, MetricReader, + NumberDataPoint, PeriodicExportingMetricReader, - PointT, Sum, ) except Exception as error: diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 15be13ac9af..7893fe643cc 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -21,10 +21,9 @@ AggregationTemporality, InMemoryMetricReader, Metric, + NumberDataPoint, Sum, ) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope class TestInMemoryMetricReader(TestCase): @@ -32,21 +31,23 @@ def test_no_metrics(self): mock_collect_callback = Mock(return_value=[]) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics_data(), []) mock_collect_callback.assert_called_once() def test_converts_metrics_to_list(self): metric = Metric( - attributes={"myattr": "baz"}, - description="", - instrumentation_scope=InstrumentationScope("testmetrics"), name="foo", - resource=Resource.create(), + description="", unit="", - point=Sum( - start_time_unix_nano=1647626444152947792, - time_unix_nano=1647626444153163239, - value=72.3309814450449, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"myattr": "baz"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=72.3309814450449, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, ), @@ -55,9 +56,9 @@ def test_converts_metrics_to_list(self): reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - returned_metrics = reader.get_metrics() + returned_metrics = reader.get_metrics_data() mock_collect_callback.assert_called_once() - self.assertIsInstance(returned_metrics, list) + self.assertIsInstance(returned_metrics, tuple) self.assertEqual(len(returned_metrics), 1) self.assertIs(returned_metrics[0], metric) @@ -76,6 +77,27 @@ def test_integration(self): counter1.add(1, {"foo": "1"}) counter1.add(1, {"foo": "2"}) - metrics = reader.get_metrics() - # should be 3 metrics, one from the observable gauge and one for each labelset from the counter - self.assertEqual(len(metrics), 3) + metrics = reader.get_metrics_data() + # should be 3 number data points, one from the observable gauge and one + # for each labelset from the counter + self.assertEqual(len(metrics.resource_metrics[0].scope_metrics), 2) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[0].metrics), 1 + ) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[1].metrics), 1 + ) + self.assertEqual( + len( + metrics.resource_metrics[0]. + scope_metrics[1].metrics[0].data.data_points + ), + 1 + ) + self.assertEqual( + len( + metrics.resource_metrics[0]. + scope_metrics[0].metrics[0].data.data_points + ), + 2 + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index cd1a8481ee3..49b1e0d9a90 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -21,6 +21,9 @@ ObservableCounter, UpDownCounter, ) +from opentelemetry.sdk._metrics._internal.aggregation import ( + _LastValueAggregation, +) from opentelemetry.sdk._metrics._internal.measurement import Measurement from opentelemetry.sdk._metrics._internal.metric_reader_storage import ( _DEFAULT_VIEW, @@ -106,9 +109,9 @@ def test_creates_view_instrument_matches( def test_forwards_calls_to_view_instrument_match( self, MockViewInstrumentMatch: Mock ): - view_instrument_match1 = Mock() - view_instrument_match2 = Mock() - view_instrument_match3 = Mock() + view_instrument_match1 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match2 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match3 = Mock(_aggregation=_LastValueAggregation({})) MockViewInstrumentMatch.side_effect = [ view_instrument_match1, view_instrument_match2, @@ -163,7 +166,48 @@ def test_forwards_calls_to_view_instrument_match( view_instrument_match1.collect.assert_called_once() view_instrument_match2.collect.assert_called_once() view_instrument_match3.collect.assert_called_once() - self.assertEqual(result, all_metrics) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[0].metrics[0]. + data.data_points[0] + ), + all_metrics[0] + ) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[0].metrics[0]. + data.data_points[1] + ), + all_metrics[1] + ) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[0].metrics[1]. + data.data_points[0] + ), + all_metrics[2] + ) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[0].metrics[1]. + data.data_points[1] + ), + all_metrics[3] + ) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[1].metrics[0]. + data.data_points[0] + ), + all_metrics[4] + ) + self.assertEqual( + ( + result.resource_metrics[0].scope_metrics[1].metrics[0]. + data.data_points[1] + ), + all_metrics[5] + ) @patch( "opentelemetry.sdk._metrics._internal." @@ -256,7 +300,13 @@ def test_drop_aggregation(self): ) metric_reader_storage.consume_measurement(Measurement(1, counter)) - self.assertEqual([], metric_reader_storage.collect()) + self.assertEqual( + [], + ( + metric_reader_storage.collect(). + resource_metrics[0].scope_metrics[0].metrics + ) + ) def test_conflicting_view_configuration(self): diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index b20dd567a26..eb38b8356cc 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -506,18 +506,19 @@ def test_duplicate_instrument_aggregate_data(self): metrics = exporter.metrics[0] - self.assertEqual(len(metrics), 2) + scope_metrics = metrics.resource_metrics[0].scope_metrics + self.assertEqual(len(scope_metrics), 2) - metric_0 = metrics[0] + metric_0 = scope_metrics[0].metrics[0] self.assertEqual(metric_0.name, "counter") self.assertEqual(metric_0.unit, "unit") self.assertEqual(metric_0.description, "description") - self.assertEqual(metric_0.point.value, 3) + self.assertEqual(metric_0.data.data_points[0].value, 3) - metric_1 = metrics[1] + metric_1 = scope_metrics[1].metrics[0] self.assertEqual(metric_1.name, "counter") self.assertEqual(metric_1.unit, "unit") self.assertEqual(metric_1.description, "description") - self.assertEqual(metric_1.point.value, 7) + self.assertEqual(metric_1.data.data_points[0].value, 7) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 531ef90c955..597bd37259c 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -23,10 +23,10 @@ Metric, MetricExporter, MetricExportResult, + NumberDataPoint, PeriodicExportingMetricReader, Sum, ) -from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.util._time import _time_ns @@ -54,29 +54,34 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: metrics_list = [ Metric( name="sum_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Sum( - start_time_unix_nano=_time_ns(), - time_unix_nano=_time_ns(), - value=2, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ], aggregation_temporality=1, is_monotonic=True, ), ), Metric( name="gauge_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Gauge( - time_unix_nano=_time_ns(), - value=2, + data=Gauge( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ] ), ), ] diff --git a/opentelemetry-sdk/tests/metrics/test_point.py b/opentelemetry-sdk/tests/metrics/test_point.py index 70b2235f168..7488c85c995 100644 --- a/opentelemetry-sdk/tests/metrics/test_point.py +++ b/opentelemetry-sdk/tests/metrics/test_point.py @@ -14,22 +14,22 @@ from unittest import TestCase -from opentelemetry.sdk._metrics.export import Gauge, Histogram, Metric, Sum -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk._metrics.export import ( + Gauge, + Histogram, + HistogramDataPoint, + Metric, + NumberDataPoint, + Sum, +) -def _create_metric(value): +def _create_metric(data): return Metric( - attributes={"attr-key": "test-val"}, - description="test-description", - instrumentation_scope=InstrumentationScope( - name="name", version="version" - ), name="test-name", - resource=Resource({"resource-key": "resource-val"}), + description="test-description", unit="test-unit", - point=value, + data=data, ) @@ -38,40 +38,62 @@ def test_sum(self): self.maxDiff = None point = _create_metric( Sum( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ], aggregation_temporality=2, is_monotonic=True, - start_time_unix_nano=10, - time_unix_nano=20, - value=9, ) ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 2, "is_monotonic": true, "start_time_unix_nano": 10, "time_unix_nano": 20, "value": 9}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\", \\"aggregation_temporality\\": 2, \\"is_monotonic\\": true}"}', point.to_json(), ) def test_gauge(self): - point = _create_metric(Gauge(time_unix_nano=40, value=20)) + point = _create_metric( + Gauge( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ] + ) + ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"time_unix_nano": 40, "value": 20}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\"}"}', point.to_json(), ) def test_histogram(self): point = _create_metric( Histogram( + data_points=[ + HistogramDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=50, + time_unix_nano=60, + count=1, + sum=0.8, + bucket_counts=[0, 0, 1, 0], + explicit_bounds=[0.1, 0.5, 0.9, 1], + min=0.8, + max=0.8, + ) + ], aggregation_temporality=1, - bucket_counts=[0, 0, 1, 0], - explicit_bounds=[0.1, 0.5, 0.9, 1], - max=0.8, - min=0.8, - start_time_unix_nano=50, - sum=0.8, - time_unix_nano=60, ) ) self.maxDiff = None self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 1, "bucket_counts": [0, 0, 1, 0], "explicit_bounds": [0.1, 0.5, 0.9, 1], "max": 0.8, "min": 0.8, "start_time_unix_nano": 50, "sum": 0.8, "time_unix_nano": 60}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 50, \\\\\\"time_unix_nano\\\\\\": 60, \\\\\\"count\\\\\\": 1, \\\\\\"sum\\\\\\": 0.8, \\\\\\"bucket_counts\\\\\\": [0, 0, 1, 0], \\\\\\"explicit_bounds\\\\\\": [0.1, 0.5, 0.9, 1], \\\\\\"min\\\\\\": 0.8, \\\\\\"max\\\\\\": 0.8}]\\", \\"aggregation_temporality\\": 1}"}', point.to_json(), ) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 9f05d2bfee0..25c9a31e892 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -27,7 +27,7 @@ from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) -from opentelemetry.sdk._metrics.export import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.export import AggregationTemporality from opentelemetry.sdk._metrics.view import ( DefaultAggregation, DropAggregation, @@ -63,12 +63,6 @@ def test_consume_measurement(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -110,12 +104,6 @@ def test_consume_measurement(self): aggregation=self.mock_aggregation_factory, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -147,12 +135,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -177,12 +159,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -196,24 +172,22 @@ def test_consume_measurement(self): ) def test_collect(self): - instrument1 = Mock( - name="instrument1", description="description", unit="unit" + instrument1 = Counter( + "instrument1", + Mock(), + Mock(), + description="description", + unit="unit", ) instrument1.instrumentation_scope = self.mock_instrumentation_scope view_instrument_match = _ViewInstrumentMatch( view=View( instrument_name="instrument1", name="name", - aggregation=self.mock_aggregation_factory, + aggregation=DefaultAggregation(), attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -226,18 +200,16 @@ def test_collect(self): attributes={"c": "d", "f": "g"}, ) ) - self.assertEqual( - next(view_instrument_match.collect()), - Metric( - attributes={"c": "d"}, - description="description", - instrumentation_scope=self.mock_instrumentation_scope, - name="name", - resource=self.mock_resource, - unit="unit", - point=None, - ), + + number_data_points = view_instrument_match.collect( + AggregationTemporality.CUMULATIVE ) + self.assertEqual(len(number_data_points), 1) + + number_data_point = number_data_points[0] + + self.assertEqual(number_data_point.attributes, frozenset({("c", "d")})) + self.assertEqual(number_data_point.value, 0) def test_setting_aggregation(self): instrument1 = Counter( @@ -256,12 +228,6 @@ def test_setting_aggregation(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation={Counter: LastValueAggregation()}, ) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index b11e1646246..d89b4480a0b 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -13,69 +13,72 @@ # limitations under the License. -from collections import OrderedDict - from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._metrics.export import ( AggregationTemporality, Gauge, Metric, + NumberDataPoint, Sum, ) -from opentelemetry.sdk.resources import Resource as SDKResource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope def _generate_metric( - name, point, attributes=None, description=None, unit=None + name, data, attributes=None, description=None, unit=None ) -> Metric: - if not attributes: - attributes = BoundedAttributes(attributes={"a": 1, "b": True}) - if not description: + if description is None: description = "foo" - if not unit: + if unit is None: unit = "s" return Metric( - resource=SDKResource(OrderedDict([("a", 1), ("b", False)])), - instrumentation_scope=InstrumentationScope( - "first_name", "first_version" - ), - attributes=attributes, - description=description, name=name, + description=description, unit=unit, - point=point, + data=data, ) def _generate_sum( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Sum: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Sum( + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, - start_time_unix_nano=1641946015139533244, - time_unix_nano=1641946016139533244, - value=val, ), - attributes=attributes, description=description, unit=unit, ) def _generate_gauge( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Gauge: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Gauge( - time_unix_nano=1641946016139533244, - value=val, + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], ), - attributes=attributes, description=description, unit=unit, ) @@ -87,7 +90,6 @@ def _generate_unsupported_metric( return _generate_metric( name, None, - attributes=attributes, description=description, unit=unit, )