diff --git a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md index 979d7c19092..e467d8f78e9 100644 --- a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Change temporality for Counter and UpDownCounter + ([#1384](https://github.com/open-telemetry/opentelemetry-python/pull/1384)) - Add Gzip compression for exporter ([#1141](https://github.com/open-telemetry/opentelemetry-python/pull/1141)) - OTLP exporter: Handle error case when no credentials supplied diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index c65b2a89df2..c90dd47db27 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -15,8 +15,7 @@ """OTLP Metrics Exporter""" import logging -import os -from typing import List, Optional, Sequence, Type, TypeVar, Union +from typing import List, Optional, Sequence, Type, TypeVar from grpc import ChannelCredentials @@ -71,7 +70,9 @@ def _get_data_points( - export_record: ExportRecord, data_point_class: Type[DataPointT] + export_record: ExportRecord, + data_point_class: Type[DataPointT], + aggregation_temporality: int, ) -> List[DataPointT]: if isinstance(export_record.aggregator, SumAggregator): @@ -91,6 +92,15 @@ def _get_data_points( elif isinstance(export_record.aggregator, ValueObserverAggregator): value = export_record.aggregator.checkpoint.last + if aggregation_temporality == ( + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + ): + start_time_unix_nano = export_record.aggregator.first_timestamp + else: + start_time_unix_nano = ( + export_record.aggregator.initial_checkpoint_timestamp + ) + return [ data_point_class( labels=[ @@ -98,9 +108,7 @@ def _get_data_points( for label_key, label_value in export_record.labels ], value=value, - start_time_unix_nano=( - export_record.aggregator.initial_checkpoint_timestamp - ), + start_time_unix_nano=start_time_unix_nano, time_unix_nano=(export_record.aggregator.last_update_timestamp), ) ] @@ -215,25 +223,35 @@ def _translate_data( data_point_class = type_class[value_type]["data_point_class"] if isinstance(export_record.instrument, Counter): + + aggregation_temporality = ( + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + ) + otlp_metric_data = sum_class( data_points=_get_data_points( - export_record, data_point_class - ), - aggregation_temporality=( - AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA + export_record, + data_point_class, + aggregation_temporality, ), + aggregation_temporality=aggregation_temporality, is_monotonic=True, ) argument = type_class[value_type]["sum"]["argument"] elif isinstance(export_record.instrument, UpDownCounter): + + aggregation_temporality = ( + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + ) + otlp_metric_data = sum_class( data_points=_get_data_points( - export_record, data_point_class - ), - aggregation_temporality=( - AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA + export_record, + data_point_class, + aggregation_temporality, ), + aggregation_temporality=aggregation_temporality, is_monotonic=False, ) argument = type_class[value_type]["sum"]["argument"] @@ -243,25 +261,35 @@ def _translate_data( continue elif isinstance(export_record.instrument, SumObserver): + + aggregation_temporality = ( + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + ) + otlp_metric_data = sum_class( data_points=_get_data_points( - export_record, data_point_class - ), - aggregation_temporality=( - AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + export_record, + data_point_class, + aggregation_temporality, ), + aggregation_temporality=aggregation_temporality, is_monotonic=True, ) argument = type_class[value_type]["sum"]["argument"] elif isinstance(export_record.instrument, UpDownSumObserver): + + aggregation_temporality = ( + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + ) + otlp_metric_data = sum_class( data_points=_get_data_points( - export_record, data_point_class - ), - aggregation_temporality=( - AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + export_record, + data_point_class, + aggregation_temporality, ), + aggregation_temporality=aggregation_temporality, is_monotonic=False, ) argument = type_class[value_type]["sum"]["argument"] @@ -269,7 +297,9 @@ def _translate_data( elif isinstance(export_record.instrument, (ValueObserver)): otlp_metric_data = gauge_class( data_points=_get_data_points( - export_record, data_point_class + export_record, + data_point_class, + AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, ) ) argument = type_class[value_type]["gauge"]["argument"] diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py index 299406559c2..4f121b38fda 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py @@ -46,7 +46,9 @@ class TestOTLPMetricExporter(TestCase): - def setUp(self): + @patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def setUp(self, mock_time_ns): # pylint: disable=arguments-differ + mock_time_ns.configure_mock(**{"return_value": 1}) self.exporter = OTLPMetricsExporter(insecure=True) resource = SDKResource(OrderedDict([("a", 1), ("b", False)])) @@ -95,12 +97,9 @@ def test_no_credentials_error(self): with self.assertRaises(ValueError): OTLPMetricsExporter() - @patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") - def test_translate_metrics(self, mock_time_ns): + def test_translate_metrics(self): # pylint: disable=no-member - mock_time_ns.configure_mock(**{"return_value": 1}) - self.counter_export_record.aggregator.checkpoint = 1 self.counter_export_record.aggregator.initial_checkpoint_timestamp = 1 self.counter_export_record.aggregator.last_update_timestamp = 1 @@ -137,7 +136,7 @@ def test_translate_metrics(self, mock_time_ns): ) ], aggregation_temporality=( - AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA + AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ), is_monotonic=True, ), diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 84ab518a47f..3781b9146bd 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -34,6 +34,7 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = 0 self.initial_checkpoint_timestamp = 0 + self.first_timestamp = time_ns() self.checkpointed = True if config is not None: self.config = config