diff --git a/docs/conf.py b/docs/conf.py index 03fad0837f0..85a328f04b1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -97,6 +97,7 @@ # https://github.com/sphinx-doc/sphinx/pull/3744 nitpick_ignore = [ ("py:class", "ValueT"), + ("py:class", "opentelemetry.sdk._metrics.export.Mapping"), # Even if wrapt is added to intersphinx_mapping, sphinx keeps failing # with "class reference target not found: ObjectProxy". ("py:class", "ObjectProxy"), diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index 693eb261dba..32b82cfaa4e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -11,9 +11,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from logging import getLogger from os import environ -from typing import Optional, Sequence +from typing import Optional, Sequence, Dict from grpc import ChannelCredentials, Compression from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, @@ -40,9 +40,14 @@ from opentelemetry.sdk._metrics.export import ( MetricExporter, MetricExportResult, + MappingMetricT, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope, ) -logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class OTLPMetricExporter( @@ -79,87 +84,109 @@ def __init__( ) def _translate_data( - self, data: Sequence[Metric] + self, data: MappingMetricT ) -> ExportMetricsServiceRequest: - sdk_resource_scope_metrics = {} - - for metric in data: - resource = metric.resource - scope_map = sdk_resource_scope_metrics.get(resource, {}) - if not scope_map: - sdk_resource_scope_metrics[resource] = scope_map - - scope_metrics = scope_map.get(metric.instrumentation_scope) - - if not scope_metrics: - if metric.instrumentation_scope is not None: - scope_map[metric.instrumentation_scope] = pb2.ScopeMetrics( - scope=InstrumentationScope( - name=metric.instrumentation_scope.name, - version=metric.instrumentation_scope.version, + + sdk_resource_scope_metrics: Dict[ + Resource, Dict[SDKInstrumentationScope, pb2.ScopeMetrics] + ] = {} + + for resource, instrumentation_scope_metrics in data.items(): + + if resource not in sdk_resource_scope_metrics: + sdk_resource_scope_metrics[resource] = {} + + for ( + instrumentation_scope, + metrics, + ) in instrumentation_scope_metrics.items(): + if instrumentation_scope not in sdk_resource_scope_metrics: + if instrumentation_scope is None: + sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] = pb2.ScopeMetrics() + else: + sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] = pb2.ScopeMetrics( + scope=InstrumentationScope( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + ) ) + scope_metrics = sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] + + for metric in metrics: + + pbmetric = pb2.Metric( + name=metric.name, + description=metric.description, + unit=metric.unit, ) - else: - scope_map[ - metric.instrumentation_scope - ] = pb2.ScopeMetrics() + if isinstance(metric.point, Gauge): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + metric.point.attributes + ), + time_unix_nano=metric.point.time_unix_nano, + ) + if isinstance(metric.point.value, int): + pt.as_int = metric.point.value + else: + pt.as_double = metric.point.value + pbmetric.gauge.data_points.append(pt) + elif isinstance(metric.point, Histogram): + pt = pb2.HistogramDataPoint( + attributes=self._translate_attributes( + metric.point.attributes + ), + time_unix_nano=metric.point.time_unix_nano, + start_time_unix_nano=( + metric.point.start_time_unix_nano + ), + count=sum(metric.point.bucket_counts), + sum=metric.point.sum, + bucket_counts=metric.point.bucket_counts, + explicit_bounds=metric.point.explicit_bounds, + ) + pbmetric.histogram.aggregation_temporality = ( + metric.point.aggregation_temporality + ) + pbmetric.histogram.data_points.append(pt) + elif isinstance(metric.point, Sum): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + metric.point.attributes + ), + start_time_unix_nano=( + metric.point.start_time_unix_nano + ), + time_unix_nano=metric.point.time_unix_nano, + ) + if isinstance(metric.point.value, int): + pt.as_int = metric.point.value + else: + pt.as_double = metric.point.value + # note that because sum is a message type, the fields + # must be set individually rather than instantiating a + # pb2.Sum and setting it once + pbmetric.sum.aggregation_temporality = ( + metric.point.aggregation_temporality + ) + pbmetric.sum.is_monotonic = metric.point.is_monotonic + pbmetric.sum.data_points.append(pt) + else: + _logger.warn( + "unsupported datapoint type %s", metric.point + ) + continue - scope_metrics = scope_map.get(metric.instrumentation_scope) + scope_metrics.metrics.append( + pbmetric, + ) - pbmetric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) - if isinstance(metric.point, Gauge): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - pbmetric.gauge.data_points.append(pt) - elif isinstance(metric.point, Histogram): - pt = pb2.HistogramDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - start_time_unix_nano=metric.point.start_time_unix_nano, - count=sum(metric.point.bucket_counts), - sum=metric.point.sum, - bucket_counts=metric.point.bucket_counts, - explicit_bounds=metric.point.explicit_bounds, - ) - pbmetric.histogram.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.histogram.data_points.append(pt) - elif isinstance(metric.point, Sum): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - start_time_unix_nano=metric.point.start_time_unix_nano, - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - # note that because sum is a message type, the fields must be - # set individually rather than instantiating a pb2.Sum and setting - # it once - pbmetric.sum.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.sum.is_monotonic = metric.point.is_monotonic - pbmetric.sum.data_points.append(pt) - else: - logger.warn("unsupported datapoint type %s", metric.point) - continue - - scope_metrics.metrics.append( - pbmetric, - ) return ExportMetricsServiceRequest( resource_metrics=get_resource_data( sdk_resource_scope_metrics, @@ -170,7 +197,7 @@ def _translate_data( def export( self, - metrics: Sequence[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py index af5d39ff597..1a8a02caefb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py @@ -41,13 +41,27 @@ Resource as OTLPResource, ) from opentelemetry.sdk._metrics.export import MetricExportResult -from opentelemetry.sdk._metrics.point import AggregationTemporality, Histogram +from opentelemetry.sdk._metrics.metric_reader import ( + Metric as MetricReaderMetric, +) +from opentelemetry.sdk._metrics.point import ( + AggregationTemporality, + Gauge, + Histogram, + Sum, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_INSECURE, ) +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope, +) from opentelemetry.test.metrictestutil import ( + _generate_attributes, _generate_gauge, _generate_metric, + _generate_metric_reader_metric, _generate_sum, ) @@ -110,21 +124,32 @@ def setUp(self): self.server.start() self.metrics = { - "sum_int": _generate_sum("sum_int", 33), - "sum_double": _generate_sum("sum_double", 2.98), - "gauge_int": _generate_gauge("gauge_int", 9000), - "gauge_double": _generate_gauge("gauge_double", 52.028), - "histogram": _generate_metric( - "histogram", - Histogram( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - max=18, - min=8, - start_time_unix_nano=1641946016139533244, - sum=67, - time_unix_nano=1641946016139533244, + "sum_int": _generate_metric_reader_metric( + _generate_sum("sum_int", 33) + ), + "sum_double": _generate_metric_reader_metric( + _generate_sum("sum_double", 2.98) + ), + "gauge_int": _generate_metric_reader_metric( + _generate_gauge("gauge_int", 9000) + ), + "gauge_double": _generate_metric_reader_metric( + _generate_gauge("gauge_double", 52.028) + ), + "histogram": _generate_metric_reader_metric( + _generate_metric( + "histogram", + Histogram( + attributes=_generate_attributes(), + aggregation_temporality=AggregationTemporality.DELTA, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + max=18, + min=8, + start_time_unix_nano=1641946016139533244, + sum=67, + time_unix_nano=1641946016139533244, + ), ), ), } @@ -243,7 +268,7 @@ def test_unavailable(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLE(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(1) @@ -258,7 +283,7 @@ def test_unavailable_delay(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLEDelay(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(4) @@ -268,7 +293,7 @@ def test_success(self): MetricsServiceServicerSUCCESS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.SUCCESS, ) @@ -277,7 +302,7 @@ def test_failure(self): MetricsServiceServicerALREADY_EXISTS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) @@ -336,7 +361,7 @@ def test_translate_sum_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_int"]]) + actual = self.exporter._translate_data(self.metrics["sum_int"]) self.assertEqual(expected, actual) def test_translate_sum_double(self): @@ -394,7 +419,7 @@ def test_translate_sum_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_double"]]) + actual = self.exporter._translate_data(self.metrics["sum_double"]) self.assertEqual(expected, actual) def test_translate_gauge_int(self): @@ -449,7 +474,7 @@ def test_translate_gauge_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_int"]]) + actual = self.exporter._translate_data(self.metrics["gauge_int"]) self.assertEqual(expected, actual) def test_translate_gauge_double(self): @@ -504,7 +529,7 @@ def test_translate_gauge_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_double"]]) + actual = self.exporter._translate_data(self.metrics["gauge_double"]) self.assertEqual(expected, actual) def test_translate_histogram(self): @@ -566,5 +591,225 @@ def test_translate_histogram(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["histogram"]]) + actual = self.exporter._translate_data(self.metrics["histogram"]) + self.assertEqual(expected, actual) + + def test_translate_several_metrics(self): + # pylint: disable=too-many-locals + + resource_0 = Resource({SERVICE_NAME: "resource_0"}, "") + resource_1 = Resource({SERVICE_NAME: "resource_1"}, "") + + instrumentation_scope_0 = SDKInstrumentationScope( + "instrumentation_scope_0", None, None + ) + instrumentation_scope_1 = SDKInstrumentationScope( + "instrumentation_scope_1", None, None + ) + + metric_reader_metric_0 = MetricReaderMetric( + description="description_0", + name="counter1", + unit="s", + point=Sum( + attributes={"foo": "1"}, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + start_time_unix_nano=1651529022025608591, + time_unix_nano=1651529022025645974, + value=1, + ), + ) + + metric_reader_metric_1 = MetricReaderMetric( + description="description_1", + name="counter1", + unit="m", + point=Sum( + attributes={"foo": "2"}, + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + start_time_unix_nano=1651529022025618863, + time_unix_nano=1651529022025668946, + value=1, + ), + ) + + metric_reader_metric_2 = MetricReaderMetric( + description="description_2", + name="observable_gauge1", + unit="Hz", + point=Gauge( + attributes={"foo": "3"}, + time_unix_nano=1651529022025687868, + value=12, + ), + ) + + metric_reader_metric_3 = MetricReaderMetric( + description="description_3", + name="observable_gauge1", + unit="Hz", + point=Gauge( + attributes={"foo": "4"}, + time_unix_nano=1651529022025687868, + value=12, + ), + ) + + metrics = { + resource_0: { + instrumentation_scope_0: [ + metric_reader_metric_0, + metric_reader_metric_1, + ], + instrumentation_scope_1: [metric_reader_metric_3], + }, + resource_1: {instrumentation_scope_1: [metric_reader_metric_2]}, + } + + pb2_metric_0 = pb2.Metric( + name="counter1", + unit="s", + description="description_0", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="1"), + ), + ], + start_time_unix_nano=1651529022025608591, + time_unix_nano=1651529022025645974, + as_int=1, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + + pb2_metric_1 = pb2.Metric( + name="counter1", + unit="m", + description="description_1", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="2"), + ), + ], + start_time_unix_nano=1651529022025618863, + time_unix_nano=1651529022025668946, + as_int=1, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + + pb2_metric_2 = pb2.Metric( + name="observable_gauge1", + unit="Hz", + description="description_2", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", value=AnyValue(string_value="3") + ) + ], + time_unix_nano=1651529022025687868, + as_int=12, + ) + ] + ), + ) + + pb2_metric_3 = pb2.Metric( + name="observable_gauge1", + unit="Hz", + description="description_3", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="4"), + ), + ], + time_unix_nano=1651529022025687868, + as_int=12, + ) + ] + ), + ) + + pb2_resource_0 = OTLPResource( + attributes=[ + KeyValue( + key="service.name", + value=AnyValue(string_value="resource_0"), + ) + ] + ) + + pb2_resource_1 = OTLPResource( + attributes=[ + KeyValue( + key="service.name", + value=AnyValue(string_value="resource_1"), + ) + ] + ) + + pb2_instrumentation_scope_0 = InstrumentationScope( + name="instrumentation_scope_0" + ) + pb2_instrumentation_scope_1 = InstrumentationScope( + name="instrumentation_scope_1" + ) + # pylint: disable=protected-access + actual = self.exporter._translate_data(metrics) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=pb2_resource_0, + scope_metrics=[ + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_0, + metrics=[ + pb2_metric_0, + pb2_metric_1, + ], + ), + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_1, + metrics=[ + pb2_metric_3, + ], + ), + ], + ), + pb2.ResourceMetrics( + resource=pb2_resource_1, + scope_metrics=[ + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_1, + metrics=[ + pb2_metric_2, + ], + ) + ], + ), + ] + ) self.assertEqual(expected, actual) diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 448a04bf293..1bec0b0d742 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -77,6 +77,10 @@ ) from prometheus_client.core import Metric as PrometheusMetric +from opentelemetry.sdk._metrics._internal.export import MappingMetricT +from opentelemetry.sdk._metrics.metric_reader import ( + Metric as MetricReaderMetric, +) from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import Gauge, Histogram, Metric, Sum @@ -112,13 +116,17 @@ def __init__(self, prefix: str = "") -> None: def _receive_metrics( self, - metrics: Iterable[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> None: if metrics is None: return - self._collector.add_metrics_data(metrics) + for instrumentation_scope_metric_reader_metrics in metrics.values(): + for metric_reader_metrics in ( + instrumentation_scope_metric_reader_metrics + ).values(): + self._collector.add_metrics_data(metric_reader_metrics) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: REGISTRY.unregister(self._collector) @@ -139,7 +147,9 @@ def __init__(self, prefix: str = ""): r"[^\w]", UNICODE | IGNORECASE ) - def add_metrics_data(self, export_records: Sequence[Metric]) -> None: + def add_metrics_data( + self, export_records: Iterable[MetricReaderMetric] + ) -> None: """Add metrics to Prometheus data""" self._metrics_to_export.append(export_records) @@ -171,7 +181,7 @@ def _translate_to_prometheus( ): label_values = [] label_keys = [] - for key, value in metric.attributes.items(): + for key, value in metric.point.attributes.items(): label_keys.append(self._sanitize(key)) label_values.append(self._check_value(value)) diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index fd963b139a0..51075aad3aa 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -60,6 +60,7 @@ def test_histogram_to_prometheus(self): record = _generate_metric( "test@name", Histogram( + attributes={"histo": 1}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[1, 3, 2], explicit_bounds=[123.0, 456.0], @@ -69,7 +70,6 @@ def test_histogram_to_prometheus(self): sum=579.0, time_unix_nano=1641946016139533244, ), - attributes={"histo": 1}, ) collector = _CustomCollector("testprefix") diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py index fef40bdae49..5be572d2be6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/_view_instrument_match.py @@ -112,13 +112,13 @@ def consume_measurement(self, measurement: Measurement) -> None: ): aggregation = ( self._view._aggregation._create_aggregation( - self._instrument + self._instrument, attributes ) ) else: aggregation = self._instrument_class_aggregation[ self._instrument.__class__ - ]._create_aggregation(self._instrument) + ]._create_aggregation(self._instrument, attributes) self._attributes_aggregation[attributes] = aggregation self._attributes_aggregation[attributes].aggregate(measurement) @@ -151,7 +151,6 @@ def collect( if current_point is not None: yield Metric( - attributes=dict(attributes), description=self._description, instrumentation_scope=( self._instrument.instrumentation_scope diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py index 92aee89143c..7788f6c9c5d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py @@ -33,18 +33,22 @@ ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import AggregationTemporality, Gauge -from opentelemetry.sdk._metrics.point import Histogram as HistogramPoint -from opentelemetry.sdk._metrics.point import PointT, Sum +from opentelemetry.sdk._metrics.point import Histogram as HistogramDatum +from opentelemetry.sdk._metrics.point import ( + DataT, Sum, HistogramDataPoint, NumberDataPoint +) from opentelemetry.util._time import _time_ns +from opentelemetry.util.types import Attributes -_PointVarT = TypeVar("_PointVarT", bound=PointT) +_PointVarT = TypeVar("_PointVarT", bound=DataT) _logger = getLogger(__name__) class _Aggregation(ABC, Generic[_PointVarT]): - def __init__(self): + def __init__(self, attributes: Attributes): self._lock = Lock() + self._attributes = attributes @abstractmethod def aggregate(self, measurement: Measurement) -> None: @@ -69,7 +73,9 @@ class Aggregation(ABC): """ @abstractmethod - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: """Creates an aggregation""" @@ -92,7 +98,9 @@ class DefaultAggregation(Aggregation): ============================================= ==================================== """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: # pylint: disable=too-many-return-statements if isinstance(instrument, Counter): @@ -130,10 +138,11 @@ def _create_aggregation(self, instrument: Instrument) -> _Aggregation: class _SumAggregation(_Aggregation[Sum]): def __init__( self, + attributes: Attributes, instrument_is_monotonic: bool, instrument_temporality: AggregationTemporality, ): - super().__init__() + super().__init__(attributes) self._start_time_unix_nano = _time_ns() self._instrument_temporality = instrument_temporality @@ -165,33 +174,25 @@ def collect(self) -> Optional[Sum]: self._value = 0 self._start_time_unix_nano = now + 1 + else: - return Sum( - aggregation_temporality=AggregationTemporality.DELTA, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=now, - value=value, - ) - - with self._lock: - if self._value is None: - return None - value = self._value - self._value = None + with self._lock: + if self._value is None: + return None + value = self._value + self._value = None - return Sum( - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=self._instrument_is_monotonic, - start_time_unix_nano=self._start_time_unix_nano, + return NumberDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, time_unix_nano=now, value=value, ) class _LastValueAggregation(_Aggregation[Gauge]): - def __init__(self): - super().__init__() + def __init__(self, attributes: Attributes): + super().__init__(attributes) self._value = None def aggregate(self, measurement: Measurement): @@ -209,14 +210,16 @@ def collect(self) -> Optional[Gauge]: self._value = None return Gauge( + attributes=self._attributes, time_unix_nano=_time_ns(), value=value, ) -class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): +class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramDatum]): def __init__( self, + attributes: Attributes, boundaries: Sequence[float] = ( 0.0, 5.0, @@ -231,7 +234,7 @@ def __init__( ), record_min_max: bool = True, ): - super().__init__() + super().__init__(attributes) self._boundaries = tuple(boundaries) self._bucket_counts = self._get_empty_bucket_counts() self._min = inf @@ -255,18 +258,18 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> HistogramPoint: + def collect(self) -> HistogramDatum: """ Atomically return a point for the current value of the metric. """ now = _time_ns() with self._lock: - value = self._bucket_counts + bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano - histogram_sum = self._sum - histogram_max = self._max - histogram_min = self._min + sum_ = self._sum + max_ = self._max + min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = now + 1 @@ -274,15 +277,21 @@ def collect(self) -> HistogramPoint: self._min = inf self._max = -inf - return HistogramPoint( + return HistogramDatum( aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=tuple(value), - explicit_bounds=self._boundaries, - max=histogram_max, - min=histogram_min, - start_time_unix_nano=start_time_unix_nano, - sum=histogram_sum, - time_unix_nano=now, + data_points=[ + HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=start_time_unix_nano, + time_unix_nano=now, + count=sum(bucket_counts), + sum=sum_, + bucket_counts=tuple(bucket_counts), + explicit_bounds=self._boundaries, + max=max_, + min=min_, + ) + ] ) @@ -356,6 +365,7 @@ def _convert_aggregation_temporality( ) return Sum( + attributes=current_point.attributes, start_time_unix_nano=output_start_time_unix_nano, time_unix_nano=current_point.time_unix_nano, value=value, @@ -363,7 +373,7 @@ def _convert_aggregation_temporality( is_monotonic=is_monotonic, ) - if current_point_type is HistogramPoint: + if current_point_type is HistogramDatum: if previous_point is None: return replace( current_point, aggregation_temporality=aggregation_temporality @@ -403,7 +413,8 @@ def _convert_aggregation_temporality( ) ] - return HistogramPoint( + return HistogramDatum( + attributes=current_point.attributes, aggregation_temporality=aggregation_temporality, bucket_counts=bucket_counts, explicit_bounds=current_point.explicit_bounds, @@ -432,6 +443,7 @@ class ExplicitBucketHistogramAggregation(Aggregation): def __init__( self, + attributes: Attributes, boundaries: Sequence[float] = ( 0.0, 5.0, @@ -449,8 +461,11 @@ def __init__( self._boundaries = boundaries self._record_min_max = record_min_max - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: return _ExplicitBucketHistogramAggregation( + attributes, boundaries=self._boundaries, record_min_max=self._record_min_max, ) @@ -462,7 +477,9 @@ class SumAggregation(Aggregation): - The arithmetic sum of Measurement values. """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): @@ -471,6 +488,7 @@ def _create_aggregation(self, instrument: Instrument) -> _Aggregation: temporality = AggregationTemporality.CUMULATIVE return _SumAggregation( + attributes, isinstance(instrument, (Counter, ObservableCounter)), temporality, ) @@ -484,12 +502,16 @@ class LastValueAggregation(Aggregation): - The timestamp of the last Measurement. """ - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: - return _LastValueAggregation() + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: + return _LastValueAggregation(attributes) class DropAggregation(Aggregation): """Using this aggregation will make all measurements be ignored.""" - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: - return _DropAggregation() + def _create_aggregation( + self, instrument: Instrument, attributes: Attributes + ) -> _Aggregation: + return _DropAggregation(attributes) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index c8dcb43327e..0dcb81de7a3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, List, Optional from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -28,8 +28,14 @@ set_value, ) from opentelemetry.sdk._metrics._internal.aggregation import Aggregation +from opentelemetry.sdk._metrics._internal.metric_reader import MappingMetricT +from opentelemetry.sdk._metrics._internal.metric_reader import ( + Metric as MetricReaderMetric, +) from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util._once import Once from opentelemetry.util._time import _time_ns @@ -55,7 +61,7 @@ class MetricExporter(ABC): @abstractmethod def export( self, - metrics: Sequence[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: @@ -95,7 +101,7 @@ def __init__( def export( self, - metrics: Sequence[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: @@ -124,24 +130,26 @@ def __init__( preferred_aggregation=preferred_aggregation, ) self._lock = RLock() - self._metrics: List[Metric] = [] + self._metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ] = None def get_metrics(self) -> List[Metric]: """Reads and returns current metrics from the SDK""" with self._lock: self.collect() metrics = self._metrics - self._metrics = [] + self._metrics = None return metrics def _receive_metrics( self, - metrics: Iterable[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> None: with self._lock: - self._metrics = list(metrics) + self._metrics = metrics def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -211,7 +219,7 @@ def _ticker(self) -> None: def _receive_metrics( self, - metrics: Iterable[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> None: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index d949a340478..738e62539eb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -15,7 +15,7 @@ from abc import ABC, abstractmethod from logging import getLogger from os import environ -from typing import Callable, Dict, Iterable +from typing import Callable, Dict, Iterable, Mapping from typing_extensions import final @@ -31,14 +31,22 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.point import AggregationTemporality +from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk.environment_variables import ( _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope _logger = getLogger(__name__) +MappingMetricT = Mapping[ + Resource, Mapping[InstrumentationScope, Iterable[Metric]] +] + + class MetricReader(ABC): """ Base class for all metric readers @@ -147,8 +155,47 @@ def collect(self, timeout_millis: float = 10_000) -> None: "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return + + collected_metrics = self._collect( + self, self._instrument_class_temporality + ) + + resource_instrumentation_scope_metrics = {} + + for collected_metric in collected_metrics: + if collected_metric.resource not in ( + resource_instrumentation_scope_metrics + ): + resource_instrumentation_scope_metrics[ + collected_metric.resource + ] = {} + + instrumentation_scope_metrics = ( + resource_instrumentation_scope_metrics[ + collected_metric.resource + ] + ) + + if collected_metric.instrumentation_scope not in ( + instrumentation_scope_metrics + ): + instrumentation_scope_metrics[ + collected_metric.instrumentation_scope + ] = [] + + instrumentation_scope_metrics[ + collected_metric.instrumentation_scope + ].append( + Metric( + description=collected_metric.description, + name=collected_metric.name, + unit=collected_metric.unit, + point=collected_metric.point, + ) + ) + self._receive_metrics( - self._collect(self, self._instrument_class_temporality), + resource_instrumentation_scope_metrics, timeout_millis=timeout_millis, ) @@ -165,7 +212,7 @@ def _set_collect_callback( @abstractmethod def _receive_metrics( self, - metrics: Iterable[Metric], + metrics: MappingMetricT, timeout_millis: float = 10_000, **kwargs, ) -> None: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py index 6d55858a635..a84e04dcbc6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py @@ -17,8 +17,6 @@ from enum import IntEnum from typing import Sequence, Union -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util.types import Attributes @@ -34,16 +32,24 @@ class AggregationTemporality(IntEnum): CUMULATIVE = 2 +@dataclass(frozen=True) +class NumberDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + attributes: Attributes + start_time_unix_nano: int + time_unix_nano: int + value: Union[int, float] + + @dataclass(frozen=True) class Sum: """Represents the type of a scalar metric that is calculated as a sum of all reported measurements over a time interval.""" - aggregation_temporality: AggregationTemporality is_monotonic: bool - start_time_unix_nano: int - time_unix_nano: int - value: Union[int, float] + data_points: Sequence[NumberDataPoint] @dataclass(frozen=True) @@ -51,27 +57,34 @@ class Gauge: """Represents the type of a scalar metric that always exports the current value for every data point. It should be used for an unknown aggregation.""" + data_points: Sequence[NumberDataPoint] + +@dataclass(frozen=True) +class HistogramDataPoint: + """Single data point in a timeseries that describes the time-varying scalar + value of a metric. + """ + attributes: Attributes + start_time_unix_nano: int time_unix_nano: int - value: Union[int, float] + count: int + sum: Union[int, float] + bucket_counts: Sequence[int] + explicit_bounds: Sequence[float] + min: float + max: float @dataclass(frozen=True) class Histogram: """Represents the type of a metric that is calculated by aggregating as a histogram of all reported measurements over a time interval.""" - aggregation_temporality: AggregationTemporality - bucket_counts: Sequence[int] - explicit_bounds: Sequence[float] - max: int - min: int - start_time_unix_nano: int - sum: Union[int, float] - time_unix_nano: int + data_points: Sequence[HistogramDataPoint] -PointT = Union[Sum, Gauge, Histogram] +DataT = Union[Sum, Gauge, Histogram] @dataclass(frozen=True) @@ -81,21 +94,14 @@ class Metric: Concrete metric types contain all the information as in the OTLP proto definitions (https://github.com/open-telemetry/opentelemetry-proto/blob/b43e9b18b76abf3ee040164b55b9c355217151f3/opentelemetry/proto/metrics/v1/metrics.proto#L37) but are flattened as much as possible. """ - - # common fields to all metric kinds - attributes: Attributes description: str - instrumentation_scope: InstrumentationScope name: str - resource: Resource unit: str - point: PointT - """Contains non-common fields for the given metric""" + data: DataT def to_json(self) -> str: return json.dumps( { - "attributes": self.attributes if self.attributes else "", "description": self.description if self.description else "", "instrumentation_scope": repr(self.instrumentation_scope) if self.instrumentation_scope diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/sdk_configuration.py index b48d7f22e7a..4fb47564c7c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/sdk_configuration.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Sequence -from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics._internal.metric_reader import MetricReader from opentelemetry.sdk.resources import Resource if TYPE_CHECKING: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 06dc1e77766..c894139a66d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -17,6 +17,7 @@ from opentelemetry.sdk._metrics._internal.export import ( # noqa: F401 ConsoleMetricExporter, InMemoryMetricReader, + MappingMetricT, MetricExporter, MetricExportResult, PeriodicExportingMetricReader, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index d184e9fbef0..796ae59a475 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -15,6 +15,7 @@ # pylint: disable=unused-import from opentelemetry.sdk._metrics._internal.metric_reader import ( # noqa: F401 + Metric, MetricReader, ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py index 190b85533ea..9356f66bbf5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/point.py @@ -22,14 +22,16 @@ Gauge, Histogram, Metric, - PointT, + DataT, Sum, + NumberDataPoint, + HistogramDataPoint ) __all__ = [] for key, value in globals().copy().items(): if not key.startswith("_"): - if _version_info.minor == 6 and key == "PointT": + if _version_info.minor == 6 and key == "DataT": continue value.__module__ = __name__ __all__.append(key) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index cb2ab8b54c6..79a01b2659e 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -33,7 +33,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics(), {}) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() @@ -54,4 +54,9 @@ def test_disable_default_views_add_custom(self): metrics = reader.get_metrics() self.assertEqual(len(metrics), 1) + + resource = [*metrics][0] + instrumentation_scope = [*metrics[resource]][0] + metrics = metrics[resource][instrumentation_scope] + self.assertEqual(metrics[0].name, "testhist") diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 9ef7c469ff9..53e3b08bfdb 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -114,13 +114,13 @@ def test_collect_delta(self): ) synchronous_sum_aggregation.aggregate(measurement(1)) - first_sum = synchronous_sum_aggregation.collect() + first_sum = synchronous_sum_aggregation.collect({}) self.assertEqual(first_sum.value, 1) self.assertTrue(first_sum.is_monotonic) synchronous_sum_aggregation.aggregate(measurement(1)) - second_sum = synchronous_sum_aggregation.collect() + second_sum = synchronous_sum_aggregation.collect({}) self.assertEqual(second_sum.value, 1) self.assertTrue(second_sum.is_monotonic) @@ -139,14 +139,14 @@ def test_collect_cumulative(self): ) sum_aggregation.aggregate(measurement(1)) - first_sum = sum_aggregation.collect() + first_sum = sum_aggregation.collect({}) self.assertEqual(first_sum.value, 1) self.assertTrue(first_sum.is_monotonic) # should have been reset after first collect sum_aggregation.aggregate(measurement(1)) - second_sum = sum_aggregation.collect() + second_sum = sum_aggregation.collect({}) self.assertEqual(second_sum.value, 1) self.assertTrue(second_sum.is_monotonic) @@ -156,7 +156,7 @@ def test_collect_cumulative(self): ) # if no point seen for a whole interval, should return None - third_sum = sum_aggregation.collect() + third_sum = sum_aggregation.collect({}) self.assertIsNone(third_sum) @@ -185,10 +185,10 @@ def test_collect(self): last_value_aggregation = _LastValueAggregation() - self.assertIsNone(last_value_aggregation.collect()) + self.assertIsNone(last_value_aggregation.collect({})) last_value_aggregation.aggregate(measurement(1)) - first_gauge = last_value_aggregation.collect() + first_gauge = last_value_aggregation.collect({}) self.assertIsInstance(first_gauge, Gauge) self.assertEqual(first_gauge.value, 1) @@ -198,7 +198,7 @@ def test_collect(self): # CI fails the last assertion without this sleep(0.1) - second_gauge = last_value_aggregation.collect() + second_gauge = last_value_aggregation.collect({}) self.assertEqual(second_gauge.value, 1) @@ -207,7 +207,7 @@ def test_collect(self): ) # if no observation seen for the interval, it should return None - third_gauge = last_value_aggregation.collect() + third_gauge = last_value_aggregation.collect({}) self.assertIsNone(third_gauge) @@ -249,7 +249,7 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation._bucket_counts[3], 1 ) - histo = explicit_bucket_histogram_aggregation.collect() + histo = explicit_bucket_histogram_aggregation.collect({}) self.assertEqual(histo.sum, 14) def test_min_max(self): @@ -294,7 +294,7 @@ def test_collect(self): ) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - first_histogram = explicit_bucket_histogram_aggregation.collect() + first_histogram = explicit_bucket_histogram_aggregation.collect({}) self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) self.assertEqual(first_histogram.sum, 1) @@ -303,7 +303,7 @@ def test_collect(self): sleep(0.1) explicit_bucket_histogram_aggregation.aggregate(measurement(1)) - second_histogram = explicit_bucket_histogram_aggregation.collect() + second_histogram = explicit_bucket_histogram_aggregation.collect({}) self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) self.assertEqual(second_histogram.sum, 1) @@ -324,6 +324,7 @@ def test_previous_point_non_cumulative(self): _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -331,6 +332,7 @@ def test_previous_point_non_cumulative(self): is_monotonic=False, ), Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -343,6 +345,7 @@ def test_previous_point_non_cumulative(self): def test_mismatched_point_types(self): current_point = Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -353,7 +356,7 @@ def test_mismatched_point_types(self): with self.assertLogs(level=WARNING): self.assertIs( _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), + Gauge(attributes={}, time_unix_nano=0, value=0), current_point, AggregationTemporality.DELTA, ), @@ -364,7 +367,7 @@ def test_mismatched_point_types(self): with self.assertLogs(level=WARNING): self.assertIs( _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), + Gauge(attributes={}, time_unix_nano=0, value=0), None, AggregationTemporality.DELTA, ), @@ -374,6 +377,7 @@ def test_mismatched_point_types(self): def test_current_point_sum_previous_point_none(self): current_point = Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -396,6 +400,7 @@ def test_current_point_sum_current_point_same_aggregation_temporality( ): current_point = Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -406,6 +411,7 @@ def test_current_point_sum_current_point_same_aggregation_temporality( self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -419,6 +425,7 @@ def test_current_point_sum_current_point_same_aggregation_temporality( ) current_point = Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -429,6 +436,7 @@ def test_current_point_sum_current_point_same_aggregation_temporality( self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=0, time_unix_nano=0, value=0, @@ -446,6 +454,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -453,6 +462,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): is_monotonic=False, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -462,6 +472,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): AggregationTemporality.DELTA, ), Sum( + attributes={}, start_time_unix_nano=2, time_unix_nano=5, value=3, @@ -473,6 +484,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -480,6 +492,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): is_monotonic=True, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -489,6 +502,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): AggregationTemporality.DELTA, ), Sum( + attributes={}, start_time_unix_nano=2, time_unix_nano=5, value=3, @@ -500,6 +514,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -507,6 +522,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): is_monotonic=True, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -516,6 +532,7 @@ def test_current_point_sum_aggregation_temporality_delta(self): AggregationTemporality.DELTA, ), Sum( + attributes={}, start_time_unix_nano=2, time_unix_nano=5, value=3, @@ -529,6 +546,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -536,6 +554,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): is_monotonic=False, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -545,6 +564,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): AggregationTemporality.CUMULATIVE, ), Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=5, value=9, @@ -556,6 +576,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -563,6 +584,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): is_monotonic=True, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -572,6 +594,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): AggregationTemporality.CUMULATIVE, ), Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=5, value=9, @@ -583,6 +606,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): self.assertEqual( _convert_aggregation_temporality( Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=2, value=3, @@ -590,6 +614,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): is_monotonic=True, ), Sum( + attributes={}, start_time_unix_nano=4, time_unix_nano=5, value=6, @@ -599,6 +624,7 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): AggregationTemporality.CUMULATIVE, ), Sum( + attributes={}, start_time_unix_nano=1, time_unix_nano=5, value=9, @@ -609,10 +635,10 @@ def test_current_point_sum_aggregation_temporality_cumulative(self): def test_current_point_gauge(self): - current_point = Gauge(time_unix_nano=1, value=0) + current_point = Gauge(attributes={}, time_unix_nano=1, value=0) self.assertEqual( _convert_aggregation_temporality( - Gauge(time_unix_nano=0, value=0), + Gauge(attributes={}, time_unix_nano=0, value=0), current_point, AggregationTemporality.CUMULATIVE, ), @@ -624,6 +650,7 @@ class TestHistogramConvertAggregationTemporality(TestCase): def test_previous_point_none(self): current_point = HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.DELTA, bucket_counts=[0, 2, 1, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -674,6 +701,7 @@ def test_previous_point_non_cumulative(self): def test_same_aggregation_temporality_cumulative(self): current_point = HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 3, 4, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -686,6 +714,7 @@ def test_same_aggregation_temporality_cumulative(self): self.assertEqual( _convert_aggregation_temporality( HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 2, 1, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -703,6 +732,7 @@ def test_same_aggregation_temporality_cumulative(self): def test_same_aggregation_temporality_delta(self): current_point = HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.DELTA, bucket_counts=[0, 1, 3, 0, 0], explicit_bounds=[0, 5, 10, 25], @@ -716,6 +746,7 @@ def test_same_aggregation_temporality_delta(self): self.assertEqual( _convert_aggregation_temporality( HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 3, 4, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -733,6 +764,7 @@ def test_same_aggregation_temporality_delta(self): def test_aggregation_temporality_to_cumulative(self): current_point = HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.DELTA, bucket_counts=[0, 1, 3, 0, 0], explicit_bounds=[0, 5, 10, 25], @@ -746,6 +778,7 @@ def test_aggregation_temporality_to_cumulative(self): self.assertEqual( _convert_aggregation_temporality( HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 2, 1, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -759,6 +792,7 @@ def test_aggregation_temporality_to_cumulative(self): AggregationTemporality.CUMULATIVE, ), HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 3, 4, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -772,6 +806,7 @@ def test_aggregation_temporality_to_cumulative(self): def test_aggregation_temporality_to_delta(self): current_point = HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 3, 4, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -785,6 +820,7 @@ def test_aggregation_temporality_to_delta(self): self.assertEqual( _convert_aggregation_temporality( HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.CUMULATIVE, bucket_counts=[0, 2, 1, 2, 0], explicit_bounds=[0, 5, 10, 25], @@ -798,6 +834,7 @@ def test_aggregation_temporality_to_delta(self): AggregationTemporality.DELTA, ), HistogramPoint( + attributes={}, aggregation_temporality=AggregationTemporality.DELTA, bucket_counts=[0, 1, 3, 0, 0], explicit_bounds=[0, 5, 10, 25], diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 8ed026e2456..30b3c851922 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -32,18 +32,22 @@ def test_no_metrics(self): mock_collect_callback = Mock(return_value=[]) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics(), {}) mock_collect_callback.assert_called_once() - def test_converts_metrics_to_list(self): + def test_store_metrics(self): + + resource = Resource.create() + instrumentation_scope = InstrumentationScope("testmetrics") + metric = Metric( - attributes={"myattr": "baz"}, description="", - instrumentation_scope=InstrumentationScope("testmetrics"), + instrumentation_scope=instrumentation_scope, name="foo", - resource=Resource.create(), + resource=resource, unit="", point=Sum( + attributes={"myattr": "baz"}, start_time_unix_nano=1647626444152947792, time_unix_nano=1647626444153163239, value=72.3309814450449, @@ -57,9 +61,19 @@ def test_converts_metrics_to_list(self): returned_metrics = reader.get_metrics() mock_collect_callback.assert_called_once() - self.assertIsInstance(returned_metrics, list) - self.assertEqual(len(returned_metrics), 1) - self.assertIs(returned_metrics[0], metric) + self.assertIsInstance(returned_metrics, dict) + + metric_reader_metric = returned_metrics[resource][ + instrumentation_scope + ][0] + + self.assertEqual( + metric_reader_metric.point.attributes, metric.point.attributes + ) + self.assertEqual(metric_reader_metric.description, metric.description) + self.assertEqual(metric_reader_metric.name, metric.name) + self.assertEqual(metric_reader_metric.unit, metric.unit) + self.assertEqual(metric_reader_metric.point, metric.point) def test_shutdown(self): # shutdown should always be successful @@ -78,4 +92,8 @@ def test_integration(self): metrics = reader.get_metrics() # should be 3 metrics, one from the observable gauge and one for each labelset from the counter - self.assertEqual(len(metrics), 3) + + resource = [*metrics][0] + instrumentation_scope = [*metrics[resource]][0] + + self.assertEqual(len(metrics[resource][instrumentation_scope]), 3) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3b571638016..993254ee47c 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -246,7 +246,13 @@ def test_measurement_collect_callback( DummyMetricReader(), ] sync_consumer_instance = mock_sync_measurement_consumer() - sync_consumer_instance.collect = MockFunc() + + class ListMockFunc(MockFunc): + def __init__(self): + super().__init__() + self.mock = [] + + sync_consumer_instance.collect = ListMockFunc() MeterProvider(metric_readers=metric_readers) for reader in metric_readers: @@ -506,6 +512,14 @@ def test_duplicate_instrument_aggregate_data(self): metrics = exporter.metrics[0] + resource = [*metrics][0] + instrumentation_scope_0 = [*metrics[resource]][0] + instrumentation_scope_1 = [*metrics[resource]][1] + metrics_0 = metrics[resource][instrumentation_scope_0] + metrics_1 = metrics[resource][instrumentation_scope_1] + + metrics = [metrics_0[0], metrics_1[0]] + self.assertEqual(len(metrics), 2) metric_0 = metrics[0] diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index d2ad8cdccc3..7a1622edda5 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -23,8 +23,9 @@ MetricExportResult, PeriodicExportingMetricReader, ) -from opentelemetry.sdk._metrics.point import Gauge, Metric, Sum -from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk._metrics.point import ( + Gauge, Metric, Sum, NumberDataPoint +) from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.util._time import _time_ns @@ -52,29 +53,34 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: metrics_list = [ Metric( name="sum_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Sum( - start_time_unix_nano=_time_ns(), - time_unix_nano=_time_ns(), - value=2, + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ], aggregation_temporality=1, is_monotonic=True, ), ), Metric( name="gauge_name", - attributes={}, description="", - instrumentation_scope=None, - resource=Resource.create(), unit="", - point=Gauge( - time_unix_nano=_time_ns(), - value=2, + data=Gauge( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=_time_ns(), + time_unix_nano=_time_ns(), + value=2, + ) + ], ), ), ] @@ -98,12 +104,13 @@ def _create_periodic_reader( def _collect(reader, temp): time.sleep(collect_wait) pmr._receive_metrics(metrics) + return [] pmr._set_collect_callback(_collect) return pmr def test_ticker_called(self): - collect_mock = Mock() + collect_mock = Mock(**{"return_value": []}) pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) diff --git a/opentelemetry-sdk/tests/metrics/test_point.py b/opentelemetry-sdk/tests/metrics/test_point.py index 1748fb7ba4a..2bcecbf727c 100644 --- a/opentelemetry-sdk/tests/metrics/test_point.py +++ b/opentelemetry-sdk/tests/metrics/test_point.py @@ -21,7 +21,6 @@ def _create_metric(value): return Metric( - attributes={"attr-key": "test-val"}, description="test-description", instrumentation_scope=InstrumentationScope( name="name", version="version" @@ -38,6 +37,7 @@ def test_sum(self): self.maxDiff = None point = _create_metric( Sum( + attributes={"attr-key": "test-val"}, aggregation_temporality=2, is_monotonic=True, start_time_unix_nano=10, @@ -46,20 +46,28 @@ def test_sum(self): ) ) self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 2, "is_monotonic": true, "start_time_unix_nano": 10, "time_unix_nano": 20, "value": 9}}', + '{"description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"attributes": {"attr-key": "test-val"}, "aggregation_temporality": 2, "is_monotonic": true, "start_time_unix_nano": 10, "time_unix_nano": 20, "value": 9}}', point.to_json(), ) def test_gauge(self): - point = _create_metric(Gauge(time_unix_nano=40, value=20)) + point = _create_metric( + Gauge( + attributes={"attr-key": "test-val"}, + time_unix_nano=40, + value=20, + ) + ) + self.maxDiff = None self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"time_unix_nano": 40, "value": 20}}', + '{"description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"attributes": {"attr-key": "test-val"}, "time_unix_nano": 40, "value": 20}}', point.to_json(), ) def test_histogram(self): point = _create_metric( Histogram( + attributes={"attr-key": "test-val"}, aggregation_temporality=1, bucket_counts=[0, 0, 1, 0], explicit_bounds=[0.1, 0.5, 0.9, 1], @@ -72,6 +80,6 @@ def test_histogram(self): ) self.maxDiff = None self.assertEqual( - '{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"aggregation_temporality": 1, "bucket_counts": [0, 0, 1, 0], "explicit_bounds": [0.1, 0.5, 0.9, 1], "max": 0.8, "min": 0.8, "start_time_unix_nano": 50, "sum": 0.8, "time_unix_nano": 60}}', + '{"description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"attributes": {"attr-key": "test-val"}, "aggregation_temporality": 1, "bucket_counts": [0, 0, 1, 0], "explicit_bounds": [0.1, 0.5, 0.9, 1], "max": 0.8, "min": 0.8, "start_time_unix_nano": 50, "sum": 0.8, "time_unix_nano": 60}}', point.to_json(), ) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 7e6f17644ac..3d7667f524e 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -216,7 +216,6 @@ def test_collect(self): ) ), Metric( - attributes={"c": "d"}, description="description", instrumentation_scope=self.mock_instrumentation_scope, name="name", diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index 97b411ac4b6..04407111904 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -16,6 +16,9 @@ from collections import OrderedDict from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk._metrics.metric_reader import ( + Metric as MetricReaderMetric, +) from opentelemetry.sdk._metrics.point import ( AggregationTemporality, Gauge, @@ -26,21 +29,39 @@ from opentelemetry.sdk.util.instrumentation import InstrumentationScope -def _generate_metric( - name, point, attributes=None, description=None, unit=None -) -> Metric: +def _generate_metric_reader_metric(metric: Metric): + + return { + metric.resource: { + metric.instrumentation_scope: [ + MetricReaderMetric( + description=metric.description, + name=metric.name, + unit=metric.unit, + point=metric.point, + ) + ] + } + } + + +def _generate_attributes(attributes=None): if not attributes: - attributes = BoundedAttributes(attributes={"a": 1, "b": True}) + return BoundedAttributes(attributes={"a": 1, "b": True}) + return attributes + + +def _generate_metric(name, point, description=None, unit=None) -> Metric: if not description: description = "foo" if not unit: unit = "s" + return Metric( resource=SDKResource(OrderedDict([("a", 1), ("b", False)])), instrumentation_scope=InstrumentationScope( "first_name", "first_version" ), - attributes=attributes, description=description, name=name, unit=unit, @@ -59,8 +80,8 @@ def _generate_sum( start_time_unix_nano=1641946015139533244, time_unix_nano=1641946016139533244, value=val, + attributes=_generate_attributes(attributes), ), - attributes=attributes, description=description, unit=unit, ) @@ -74,8 +95,8 @@ def _generate_gauge( Gauge( time_unix_nano=1641946016139533244, value=val, + attributes=_generate_attributes(attributes), ), - attributes=attributes, description=description, unit=unit, ) @@ -87,7 +108,6 @@ def _generate_unsupported_metric( return _generate_metric( name, None, - attributes=attributes, description=description, unit=unit, )