Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix collection of exponential histogram #3798

Merged
merged 11 commits into from
Apr 5, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3633](https://github.com/open-telemetry/opentelemetry-python/pull/3633))
- Fix python 3.12 deprecation warning
([#3751](https://github.com/open-telemetry/opentelemetry-python/pull/3751))
- Fix exponential histograms
([#3798](https://github.com/open-telemetry/opentelemetry-python/pull/3798))
- bump mypy to 0.982
([#3776](https://github.com/open-telemetry/opentelemetry-python/pull/3776))
- Fix otlp exporter to export log_record.observed_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import (
Buckets,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping import (
Mapping,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import (
ExponentMapping,
)
Expand Down Expand Up @@ -519,6 +522,12 @@ def collect(
return None


def _new_exponential_mapping(scale: int) -> Mapping:
if scale <= 0:
return ExponentMapping(scale)
return LogarithmMapping(scale)


# pylint: disable=protected-access
class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]):
# _min_max_size and _max_max_size are the smallest and largest values
Expand Down Expand Up @@ -592,13 +601,15 @@ def __init__(
"larger than the recommended value of 20",
self._max_scale,
)
self._mapping = LogarithmMapping(self._max_scale)
self._mapping = _new_exponential_mapping(self._max_scale)
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._start_time_unix_nano = start_time_unix_nano

self._previous_scale = None
self._previous_start_time_unix_nano = None
self._previous_zero_count = None
self._previous_count = None
self._previous_sum = None
self._previous_max = None
self._previous_min = None
Expand Down Expand Up @@ -678,11 +689,14 @@ def aggregate(self, measurement: Measurement) -> None:
# 4. Rescale the mapping if needed.
if is_rescaling_needed:

scale_change = self._get_scale_change(low, high)
self._downscale(
self._get_scale_change(low, high),
scale_change,
self._positive,
self._negative,
)
new_scale = self._mapping.scale - scale_change
self._mapping = _new_exponential_mapping(new_scale)
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

index = self._mapping.map_to_index(value)

Expand Down Expand Up @@ -756,28 +770,6 @@ def collect(
self._min = inf
self._max = -inf

current_point = ExponentialHistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=current_start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
sum=current_sum,
scale=current_scale,
zero_count=current_zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.counts,
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.counts,
),
# FIXME: Find the right value for flags
flags=0,
min=current_min,
max=current_max,
)

if self._previous_scale is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
Expand All @@ -789,18 +781,48 @@ def collect(
self._previous_max = current_max
self._previous_min = current_min
self._previous_sum = current_sum
self._previous_count = current_count
self._previous_zero_count = current_zero_count
self._previous_positive = current_positive
self._previous_negative = current_negative

current_point = ExponentialHistogramDataPoint(
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
attributes=self._attributes,
start_time_unix_nano=current_start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
sum=current_sum,
scale=current_scale,
zero_count=current_zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.get_offset_counts(),
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.get_offset_counts(),
),
# FIXME: Find the right value for flags
flags=0,
min=current_min,
max=current_max,
)

return current_point

min_scale = min(self._previous_scale, current_scale)

low_positive, high_positive = self._get_low_high_previous_current(
self._previous_positive, current_positive, min_scale
self._previous_positive,
current_positive,
current_scale,
min_scale,
)
low_negative, high_negative = self._get_low_high_previous_current(
self._previous_negative, current_negative, min_scale
self._previous_negative,
current_negative,
current_scale,
min_scale,
)

min_scale = min(
Expand All @@ -814,10 +836,11 @@ def collect(
# but the histogram) has a count larger than zero, if not, scale
# (the histogram scale) would be zero. See exponential.go 191
self._downscale(
self._mapping.scale - min_scale,
self._previous_scale - min_scale,
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
self._previous_positive,
self._previous_negative,
)
self._previous_scale = min_scale

if (
collection_aggregation_temporality
Expand All @@ -826,6 +849,8 @@ def collect(

start_time_unix_nano = self._previous_start_time_unix_nano
sum_ = current_sum + self._previous_sum
zero_count = current_zero_count + self._previous_zero_count
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
count = current_count + self._previous_count
# Only update min/max on delta -> cumulative
max_ = max(current_max, self._previous_max)
min_ = min(current_min, self._previous_min)
Expand All @@ -844,10 +869,16 @@ def collect(
min_scale,
collection_aggregation_temporality,
)
current_scale = min_scale

current_positive = self._previous_positive
current_negative = self._previous_negative

else:
start_time_unix_nano = self._previous_start_time_unix_nano
sum_ = current_sum - self._previous_sum
zero_count = current_zero_count
count = current_count
max_ = current_max
min_ = current_min

Expand All @@ -870,17 +901,17 @@ def collect(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=current_count,
count=count,
sum=sum_,
scale=current_scale,
zero_count=current_zero_count,
zero_count=zero_count,
positive=BucketsPoint(
offset=current_positive.offset,
bucket_counts=current_positive.counts,
bucket_counts=current_positive.get_offset_counts(),
),
negative=BucketsPoint(
offset=current_negative.offset,
bucket_counts=current_negative.counts,
bucket_counts=current_negative.get_offset_counts(),
),
# FIXME: Find the right value for flags
flags=0,
Expand All @@ -892,19 +923,27 @@ def collect(
self._previous_positive = current_positive
self._previous_negative = current_negative
self._previous_start_time_unix_nano = current_start_time_unix_nano
self._previous_sum = current_sum
self._previous_sum = sum_
self._previous_count = count
self._previous_max = max_
self._previous_min = min_
self._previous_zero_count = zero_count

return current_point

def _get_low_high_previous_current(
self, previous_point_buckets, current_point_buckets, min_scale
self,
previous_point_buckets,
current_point_buckets,
current_scale,
min_scale,
):

(previous_point_low, previous_point_high) = self._get_low_high(
previous_point_buckets, min_scale
previous_point_buckets, self._previous_scale, min_scale
)
(current_point_low, current_point_high) = self._get_low_high(
current_point_buckets, min_scale
current_point_buckets, current_scale, min_scale
)
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

if current_point_low > current_point_high:
Expand All @@ -921,11 +960,12 @@ def _get_low_high_previous_current(

return low, high

def _get_low_high(self, buckets, min_scale):
@staticmethod
def _get_low_high(buckets, scale, min_scale):
if buckets.counts == [0]:
return 0, -1

shift = self._mapping._scale - min_scale
shift = scale - min_scale

return buckets.index_start >> shift, buckets.index_end >> shift

Expand All @@ -941,30 +981,22 @@ def _get_scale_change(self, low, high):

return change

def _downscale(self, change: int, positive, negative):
@staticmethod
def _downscale(change: int, positive, negative):

if change == 0:
return

if change < 0:
raise Exception("Invalid change of scale")

new_scale = self._mapping.scale - change

positive.downscale(change)
negative.downscale(change)

if new_scale <= 0:
mapping = ExponentMapping(new_scale)
else:
mapping = LogarithmMapping(new_scale)

self._mapping = mapping
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

def _merge(
self,
previous_buckets,
current_buckets,
previous_buckets: Buckets,
current_buckets: Buckets,
current_scale,
min_scale,
aggregation_temporality,
Expand All @@ -983,9 +1015,11 @@ def _merge(
# would not happen because self._previous_point is only assigned to
# an ExponentialHistogramDataPoint object if self._count != 0.

index = (
current_buckets.offset + current_bucket_index
) >> current_change
current_index = current_buckets.index_base + current_bucket_index
if current_index > current_buckets.index_end:
current_index -= len(current_buckets.counts)
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

index = current_index >> current_change

if index < previous_buckets.index_start:
span = previous_buckets.index_end - index
Expand All @@ -999,7 +1033,7 @@ def _merge(
previous_buckets.index_start = index

if index > previous_buckets.index_end:
span = index - previous_buckets.index_end
span = index - previous_buckets.index_start
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

if span >= self._max_size:
raise Exception("Incorrect merge scale")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def index_base(self, value: int) -> None:
def counts(self):
return self._counts

def get_offset_counts(self):
bias = self.__index_base - self.__index_start
return self._counts[-bias:] + self._counts[:-bias]
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

def grow(self, needed: int, max_size: int) -> None:

size = len(self._counts)
Expand Down Expand Up @@ -129,7 +133,6 @@ def downscale(self, amount: int) -> None:
bias = self.__index_base - self.__index_start

if bias != 0:

self.__index_base = self.__index_start

# [0, 1, 2, 3, 4] Original backing array
Expand Down
Loading
Loading