diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index 1742579e4b5..52d48e3eec8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from logging import getLogger from os import environ from typing import Optional, Sequence from grpc import ChannelCredentials, Compression @@ -40,9 +40,10 @@ from opentelemetry.sdk._metrics.export import ( MetricExporter, MetricExportResult, + MetricsData, ) -logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class OTLPMetricExporter( @@ -79,90 +80,125 @@ def __init__( ) def _translate_data( - self, data: Sequence[Metric] + self, data: MetricsData ) -> ExportMetricsServiceRequest: - sdk_resource_scope_metrics = {} - for metric in data: - resource = metric.resource - scope_map = sdk_resource_scope_metrics.get(resource, {}) - if not scope_map: - sdk_resource_scope_metrics[resource] = scope_map + resource_instrumentation_scope_pb2_scope_metrics = {} - scope_metrics = scope_map.get(metric.instrumentation_scope) + for resource_metrics in data.resource_metrics: - if not scope_metrics: - if metric.instrumentation_scope is not None: - scope_map[metric.instrumentation_scope] = pb2.ScopeMetrics( + resource = resource_metrics.resource + + instrumentation_scope_pb2_scope_metrics = ( + resource_instrumentation_scope_pb2_scope_metrics.get( + resource, {} + ) + ) + + if not instrumentation_scope_pb2_scope_metrics: + resource_instrumentation_scope_pb2_scope_metrics[ + resource + ] = instrumentation_scope_pb2_scope_metrics + + for scope_metrics in resource_metrics.scope_metrics: + + instrumentation_scope = scope_metrics.scope + + if instrumentation_scope is None: + instrumentation_scope_pb2_scope_metrics[ + None + ] = pb2.ScopeMetrics() + + else: + instrumentation_scope_pb2_scope_metrics[ + instrumentation_scope + ] = pb2.ScopeMetrics( scope=InstrumentationScope( - name=metric.instrumentation_scope.name, - version=metric.instrumentation_scope.version, + name=instrumentation_scope.name, + version=instrumentation_scope.version, ) ) - else: - scope_map[ - metric.instrumentation_scope - ] = pb2.ScopeMetrics() - scope_metrics = scope_map.get(metric.instrumentation_scope) + pb2_scope_metrics = instrumentation_scope_pb2_scope_metrics[ + instrumentation_scope + ] + + for metric in scope_metrics.metrics: + pb2_metric = pb2.Metric( + name=metric.name, + description=metric.description, + unit=metric.unit, + ) + + for data_point in metric.data.data_points: + + if isinstance(metric.data, Gauge): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + pb2_metric.gauge.data_points.append(pt) + + elif isinstance(metric.data, Histogram): + pt = pb2.HistogramDataPoint( + attributes=self._translate_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + count=data_point.count, + sum=data_point.sum, + bucket_counts=data_point.bucket_counts, + explicit_bounds=data_point.explicit_bounds, + ) + pb2_metric.histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.histogram.data_points.append(pt) + elif isinstance(metric.data, Sum): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + data_point.attributes + ), + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + # note that because sum is a message type, the + # fields must be set individually rather than + # instantiating a pb2.Sum and setting it once + pb2_metric.sum.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.sum.is_monotonic = ( + metric.data.is_monotonic + ) + pb2_metric.sum.data_points.append(pt) + else: + _logger.warn( + "unsupported datapoint type %s", + metric.point + ) + continue + + pb2_scope_metrics.metrics.append(pb2_metric) - pbmetric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) - if isinstance(metric.point, Gauge): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - pbmetric.gauge.data_points.append(pt) - elif isinstance(metric.point, Histogram): - pt = pb2.HistogramDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - start_time_unix_nano=metric.point.start_time_unix_nano, - count=sum(metric.point.bucket_counts), - sum=metric.point.sum, - bucket_counts=metric.point.bucket_counts, - explicit_bounds=metric.point.explicit_bounds, - ) - pbmetric.histogram.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.histogram.data_points.append(pt) - elif isinstance(metric.point, Sum): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - start_time_unix_nano=metric.point.start_time_unix_nano, - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - # note that because sum is a message type, the fields must be - # set individually rather than instantiating a pb2.Sum and setting - # it once - pbmetric.sum.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.sum.is_monotonic = metric.point.is_monotonic - pbmetric.sum.data_points.append(pt) - else: - logger.warn("unsupported datapoint type %s", metric.point) - continue - - scope_metrics.metrics.append( - pbmetric, - ) return ExportMetricsServiceRequest( resource_metrics=get_resource_data( - sdk_resource_scope_metrics, + resource_instrumentation_scope_pb2_scope_metrics, pb2.ResourceMetrics, "metrics", ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py index 9edf193374c..35dee657ae1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py @@ -20,6 +20,7 @@ from google.rpc.error_details_pb2 import RetryInfo from grpc import StatusCode, server +from opentelemetry.sdk.resources import Resource from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import ( OTLPMetricExporter, ) @@ -44,13 +45,20 @@ AggregationTemporality, Histogram, MetricExportResult, + ResourceMetrics, + MetricsData, + ScopeMetrics, + Metric, + HistogramDataPoint +) +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope ) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_INSECURE, ) from opentelemetry.test.metrictestutil import ( _generate_gauge, - _generate_metric, _generate_sum, ) @@ -112,23 +120,138 @@ def setUp(self): self.server.start() + histogram = Metric( + name="histogram", + description="foo", + unit="s", + data=Histogram( + data_points=[ + HistogramDataPoint( + attributes={"a": 1, "b": True}, + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + min=8, + max=18, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + self.metrics = { - "sum_int": _generate_sum("sum_int", 33), - "sum_double": _generate_sum("sum_double", 2.98), - "gauge_int": _generate_gauge("gauge_int", 9000), - "gauge_double": _generate_gauge("gauge_double", 52.028), - "histogram": _generate_metric( - "histogram", - Histogram( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - max=18, - min=8, - start_time_unix_nano=1641946016139533244, - sum=67, - time_unix_nano=1641946016139533244, - ), + "sum_int": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url" + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url" + ), + metrics=[_generate_sum("sum_int", 33)], + schema_url="instrumentation_scope_schema_url" + ) + ], + schema_url="resource_schema_url" + ) + ] + ), + "sum_double": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url" + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url" + ), + metrics=[_generate_sum("sum_double", 2.98)], + schema_url="instrumentation_scope_schema_url" + ) + ], + schema_url="resource_schema_url" + ) + ] + ), + "gauge_int": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url" + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url" + ), + metrics=[_generate_gauge("gauge_int", 9000)], + schema_url="instrumentation_scope_schema_url" + ) + ], + schema_url="resource_schema_url" + ) + ] + ), + "gauge_double": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url" + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url" + ), + metrics=[_generate_gauge("gauge_double", 52.028)], + schema_url="instrumentation_scope_schema_url" + ) + ], + schema_url="resource_schema_url" + ) + ] + ), + "histogram": MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url" + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url" + ), + metrics=[histogram], + schema_url="instrumentation_scope_schema_url" + ) + ], + schema_url="resource_schema_url" + ) + ] ), } @@ -246,7 +369,7 @@ def test_unavailable(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLE(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(1) @@ -261,7 +384,7 @@ def test_unavailable_delay(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLEDelay(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(4) @@ -271,7 +394,7 @@ def test_success(self): MetricsServiceServicerSUCCESS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.SUCCESS, ) @@ -280,7 +403,7 @@ def test_failure(self): MetricsServiceServicerALREADY_EXISTS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) @@ -339,7 +462,7 @@ def test_translate_sum_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_int"]]) + actual = self.exporter._translate_data(self.metrics["sum_int"]) self.assertEqual(expected, actual) def test_translate_sum_double(self): @@ -397,7 +520,7 @@ def test_translate_sum_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_double"]]) + actual = self.exporter._translate_data(self.metrics["sum_double"]) self.assertEqual(expected, actual) def test_translate_gauge_int(self): @@ -452,7 +575,7 @@ def test_translate_gauge_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_int"]]) + actual = self.exporter._translate_data(self.metrics["gauge_int"]) self.assertEqual(expected, actual) def test_translate_gauge_double(self): @@ -507,7 +630,7 @@ def test_translate_gauge_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_double"]]) + actual = self.exporter._translate_data(self.metrics["gauge_double"]) self.assertEqual(expected, actual) def test_translate_histogram(self): @@ -569,5 +692,5 @@ def test_translate_histogram(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["histogram"]]) + actual = self.exporter._translate_data(self.metrics["histogram"]) self.assertEqual(expected, actual) diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 3bf6e54c98b..2a9459bb18b 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -67,7 +67,7 @@ from json import dumps from logging import getLogger from re import IGNORECASE, UNICODE, compile -from typing import Dict, Iterable, Sequence, Tuple, Union +from typing import Dict, Sequence, Tuple, Union from prometheus_client.core import ( REGISTRY, @@ -80,20 +80,23 @@ from opentelemetry.sdk._metrics.export import ( Gauge, Histogram, - Metric, + HistogramDataPoint, MetricReader, Sum, + MetricsData ) _logger = getLogger(__name__) -def _convert_buckets(metric: Metric) -> Sequence[Tuple[str, int]]: +def _convert_buckets( + bucket_counts: Sequence[int], explicit_bounds: Sequence[float] +) -> Sequence[Tuple[str, int]]: buckets = [] total_count = 0 for upper_bound, count in zip( - chain(metric.point.explicit_bounds, ["+Inf"]), - metric.point.bucket_counts, + chain(explicit_bounds, ["+Inf"]), + bucket_counts, ): total_count += count buckets.append((f"{upper_bound}", total_count)) @@ -117,13 +120,13 @@ def __init__(self, prefix: str = "") -> None: def _receive_metrics( self, - metrics: Iterable[Metric], + metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> None: - if metrics is None: + if metrics_data is None: return - self._collector.add_metrics_data(metrics) + self._collector.add_metrics_data(metrics_data) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: REGISTRY.unregister(self._collector) @@ -139,14 +142,14 @@ class _CustomCollector: def __init__(self, prefix: str = ""): self._prefix = prefix self._callback = None - self._metrics_to_export = deque() + self._metrics_datas = deque() self._non_letters_digits_underscore_re = compile( r"[^\w]", UNICODE | IGNORECASE ) - def add_metrics_data(self, export_records: Sequence[Metric]) -> None: + def add_metrics_data(self, metrics_data: MetricsData) -> None: """Add metrics to Prometheus data""" - self._metrics_to_export.append(export_records) + self._metrics_datas.append(metrics_data) def collect(self) -> None: """Collect fetches the metrics from OpenTelemetry @@ -159,11 +162,10 @@ def collect(self) -> None: metric_family_id_metric_family = {} - while self._metrics_to_export: - for export_record in self._metrics_to_export.popleft(): - self._translate_to_prometheus( - export_record, metric_family_id_metric_family - ) + while self._metrics_datas: + self._translate_to_prometheus( + self._metrics_datas.popleft(), metric_family_id_metric_family + ) if metric_family_id_metric_family: for metric_family in metric_family_id_metric_family.values(): @@ -171,84 +173,142 @@ def collect(self) -> None: def _translate_to_prometheus( self, - metric: Metric, + metrics_data: MetricsData, metric_family_id_metric_family: Dict[str, PrometheusMetric], ): - label_values = [] - label_keys = [] - for key, value in metric.attributes.items(): - label_keys.append(self._sanitize(key)) - label_values.append(self._check_value(value)) - - metric_name = "" - if self._prefix != "": - metric_name = self._prefix + "_" - metric_name += self._sanitize(metric.name) - - description = metric.description or "" - - metric_family_id = "|".join( - [metric_name, description, "%".join(label_keys), metric.unit] - ) - - if isinstance(metric.point, Sum): - - metric_family_id = "|".join( - [metric_family_id, CounterMetricFamily.__name__] - ) - - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ - metric_family_id - ] = CounterMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, + metrics = [] + + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + metrics.append(metric) + + for metric in metrics: + # pylint: disable=too-many-locals,too-many-branches + label_keyss = [] + label_valuess = [] + values = [] + + for number_data_point in metric.data.data_points: + label_keys = [] + label_values = [] + + for key, value in number_data_point.attributes.items(): + label_keys.append(self._sanitize(key)) + label_values.append(self._check_value(value)) + + label_keyss.append(label_keys) + label_valuess.append(label_values) + if isinstance(number_data_point, HistogramDataPoint): + values.append( + { + "bucket_counts": number_data_point.bucket_counts, + "explicit_bounds": ( + number_data_point.explicit_bounds + ), + "sum": number_data_point.sum, + } + ) + else: + values.append(number_data_point.value) + + metric_name = "" + if self._prefix != "": + metric_name = self._prefix + "_" + metric_name += self._sanitize(metric.name) + + description = metric.description or "" + + pre_metric_family_ids = [] + + for label_keys in label_keyss: + pre_metric_family_ids.append( + "|".join( + [ + metric_name, + description, + "%".join(label_keys), + metric.unit, + ] + ) ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, value=metric.point.value - ) - elif isinstance(metric.point, Gauge): - - metric_family_id = "|".join( - [metric_family_id, GaugeMetricFamily.__name__] - ) - - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ - metric_family_id - ] = GaugeMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, - ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, value=metric.point.value - ) - elif isinstance(metric.point, Histogram): - - metric_family_id = "|".join( - [metric_family_id, HistogramMetricFamily.__name__] - ) - if metric_family_id not in metric_family_id_metric_family.keys(): - metric_family_id_metric_family[ - metric_family_id - ] = HistogramMetricFamily( - name=metric_name, - documentation=description, - labels=label_keys, - unit=metric.unit, - ) - metric_family_id_metric_family[metric_family_id].add_metric( - labels=label_values, - buckets=_convert_buckets(metric), - sum_value=metric.point.sum, - ) - else: - _logger.warning("Unsupported metric type. %s", type(metric.point)) + for pre_metric_family_id, label_values, value in zip( + pre_metric_family_ids, label_valuess, values + ): + if isinstance(metric.data, Sum): + + metric_family_id = "|".join( + [pre_metric_family_id, CounterMetricFamily.__name__] + ) + + if ( + metric_family_id + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = CounterMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[metric_family_id].add_metric( + labels=label_values, value=value + ) + elif isinstance(metric.data, Gauge): + + metric_family_id = "|".join( + [pre_metric_family_id, GaugeMetricFamily.__name__] + ) + + if ( + metric_family_id + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = GaugeMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[metric_family_id].add_metric( + labels=label_values, value=value + ) + elif isinstance(metric.data, Histogram): + + metric_family_id = "|".join( + [pre_metric_family_id, HistogramMetricFamily.__name__] + ) + + if ( + metric_family_id + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[ + metric_family_id + ] = HistogramMetricFamily( + name=metric_name, + documentation=description, + labels=label_keys, + unit=metric.unit, + ) + metric_family_id_metric_family[ + metric_family_id + ].add_metric( + labels=label_values, + buckets=_convert_buckets( + value["bucket_counts"], value["explicit_bounds"] + ), + sum_value=value["sum"], + ) + else: + _logger.warning( + "Unsupported metric data. %s", type(metric.data) + ) def _sanitize(self, key: str) -> str: """sanitize the given metric name or label according to Prometheus rule. diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index e5dcb796834..394600a1548 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest from textwrap import dedent -from unittest import mock +from unittest import TestCase +from unittest.mock import Mock, patch from prometheus_client import generate_latest from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily @@ -23,19 +23,26 @@ PrometheusMetricReader, _CustomCollector, ) -from opentelemetry.sdk._metrics.export import AggregationTemporality, Histogram +from opentelemetry.sdk._metrics.export import ( + AggregationTemporality, + Histogram, + HistogramDataPoint, + Metric, + MetricsData, + ResourceMetrics, + ScopeMetrics +) from opentelemetry.test.metrictestutil import ( _generate_gauge, - _generate_metric, _generate_sum, _generate_unsupported_metric, ) -class TestPrometheusMetricReader(unittest.TestCase): +class TestPrometheusMetricReader(TestCase): def setUp(self): - self._mock_registry_register = mock.Mock() - self._registry_register_patch = mock.patch( + self._mock_registry_register = Mock() + self._registry_register_patch = patch( "prometheus_client.core.REGISTRY.register", side_effect=self._mock_registry_register, ) @@ -49,7 +56,7 @@ def test_constructor(self): self.assertTrue(self._mock_registry_register.called) def test_shutdown(self): - with mock.patch( + with patch( "prometheus_client.core.REGISTRY.unregister" ) as registry_unregister_patch: exporter = PrometheusMetricReader() @@ -57,23 +64,45 @@ def test_shutdown(self): self.assertTrue(registry_unregister_patch.called) def test_histogram_to_prometheus(self): - record = _generate_metric( - "test@name", - Histogram( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[1, 3, 2], - explicit_bounds=[123.0, 456.0], - start_time_unix_nano=1641946016139533244, - max=457, - min=1, - sum=579.0, - time_unix_nano=1641946016139533244, + metric = Metric( + name="test@name", + description="foo", + unit="s", + data=Histogram( + data_points=[ + HistogramDataPoint( + attributes={"histo": 1}, + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=6, + sum=579.0, + bucket_counts=[1, 3, 2], + explicit_bounds=[123.0, 456.0], + min=1, + max=457, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, ), - attributes={"histo": 1}, + ) + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Mock(), + scope_metrics=[ + ScopeMetrics( + scope=Mock(), + metrics=[metric], + schema_url=Mock() + ) + ], + schema_url=Mock() + ) + ] ) collector = _CustomCollector("testprefix") - collector.add_metrics_data([record]) + collector.add_metrics_data(metrics_data) result_bytes = generate_latest(collector) result = result_bytes.decode("utf-8") self.assertEqual( @@ -93,15 +122,32 @@ def test_histogram_to_prometheus(self): def test_sum_to_prometheus(self): labels = {"environment@": "staging", "os": "Windows"} - record = _generate_sum( + metric = _generate_sum( "test@sum", 123, attributes=labels, description="testdesc", unit="testunit", ) + + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Mock(), + scope_metrics=[ + ScopeMetrics( + scope=Mock(), + metrics=[metric], + schema_url=Mock() + ) + ], + schema_url=Mock() + ) + ] + ) + collector = _CustomCollector("testprefix") - collector.add_metrics_data([record]) + collector.add_metrics_data(metrics_data) for prometheus_metric in collector.collect(): self.assertEqual(type(prometheus_metric), CounterMetricFamily) @@ -121,15 +167,32 @@ def test_sum_to_prometheus(self): def test_gauge_to_prometheus(self): labels = {"environment@": "dev", "os": "Unix"} - record = _generate_gauge( + metric = _generate_gauge( "test@gauge", 123, attributes=labels, description="testdesc", unit="testunit", ) + + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Mock(), + scope_metrics=[ + ScopeMetrics( + scope=Mock(), + metrics=[metric], + schema_url=Mock() + ) + ], + schema_url=Mock() + ) + ] + ) + collector = _CustomCollector("testprefix") - collector.add_metrics_data([record]) + collector.add_metrics_data(metrics_data) for prometheus_metric in collector.collect(): self.assertEqual(type(prometheus_metric), GaugeMetricFamily) @@ -170,15 +233,30 @@ def test_sanitize(self): def test_list_labels(self): labels = {"environment@": ["1", "2", "3"], "os": "Unix"} - record = _generate_gauge( + metric = _generate_gauge( "test@gauge", 123, attributes=labels, description="testdesc", unit="testunit", ) + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Mock(), + scope_metrics=[ + ScopeMetrics( + scope=Mock(), + metrics=[metric], + schema_url=Mock() + ) + ], + schema_url=Mock() + ) + ] + ) collector = _CustomCollector("testprefix") - collector.add_metrics_data([record]) + collector.add_metrics_data(metrics_data) for prometheus_metric in collector.collect(): self.assertEqual(type(prometheus_metric), GaugeMetricFamily) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py index 2f002a272ec..c3f0b4ebf82 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py @@ -22,16 +22,11 @@ Aggregation, DefaultAggregation, _Aggregation, - _convert_aggregation_temporality, - _PointVarT, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric -from opentelemetry.sdk._metrics._internal.sdk_configuration import ( - SdkConfiguration, -) +from opentelemetry.sdk._metrics._internal.point import DataPointT from opentelemetry.sdk._metrics._internal.view import View _logger = getLogger(__name__) @@ -42,17 +37,12 @@ def __init__( self, view: View, instrument: Instrument, - sdk_config: SdkConfiguration, - instrument_class_temporality: Dict[type, AggregationTemporality], instrument_class_aggregation: Dict[type, Aggregation], ): self._view = view self._instrument = instrument - self._sdk_config = sdk_config self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} - self._attributes_previous_point: Dict[frozenset, _PointVarT] = {} self._lock = Lock() - self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation self._name = self._view._name or self._instrument.name self._description = ( @@ -124,46 +114,15 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self) -> Iterable[Metric]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Iterable[DataPointT]: + data_points = [] with self._lock: - for ( - attributes, - aggregation, - ) in self._attributes_aggregation.items(): - - previous_point = self._attributes_previous_point.get( - attributes - ) - - current_point = aggregation.collect() - - # pylint: disable=assignment-from-none - - self._attributes_previous_point[ - attributes - ] = _convert_aggregation_temporality( - previous_point, - current_point, - AggregationTemporality.CUMULATIVE, + for aggregation in self._attributes_aggregation.values(): + data_points.append( + aggregation.collect(aggregation_temporality) ) - if current_point is not None: - - yield Metric( - attributes=dict(attributes), - description=self._description, - instrumentation_scope=( - self._instrument.instrumentation_scope - ), - name=self._name, - resource=self._sdk_config.resource, - unit=self._instrument.unit, - point=_convert_aggregation_temporality( - previous_point, - current_point, - self._instrument_class_temporality[ - self._instrument.__class__ - ], - ), - ) + return data_points diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py index 1d10b3f8ead..052588c3763 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from bisect import bisect_left -from dataclasses import replace from enum import IntEnum from logging import getLogger from math import inf @@ -37,11 +36,15 @@ from opentelemetry.sdk._metrics._internal.point import ( Histogram as HistogramPoint, ) -from opentelemetry.sdk._metrics._internal.point import PointT, Sum +from opentelemetry.sdk._metrics._internal.point import ( + HistogramDataPoint, + NumberDataPoint, + Sum, +) from opentelemetry.util._time import _time_ns from opentelemetry.util.types import Attributes -_PointVarT = TypeVar("_PointVarT", bound=PointT) +_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) _logger = getLogger(__name__) @@ -58,17 +61,21 @@ class AggregationTemporality(IntEnum): CUMULATIVE = 2 -class _Aggregation(ABC, Generic[_PointVarT]): +class _Aggregation(ABC, Generic[_DataPointVarT]): def __init__(self, attributes: Attributes): self._lock = Lock() self._attributes = attributes + self._previous_point = None @abstractmethod def aggregate(self, measurement: Measurement) -> None: pass @abstractmethod - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -76,7 +83,10 @@ class _DropAggregation(_Aggregation): def aggregate(self, measurement: Measurement) -> None: pass - def collect(self) -> Optional[_PointVarT]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: pass @@ -176,7 +186,9 @@ def aggregate(self, measurement: Measurement) -> None: self._value = 0 self._value = self._value + measurement.value - def collect(self) -> Optional[Sum]: + def collect( + self, aggregation_temporality: AggregationTemporality + ) -> Optional[NumberDataPoint]: """ Atomically return a point for the current value of the metric and reset the aggregation value. @@ -192,28 +204,50 @@ def collect(self) -> Optional[Sum]: self._value = 0 self._start_time_unix_nano = now + 1 - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) + else: - with self._lock: - if self._value is None: - return None - value = self._value - self._value = None + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None + start_time_unix_nano = self._start_time_unix_nano - return Sum( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=self._start_time_unix_nano, + current_point = NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, time_unix_nano=now, value=value, ) + if self._previous_point is None: + self._previous_point = current_point + return current_point + + if self._instrument_temporality is aggregation_temporality: + # Output DELTA for a synchronous instrument + # Output CUMULATIVE for an asynchronous instrument + return current_point + + if aggregation_temporality is AggregationTemporality.DELTA: + # Output temporality DELTA for an asynchronous instrument + value = current_point.value - self._previous_point.value + output_start_time_unix_nano = self._previous_point.time_unix_nano + + else: + # Output CUMULATIVE for a synchronous instrument + value = current_point.value + self._previous_point.value + output_start_time_unix_nano = ( + self._previous_point.start_time_unix_nano + ) + + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=output_start_time_unix_nano, + time_unix_nano=current_point.time_unix_nano, + value=value, + ) + class _LastValueAggregation(_Aggregation[Gauge]): def __init__(self, attributes: Attributes): @@ -224,7 +258,10 @@ def aggregate(self, measurement: Measurement): with self._lock: self._value = measurement.value - def collect(self) -> Optional[Gauge]: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ @@ -234,7 +271,9 @@ def collect(self) -> Optional[Gauge]: value = self._value self._value = None - return Gauge( + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=0, time_unix_nano=_time_ns(), value=value, ) @@ -266,6 +305,11 @@ def __init__( self._sum = 0 self._record_min_max = record_min_max self._start_time_unix_nano = _time_ns() + # It is assumed that the "natural" aggregation temporality for a + # Histogram instrument is DELTA, like the "natural" aggregation + # temporality for a Counter is DELTA and the "natural" aggregation + # temporality for an ObservableCounter is CUMULATIVE. + self._instrument_temporality = AggregationTemporality.DELTA def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) @@ -282,18 +326,24 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> HistogramPoint: + def collect( + self, + aggregation_temporality: AggregationTemporality, + ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ now = _time_ns() with self._lock: - value = self._bucket_counts + if not any(self._bucket_counts): + return None + + bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano - histogram_sum = self._sum - histogram_max = self._max - histogram_min = self._min + sum_ = self._sum + max_ = self._max + min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = now + 1 @@ -301,146 +351,63 @@ def collect(self) -> HistogramPoint: self._min = inf self._max = -inf - return HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=tuple(value), - explicit_bounds=self._boundaries, - max=histogram_max, - min=histogram_min, + current_point = HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=histogram_sum, time_unix_nano=now, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=self._boundaries, + min=min_, + max=max_, ) - -# pylint: disable=too-many-return-statements,too-many-branches -def _convert_aggregation_temporality( - previous_point: Optional[_PointVarT], - current_point: _PointVarT, - aggregation_temporality: AggregationTemporality, -) -> _PointVarT: - """Converts `current_point` to the requested `aggregation_temporality` - given the `previous_point`. - - `previous_point` must have `CUMULATIVE` temporality. `current_point` may - have `DELTA` or `CUMULATIVE` temporality. - - The output point will have temporality `aggregation_temporality`. Since - `GAUGE` points have no temporality, they are returned unchanged. - """ - - current_point_type = type(current_point) - - if current_point_type is Gauge: - return current_point - - if ( - previous_point is not None - and current_point is not None - and type(previous_point) is not type(current_point) - ): - _logger.warning( - "convert_aggregation_temporality called with mismatched " - "point types: %s and %s", - type(previous_point), - current_point_type, - ) - - return current_point - - if current_point_type is Sum: - if previous_point is None: - # Output CUMULATIVE for a synchronous instrument - # There is no previous value, return the delta point as a - # cumulative - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: - # Output DELTA for a synchronous instrument - # Output CUMULATIVE for an asynchronous instrument + if self._previous_point is None: + self._previous_point = current_point return current_point - if aggregation_temporality is AggregationTemporality.DELTA: - # Output temporality DELTA for an asynchronous instrument - value = current_point.value - previous_point.value - output_start_time_unix_nano = previous_point.time_unix_nano - - else: - # Output CUMULATIVE for a synchronous instrument - value = current_point.value + previous_point.value - output_start_time_unix_nano = previous_point.start_time_unix_nano - - is_monotonic = ( - previous_point.is_monotonic and current_point.is_monotonic - ) - - return Sum( - start_time_unix_nano=output_start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - value=value, - aggregation_temporality=aggregation_temporality, - is_monotonic=is_monotonic, - ) - - if current_point_type is HistogramPoint: - if previous_point is None: - return replace( - current_point, aggregation_temporality=aggregation_temporality - ) - if previous_point.aggregation_temporality is not ( - AggregationTemporality.CUMULATIVE - ): - raise Exception( - "previous_point aggregation temporality must be CUMULATIVE" - ) - - if current_point.aggregation_temporality is aggregation_temporality: + if self._instrument_temporality is aggregation_temporality: return current_point max_ = current_point.max min_ = current_point.min if aggregation_temporality is AggregationTemporality.CUMULATIVE: - start_time_unix_nano = previous_point.start_time_unix_nano - sum_ = current_point.sum + previous_point.sum + start_time_unix_nano = self._previous_point.start_time_unix_nano + sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative - max_ = max(current_point.max, previous_point.max) - min_ = min(current_point.min, previous_point.min) + max_ = max(current_point.max, self._previous_point.max) + min_ = min(current_point.min, self._previous_point.min) bucket_counts = [ curr_count + prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] else: - start_time_unix_nano = previous_point.time_unix_nano - sum_ = current_point.sum - previous_point.sum + start_time_unix_nano = self._previous_point.time_unix_nano + sum_ = current_point.sum - self._previous_point.sum bucket_counts = [ curr_count - prev_count for curr_count, prev_count in zip( - current_point.bucket_counts, previous_point.bucket_counts + current_point.bucket_counts, + self._previous_point.bucket_counts, ) ] - return HistogramPoint( - aggregation_temporality=aggregation_temporality, - bucket_counts=bucket_counts, - explicit_bounds=current_point.explicit_bounds, - max=max_, - min=min_, + return HistogramDataPoint( + attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, - sum=sum_, time_unix_nano=current_point.time_unix_nano, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=current_point.explicit_bounds, + min=min_, + max=max_, ) - return None class ExplicitBucketHistogramAggregation(Aggregation): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index d8a753ba7bc..ce2fc4a4d1e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, Optional, Sequence from typing_extensions import final @@ -262,7 +262,7 @@ def _set_collect_callback( @abstractmethod def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: @@ -300,24 +300,28 @@ def __init__( preferred_aggregation=preferred_aggregation, ) self._lock = RLock() - self._metrics: List["opentelemetry.sdk._metrics.export.Metric"] = [] + self._metrics_data: ( + "opentelemetry.sdk._metrics.export.MetricsData" + ) = None - def get_metrics(self) -> List["opentelemetry.sdk._metrics.export.Metric"]: + def get_metrics_data( + self, + ) -> ("opentelemetry.sdk._metrics.export.MetricsData"): """Reads and returns current metrics from the SDK""" with self._lock: self.collect() - metrics = self._metrics - self._metrics = [] - return metrics + metrics_data = self._metrics_data + self._metrics_data = None + return metrics_data def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: with self._lock: - self._metrics = list(metrics) + self._metrics_data = metrics_data def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -389,15 +393,15 @@ def _ticker(self) -> None: def _receive_metrics( self, - metrics: Iterable["opentelemetry.sdk._metrics.export.Metric"], + metrics_data: "opentelemetry.sdk._metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: - if metrics is None: + if metrics_data is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export(metrics, timeout_millis=timeout_millis) + self._exporter.export(metrics_data, timeout_millis=timeout_millis) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py index b163f414afb..421ae6a1e0b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader_storage.py @@ -14,19 +14,36 @@ from logging import getLogger from threading import RLock -from typing import Dict, Iterable, List +from typing import Dict, List -from opentelemetry._metrics import Asynchronous, Instrument +from opentelemetry._metrics import ( + Asynchronous, + Counter, + Instrument, + ObservableCounter, +) from opentelemetry.sdk._metrics._internal._view_instrument_match import ( _ViewInstrumentMatch, ) from opentelemetry.sdk._metrics._internal.aggregation import ( Aggregation, ExplicitBucketHistogramAggregation, + _DropAggregation, + _ExplicitBucketHistogramAggregation, + _LastValueAggregation, + _SumAggregation, ) from opentelemetry.sdk._metrics._internal.export import AggregationTemporality from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics._internal.point import Metric +from opentelemetry.sdk._metrics._internal.point import ( + Gauge, + Histogram, + Metric, + MetricsData, + ResourceMetrics, + ScopeMetrics, + Sum, +) from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) @@ -81,10 +98,6 @@ def _get_or_init_view_instrument_match( _ViewInstrumentMatch( view=_DEFAULT_VIEW, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), @@ -102,10 +115,9 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self) -> Iterable[Metric]: + def collect(self) -> MetricsData: # Use a list instead of yielding to prevent a slow reader from holding # SDK locks - metrics: List[Metric] = [] # While holding the lock, new _ViewInstrumentMatch can't be added from # another thread (so we are sure we collect all existing view). @@ -116,13 +128,89 @@ def collect(self) -> Iterable[Metric]: # streams produced by the SDK, but we still align the output timestamps # for a single instrument. with self._lock: + + scope_metrics: List[ScopeMetrics] = [] + for ( - view_instrument_matches - ) in self._instrument_view_instrument_matches.values(): + instrument, + view_instrument_matches, + ) in self._instrument_view_instrument_matches.items(): + aggregation_temporality = self._instrument_class_temporality[ + instrument.__class__ + ] + + metrics: List[Metric] = [] + for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect()) - return metrics + if isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _SumAggregation, + ): + data = Sum( + aggregation_temporality=aggregation_temporality, + data_points=view_instrument_match.collect( + aggregation_temporality + ), + is_monotonic=isinstance( + instrument, (Counter, ObservableCounter) + ), + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _LastValueAggregation, + ): + data = Gauge( + data_points=view_instrument_match.collect( + aggregation_temporality + ) + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _ExplicitBucketHistogramAggregation, + ): + data = Histogram( + data_points=view_instrument_match.collect( + aggregation_temporality + ), + aggregation_temporality=aggregation_temporality, + ) + elif isinstance( + # pylint: disable=protected-access + view_instrument_match._aggregation, + _DropAggregation, + ): + continue + + metrics.append( + Metric( + # pylint: disable=protected-access + name=view_instrument_match._name, + description=view_instrument_match._description, + unit=view_instrument_match._instrument.unit, + data=data, + ) + ) + scope_metrics.append( + ScopeMetrics( + scope=instrument.instrumentation_scope, + metrics=metrics, + schema_url=instrument.instrumentation_scope.schema_url, + ) + ) + + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self._sdk_config.resource, + scope_metrics=scope_metrics, + schema_url=self._sdk_config.resource.schema_url, + ) + ] + ) def _handle_view_instrument_match( self, @@ -140,10 +228,6 @@ def _handle_view_instrument_match( new_view_instrument_match = _ViewInstrumentMatch( view=view, instrument=instrument, - sdk_config=self._sdk_config, - instrument_class_temporality=( - self._instrument_class_temporality - ), instrument_class_aggregation=( self._instrument_class_aggregation ), diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py index eca9783eeea..4e57f7a377b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py @@ -25,18 +25,39 @@ from opentelemetry.util.types import Attributes +@dataclass(frozen=True) +class NumberDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + + @dataclass(frozen=True) class Sum: """Represents the type of a scalar metric that is calculated as a sum of all reported measurements over a time interval.""" + data_points: Sequence[NumberDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) is_monotonic: bool - start_time_unix_nano: int - time_unix_nano: int - value: Union[int, float] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + "is_monotonic": self.is_monotonic, + } + ) @dataclass(frozen=True) @@ -45,8 +66,33 @@ class Gauge: value for every data point. It should be used for an unknown aggregation.""" + data_points: Sequence[NumberDataPoint] + + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ) + } + ) + + +@dataclass(frozen=True) +class HistogramDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + + attributes: Attributes + start_time_unix_nano: int time_unix_nano: int - value: Union[int, float] + count: int + sum: Union[int, float] + bucket_counts: Sequence[int] + explicit_bounds: Sequence[float] + min: float + max: float @dataclass(frozen=True) @@ -54,52 +100,67 @@ class Histogram: """Represents the type of a metric that is calculated by aggregating as a histogram of all reported measurements over a time interval.""" + data_points: Sequence[HistogramDataPoint] aggregation_temporality: ( "opentelemetry.sdk._metrics.export.AggregationTemporality" ) - bucket_counts: Sequence[int] - explicit_bounds: Sequence[float] - max: int - min: int - start_time_unix_nano: int - sum: Union[int, float] - time_unix_nano: int + def to_json(self) -> str: + return dumps( + { + "data_points": dumps( + [asdict(data_point) for data_point in self.data_points] + ), + "aggregation_temporality": self.aggregation_temporality, + } + ) -PointT = Union[Sum, Gauge, Histogram] + +DataT = Union[Sum, Gauge, Histogram] +DataPointT = Union[NumberDataPoint, HistogramDataPoint] @dataclass(frozen=True) class Metric: - """Represents a metric point in the OpenTelemetry data model to be exported - - Concrete metric types contain all the information as in the OTLP proto definitions - (https://github.com/open-telemetry/opentelemetry-proto/blob/b43e9b18b76abf3ee040164b55b9c355217151f3/opentelemetry/proto/metrics/v1/metrics.proto#L37) but are flattened as much as possible. - """ + """Represents a metric point in the OpenTelemetry data model to be + exported.""" - # common fields to all metric kinds - attributes: Attributes - description: str - instrumentation_scope: InstrumentationScope name: str - resource: Resource + description: str unit: str - point: PointT - """Contains non-common fields for the given metric""" + data: DataT def to_json(self) -> str: return dumps( { - "attributes": self.attributes if self.attributes else "", - "description": self.description if self.description else "", - "instrumentation_scope": repr(self.instrumentation_scope) - if self.instrumentation_scope - else "", "name": self.name, - "resource": repr(self.resource.attributes) - if self.resource - else "", + "description": self.description if self.description else "", "unit": self.unit if self.unit else "", - "point": asdict(self.point) if self.point else "", + "data": self.data.to_json(), } ) + + +@dataclass(frozen=True) +class ScopeMetrics: + """A collection of Metrics produced by a scope""" + + scope: InstrumentationScope + metrics: Sequence[Metric] + schema_url: str + + +@dataclass(frozen=True) +class ResourceMetrics: + """A collection of ScopeMetrics from a Resource""" + + resource: Resource + scope_metrics: Sequence[ScopeMetrics] + schema_url: str + + +@dataclass(frozen=True) +class MetricsData: + """An array of ResourceMetrics""" + + resource_metrics: Sequence[ResourceMetrics] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 107f73f79d5..becdd9d1c3c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -28,17 +28,23 @@ # The point module is not in the export directory to avoid a circular import. from opentelemetry.sdk._metrics._internal.point import ( # noqa: F401 + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, Metric, - PointT, + NumberDataPoint, Sum, + ScopeMetrics, + ResourceMetrics, + MetricsData, ) __all__ = [] for key, value in globals().copy().items(): if not key.startswith("_"): - if _version_info.minor == 6 and key == "PointT": + if _version_info.minor == 6 and key == "DataT": continue value.__module__ = __name__ __all__.append(key) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index 5534e85090a..121b37c31ed 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -31,8 +31,15 @@ def test_disable_default_views(self): counter.add(10, {"label": "value1"}) counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - - self.assertEqual(reader.get_metrics(), []) + self.assertEqual( + ( + reader.get_metrics_data() + .resource_metrics[0] + .scope_metrics[0] + .metrics + ), + [], + ) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() @@ -51,6 +58,16 @@ def test_disable_default_views_add_custom(self): counter.add(10, {"label": "value3"}) histogram.record(12, {"label": "value"}) - metrics = reader.get_metrics() - self.assertEqual(len(metrics), 1) - self.assertEqual(metrics[0].name, "testhist") + metrics = reader.get_metrics_data() + self.assertEqual(len(metrics.resource_metrics), 1) + self.assertEqual(len(metrics.resource_metrics[0].scope_metrics), 2) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[0].metrics), 0 + ) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[1].metrics), 1 + ) + self.assertEqual( + metrics.resource_metrics[0].scope_metrics[1].metrics[0].name, + "testhist", + ) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index e747e49cdba..e71f99d9eed 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import replace -from logging import WARNING from math import inf from time import sleep from typing import Union @@ -29,15 +27,15 @@ UpDownCounter, ) from opentelemetry.sdk._metrics._internal.aggregation import ( - _convert_aggregation_temporality, _ExplicitBucketHistogramAggregation, _LastValueAggregation, _SumAggregation, ) from opentelemetry.sdk._metrics._internal.measurement import Measurement -from opentelemetry.sdk._metrics.export import AggregationTemporality, Gauge -from opentelemetry.sdk._metrics.export import Histogram as HistogramPoint -from opentelemetry.sdk._metrics.export import Sum +from opentelemetry.sdk._metrics.export import ( + AggregationTemporality, + NumberDataPoint, +) from opentelemetry.sdk._metrics.view import ( DefaultAggregation, ExplicitBucketHistogramAggregation, @@ -110,20 +108,44 @@ def test_collect_delta(self): """ synchronous_sum_aggregation = _SumAggregation( - Mock(), True, AggregationTemporality.DELTA + {}, True, AggregationTemporality.DELTA + ) + + synchronous_sum_aggregation.aggregate(measurement(1)) + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(first_sum.value, 1) + + synchronous_sum_aggregation.aggregate(measurement(1)) + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + + self.assertEqual(second_sum.value, 2) + + self.assertEqual( + second_sum.start_time_unix_nano, first_sum.start_time_unix_nano + ) + + synchronous_sum_aggregation = _SumAggregation( + {}, True, AggregationTemporality.DELTA ) synchronous_sum_aggregation.aggregate(measurement(1)) - first_sum = synchronous_sum_aggregation.collect() + first_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) synchronous_sum_aggregation.aggregate(measurement(1)) - second_sum = synchronous_sum_aggregation.collect() + second_sum = synchronous_sum_aggregation.collect( + AggregationTemporality.DELTA + ) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertGreater( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano @@ -131,32 +153,30 @@ def test_collect_delta(self): def test_collect_cumulative(self): """ - `SynchronousSumAggregation` collects sum metric points + `SynchronousSumAggregation` collects number data points """ sum_aggregation = _SumAggregation( - True, AggregationTemporality.CUMULATIVE, Mock() + {}, True, AggregationTemporality.CUMULATIVE ) sum_aggregation.aggregate(measurement(1)) - first_sum = sum_aggregation.collect() + first_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(first_sum.value, 1) - self.assertTrue(first_sum.is_monotonic) # should have been reset after first collect sum_aggregation.aggregate(measurement(1)) - second_sum = sum_aggregation.collect() + second_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertEqual(second_sum.value, 1) - self.assertTrue(second_sum.is_monotonic) self.assertEqual( second_sum.start_time_unix_nano, first_sum.start_time_unix_nano ) # if no point seen for a whole interval, should return None - third_sum = sum_aggregation.collect() + third_sum = sum_aggregation.collect(AggregationTemporality.CUMULATIVE) self.assertIsNone(third_sum) @@ -180,35 +200,44 @@ def test_aggregate(self): def test_collect(self): """ - `LastValueAggregation` collects sum metric points + `LastValueAggregation` collects number data points """ last_value_aggregation = _LastValueAggregation(Mock()) - self.assertIsNone(last_value_aggregation.collect()) + self.assertIsNone( + last_value_aggregation.collect(AggregationTemporality.CUMULATIVE) + ) last_value_aggregation.aggregate(measurement(1)) - first_gauge = last_value_aggregation.collect() - self.assertIsInstance(first_gauge, Gauge) + first_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsInstance(first_number_data_point, NumberDataPoint) - self.assertEqual(first_gauge.value, 1) + self.assertEqual(first_number_data_point.value, 1) last_value_aggregation.aggregate(measurement(1)) # CI fails the last assertion without this sleep(0.1) - second_gauge = last_value_aggregation.collect() + second_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_gauge.value, 1) + self.assertEqual(second_number_data_point.value, 1) self.assertGreater( - second_gauge.time_unix_nano, first_gauge.time_unix_nano + second_number_data_point.time_unix_nano, + first_number_data_point.time_unix_nano, ) # if no observation seen for the interval, it should return None - third_gauge = last_value_aggregation.collect() - self.assertIsNone(third_gauge) + third_number_data_point = last_value_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) + self.assertIsNone(third_number_data_point) class TestExplicitBucketHistogramAggregation(TestCase): @@ -249,7 +278,9 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation._bucket_counts[3], 1 ) - histo = explicit_bucket_histogram_aggregation.collect() + histo = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(histo.sum, 14) def test_min_max(self): @@ -294,7 +325,9 @@ def test_collect(self): ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - first_histogram = explicit_bucket_histogram_aggregation.collect() + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) self.assertEqual(first_histogram.sum, 1) @@ -303,510 +336,42 @@ def test_collect(self): sleep(0.1) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - second_histogram = explicit_bucket_histogram_aggregation.collect() + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.CUMULATIVE + ) - self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) - self.assertEqual(second_histogram.sum, 1) + self.assertEqual(second_histogram.bucket_counts, (0, 2, 0, 0)) + self.assertEqual(second_histogram.sum, 2) self.assertGreater( second_histogram.time_unix_nano, first_histogram.time_unix_nano ) - -class TestConvertAggregationTemporality(TestCase): - """ - Test aggregation temporality conversion algorithm - """ - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - - def test_mismatched_point_types(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - with self.assertRaises(AssertionError): - with self.assertLogs(level=WARNING): - self.assertIs( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - None, - AggregationTemporality.DELTA, - ), - current_point, - ) - - def test_current_point_sum_previous_point_none(self): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_current_point_sum_current_point_same_aggregation_temporality( - self, - ): - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) - - current_point = Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=0, - time_unix_nano=0, - value=0, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - def test_current_point_sum_aggregation_temporality_delta(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - AggregationTemporality.DELTA, - ), - Sum( - start_time_unix_nano=2, - time_unix_nano=5, - value=3, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - ) - - def test_current_point_sum_aggregation_temporality_cumulative(self): - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=False, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=False, - ), - ) - - self.assertEqual( - _convert_aggregation_temporality( - Sum( - start_time_unix_nano=1, - time_unix_nano=2, - value=3, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - Sum( - start_time_unix_nano=4, - time_unix_nano=5, - value=6, - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=True, - ), - AggregationTemporality.CUMULATIVE, - ), - Sum( - start_time_unix_nano=1, - time_unix_nano=5, - value=9, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - ) - - def test_current_point_gauge(self): - - current_point = Gauge(time_unix_nano=1, value=0) - self.assertEqual( - _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, - ) - - -class TestHistogramConvertAggregationTemporality(TestCase): - def test_previous_point_none(self): - - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ) - - self.assertEqual( - _convert_aggregation_temporality( - None, current_point, AggregationTemporality.CUMULATIVE - ), - replace( - current_point, - aggregation_temporality=AggregationTemporality.CUMULATIVE, - ), - ) - - def test_previous_point_non_cumulative(self): - - with self.assertRaises(Exception): - - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=2, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=2, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), - AggregationTemporality.DELTA, - ), - - def test_same_aggregation_temporality_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - current_point, + explicit_bucket_histogram_aggregation = ( + _ExplicitBucketHistogramAggregation(Mock(), boundaries=[0, 1, 2]) ) - def test_same_aggregation_temporality_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=9, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + first_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), - current_point, - AggregationTemporality.DELTA, - ), - current_point, - ) + self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(first_histogram.sum, 1) - def test_aggregation_temporality_to_cumulative(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ) + # CI fails the last assertion without this + sleep(0.1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.CUMULATIVE, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ), + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + second_histogram = explicit_bucket_histogram_aggregation.collect( + AggregationTemporality.DELTA ) - def test_aggregation_temporality_to_delta(self): - current_point = HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 3, 4, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=0, - sum=105, - time_unix_nano=2, - ) + self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) + self.assertEqual(second_histogram.sum, 1) - self.assertEqual( - _convert_aggregation_temporality( - HistogramPoint( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - bucket_counts=[0, 2, 1, 2, 0], - explicit_bounds=[0, 5, 10, 25], - max=24, - min=1, - start_time_unix_nano=0, - sum=70, - time_unix_nano=1, - ), - current_point, - AggregationTemporality.DELTA, - ), - HistogramPoint( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[0, 1, 3, 0, 0], - explicit_bounds=[0, 5, 10, 25], - max=22, - min=3, - start_time_unix_nano=1, - sum=35, - time_unix_nano=2, - ), + self.assertGreater( + second_histogram.time_unix_nano, first_histogram.time_unix_nano ) diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py index f9e543db512..b59313c1d96 100644 --- a/opentelemetry-sdk/tests/metrics/test_backward_compat.py +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -100,8 +100,14 @@ def test_observable_callback(self): # produce some data meter_provider.get_meter("foo").create_counter("mycounter").add(12) try: - metrics = reader.get_metrics() + metrics_data = reader.get_metrics_data() except Exception: self.fail() - self.assertEqual(len(metrics), 1) + self.assertEqual(len(metrics_data.resource_metrics), 1) + self.assertEqual( + len(metrics_data.resource_metrics[0].scope_metrics), 1 + ) + self.assertEqual( + len(metrics_data.resource_metrics[0].scope_metrics[0].metrics), 1 + ) diff --git a/opentelemetry-sdk/tests/metrics/test_import.py b/opentelemetry-sdk/tests/metrics/test_import.py index 1a31ab3b885..43c65337dc7 100644 --- a/opentelemetry-sdk/tests/metrics/test_import.py +++ b/opentelemetry-sdk/tests/metrics/test_import.py @@ -46,15 +46,18 @@ def test_import_export(self): from opentelemetry.sdk._metrics.export import ( # noqa: F401 AggregationTemporality, ConsoleMetricExporter, + DataPointT, + DataT, Gauge, Histogram, + HistogramDataPoint, InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, MetricReader, + NumberDataPoint, PeriodicExportingMetricReader, - PointT, Sum, ) except Exception as error: diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 15be13ac9af..46e5cb048af 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -21,10 +21,9 @@ AggregationTemporality, InMemoryMetricReader, Metric, + NumberDataPoint, Sum, ) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope class TestInMemoryMetricReader(TestCase): @@ -32,21 +31,23 @@ def test_no_metrics(self): mock_collect_callback = Mock(return_value=[]) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics_data(), []) mock_collect_callback.assert_called_once() def test_converts_metrics_to_list(self): metric = Metric( - attributes={"myattr": "baz"}, - description="", - instrumentation_scope=InstrumentationScope("testmetrics"), name="foo", - resource=Resource.create(), + description="", unit="", - point=Sum( - start_time_unix_nano=1647626444152947792, - time_unix_nano=1647626444153163239, - value=72.3309814450449, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"myattr": "baz"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=72.3309814450449, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, ), @@ -55,9 +56,9 @@ def test_converts_metrics_to_list(self): reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - returned_metrics = reader.get_metrics() + returned_metrics = reader.get_metrics_data() mock_collect_callback.assert_called_once() - self.assertIsInstance(returned_metrics, list) + self.assertIsInstance(returned_metrics, tuple) self.assertEqual(len(returned_metrics), 1) self.assertIs(returned_metrics[0], metric) @@ -76,6 +77,31 @@ def test_integration(self): counter1.add(1, {"foo": "1"}) counter1.add(1, {"foo": "2"}) - metrics = reader.get_metrics() - # should be 3 metrics, one from the observable gauge and one for each labelset from the counter - self.assertEqual(len(metrics), 3) + metrics = reader.get_metrics_data() + # should be 3 number data points, one from the observable gauge and one + # for each labelset from the counter + self.assertEqual(len(metrics.resource_metrics[0].scope_metrics), 2) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[0].metrics), 1 + ) + self.assertEqual( + len(metrics.resource_metrics[0].scope_metrics[1].metrics), 1 + ) + self.assertEqual( + len( + metrics.resource_metrics[0] + .scope_metrics[1] + .metrics[0] + .data.data_points + ), + 1, + ) + self.assertEqual( + len( + metrics.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points + ), + 2, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index cd1a8481ee3..33c901de6ad 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -21,6 +21,9 @@ ObservableCounter, UpDownCounter, ) +from opentelemetry.sdk._metrics._internal.aggregation import ( + _LastValueAggregation, +) from opentelemetry.sdk._metrics._internal.measurement import Measurement from opentelemetry.sdk._metrics._internal.metric_reader_storage import ( _DEFAULT_VIEW, @@ -106,9 +109,9 @@ def test_creates_view_instrument_matches( def test_forwards_calls_to_view_instrument_match( self, MockViewInstrumentMatch: Mock ): - view_instrument_match1 = Mock() - view_instrument_match2 = Mock() - view_instrument_match3 = Mock() + view_instrument_match1 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match2 = Mock(_aggregation=_LastValueAggregation({})) + view_instrument_match3 = Mock(_aggregation=_LastValueAggregation({})) MockViewInstrumentMatch.side_effect = [ view_instrument_match1, view_instrument_match2, @@ -163,7 +166,60 @@ def test_forwards_calls_to_view_instrument_match( view_instrument_match1.collect.assert_called_once() view_instrument_match2.collect.assert_called_once() view_instrument_match3.collect.assert_called_once() - self.assertEqual(result, all_metrics) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ), + all_metrics[0], + ) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[1] + ), + all_metrics[1], + ) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[0] + .metrics[1] + .data.data_points[0] + ), + all_metrics[2], + ) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[0] + .metrics[1] + .data.data_points[1] + ), + all_metrics[3], + ) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[1] + .metrics[0] + .data.data_points[0] + ), + all_metrics[4], + ) + self.assertEqual( + ( + result.resource_metrics[0] + .scope_metrics[1] + .metrics[0] + .data.data_points[1] + ), + all_metrics[5], + ) @patch( "opentelemetry.sdk._metrics._internal." @@ -256,7 +312,15 @@ def test_drop_aggregation(self): ) metric_reader_storage.consume_measurement(Measurement(1, counter)) - self.assertEqual([], metric_reader_storage.collect()) + self.assertEqual( + [], + ( + metric_reader_storage.collect() + .resource_metrics[0] + .scope_metrics[0] + .metrics + ), + ) def test_conflicting_view_configuration(self): diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index b20dd567a26..eb38b8356cc 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -506,18 +506,19 @@ def test_duplicate_instrument_aggregate_data(self): metrics = exporter.metrics[0] - self.assertEqual(len(metrics), 2) + scope_metrics = metrics.resource_metrics[0].scope_metrics + self.assertEqual(len(scope_metrics), 2) - metric_0 = metrics[0] + metric_0 = scope_metrics[0].metrics[0] self.assertEqual(metric_0.name, "counter") self.assertEqual(metric_0.unit, "unit") self.assertEqual(metric_0.description, "description") - self.assertEqual(metric_0.point.value, 3) + self.assertEqual(metric_0.data.data_points[0].value, 3) - metric_1 = metrics[1] + metric_1 = scope_metrics[1].metrics[0] self.assertEqual(metric_1.name, "counter") self.assertEqual(metric_1.unit, "unit") self.assertEqual(metric_1.description, "description") - self.assertEqual(metric_1.point.value, 7) + self.assertEqual(metric_1.data.data_points[0].value, 7) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 531ef90c955..597bd37259c 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -23,10 +23,10 @@ Metric, MetricExporter, MetricExportResult, + NumberDataPoint, PeriodicExportingMetricReader, Sum, ) -from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.util._time import _time_ns @@ -54,29 +54,34 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: metrics_list = [ Metric( name="sum_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Sum( - start_time_unix_nano=_time_ns(), - time_unix_nano=_time_ns(), - value=2, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ], aggregation_temporality=1, is_monotonic=True, ), ), Metric( name="gauge_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Gauge( - time_unix_nano=_time_ns(), - value=2, + data=Gauge( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ] ), ), ] diff --git a/opentelemetry-sdk/tests/metrics/test_point.py b/opentelemetry-sdk/tests/metrics/test_point.py index 70b2235f168..7488c85c995 100644 --- a/opentelemetry-sdk/tests/metrics/test_point.py +++ b/opentelemetry-sdk/tests/metrics/test_point.py @@ -14,22 +14,22 @@ from unittest import TestCase -from opentelemetry.sdk._metrics.export import Gauge, Histogram, Metric, Sum -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk._metrics.export import ( + Gauge, + Histogram, + HistogramDataPoint, + Metric, + NumberDataPoint, + Sum, +) -def _create_metric(value): +def _create_metric(data): return Metric( - attributes={"attr-key": "test-val"}, - description="test-description", - instrumentation_scope=InstrumentationScope( - name="name", version="version" - ), name="test-name", - resource=Resource({"resource-key": "resource-val"}), + description="test-description", unit="test-unit", - point=value, + data=data, ) @@ -38,40 +38,62 @@ def test_sum(self): self.maxDiff = None point = _create_metric( Sum( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ], aggregation_temporality=2, is_monotonic=True, - start_time_unix_nano=10, - time_unix_nano=20, - value=9, ) ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 2, "is_monotonic": true, "start_time_unix_nano": 10, "time_unix_nano": 20, "value": 9}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\", \\"aggregation_temporality\\": 2, \\"is_monotonic\\": true}"}', point.to_json(), ) def test_gauge(self): - point = _create_metric(Gauge(time_unix_nano=40, value=20)) + point = _create_metric( + Gauge( + data_points=[ + NumberDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=10, + time_unix_nano=20, + value=9, + ) + ] + ) + ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"time_unix_nano": 40, "value": 20}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 10, \\\\\\"time_unix_nano\\\\\\": 20, \\\\\\"value\\\\\\": 9}]\\"}"}', point.to_json(), ) def test_histogram(self): point = _create_metric( Histogram( + data_points=[ + HistogramDataPoint( + attributes={"attr-key": "test-val"}, + start_time_unix_nano=50, + time_unix_nano=60, + count=1, + sum=0.8, + bucket_counts=[0, 0, 1, 0], + explicit_bounds=[0.1, 0.5, 0.9, 1], + min=0.8, + max=0.8, + ) + ], aggregation_temporality=1, - bucket_counts=[0, 0, 1, 0], - explicit_bounds=[0.1, 0.5, 0.9, 1], - max=0.8, - min=0.8, - start_time_unix_nano=50, - sum=0.8, - time_unix_nano=60, ) ) self.maxDiff = None self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 1, "bucket_counts": [0, 0, 1, 0], "explicit_bounds": [0.1, 0.5, 0.9, 1], "max": 0.8, "min": 0.8, "start_time_unix_nano": 50, "sum": 0.8, "time_unix_nano": 60}}', + '{"name": "test-name", "description": "test-description", "unit": "test-unit", "data": "{\\"data_points\\": \\"[{\\\\\\"attributes\\\\\\": {\\\\\\"attr-key\\\\\\": \\\\\\"test-val\\\\\\"}, \\\\\\"start_time_unix_nano\\\\\\": 50, \\\\\\"time_unix_nano\\\\\\": 60, \\\\\\"count\\\\\\": 1, \\\\\\"sum\\\\\\": 0.8, \\\\\\"bucket_counts\\\\\\": [0, 0, 1, 0], \\\\\\"explicit_bounds\\\\\\": [0.1, 0.5, 0.9, 1], \\\\\\"min\\\\\\": 0.8, \\\\\\"max\\\\\\": 0.8}]\\", \\"aggregation_temporality\\": 1}"}', point.to_json(), ) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 9f05d2bfee0..25c9a31e892 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -27,7 +27,7 @@ from opentelemetry.sdk._metrics._internal.sdk_configuration import ( SdkConfiguration, ) -from opentelemetry.sdk._metrics.export import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.export import AggregationTemporality from opentelemetry.sdk._metrics.view import ( DefaultAggregation, DropAggregation, @@ -63,12 +63,6 @@ def test_consume_measurement(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -110,12 +104,6 @@ def test_consume_measurement(self): aggregation=self.mock_aggregation_factory, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -147,12 +135,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -177,12 +159,6 @@ def test_consume_measurement(self): attribute_keys={}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -196,24 +172,22 @@ def test_consume_measurement(self): ) def test_collect(self): - instrument1 = Mock( - name="instrument1", description="description", unit="unit" + instrument1 = Counter( + "instrument1", + Mock(), + Mock(), + description="description", + unit="unit", ) instrument1.instrumentation_scope = self.mock_instrumentation_scope view_instrument_match = _ViewInstrumentMatch( view=View( instrument_name="instrument1", name="name", - aggregation=self.mock_aggregation_factory, + aggregation=DefaultAggregation(), attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation=MagicMock( **{"__getitem__.return_value": DefaultAggregation()} ), @@ -226,18 +200,16 @@ def test_collect(self): attributes={"c": "d", "f": "g"}, ) ) - self.assertEqual( - next(view_instrument_match.collect()), - Metric( - attributes={"c": "d"}, - description="description", - instrumentation_scope=self.mock_instrumentation_scope, - name="name", - resource=self.mock_resource, - unit="unit", - point=None, - ), + + number_data_points = view_instrument_match.collect( + AggregationTemporality.CUMULATIVE ) + self.assertEqual(len(number_data_points), 1) + + number_data_point = number_data_points[0] + + self.assertEqual(number_data_point.attributes, frozenset({("c", "d")})) + self.assertEqual(number_data_point.value, 0) def test_setting_aggregation(self): instrument1 = Counter( @@ -256,12 +228,6 @@ def test_setting_aggregation(self): attribute_keys={"a", "c"}, ), instrument=instrument1, - sdk_config=self.sdk_configuration, - instrument_class_temporality=MagicMock( - **{ - "__getitem__.return_value": AggregationTemporality.CUMULATIVE - } - ), instrument_class_aggregation={Counter: LastValueAggregation()}, ) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index b11e1646246..d89b4480a0b 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -13,69 +13,72 @@ # limitations under the License. -from collections import OrderedDict - from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._metrics.export import ( AggregationTemporality, Gauge, Metric, + NumberDataPoint, Sum, ) -from opentelemetry.sdk.resources import Resource as SDKResource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope def _generate_metric( - name, point, attributes=None, description=None, unit=None + name, data, attributes=None, description=None, unit=None ) -> Metric: - if not attributes: - attributes = BoundedAttributes(attributes={"a": 1, "b": True}) - if not description: + if description is None: description = "foo" - if not unit: + if unit is None: unit = "s" return Metric( - resource=SDKResource(OrderedDict([("a", 1), ("b", False)])), - instrumentation_scope=InstrumentationScope( - "first_name", "first_version" - ), - attributes=attributes, - description=description, name=name, + description=description, unit=unit, - point=point, + data=data, ) def _generate_sum( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Sum: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Sum( + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], aggregation_temporality=AggregationTemporality.CUMULATIVE, is_monotonic=True, - start_time_unix_nano=1641946015139533244, - time_unix_nano=1641946016139533244, - value=val, ), - attributes=attributes, description=description, unit=unit, ) def _generate_gauge( - name, val, attributes=None, description=None, unit=None + name, value, attributes=None, description=None, unit=None ) -> Gauge: + if attributes is None: + attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( name, Gauge( - time_unix_nano=1641946016139533244, - value=val, + data_points=[ + NumberDataPoint( + attributes=attributes, + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + value=value, + ) + ], ), - attributes=attributes, description=description, unit=unit, ) @@ -87,7 +90,6 @@ def _generate_unsupported_metric( return _generate_metric( name, None, - attributes=attributes, description=description, unit=unit, )