Skip to content

Commit

Permalink
Fix collection of exponential histogram (#3798)
Browse files Browse the repository at this point in the history
* Fix collection of exponential histogram

* changelog

* fix lint

* moar lint

* fix sum/count/min/max

* fix scale when downscaling happens due to low/high of current/previous not fitting into the current scale

---------

Co-authored-by: Srikanth Chekuri <[email protected]>
Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
3 people authored Apr 5, 2024
1 parent 6182bb0 commit da30869
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3633](https://github.com/open-telemetry/opentelemetry-python/pull/3633))
- Fix python 3.12 deprecation warning
([#3751](https://github.com/open-telemetry/opentelemetry-python/pull/3751))
- Fix exponential histograms
([#3798](https://github.com/open-telemetry/opentelemetry-python/pull/3798))
- bump mypy to 0.982
([#3776](https://github.com/open-telemetry/opentelemetry-python/pull/3776))
- Fix otlp exporter to export log_record.observed_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import (
Buckets,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
Mapping,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import (
ExponentMapping,
)
Expand Down Expand Up @@ -519,6 +522,12 @@ def collect(
return None


def _new_exponential_mapping(scale: int) -> Mapping:
if scale <= 0:
return ExponentMapping(scale)
return LogarithmMapping(scale)


# pylint: disable=protected-access
class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]):
# _min_max_size and _max_max_size are the smallest and largest values
Expand Down Expand Up @@ -592,13 +601,15 @@ def __init__(
"larger than the recommended value of 20",
self._max_scale,
)
self._mapping = LogarithmMapping(self._max_scale)
self._mapping = _new_exponential_mapping(self._max_scale)

self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._start_time_unix_nano = start_time_unix_nano

self._previous_scale = None
self._previous_start_time_unix_nano = None
self._previous_zero_count = None
self._previous_count = None
self._previous_sum = None
self._previous_max = None
self._previous_min = None
Expand Down Expand Up @@ -678,11 +689,14 @@ def aggregate(self, measurement: Measurement) -> None:
# 4. Rescale the mapping if needed.
if is_rescaling_needed:

scale_change = self._get_scale_change(low, high)
self._downscale(
self._get_scale_change(low, high),
scale_change,
self._positive,
self._negative,
)
new_scale = self._mapping.scale - scale_change
self._mapping = _new_exponential_mapping(new_scale)

index = self._mapping.map_to_index(value)

Expand Down Expand Up @@ -756,28 +770,6 @@ def collect(
self._min = inf
self._max = -inf

current_point = ExponentialHistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=current_start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
sum=current_sum,
scale=current_scale,
zero_count=current_zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.counts,
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.counts,
),
# FIXME: Find the right value for flags
flags=0,
min=current_min,
max=current_max,
)

if self._previous_scale is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
Expand All @@ -789,18 +781,48 @@ def collect(
self._previous_max = current_max
self._previous_min = current_min
self._previous_sum = current_sum
self._previous_count = current_count
self._previous_zero_count = current_zero_count
self._previous_positive = current_positive
self._previous_negative = current_negative

current_point = ExponentialHistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=current_start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
sum=current_sum,
scale=current_scale,
zero_count=current_zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.get_offset_counts(),
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.get_offset_counts(),
),
# FIXME: Find the right value for flags
flags=0,
min=current_min,
max=current_max,
)

return current_point

min_scale = min(self._previous_scale, current_scale)

low_positive, high_positive = self._get_low_high_previous_current(
self._previous_positive, current_positive, min_scale
self._previous_positive,
current_positive,
current_scale,
min_scale,
)
low_negative, high_negative = self._get_low_high_previous_current(
self._previous_negative, current_negative, min_scale
self._previous_negative,
current_negative,
current_scale,
min_scale,
)

min_scale = min(
Expand All @@ -814,10 +836,11 @@ def collect(
# but the histogram) has a count larger than zero, if not, scale
# (the histogram scale) would be zero. See exponential.go 191
self._downscale(
self._mapping.scale - min_scale,
self._previous_scale - min_scale,
self._previous_positive,
self._previous_negative,
)
self._previous_scale = min_scale

if (
collection_aggregation_temporality
Expand All @@ -826,6 +849,8 @@ def collect(

start_time_unix_nano = self._previous_start_time_unix_nano
sum_ = current_sum + self._previous_sum
zero_count = current_zero_count + self._previous_zero_count
count = current_count + self._previous_count
# Only update min/max on delta -> cumulative
max_ = max(current_max, self._previous_max)
min_ = min(current_min, self._previous_min)
Expand All @@ -844,10 +869,16 @@ def collect(
min_scale,
collection_aggregation_temporality,
)
current_scale = min_scale

current_positive = self._previous_positive
current_negative = self._previous_negative

else:
start_time_unix_nano = self._previous_start_time_unix_nano
sum_ = current_sum - self._previous_sum
zero_count = current_zero_count
count = current_count
max_ = current_max
min_ = current_min

Expand All @@ -870,17 +901,17 @@ def collect(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
count=count,
sum=sum_,
scale=current_scale,
zero_count=current_zero_count,
zero_count=zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.counts,
bucket_counts=current_positive.get_offset_counts(),
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.counts,
bucket_counts=current_negative.get_offset_counts(),
),
# FIXME: Find the right value for flags
flags=0,
Expand All @@ -892,19 +923,27 @@ def collect(
self._previous_positive = current_positive
self._previous_negative = current_negative
self._previous_start_time_unix_nano = current_start_time_unix_nano
self._previous_sum = current_sum
self._previous_sum = sum_
self._previous_count = count
self._previous_max = max_
self._previous_min = min_
self._previous_zero_count = zero_count

return current_point

def _get_low_high_previous_current(
self, previous_point_buckets, current_point_buckets, min_scale
self,
previous_point_buckets,
current_point_buckets,
current_scale,
min_scale,
):

(previous_point_low, previous_point_high) = self._get_low_high(
previous_point_buckets, min_scale
previous_point_buckets, self._previous_scale, min_scale
)
(current_point_low, current_point_high) = self._get_low_high(
current_point_buckets, min_scale
current_point_buckets, current_scale, min_scale
)

if current_point_low > current_point_high:
Expand All @@ -921,11 +960,12 @@ def _get_low_high_previous_current(

return low, high

def _get_low_high(self, buckets, min_scale):
@staticmethod
def _get_low_high(buckets, scale, min_scale):
if buckets.counts == [0]:
return 0, -1

shift = self._mapping._scale - min_scale
shift = scale - min_scale

return buckets.index_start >> shift, buckets.index_end >> shift

Expand All @@ -941,30 +981,22 @@ def _get_scale_change(self, low, high):

return change

def _downscale(self, change: int, positive, negative):
@staticmethod
def _downscale(change: int, positive, negative):

if change == 0:
return

if change < 0:
raise Exception("Invalid change of scale")

new_scale = self._mapping.scale - change

positive.downscale(change)
negative.downscale(change)

if new_scale <= 0:
mapping = ExponentMapping(new_scale)
else:
mapping = LogarithmMapping(new_scale)

self._mapping = mapping

def _merge(
self,
previous_buckets,
current_buckets,
previous_buckets: Buckets,
current_buckets: Buckets,
current_scale,
min_scale,
aggregation_temporality,
Expand All @@ -983,9 +1015,11 @@ def _merge(
# would not happen because self._previous_point is only assigned to
# an ExponentialHistogramDataPoint object if self._count != 0.

index = (
current_buckets.offset + current_bucket_index
) >> current_change
current_index = current_buckets.index_base + current_bucket_index
if current_index > current_buckets.index_end:
current_index -= len(current_buckets.counts)

index = current_index >> current_change

if index < previous_buckets.index_start:
span = previous_buckets.index_end - index
Expand All @@ -999,7 +1033,7 @@ def _merge(
previous_buckets.index_start = index

if index > previous_buckets.index_end:
span = index - previous_buckets.index_end
span = index - previous_buckets.index_start

if span >= self._max_size:
raise Exception("Incorrect merge scale")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def index_base(self, value: int) -> None:
def counts(self):
return self._counts

def get_offset_counts(self):
bias = self.__index_base - self.__index_start
return self._counts[-bias:] + self._counts[:-bias]

def grow(self, needed: int, max_size: int) -> None:

size = len(self._counts)
Expand Down Expand Up @@ -129,7 +133,6 @@ def downscale(self, amount: int) -> None:
bias = self.__index_base - self.__index_start

if bias != 0:

self.__index_base = self.__index_start

# [0, 1, 2, 3, 4] Original backing array
Expand Down
Loading

0 comments on commit da30869

Please sign in to comment.