diff --git a/CHANGELOG.md b/CHANGELOG.md index 121e85e7034..27b02b1d7aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3633](https://github.com/open-telemetry/opentelemetry-python/pull/3633)) - Fix python 3.12 deprecation warning ([#3751](https://github.com/open-telemetry/opentelemetry-python/pull/3751)) +- Fix exponential histograms + ([#3798](https://github.com/open-telemetry/opentelemetry-python/pull/3798)) - bump mypy to 0.982 ([#3776](https://github.com/open-telemetry/opentelemetry-python/pull/3776)) - Fix otlp exporter to export log_record.observed_timestamp diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 3ec37473f60..3f93c91fa24 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -37,6 +37,9 @@ from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import ( Buckets, ) +from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import ( + Mapping, +) from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import ( ExponentMapping, ) @@ -519,6 +522,12 @@ def collect( return None +def _new_exponential_mapping(scale: int) -> Mapping: + if scale <= 0: + return ExponentMapping(scale) + return LogarithmMapping(scale) + + # pylint: disable=protected-access class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): # _min_max_size and _max_max_size are the smallest and largest values @@ -592,13 +601,15 @@ def __init__( "larger than the recommended value of 20", self._max_scale, ) - self._mapping = LogarithmMapping(self._max_scale) + self._mapping = _new_exponential_mapping(self._max_scale) self._instrument_aggregation_temporality = AggregationTemporality.DELTA self._start_time_unix_nano = start_time_unix_nano self._previous_scale = None self._previous_start_time_unix_nano = None + self._previous_zero_count = None + self._previous_count = None self._previous_sum = None self._previous_max = None self._previous_min = None @@ -678,11 +689,14 @@ def aggregate(self, measurement: Measurement) -> None: # 4. Rescale the mapping if needed. if is_rescaling_needed: + scale_change = self._get_scale_change(low, high) self._downscale( - self._get_scale_change(low, high), + scale_change, self._positive, self._negative, ) + new_scale = self._mapping.scale - scale_change + self._mapping = _new_exponential_mapping(new_scale) index = self._mapping.map_to_index(value) @@ -756,28 +770,6 @@ def collect( self._min = inf self._max = -inf - current_point = ExponentialHistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=current_start_time_unix_nano, - time_unix_nano=collection_start_nano, - count=current_count, - sum=current_sum, - scale=current_scale, - zero_count=current_zero_count, - positive=BucketsPoint( - offset=current_positive.offset, - bucket_counts=current_positive.counts, - ), - negative=BucketsPoint( - offset=current_negative.offset, - bucket_counts=current_negative.counts, - ), - # FIXME: Find the right value for flags - flags=0, - min=current_min, - max=current_max, - ) - if self._previous_scale is None or ( self._instrument_aggregation_temporality is collection_aggregation_temporality @@ -789,18 +781,48 @@ def collect( self._previous_max = current_max self._previous_min = current_min self._previous_sum = current_sum + self._previous_count = current_count + self._previous_zero_count = current_zero_count self._previous_positive = current_positive self._previous_negative = current_negative + current_point = ExponentialHistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=current_start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=current_count, + sum=current_sum, + scale=current_scale, + zero_count=current_zero_count, + positive=BucketsPoint( + offset=current_positive.offset, + bucket_counts=current_positive.get_offset_counts(), + ), + negative=BucketsPoint( + offset=current_negative.offset, + bucket_counts=current_negative.get_offset_counts(), + ), + # FIXME: Find the right value for flags + flags=0, + min=current_min, + max=current_max, + ) + return current_point min_scale = min(self._previous_scale, current_scale) low_positive, high_positive = self._get_low_high_previous_current( - self._previous_positive, current_positive, min_scale + self._previous_positive, + current_positive, + current_scale, + min_scale, ) low_negative, high_negative = self._get_low_high_previous_current( - self._previous_negative, current_negative, min_scale + self._previous_negative, + current_negative, + current_scale, + min_scale, ) min_scale = min( @@ -814,10 +836,11 @@ def collect( # but the histogram) has a count larger than zero, if not, scale # (the histogram scale) would be zero. See exponential.go 191 self._downscale( - self._mapping.scale - min_scale, + self._previous_scale - min_scale, self._previous_positive, self._previous_negative, ) + self._previous_scale = min_scale if ( collection_aggregation_temporality @@ -826,6 +849,8 @@ def collect( start_time_unix_nano = self._previous_start_time_unix_nano sum_ = current_sum + self._previous_sum + zero_count = current_zero_count + self._previous_zero_count + count = current_count + self._previous_count # Only update min/max on delta -> cumulative max_ = max(current_max, self._previous_max) min_ = min(current_min, self._previous_min) @@ -844,10 +869,16 @@ def collect( min_scale, collection_aggregation_temporality, ) + current_scale = min_scale + + current_positive = self._previous_positive + current_negative = self._previous_negative else: start_time_unix_nano = self._previous_start_time_unix_nano sum_ = current_sum - self._previous_sum + zero_count = current_zero_count + count = current_count max_ = current_max min_ = current_min @@ -870,17 +901,17 @@ def collect( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=collection_start_nano, - count=current_count, + count=count, sum=sum_, scale=current_scale, - zero_count=current_zero_count, + zero_count=zero_count, positive=BucketsPoint( offset=current_positive.offset, - bucket_counts=current_positive.counts, + bucket_counts=current_positive.get_offset_counts(), ), negative=BucketsPoint( offset=current_negative.offset, - bucket_counts=current_negative.counts, + bucket_counts=current_negative.get_offset_counts(), ), # FIXME: Find the right value for flags flags=0, @@ -892,19 +923,27 @@ def collect( self._previous_positive = current_positive self._previous_negative = current_negative self._previous_start_time_unix_nano = current_start_time_unix_nano - self._previous_sum = current_sum + self._previous_sum = sum_ + self._previous_count = count + self._previous_max = max_ + self._previous_min = min_ + self._previous_zero_count = zero_count return current_point def _get_low_high_previous_current( - self, previous_point_buckets, current_point_buckets, min_scale + self, + previous_point_buckets, + current_point_buckets, + current_scale, + min_scale, ): (previous_point_low, previous_point_high) = self._get_low_high( - previous_point_buckets, min_scale + previous_point_buckets, self._previous_scale, min_scale ) (current_point_low, current_point_high) = self._get_low_high( - current_point_buckets, min_scale + current_point_buckets, current_scale, min_scale ) if current_point_low > current_point_high: @@ -921,11 +960,12 @@ def _get_low_high_previous_current( return low, high - def _get_low_high(self, buckets, min_scale): + @staticmethod + def _get_low_high(buckets, scale, min_scale): if buckets.counts == [0]: return 0, -1 - shift = self._mapping._scale - min_scale + shift = scale - min_scale return buckets.index_start >> shift, buckets.index_end >> shift @@ -941,7 +981,8 @@ def _get_scale_change(self, low, high): return change - def _downscale(self, change: int, positive, negative): + @staticmethod + def _downscale(change: int, positive, negative): if change == 0: return @@ -949,22 +990,13 @@ def _downscale(self, change: int, positive, negative): if change < 0: raise Exception("Invalid change of scale") - new_scale = self._mapping.scale - change - positive.downscale(change) negative.downscale(change) - if new_scale <= 0: - mapping = ExponentMapping(new_scale) - else: - mapping = LogarithmMapping(new_scale) - - self._mapping = mapping - def _merge( self, - previous_buckets, - current_buckets, + previous_buckets: Buckets, + current_buckets: Buckets, current_scale, min_scale, aggregation_temporality, @@ -983,9 +1015,11 @@ def _merge( # would not happen because self._previous_point is only assigned to # an ExponentialHistogramDataPoint object if self._count != 0. - index = ( - current_buckets.offset + current_bucket_index - ) >> current_change + current_index = current_buckets.index_base + current_bucket_index + if current_index > current_buckets.index_end: + current_index -= len(current_buckets.counts) + + index = current_index >> current_change if index < previous_buckets.index_start: span = previous_buckets.index_end - index @@ -999,7 +1033,7 @@ def _merge( previous_buckets.index_start = index if index > previous_buckets.index_end: - span = index - previous_buckets.index_end + span = index - previous_buckets.index_start if span >= self._max_size: raise Exception("Incorrect merge scale") diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py index 5c6b04bd39b..4dbe8f385e8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py @@ -73,6 +73,10 @@ def index_base(self, value: int) -> None: def counts(self): return self._counts + def get_offset_counts(self): + bias = self.__index_base - self.__index_start + return self._counts[-bias:] + self._counts[:-bias] + def grow(self, needed: int, max_size: int) -> None: size = len(self._counts) @@ -129,7 +133,6 @@ def downscale(self, amount: int) -> None: bias = self.__index_base - self.__index_start if bias != 0: - self.__index_base = self.__index_start # [0, 1, 2, 3, 4] Original backing array diff --git a/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py index 311f00a0b00..bae0aca20bf 100644 --- a/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random as insecure_random from itertools import permutations from logging import WARNING from math import ldexp @@ -37,6 +38,9 @@ LogarithmMapping, ) from opentelemetry.sdk.metrics._internal.measurement import Measurement +from opentelemetry.sdk.metrics._internal.point import ( + ExponentialHistogramDataPoint, +) from opentelemetry.sdk.metrics.view import ( ExponentialBucketHistogramAggregation, ) @@ -853,13 +857,14 @@ def test_aggregate_collect(self): AggregationTemporality.CUMULATIVE, 0 ) - def test_collect_results_cumulative(self): + def test_collect_results_cumulative(self) -> None: exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( Mock(), Mock(), ) ) + self.maxDiff = None self.assertEqual(exponential_histogram_aggregation._mapping._scale, 20) @@ -884,7 +889,7 @@ def test_collect_results_cumulative(self): self.assertEqual(collection_0.zero_count, 0) self.assertEqual( collection_0.positive.bucket_counts, - [1, *[0] * 63, 1, *[0] * 31, 1, *[0] * 63], + [1, *[0] * 63, 1, *[0] * 63, 1, *[0] * 31], ) self.assertEqual(collection_0.flags, 0) self.assertEqual(collection_0.min, 1) @@ -911,7 +916,7 @@ def test_collect_results_cumulative(self): previous_count = count count_counts.append([previous_count, 1]) - self.assertEqual(collection_1.count, 5) + self.assertEqual(collection_1.count, 8) self.assertEqual(collection_1.sum, 16.645) self.assertEqual(collection_1.scale, 4) self.assertEqual(collection_1.zero_count, 0) @@ -920,21 +925,68 @@ def test_collect_results_cumulative(self): collection_1.positive.bucket_counts, [ 1, - *[0] * 15, + *[0] * 17, 1, - *[0] * 47, + *[0] * 36, 1, - *[0] * 40, + *[0] * 15, + 2, + *[0] * 15, 1, - *[0] * 17, + *[0] * 15, 1, - *[0] * 36, + *[0] * 15, + 1, + *[0] * 40, ], ) self.assertEqual(collection_1.flags, 0) self.assertEqual(collection_1.min, 0.045) self.assertEqual(collection_1.max, 8) + def test_cumulative_aggregation_with_random_data(self) -> None: + histogram = _ExponentialBucketHistogramAggregation(Mock(), Mock()) + + def collect_and_validate() -> None: + result: ExponentialHistogramDataPoint = histogram.collect( + AggregationTemporality.CUMULATIVE, 0 + ) + buckets = result.positive.bucket_counts + scale = result.scale + index_start = result.positive.offset + + for i in range(len(buckets)): + index = index_start + i + count = buckets[i] + lower_bound = 2 ** (index / (2**scale)) + upper_bound = 2 ** ((index + 1) / (2**scale)) + matches = 0 + for value in values: + if value > lower_bound and value <= upper_bound: + matches += 1 + assert ( + matches == count + ), f"index: {index}, count: {count}, scale: {scale}, lower_bound: {lower_bound}, upper_bound: {upper_bound}, matches: {matches}" + + assert sum(buckets) + result.zero_count == len(values) + assert result.sum == sum(values) + assert result.count == len(values) + assert result.min == min(values) + assert result.max == max(values) + assert result.zero_count == len([v for v in values if v == 0]) + assert scale >= 3 + + random = insecure_random.Random("opentelemetry2") + values = [] + for i in range(2000): + value = random.randint(0, 1000) + values.append(value) + histogram.aggregate(Measurement(value, Mock())) + if i % 20 == 0: + collect_and_validate() + + collect_and_validate() + def test_merge_collect_cumulative(self): exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) @@ -974,7 +1026,8 @@ def test_merge_collect_cumulative(self): 0, ) - self.assertEqual(result.scale, result_1.scale) + self.assertEqual(result.scale, 0) + self.assertEqual(result_1.scale, -1) def test_merge_collect_delta(self): exponential_histogram_aggregation = (