From 941e3553611847baf93f59acc475ffe283150ef9 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Wed, 7 Feb 2024 18:07:00 -0600 Subject: [PATCH 1/2] Fix explicit bucket histogram aggregation (#3429) * Fix explicit bucket histogram aggregation Fixes #3407 * Revert "Fix explicit bucket histogram aggregation" This reverts commit f1c6683cbf48ad9acdfce8be24b1dbafaa7bc918. * Fix ExplicitBucketHistogramAggregation Fixes #3407 * Fix default instrument temporality * Fix last test case * Add more test cases * Test min, max and sum * Fix lint * Add CHANGELOG * Fix lint * Skip test if running in Windows --- CHANGELOG.md | 6 +- .../sdk/metrics/_internal/aggregation.py | 187 ++++++++------ ...t_explicit_bucket_histogram_aggregation.py | 244 ++++++++++++++++++ .../integration_test/test_histogram_export.py | 14 +- .../tests/metrics/test_aggregation.py | 32 ++- 5 files changed, 386 insertions(+), 97 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ba26b90d079..6cbeab987c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Fix explicit bucket histogram aggregation + ([#3429](https://github.com/open-telemetry/opentelemetry-python/pull/3429)) +- Add `code.lineno`, `code.function` and `code.filepath` to all logs + ([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645)) - Add Synchronous Gauge instrument ([#3462](https://github.com/open-telemetry/opentelemetry-python/pull/3462)) - Drop support for 3.7 @@ -19,8 +23,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3623](https://github.com/open-telemetry/opentelemetry-python/pull/3623)) - Improve Resource Detector timeout messaging ([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645)) -- Add `code.lineno`, `code.function` and `code.filepath` to all logs - ([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645)) ## Version 1.22.0/0.43b0 (2023-12-15) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 3adf3327f63..3ec37473f60 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, + instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, boundaries: Sequence[float] = ( 0.0, @@ -398,33 +399,43 @@ def __init__( record_min_max: bool = True, ): super().__init__(attributes) + self._boundaries = tuple(boundaries) - self._bucket_counts = self._get_empty_bucket_counts() + self._record_min_max = record_min_max self._min = inf self._max = -inf self._sum = 0 - self._record_min_max = record_min_max + self._start_time_unix_nano = start_time_unix_nano - # It is assumed that the "natural" aggregation temporality for a - # Histogram instrument is DELTA, like the "natural" aggregation - # temporality for a Counter is DELTA and the "natural" aggregation - # temporality for an ObservableCounter is CUMULATIVE. - self._instrument_aggregation_temporality = AggregationTemporality.DELTA + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + + self._current_value = None + + self._previous_collection_start_nano = self._start_time_unix_nano + self._previous_cumulative_value = self._get_empty_bucket_counts() + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) def aggregate(self, measurement: Measurement) -> None: + with self._lock: + if self._current_value is None: + self._current_value = self._get_empty_bucket_counts() - value = measurement.value + value = measurement.value - if self._record_min_max: - self._min = min(self._min, value) - self._max = max(self._max, value) + self._sum += value - self._sum += value + if self._record_min_max: + self._min = min(self._min, value) + self._max = max(self._max, value) - self._bucket_counts[bisect_left(self._boundaries, value)] += 1 + self._current_value[bisect_left(self._boundaries, value)] += 1 def collect( self, @@ -434,84 +445,78 @@ def collect( """ Atomically return a point for the current value of the metric. """ - with self._lock: - if not any(self._bucket_counts): - return None - bucket_counts = self._bucket_counts - start_time_unix_nano = self._start_time_unix_nano + with self._lock: + current_value = self._current_value sum_ = self._sum - max_ = self._max min_ = self._min + max_ = self._max - self._bucket_counts = self._get_empty_bucket_counts() - self._start_time_unix_nano = collection_start_nano + self._current_value = None self._sum = 0 self._min = inf self._max = -inf - current_point = HistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=collection_start_nano, - count=sum(bucket_counts), - sum=sum_, - bucket_counts=tuple(bucket_counts), - explicit_bounds=self._boundaries, - min=min_, - max=max_, - ) + if ( + self._instrument_aggregation_temporality + is AggregationTemporality.DELTA + ): + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): - if self._previous_point is None or ( - self._instrument_aggregation_temporality - is collection_aggregation_temporality - ): - self._previous_point = current_point - return current_point + if current_value is None: + return None - max_ = current_point.max - min_ = current_point.min + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) - if ( - collection_aggregation_temporality - is AggregationTemporality.CUMULATIVE - ): - start_time_unix_nano = self._previous_point.start_time_unix_nano - sum_ = current_point.sum + self._previous_point.sum - # Only update min/max on delta -> cumulative - max_ = max(current_point.max, self._previous_point.max) - min_ = min(current_point.min, self._previous_point.min) - bucket_counts = [ - curr_count + prev_count - for curr_count, prev_count in zip( - current_point.bucket_counts, - self._previous_point.bucket_counts, - ) - ] - else: - start_time_unix_nano = self._previous_point.time_unix_nano - sum_ = current_point.sum - self._previous_point.sum - bucket_counts = [ - curr_count - prev_count - for curr_count, prev_count in zip( - current_point.bucket_counts, - self._previous_point.bucket_counts, + return HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=sum(current_value), + sum=sum_, + bucket_counts=tuple(current_value), + explicit_bounds=self._boundaries, + min=min_, + max=max_, + ) + + if current_value is None: + current_value = self._get_empty_bucket_counts() + + self._previous_cumulative_value = [ + current_value_element + previous_cumulative_value_element + for ( + current_value_element, + previous_cumulative_value_element, + ) in zip(current_value, self._previous_cumulative_value) + ] + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + + return HistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=sum(self._previous_cumulative_value), + sum=self._previous_sum, + bucket_counts=tuple(self._previous_cumulative_value), + explicit_bounds=self._boundaries, + min=self._previous_min, + max=self._previous_max, ) - ] - current_point = HistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=current_point.time_unix_nano, - count=sum(bucket_counts), - sum=sum_, - bucket_counts=tuple(bucket_counts), - explicit_bounds=current_point.explicit_bounds, - min=min_, - max=max_, - ) - self._previous_point = current_point - return current_point + return None # pylint: disable=protected-access @@ -1100,7 +1105,11 @@ def _create_aggregation( if isinstance(instrument, Histogram): return _ExplicitBucketHistogramAggregation( - attributes, start_time_unix_nano + attributes, + instrument_aggregation_temporality=( + AggregationTemporality.DELTA + ), + start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, ObservableGauge): @@ -1179,8 +1188,18 @@ def _create_aggregation( attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: + + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + return _ExplicitBucketHistogramAggregation( attributes, + instrument_aggregation_temporality, start_time_unix_nano, self._boundaries, self._record_min_max, @@ -1200,16 +1219,18 @@ def _create_aggregation( start_time_unix_nano: int, ) -> _Aggregation: - temporality = AggregationTemporality.UNSPECIFIED + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): - temporality = AggregationTemporality.DELTA + instrument_aggregation_temporality = AggregationTemporality.DELTA elif isinstance(instrument, Asynchronous): - temporality = AggregationTemporality.CUMULATIVE + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) return _SumAggregation( attributes, isinstance(instrument, (Counter, ObservableCounter)), - temporality, + instrument_aggregation_temporality, start_time_unix_nano, ) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py new file mode 100644 index 00000000000..6db35fd4c0a --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_explicit_bucket_histogram_aggregation.py @@ -0,0 +1,244 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from platform import system +from unittest import TestCase + +from pytest import mark + +from opentelemetry.sdk.metrics import Histogram, MeterProvider +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + InMemoryMetricReader, +) +from opentelemetry.sdk.metrics.view import ExplicitBucketHistogramAggregation + + +class TestExplicitBucketHistogramAggregation(TestCase): + + test_values = [1, 6, 11, 26, 51, 76, 101, 251, 501, 751] + + @mark.skipif( + system() == "Windows", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_delta_temporality(self): + + aggregation = ExplicitBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={Histogram: AggregationTemporality.DELTA}, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for test_value in self.test_values: + histogram.record(test_value) + results.append(reader.get_metrics_data()) + + metric_data = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + previous_time_unix_nano = metric_data.time_unix_nano + + self.assertEqual( + metric_data.bucket_counts, + (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + ) + + self.assertLess( + metric_data.start_time_unix_nano, + previous_time_unix_nano, + ) + self.assertEqual(metric_data.min, self.test_values[0]) + self.assertEqual(metric_data.max, self.test_values[0]) + self.assertEqual(metric_data.sum, self.test_values[0]) + + for index, metrics_data in enumerate(results[1:]): + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_time_unix_nano, metric_data.start_time_unix_nano + ) + previous_time_unix_nano = metric_data.time_unix_nano + self.assertEqual( + metric_data.bucket_counts, + tuple( + [ + 1 if internal_index == index + 2 else 0 + for internal_index in range(16) + ] + ), + ) + self.assertLess( + metric_data.start_time_unix_nano, metric_data.time_unix_nano + ) + self.assertEqual(metric_data.min, self.test_values[index + 1]) + self.assertEqual(metric_data.max, self.test_values[index + 1]) + self.assertEqual(metric_data.sum, self.test_values[index + 1]) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + @mark.skipif( + system() != "Linux", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_cumulative_temporality(self): + + aggregation = ExplicitBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={ + Histogram: AggregationTemporality.CUMULATIVE + }, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for test_value in self.test_values: + + histogram.record(test_value) + results.append(reader.get_metrics_data()) + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for index, metrics_data in enumerate(results): + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual( + metric_data.bucket_counts, + tuple( + [ + 0 + if internal_index < 1 or internal_index > index + 1 + else 1 + for internal_index in range(16) + ] + ), + ) + self.assertEqual(metric_data.min, self.test_values[0]) + self.assertEqual(metric_data.max, self.test_values[index]) + self.assertEqual( + metric_data.sum, sum(self.test_values[: index + 1]) + ) + + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + start_time_unix_nano = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + .start_time_unix_nano + ) + + for metrics_data in results: + + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertEqual( + metric_data.bucket_counts, + (0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0), + ) + self.assertEqual(metric_data.min, self.test_values[0]) + self.assertEqual(metric_data.max, self.test_values[-1]) + self.assertEqual(metric_data.sum, sum(self.test_values)) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py index 81d419819a4..eaf590219ba 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_histogram_export.py @@ -65,10 +65,8 @@ def test_histogram_counter_collection(self): metric_data = in_memory_metric_reader.get_metrics_data() - # FIXME ExplicitBucketHistogramAggregation is resetting counts to zero - # even if aggregation temporality is cumulative. self.assertEqual( - len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 1 + len(metric_data.resource_metrics[0].scope_metrics[0].metrics), 2 ) self.assertEqual( ( @@ -76,6 +74,16 @@ def test_histogram_counter_collection(self): .scope_metrics[0] .metrics[0] .data.data_points[0] + .bucket_counts + ), + (0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), + ) + self.assertEqual( + ( + metric_data.resource_metrics[0] + .scope_metrics[0] + .metrics[1] + .data.data_points[0] .value ), 1, diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 50561022fb1..37656b0a34c 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -270,7 +270,10 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation = ( _ExplicitBucketHistogramAggregation( - Mock(), 0, boundaries=[0, 2, 4] + Mock(), + AggregationTemporality.DELTA, + 0, + boundaries=[0, 2, 4], ) ) @@ -284,22 +287,22 @@ def test_aggregate(self): # The first bucket keeps count of values between (-inf, 0] (-1 and 0) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[0], 2 + explicit_bucket_histogram_aggregation._current_value[0], 2 ) # The second bucket keeps count of values between (0, 2] (1 and 2) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[1], 2 + explicit_bucket_histogram_aggregation._current_value[1], 2 ) # The third bucket keeps count of values between (2, 4] (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[2], 2 + explicit_bucket_histogram_aggregation._current_value[2], 2 ) # The fourth bucket keeps count of values between (4, inf) (3 and 4) self.assertEqual( - explicit_bucket_histogram_aggregation._bucket_counts[3], 1 + explicit_bucket_histogram_aggregation._current_value[3], 1 ) histo = explicit_bucket_histogram_aggregation.collect( @@ -314,7 +317,9 @@ def test_min_max(self): """ explicit_bucket_histogram_aggregation = ( - _ExplicitBucketHistogramAggregation(Mock(), 0) + _ExplicitBucketHistogramAggregation( + Mock(), AggregationTemporality.CUMULATIVE, 0 + ) ) explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) @@ -328,7 +333,10 @@ def test_min_max(self): explicit_bucket_histogram_aggregation = ( _ExplicitBucketHistogramAggregation( - Mock(), 0, record_min_max=False + Mock(), + AggregationTemporality.CUMULATIVE, + 0, + record_min_max=False, ) ) @@ -348,7 +356,10 @@ def test_collect(self): explicit_bucket_histogram_aggregation = ( _ExplicitBucketHistogramAggregation( - Mock(), 0, boundaries=[0, 1, 2] + Mock(), + AggregationTemporality.DELTA, + 0, + boundaries=[0, 1, 2], ) ) @@ -368,6 +379,7 @@ def test_collect(self): explicit_bucket_histogram_aggregation.aggregate(measurement(1)) # 2 is used here directly to simulate the instant the second # collection process starts. + second_histogram = explicit_bucket_histogram_aggregation.collect( AggregationTemporality.CUMULATIVE, 2 ) @@ -381,7 +393,9 @@ def test_collect(self): def test_boundaries(self): self.assertEqual( - _ExplicitBucketHistogramAggregation(Mock(), 0)._boundaries, + _ExplicitBucketHistogramAggregation( + Mock(), AggregationTemporality.CUMULATIVE, 0 + )._boundaries, ( 0.0, 5.0, From b9ee532eeadbfb57e68c87e0e215a8b9d428c890 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 7 Feb 2024 16:20:13 -0800 Subject: [PATCH 2/2] [chore]: avoid unnecessary temporary variables (#3672) Signed-off-by: Bogdan Drutu --- .../opentelemetry/sdk/_logs/_internal/__init__.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index cbfde8d6ebd..50d918ea8c3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -462,22 +462,16 @@ def _get_attributes(record: logging.LogRecord) -> Attributes: attributes[SpanAttributes.CODE_LINENO] = record.lineno if record.exc_info: - exc_type = "" - message = "" - stack_trace = "" exctype, value, tb = record.exc_info if exctype is not None: - exc_type = exctype.__name__ + attributes[SpanAttributes.EXCEPTION_TYPE] = exctype.__name__ if value is not None and value.args: - message = value.args[0] + attributes[SpanAttributes.EXCEPTION_MESSAGE] = value.args[0] if tb is not None: # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation - stack_trace = "".join( + attributes[SpanAttributes.EXCEPTION_STACKTRACE] = "".join( traceback.format_exception(*record.exc_info) ) - attributes[SpanAttributes.EXCEPTION_TYPE] = exc_type - attributes[SpanAttributes.EXCEPTION_MESSAGE] = message - attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stack_trace return attributes def _translate(self, record: logging.LogRecord) -> LogRecord: