diff --git a/CHANGELOG.md b/CHANGELOG.md index 44938228ca3..439a9eb9640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3956](https://github.com/open-telemetry/opentelemetry-python/pull/3956)) - When encountering an error encoding metric attributes in the OTLP exporter, log the key that had an error. ([#3838](https://github.com/open-telemetry/opentelemetry-python/pull/3838)) +- Fix `ExponentialHistogramAggregation` + ([#3978](https://github.com/open-telemetry/opentelemetry-python/pull/3978)) - Log a warning when a `LogRecord` in `sdk/log` has dropped attributes due to reaching limits ([#3946](https://github.com/open-telemetry/opentelemetry-python/pull/3946)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py index 3a09cdcfea1..62ac967bbec 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py @@ -126,17 +126,17 @@ def __init__( ) self._instrument_is_monotonic = instrument_is_monotonic - self._current_value = None + self._value = None self._previous_collection_start_nano = self._start_time_unix_nano - self._previous_cumulative_value = 0 + self._previous_value = 0 def aggregate(self, measurement: Measurement) -> None: with self._lock: - if self._current_value is None: - self._current_value = 0 + if self._value is None: + self._value = 0 - self._current_value = self._current_value + measurement.value + self._value = self._value + measurement.value def collect( self, @@ -204,15 +204,15 @@ def collect( When the collection aggregation temporality does not match the instrument aggregation temporality, then a conversion is made. For this purpose, this aggregation keeps a private attribute, - self._previous_cumulative. + self._previous_value. When the instrument is synchronous: - self._previous_cumulative_value is the sum of every previously + self._previous_value is the sum of every previously collected (delta) value. In this case, the returned (cumulative) value will be: - self._previous_cumulative_value + current_value + self._previous_value + value synchronous_instrument.add(2) collect(CUMULATIVE) -> 2 @@ -225,10 +225,10 @@ def collect( time -> - self._previous_cumulative_value + self._previous_value |-------------| - current_value (delta) + value (delta) |----| returned value (cumulative) @@ -236,11 +236,11 @@ def collect( When the instrument is asynchronous: - self._previous_cumulative_value is the value of the previously + self._previous_value is the value of the previously collected (cumulative) value. In this case, the returned (delta) value will be: - current_value - self._previous_cumulative_value + value - self._previous_value callback() -> 1352 collect(DELTA) -> 1352 @@ -253,10 +253,10 @@ def collect( time -> - self._previous_cumulative_value + self._previous_value |-------------| - current_value (cumulative) + value (cumulative) |------------------| returned value (delta) @@ -264,8 +264,8 @@ def collect( """ with self._lock: - current_value = self._current_value - self._current_value = None + value = self._value + self._value = None if ( self._instrument_aggregation_temporality @@ -285,34 +285,32 @@ def collect( collection_start_nano ) - if current_value is None: + if value is None: return None return NumberDataPoint( attributes=self._attributes, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, - value=current_value, + value=value, ) - if current_value is None: - current_value = 0 + if value is None: + value = 0 - self._previous_cumulative_value = ( - current_value + self._previous_cumulative_value - ) + self._previous_value = value + self._previous_value return NumberDataPoint( attributes=self._attributes, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, - value=self._previous_cumulative_value, + value=self._previous_value, ) # This happens when the corresponding instrument for this # aggregation is asynchronous. - if current_value is None: + if value is None: # This happens when the corresponding instrument callback # does not produce measurements. return None @@ -321,9 +319,9 @@ def collect( collection_aggregation_temporality is AggregationTemporality.DELTA ): - result_value = current_value - self._previous_cumulative_value + result_value = value - self._previous_value - self._previous_cumulative_value = current_value + self._previous_value = value previous_collection_start_nano = ( self._previous_collection_start_nano @@ -341,7 +339,7 @@ def collect( attributes=self._attributes, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, - value=current_value, + value=value, ) @@ -403,42 +401,43 @@ def __init__( ): super().__init__(attributes) + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano self._boundaries = tuple(boundaries) self._record_min_max = record_min_max + + self._value = None self._min = inf self._max = -inf self._sum = 0 - self._start_time_unix_nano = start_time_unix_nano - self._instrument_aggregation_temporality = ( - instrument_aggregation_temporality - ) - - self._current_value = None - - self._previous_collection_start_nano = self._start_time_unix_nano - self._previous_cumulative_value = self._get_empty_bucket_counts() + self._previous_value = None self._previous_min = inf self._previous_max = -inf self._previous_sum = 0 + self._previous_collection_start_nano = self._start_time_unix_nano + def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) def aggregate(self, measurement: Measurement) -> None: + with self._lock: - if self._current_value is None: - self._current_value = self._get_empty_bucket_counts() + if self._value is None: + self._value = self._get_empty_bucket_counts() - value = measurement.value + measurement_value = measurement.value - self._sum += value + self._sum += measurement_value if self._record_min_max: - self._min = min(self._min, value) - self._max = max(self._max, value) + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) - self._current_value[bisect_left(self._boundaries, value)] += 1 + self._value[bisect_left(self._boundaries, measurement_value)] += 1 def collect( self, @@ -450,12 +449,12 @@ def collect( """ with self._lock: - current_value = self._current_value + value = self._value sum_ = self._sum min_ = self._min max_ = self._max - self._current_value = None + self._value = None self._sum = 0 self._min = inf self._max = -inf @@ -478,30 +477,33 @@ def collect( collection_start_nano ) - if current_value is None: + if value is None: return None return HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=previous_collection_start_nano, time_unix_nano=collection_start_nano, - count=sum(current_value), + count=sum(value), sum=sum_, - bucket_counts=tuple(current_value), + bucket_counts=tuple(value), explicit_bounds=self._boundaries, min=min_, max=max_, ) - if current_value is None: - current_value = self._get_empty_bucket_counts() + if value is None: + value = self._get_empty_bucket_counts() + + if self._previous_value is None: + self._previous_value = self._get_empty_bucket_counts() - self._previous_cumulative_value = [ - current_value_element + previous_cumulative_value_element + self._previous_value = [ + value_element + previous_value_element for ( - current_value_element, - previous_cumulative_value_element, - ) in zip(current_value, self._previous_cumulative_value) + value_element, + previous_value_element, + ) in zip(value, self._previous_value) ] self._previous_min = min(min_, self._previous_min) self._previous_max = max(max_, self._previous_max) @@ -511,9 +513,9 @@ def collect( attributes=self._attributes, start_time_unix_nano=self._start_time_unix_nano, time_unix_nano=collection_start_nano, - count=sum(self._previous_cumulative_value), + count=sum(self._previous_value), sum=self._previous_sum, - bucket_counts=tuple(self._previous_cumulative_value), + bucket_counts=tuple(self._previous_value), explicit_bounds=self._boundaries, min=self._previous_min, max=self._previous_max, @@ -522,12 +524,6 @@ def collect( return None -def _new_exponential_mapping(scale: int) -> Mapping: - if scale <= 0: - return ExponentMapping(scale) - return LogarithmMapping(scale) - - # pylint: disable=protected-access class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): # _min_max_size and _max_max_size are the smallest and largest values @@ -544,6 +540,7 @@ class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, + instrument_aggregation_temporality: AggregationTemporality, start_time_unix_nano: int, # This is the default maximum number of buckets per positive or # negative number range. The value 160 is specified by OpenTelemetry. @@ -552,9 +549,16 @@ def __init__( max_size: int = 160, max_scale: int = 20, ): - super().__init__(attributes) # max_size is the maximum capacity of the positive and negative # buckets. + # _sum is the sum of all the values aggregated by this aggregator. + # _count is the count of all calls to aggregate. + # _zero_count is the count of all the calls to aggregate when the value + # to be aggregated is exactly 0. + # _min is the smallest value aggregated by this aggregator. + # _max is the smallest value aggregated by this aggregator. + # _positive holds the positive values. + # _negative holds the negative values by their absolute value. if max_size < self._min_max_size: raise ValueError( f"Buckets max size {max_size} is smaller than " @@ -566,167 +570,159 @@ def __init__( f"Buckets max size {max_size} is larger than " "maximum max size {self._max_max_size}" ) + if max_scale > 20: + _logger.warning( + "max_scale is set to %s which is " + "larger than the recommended value of 20", + max_scale, + ) + + # This aggregation is analogous to _ExplicitBucketHistogramAggregation, + # the only difference is that with every call to aggregate, the size + # and amount of buckets can change (in + # _ExplicitBucketHistogramAggregation both size and amount of buckets + # remain constant once it is instantiated). + + super().__init__(attributes) + self._instrument_aggregation_temporality = ( + instrument_aggregation_temporality + ) + self._start_time_unix_nano = start_time_unix_nano self._max_size = max_size self._max_scale = max_scale - # _sum is the sum of all the values aggregated by this aggregator. + self._value_positive = None + self._value_negative = None + self._min = inf + self._max = -inf self._sum = 0 - - # _count is the count of all calls to aggregate. self._count = 0 - - # _zero_count is the count of all the calls to aggregate when the value - # to be aggregated is exactly 0. self._zero_count = 0 + self._scale = None - # _min is the smallest value aggregated by this aggregator. - self._min = inf - - # _max is the smallest value aggregated by this aggregator. - self._max = -inf - - # _positive holds the positive values. - self._positive = Buckets() - - # _negative holds the negative values by their absolute value. - self._negative = Buckets() - - # _mapping corresponds to the current scale, is shared by both the - # positive and negative buckets. - - if self._max_scale > 20: - _logger.warning( - "max_scale is set to %s which is " - "larger than the recommended value of 20", - self._max_scale, - ) - self._mapping = _new_exponential_mapping(self._max_scale) + self._previous_value_positive = None + self._previous_value_negative = None + self._previous_min = inf + self._previous_max = -inf + self._previous_sum = 0 + self._previous_count = 0 + self._previous_zero_count = 0 + self._previous_scale = None - self._instrument_aggregation_temporality = AggregationTemporality.DELTA - self._start_time_unix_nano = start_time_unix_nano + self._previous_collection_start_nano = self._start_time_unix_nano - self._previous_scale = None - self._previous_start_time_unix_nano = None - self._previous_zero_count = None - self._previous_count = None - self._previous_sum = None - self._previous_max = None - self._previous_min = None - self._previous_positive = None - self._previous_negative = None + self._mapping = self._new_mapping(self._max_scale) def aggregate(self, measurement: Measurement) -> None: # pylint: disable=too-many-branches,too-many-statements, too-many-locals with self._lock: + if self._value_positive is None: + self._value_positive = Buckets() + if self._value_negative is None: + self._value_negative = Buckets() - value = measurement.value + measurement_value = measurement.value - # 0. Set the following attributes: - # _min - # _max - # _count - # _zero_count - # _sum - if value < self._min: - self._min = value + self._sum += measurement_value - if value > self._max: - self._max = value + self._min = min(self._min, measurement_value) + self._max = max(self._max, measurement_value) self._count += 1 - if value == 0: + if measurement_value == 0: self._zero_count += 1 - # No need to do anything else if value is zero, just increment the - # zero count. - return - self._sum += value + if self._count == self._zero_count: + self._scale = 0 - # 1. Use the positive buckets for positive values and the negative - # buckets for negative values. - if value > 0: - buckets = self._positive + return - else: - # Both exponential and logarithm mappings use only positive values - # so the absolute value is used here. - value = -value - buckets = self._negative + if measurement_value > 0: + value = self._value_positive - # 2. Compute the index for the value at the current scale. - index = self._mapping.map_to_index(value) + else: + measurement_value = -measurement_value + value = self._value_negative - # IncrementIndexBy starts here + # The following code finds out if it is necessary to change the + # buckets to hold the incoming measurement_value, changes them if + # necessary. This process does not exist in + # _ExplicitBucketHistogram aggregation because the buckets there + # are constant in size and amount. + index = self._mapping.map_to_index(measurement_value) - # 3. Determine if a change of scale is needed. is_rescaling_needed = False low, high = 0, 0 - if len(buckets) == 0: - buckets.index_start = index - buckets.index_end = index - buckets.index_base = index + if len(value) == 0: + value.index_start = index + value.index_end = index + value.index_base = index elif ( - index < buckets.index_start - and (buckets.index_end - index) >= self._max_size + index < value.index_start + and (value.index_end - index) >= self._max_size ): is_rescaling_needed = True low = index - high = buckets.index_end + high = value.index_end elif ( - index > buckets.index_end - and (index - buckets.index_start) >= self._max_size + index > value.index_end + and (index - value.index_start) >= self._max_size ): is_rescaling_needed = True - low = buckets.index_start + low = value.index_start high = index - # 4. Rescale the mapping if needed. if is_rescaling_needed: scale_change = self._get_scale_change(low, high) self._downscale( scale_change, - self._positive, - self._negative, + self._value_positive, + self._value_negative, + ) + self._mapping = self._new_mapping( + self._mapping.scale - scale_change ) - new_scale = self._mapping.scale - scale_change - self._mapping = _new_exponential_mapping(new_scale) - index = self._mapping.map_to_index(value) + index = self._mapping.map_to_index(measurement_value) + + self._scale = self._mapping.scale - # 5. If the index is outside - # [buckets.index_start, buckets.index_end] readjust the buckets - # boundaries or add more buckets. - if index < buckets.index_start: - span = buckets.index_end - index + if index < value.index_start: + span = value.index_end - index - if span >= len(buckets.counts): - buckets.grow(span + 1, self._max_size) + if span >= len(value.counts): + value.grow(span + 1, self._max_size) - buckets.index_start = index + value.index_start = index - elif index > buckets.index_end: - span = index - buckets.index_start + elif index > value.index_end: + span = index - value.index_start - if span >= len(buckets.counts): - buckets.grow(span + 1, self._max_size) + if span >= len(value.counts): + value.grow(span + 1, self._max_size) - buckets.index_end = index + value.index_end = index - # 6. Compute the index of the bucket to be incremented. - bucket_index = index - buckets.index_base + bucket_index = index - value.index_base if bucket_index < 0: - bucket_index += len(buckets.counts) + bucket_index += len(value.counts) - # 7. Increment the bucket. - buckets.increment_bucket(bucket_index) + # Now the buckets have been changed if needed and bucket_index will + # be used to increment the counter of the bucket that needs to be + # incremented. + + # This is analogous to + # self._value[bisect_left(self._boundaries, measurement_value)] += 1 + # in _ExplicitBucketHistogramAggregation.aggregate + value.increment_bucket(bucket_index) def collect( self, @@ -736,200 +732,238 @@ def collect( """ Atomically return a point for the current value of the metric. """ - # pylint: disable=too-many-statements, too-many-locals + # pylint: disable=too-many-statements, too-many-locals with self._lock: - if self._count == 0: - return None - - current_negative = self._negative - current_positive = self._positive - current_zero_count = self._zero_count - current_count = self._count - current_start_time_unix_nano = self._start_time_unix_nano - current_sum = self._sum - current_max = self._max - if current_max == -inf: - current_max = None - current_min = self._min - if current_min == inf: - current_min = None - - if self._count == self._zero_count: - current_scale = 0 - - else: - current_scale = self._mapping.scale + value_positive = self._value_positive + value_negative = self._value_negative + sum_ = self._sum + min_ = self._min + max_ = self._max + count = self._count + zero_count = self._zero_count + scale = self._scale - self._negative = Buckets() - self._positive = Buckets() - self._start_time_unix_nano = collection_start_nano + self._value_positive = None + self._value_negative = None self._sum = 0 - self._count = 0 - self._zero_count = 0 self._min = inf self._max = -inf + self._count = 0 + self._zero_count = 0 + self._scale = None - if self._previous_scale is None or ( + if ( self._instrument_aggregation_temporality - is collection_aggregation_temporality + is AggregationTemporality.DELTA ): - self._previous_scale = current_scale - self._previous_start_time_unix_nano = ( - current_start_time_unix_nano - ) - self._previous_max = current_max - self._previous_min = current_min - self._previous_sum = current_sum - self._previous_count = current_count - self._previous_zero_count = current_zero_count - self._previous_positive = current_positive - self._previous_negative = current_negative - - current_point = ExponentialHistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=current_start_time_unix_nano, - time_unix_nano=collection_start_nano, - count=current_count, - sum=current_sum, - scale=current_scale, - zero_count=current_zero_count, - positive=BucketsPoint( - offset=current_positive.offset, - bucket_counts=current_positive.get_offset_counts(), - ), - negative=BucketsPoint( - offset=current_negative.offset, - bucket_counts=current_negative.get_offset_counts(), - ), - # FIXME: Find the right value for flags - flags=0, - min=current_min, - max=current_max, - ) - - return current_point + # This happens when the corresponding instrument for this + # aggregation is synchronous. + if ( + collection_aggregation_temporality + is AggregationTemporality.DELTA + ): - min_scale = min(self._previous_scale, current_scale) + previous_collection_start_nano = ( + self._previous_collection_start_nano + ) + self._previous_collection_start_nano = ( + collection_start_nano + ) - low_positive, high_positive = self._get_low_high_previous_current( - self._previous_positive, - current_positive, - current_scale, - min_scale, - ) - low_negative, high_negative = self._get_low_high_previous_current( - self._previous_negative, - current_negative, - current_scale, - min_scale, - ) + if value_positive is None and value_negative is None: + return None - min_scale = min( - min_scale - - self._get_scale_change(low_positive, high_positive), - min_scale - - self._get_scale_change(low_negative, high_negative), - ) + return ExponentialHistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=previous_collection_start_nano, + time_unix_nano=collection_start_nano, + count=count, + sum=sum_, + scale=scale, + zero_count=zero_count, + positive=BucketsPoint( + offset=value_positive.offset, + bucket_counts=(value_positive.get_offset_counts()), + ), + negative=BucketsPoint( + offset=value_negative.offset, + bucket_counts=(value_negative.get_offset_counts()), + ), + # FIXME: Find the right value for flags + flags=0, + min=min_, + max=max_, + ) - # FIXME Go implementation checks if the histogram (not the mapping - # but the histogram) has a count larger than zero, if not, scale - # (the histogram scale) would be zero. See exponential.go 191 - self._downscale( - self._previous_scale - min_scale, - self._previous_positive, - self._previous_negative, - ) - self._previous_scale = min_scale + # Here collection_temporality is CUMULATIVE. + # instrument_temporality is always DELTA for the time being. + # Here we need to handle the case where: + # collect is called after at least one other call to collect + # (there is data in previous buckets, a call to merge is needed + # to handle possible differences in bucket sizes). + # collect is called without another call previous call to + # collect was made (there is no previous buckets, previous, + # empty buckets that are the same scale of the current buckets + # need to be made so that they can be cumulatively aggregated + # to the current buckets). - if ( - collection_aggregation_temporality - is AggregationTemporality.CUMULATIVE - ): + if ( + value_positive is None + and self._previous_value_positive is None + ): + # This happens if collect is called for the first time + # and aggregate has not yet been called. + value_positive = Buckets() + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is None + ): + value_negative = Buckets() + self._previous_value_negative = value_negative.copy_empty() + if scale is None and self._previous_scale is None: + scale = self._mapping.scale + self._previous_scale = scale - start_time_unix_nano = self._previous_start_time_unix_nano - sum_ = current_sum + self._previous_sum - zero_count = current_zero_count + self._previous_zero_count - count = current_count + self._previous_count - # Only update min/max on delta -> cumulative - max_ = max(current_max, self._previous_max) - min_ = min(current_min, self._previous_min) + if ( + value_positive is not None + and self._previous_value_positive is None + ): + # This happens when collect is called the very first time + # and aggregate has been called before. + + # We need previous buckets to add them to the current ones. + # When collect is called for the first time, there are no + # previous buckets, so we need to create empty buckets to + # add them to the current ones. The addition of empty + # buckets to the current ones will result in the current + # ones unchanged. + + # The way the previous buckets are generated here is + # different from the explicit bucket histogram where + # the size and amount of the buckets does not change once + # they are instantiated. Here, the size and amount of the + # buckets can change with every call to aggregate. In order + # to get empty buckets that can be added to the current + # ones resulting in the current ones unchanged we need to + # generate empty buckets that have the same size and amount + # as the current ones, this is what copy_empty does. + self._previous_value_positive = value_positive.copy_empty() + if ( + value_negative is not None + and self._previous_value_negative is None + ): + self._previous_value_negative = value_negative.copy_empty() + if scale is not None and self._previous_scale is None: + self._previous_scale = scale - self._merge( - self._previous_positive, - current_positive, - current_scale, - min_scale, - collection_aggregation_temporality, + if ( + value_positive is None + and self._previous_value_positive is not None + ): + value_positive = self._previous_value_positive.copy_empty() + if ( + value_negative is None + and self._previous_value_negative is not None + ): + value_negative = self._previous_value_negative.copy_empty() + if scale is None and self._previous_scale is not None: + scale = self._previous_scale + + min_scale = min(self._previous_scale, scale) + + low_positive, high_positive = ( + self._get_low_high_previous_current( + self._previous_value_positive, + value_positive, + scale, + min_scale, + ) ) - self._merge( - self._previous_negative, - current_negative, - current_scale, - min_scale, - collection_aggregation_temporality, + low_negative, high_negative = ( + self._get_low_high_previous_current( + self._previous_value_negative, + value_negative, + scale, + min_scale, + ) ) - current_scale = min_scale - current_positive = self._previous_positive - current_negative = self._previous_negative + min_scale = min( + min_scale + - self._get_scale_change(low_positive, high_positive), + min_scale + - self._get_scale_change(low_negative, high_negative), + ) - else: - start_time_unix_nano = self._previous_start_time_unix_nano - sum_ = current_sum - self._previous_sum - zero_count = current_zero_count - count = current_count - max_ = current_max - min_ = current_min + self._downscale( + self._previous_scale - min_scale, + self._previous_value_positive, + self._previous_value_negative, + ) + # self._merge adds the values from value to + # self._previous_value, this is analogous to + # self._previous_value = [ + # value_element + previous_value_element + # for ( + # value_element, + # previous_value_element, + # ) in zip(value, self._previous_value) + # ] + # in _ExplicitBucketHistogramAggregation.collect. self._merge( - self._previous_positive, - current_positive, - current_scale, + self._previous_value_positive, + value_positive, + scale, min_scale, collection_aggregation_temporality, ) self._merge( - self._previous_negative, - current_negative, - current_scale, + self._previous_value_negative, + value_negative, + scale, min_scale, collection_aggregation_temporality, ) - current_point = ExponentialHistogramDataPoint( - attributes=self._attributes, - start_time_unix_nano=start_time_unix_nano, - time_unix_nano=collection_start_nano, - count=count, - sum=sum_, - scale=current_scale, - zero_count=zero_count, - positive=BucketsPoint( - offset=current_positive.offset, - bucket_counts=current_positive.get_offset_counts(), - ), - negative=BucketsPoint( - offset=current_negative.offset, - bucket_counts=current_negative.get_offset_counts(), - ), - # FIXME: Find the right value for flags - flags=0, - min=min_, - max=max_, - ) + self._previous_min = min(min_, self._previous_min) + self._previous_max = max(max_, self._previous_max) + self._previous_sum = sum_ + self._previous_sum + self._previous_count = count + self._previous_count + self._previous_zero_count = ( + zero_count + self._previous_zero_count + ) + self._previous_scale = min_scale - self._previous_scale = current_scale - self._previous_positive = current_positive - self._previous_negative = current_negative - self._previous_start_time_unix_nano = current_start_time_unix_nano - self._previous_sum = sum_ - self._previous_count = count - self._previous_max = max_ - self._previous_min = min_ - self._previous_zero_count = zero_count + return ExponentialHistogramDataPoint( + attributes=self._attributes, + start_time_unix_nano=self._start_time_unix_nano, + time_unix_nano=collection_start_nano, + count=self._previous_count, + sum=self._previous_sum, + scale=self._previous_scale, + zero_count=self._previous_zero_count, + positive=BucketsPoint( + offset=self._previous_value_positive.offset, + bucket_counts=( + self._previous_value_positive.get_offset_counts() + ), + ), + negative=BucketsPoint( + offset=self._previous_value_negative.offset, + bucket_counts=( + self._previous_value_negative.get_offset_counts() + ), + ), + # FIXME: Find the right value for flags + flags=0, + min=self._previous_min, + max=self._previous_max, + ) - return current_point + return None def _get_low_high_previous_current( self, @@ -969,6 +1003,12 @@ def _get_low_high(buckets, scale, min_scale): return buckets.index_start >> shift, buckets.index_end >> shift + @staticmethod + def _new_mapping(scale: int) -> Mapping: + if scale <= 0: + return ExponentMapping(scale) + return LogarithmMapping(scale) + def _get_scale_change(self, low, high): change = 0 @@ -1174,8 +1214,18 @@ def _create_aggregation( attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: + + instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED + if isinstance(instrument, Synchronous): + instrument_aggregation_temporality = AggregationTemporality.DELTA + elif isinstance(instrument, Asynchronous): + instrument_aggregation_temporality = ( + AggregationTemporality.CUMULATIVE + ) + return _ExponentialBucketHistogramAggregation( attributes, + instrument_aggregation_temporality, start_time_unix_nano, max_size=self._max_size, max_scale=self._max_scale, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py index 4dbe8f385e8..8877985c234 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exponential_histogram/buckets.py @@ -177,3 +177,17 @@ def downscale(self, amount: int) -> None: def increment_bucket(self, bucket_index: int, increment: int = 1) -> None: self._counts[bucket_index] += increment + + def copy_empty(self) -> "Buckets": + copy = Buckets() + + # pylint: disable=no-member + # pylint: disable=protected-access + # pylint: disable=attribute-defined-outside-init + # pylint: disable=invalid-name + copy._Buckets__index_base = self._Buckets__index_base + copy._Buckets__index_start = self._Buckets__index_start + copy._Buckets__index_end = self._Buckets__index_end + copy._counts = [0 for _ in self._counts] + + return copy diff --git a/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py index e243e09643d..85c28070c15 100644 --- a/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/exponential_histogram/test_exponential_bucket_histogram_aggregation.py @@ -15,11 +15,12 @@ # pylint: disable=protected-access,too-many-lines,invalid-name # pylint: disable=consider-using-enumerate,no-self-use,too-many-public-methods -import random as insecure_random +from inspect import currentframe from itertools import permutations from logging import WARNING from math import ldexp -from sys import float_info +from random import Random, randrange +from sys import float_info, maxsize from types import MethodType from unittest.mock import Mock, patch @@ -73,8 +74,8 @@ def swap( ): for attribute in [ - "_positive", - "_negative", + "_value_positive", + "_value_negative", "_sum", "_count", "_zero_count", @@ -137,14 +138,18 @@ def require_equal(self, a, b): self.assertEqual(a._mapping.scale, b._mapping.scale) - self.assertEqual(len(a._positive), len(b._positive)) - self.assertEqual(len(a._negative), len(b._negative)) + self.assertEqual(len(a._value_positive), len(b._value_positive)) + self.assertEqual(len(a._value_negative), len(b._value_negative)) - for index in range(len(a._positive)): - self.assertEqual(a._positive[index], b._positive[index]) + for index in range(len(a._value_positive)): + self.assertEqual( + a._value_positive[index], b._value_positive[index] + ) - for index in range(len(a._negative)): - self.assertEqual(a._negative[index], b._negative[index]) + for index in range(len(a._value_negative)): + self.assertEqual( + a._value_negative[index], b._value_negative[index] + ) def test_alternating_growth_0(self): """ @@ -161,7 +166,9 @@ def test_alternating_growth_0(self): # agg is an instance of github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram/structure.Histogram[float64] exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=4 + ) ) exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) @@ -169,11 +176,12 @@ def test_alternating_growth_0(self): exponential_histogram_aggregation.aggregate(Measurement(1, Mock())) self.assertEqual( - exponential_histogram_aggregation._positive.offset, -1 + exponential_histogram_aggregation._value_positive.offset, -1 ) self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) self.assertEqual( - get_counts(exponential_histogram_aggregation._positive), [1, 1, 1] + get_counts(exponential_histogram_aggregation._value_positive), + [1, 1, 1], ) def test_alternating_growth_1(self): @@ -185,7 +193,9 @@ def test_alternating_growth_1(self): """ exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=4 + ) ) exponential_histogram_aggregation.aggregate(Measurement(2, Mock())) @@ -196,11 +206,12 @@ def test_alternating_growth_1(self): exponential_histogram_aggregation.aggregate(Measurement(0.5, Mock())) self.assertEqual( - exponential_histogram_aggregation._positive.offset, -1 + exponential_histogram_aggregation._value_positive.offset, -1 ) self.assertEqual(exponential_histogram_aggregation._mapping.scale, -1) self.assertEqual( - get_counts(exponential_histogram_aggregation._positive), [2, 3, 1] + get_counts(exponential_histogram_aggregation._value_positive), + [2, 3, 1], ) def test_permutations(self): @@ -246,7 +257,10 @@ def test_permutations(self): exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( - Mock(), Mock(), max_size=2 + Mock(), + AggregationTemporality.DELTA, + Mock(), + max_size=2, ) ) @@ -261,19 +275,19 @@ def test_permutations(self): expected["scale"], ) self.assertEqual( - exponential_histogram_aggregation._positive.offset, + exponential_histogram_aggregation._value_positive.offset, expected["offset"], ) self.assertEqual( - len(exponential_histogram_aggregation._positive), + len(exponential_histogram_aggregation._value_positive), expected["len"], ) self.assertEqual( - exponential_histogram_aggregation._positive[0], + exponential_histogram_aggregation._value_positive[0], expected["at_0"], ) self.assertEqual( - exponential_histogram_aggregation._positive[1], + exponential_histogram_aggregation._value_positive[1], expected["at_1"], ) @@ -292,7 +306,10 @@ def ascending_sequence_test( exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( - Mock(), Mock(), max_size=max_size + Mock(), + AggregationTemporality.DELTA, + Mock(), + max_size=max_size, ) ) @@ -317,7 +334,8 @@ def ascending_sequence_test( init_scale, exponential_histogram_aggregation._mapping._scale ) self.assertEqual( - offset, exponential_histogram_aggregation._positive.offset + offset, + exponential_histogram_aggregation._value_positive.offset, ) exponential_histogram_aggregation.aggregate( @@ -326,7 +344,7 @@ def ascending_sequence_test( sum_ += max_val self.assertNotEqual( - 0, exponential_histogram_aggregation._positive[0] + 0, exponential_histogram_aggregation._value_positive[0] ) # The maximum-index filled bucket is at or @@ -337,12 +355,15 @@ def ascending_sequence_test( total_count = 0 for index in range( - len(exponential_histogram_aggregation._positive) + len(exponential_histogram_aggregation._value_positive) ): - total_count += exponential_histogram_aggregation._positive[ - index - ] - if exponential_histogram_aggregation._positive[index] != 0: + total_count += ( + exponential_histogram_aggregation._value_positive[index] + ) + if ( + exponential_histogram_aggregation._value_positive[index] + != 0 + ): max_fill = index # FIXME the corresponding Go code is @@ -369,15 +390,15 @@ def ascending_sequence_test( index = mapping.map_to_index(min_val) self.assertEqual( - index, exponential_histogram_aggregation._positive.offset + index, exponential_histogram_aggregation._value_positive.offset ) index = mapping.map_to_index(max_val) self.assertEqual( index, - exponential_histogram_aggregation._positive.offset - + len(exponential_histogram_aggregation._positive) + exponential_histogram_aggregation._value_positive.offset + + len(exponential_histogram_aggregation._value_positive) - 1, ) @@ -394,7 +415,7 @@ def mock_increment(self, bucket_index: int) -> None: exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( - Mock(), Mock(), max_size=256 + Mock(), AggregationTemporality.DELTA, Mock(), max_size=256 ) ) @@ -405,15 +426,16 @@ def mock_increment(self, bucket_index: int) -> None: self.assertEqual(0, exponential_histogram_aggregation._sum) expect = 0 + exponential_histogram_aggregation._value_positive = Buckets() + for value in range(2, 257): expect += value * increment with patch.object( - exponential_histogram_aggregation._positive, + exponential_histogram_aggregation._value_positive, "increment_bucket", - # new=positive_mock MethodType( mock_increment, - exponential_histogram_aggregation._positive, + exponential_histogram_aggregation._value_positive, ), ): exponential_histogram_aggregation.aggregate( @@ -434,16 +456,16 @@ def mock_increment(self, bucket_index: int) -> None: self.assertEqual( 256 - ((1 << scale) - 1), - len(exponential_histogram_aggregation._positive), + len(exponential_histogram_aggregation._value_positive), ) self.assertEqual( (1 << scale) - 1, - exponential_histogram_aggregation._positive.offset, + exponential_histogram_aggregation._value_positive.offset, ) for index in range(0, 256): self.assertLessEqual( - exponential_histogram_aggregation._positive[index], + exponential_histogram_aggregation._value_positive[index], 6 * increment, ) @@ -451,12 +473,12 @@ def test_move_into(self): exponential_histogram_aggregation_0 = ( _ExponentialBucketHistogramAggregation( - Mock(), Mock(), max_size=256 + Mock(), AggregationTemporality.DELTA, Mock(), max_size=256 ) ) exponential_histogram_aggregation_1 = ( _ExponentialBucketHistogramAggregation( - Mock(), Mock(), max_size=256 + Mock(), AggregationTemporality.DELTA, Mock(), max_size=256 ) ) @@ -489,36 +511,38 @@ def test_move_into(self): self.assertEqual( 256 - ((1 << scale) - 1), - len(exponential_histogram_aggregation_1._positive), + len(exponential_histogram_aggregation_1._value_positive), ) self.assertEqual( (1 << scale) - 1, - exponential_histogram_aggregation_1._positive.offset, + exponential_histogram_aggregation_1._value_positive.offset, ) for index in range(0, 256): self.assertLessEqual( - exponential_histogram_aggregation_1._positive[index], 6 + exponential_histogram_aggregation_1._value_positive[index], 6 ) def test_very_large_numbers(self): exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=2) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=2 + ) ) def expect_balanced(count: int): self.assertEqual( - 2, len(exponential_histogram_aggregation._positive) + 2, len(exponential_histogram_aggregation._value_positive) ) self.assertEqual( - -1, exponential_histogram_aggregation._positive.offset + -1, exponential_histogram_aggregation._value_positive.offset ) self.assertEqual( - count, exponential_histogram_aggregation._positive[0] + count, exponential_histogram_aggregation._value_positive[0] ) self.assertEqual( - count, exponential_histogram_aggregation._positive[1] + count, exponential_histogram_aggregation._value_positive[1] ) exponential_histogram_aggregation.aggregate( @@ -580,7 +604,9 @@ def expect_balanced(count: int): def test_full_range(self): exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=2) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=2 + ) ) exponential_histogram_aggregation.aggregate( @@ -602,18 +628,24 @@ def test_full_range(self): self.assertEqual( _ExponentialBucketHistogramAggregation._min_max_size, - len(exponential_histogram_aggregation._positive), + len(exponential_histogram_aggregation._value_positive), ) self.assertEqual( - -1, exponential_histogram_aggregation._positive.offset + -1, exponential_histogram_aggregation._value_positive.offset + ) + self.assertLessEqual( + exponential_histogram_aggregation._value_positive[0], 2 + ) + self.assertLessEqual( + exponential_histogram_aggregation._value_positive[1], 1 ) - self.assertLessEqual(exponential_histogram_aggregation._positive[0], 2) - self.assertLessEqual(exponential_histogram_aggregation._positive[1], 1) def test_aggregator_min_max(self): exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) for value in [1, 3, 5, 7, 9]: @@ -625,7 +657,9 @@ def test_aggregator_min_max(self): self.assertEqual(9, exponential_histogram_aggregation._max) exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) for value in [-1, -3, -5, -7, -9]: @@ -639,21 +673,27 @@ def test_aggregator_min_max(self): def test_aggregator_copy_swap(self): exponential_histogram_aggregation_0 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) for value in [1, 3, 5, 7, 9, -1, -3, -5]: exponential_histogram_aggregation_0.aggregate( Measurement(value, Mock()) ) exponential_histogram_aggregation_1 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) for value in [5, 4, 3, 2]: exponential_histogram_aggregation_1.aggregate( Measurement(value, Mock()) ) exponential_histogram_aggregation_2 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) swap( @@ -662,8 +702,8 @@ def test_aggregator_copy_swap(self): ) # pylint: disable=unnecessary-dunder-call - exponential_histogram_aggregation_2._positive.__init__() - exponential_histogram_aggregation_2._negative.__init__() + exponential_histogram_aggregation_2._value_positive.__init__() + exponential_histogram_aggregation_2._value_negative.__init__() exponential_histogram_aggregation_2._sum = 0 exponential_histogram_aggregation_2._count = 0 exponential_histogram_aggregation_2._zero_count = 0 @@ -674,8 +714,8 @@ def test_aggregator_copy_swap(self): ) for attribute in [ - "_positive", - "_negative", + "_value_positive", + "_value_negative", "_sum", "_count", "_zero_count", @@ -697,7 +737,9 @@ def test_aggregator_copy_swap(self): def test_zero_count_by_increment(self): exponential_histogram_aggregation_0 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) increment = 10 @@ -707,10 +749,11 @@ def test_zero_count_by_increment(self): Measurement(0, Mock()) ) exponential_histogram_aggregation_1 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) - # positive_mock = Mock(wraps=exponential_histogram_aggregation_1._positive) def mock_increment(self, bucket_index: int) -> None: """ Increments a bucket @@ -718,12 +761,14 @@ def mock_increment(self, bucket_index: int) -> None: self._counts[bucket_index] += increment + exponential_histogram_aggregation_1._value_positive = Buckets() + with patch.object( - exponential_histogram_aggregation_1._positive, + exponential_histogram_aggregation_1._value_positive, "increment_bucket", - # new=positive_mock MethodType( - mock_increment, exponential_histogram_aggregation_1._positive + mock_increment, + exponential_histogram_aggregation_1._value_positive, ), ): exponential_histogram_aggregation_1.aggregate( @@ -740,7 +785,9 @@ def mock_increment(self, bucket_index: int) -> None: def test_one_count_by_increment(self): exponential_histogram_aggregation_0 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) increment = 10 @@ -750,10 +797,11 @@ def test_one_count_by_increment(self): Measurement(1, Mock()) ) exponential_histogram_aggregation_1 = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock()) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock() + ) ) - # positive_mock = Mock(wraps=exponential_histogram_aggregation_1._positive) def mock_increment(self, bucket_index: int) -> None: """ Increments a bucket @@ -761,12 +809,14 @@ def mock_increment(self, bucket_index: int) -> None: self._counts[bucket_index] += increment + exponential_histogram_aggregation_1._value_positive = Buckets() + with patch.object( - exponential_histogram_aggregation_1._positive, + exponential_histogram_aggregation_1._value_positive, "increment_bucket", - # new=positive_mock MethodType( - mock_increment, exponential_histogram_aggregation_1._positive + mock_increment, + exponential_histogram_aggregation_1._value_positive, ), ): exponential_histogram_aggregation_1.aggregate( @@ -820,6 +870,7 @@ def test_min_max_size(self): exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( Mock(), + AggregationTemporality.DELTA, Mock(), max_size=_ExponentialBucketHistogramAggregation._min_max_size, ) @@ -833,7 +884,7 @@ def test_min_max_size(self): # This means the smallest max_scale is enough for the full range of the # normal floating point values. self.assertEqual( - len(exponential_histogram_aggregation._positive._counts), + len(exponential_histogram_aggregation._value_positive._counts), exponential_histogram_aggregation._min_max_size, ) @@ -844,6 +895,7 @@ def test_aggregate_collect(self): exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( Mock(), + AggregationTemporality.DELTA, Mock(), ) ) @@ -865,6 +917,7 @@ def test_collect_results_cumulative(self) -> None: exponential_histogram_aggregation = ( _ExponentialBucketHistogramAggregation( Mock(), + AggregationTemporality.DELTA, Mock(), ) ) @@ -949,9 +1002,13 @@ def test_collect_results_cumulative(self) -> None: self.assertEqual(collection_1.max, 8) def test_cumulative_aggregation_with_random_data(self) -> None: - histogram = _ExponentialBucketHistogramAggregation(Mock(), Mock()) + histogram = _ExponentialBucketHistogramAggregation( + Mock(), + AggregationTemporality.DELTA, + Mock(), + ) - def collect_and_validate() -> None: + def collect_and_validate(values, histogram) -> None: result: ExponentialHistogramDataPoint = histogram.collect( AggregationTemporality.CUMULATIVE, 0 ) @@ -981,20 +1038,31 @@ def collect_and_validate() -> None: assert result.zero_count == len([v for v in values if v == 0]) assert scale >= 3 - random = insecure_random.Random("opentelemetry2") + seed = randrange(maxsize) + # This test case is executed with random values every time. In order to + # run this test case with the same values used in a previous execution, + # check the value printed by that previous execution of this test case + # and use the same value for the seed variable in the line below. + # seed = 4539544373807492135 + + random_generator = Random(seed) + print(f"seed for {currentframe().f_code.co_name} is {seed}") + values = [] for i in range(2000): - value = random.randint(0, 1000) + value = random_generator.randint(0, 1000) values.append(value) histogram.aggregate(Measurement(value, Mock())) if i % 20 == 0: - collect_and_validate() + collect_and_validate(values, histogram) - collect_and_validate() + collect_and_validate(values, histogram) def test_merge_collect_cumulative(self): exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=4 + ) ) for value in [2, 4, 8, 16]: @@ -1003,16 +1071,21 @@ def test_merge_collect_cumulative(self): ) self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) - self.assertEqual(exponential_histogram_aggregation._positive.offset, 0) self.assertEqual( - exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + exponential_histogram_aggregation._value_positive.offset, 0 + ) + self.assertEqual( + exponential_histogram_aggregation._value_positive.counts, + [1, 1, 1, 1], ) - result = exponential_histogram_aggregation.collect( + result_0 = exponential_histogram_aggregation.collect( AggregationTemporality.CUMULATIVE, 0, ) + self.assertEqual(result_0.scale, 0) + for value in [1, 2, 4, 8]: exponential_histogram_aggregation.aggregate( Measurement(1 / value, Mock()) @@ -1020,10 +1093,11 @@ def test_merge_collect_cumulative(self): self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) self.assertEqual( - exponential_histogram_aggregation._positive.offset, -4 + exponential_histogram_aggregation._value_positive.offset, -4 ) self.assertEqual( - exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + exponential_histogram_aggregation._value_positive.counts, + [1, 1, 1, 1], ) result_1 = exponential_histogram_aggregation.collect( @@ -1031,12 +1105,13 @@ def test_merge_collect_cumulative(self): 0, ) - self.assertEqual(result.scale, 0) self.assertEqual(result_1.scale, -1) def test_merge_collect_delta(self): exponential_histogram_aggregation = ( - _ExponentialBucketHistogramAggregation(Mock(), Mock(), max_size=4) + _ExponentialBucketHistogramAggregation( + Mock(), AggregationTemporality.DELTA, Mock(), max_size=4 + ) ) for value in [2, 4, 8, 16]: @@ -1045,9 +1120,12 @@ def test_merge_collect_delta(self): ) self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) - self.assertEqual(exponential_histogram_aggregation._positive.offset, 0) self.assertEqual( - exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + exponential_histogram_aggregation._value_positive.offset, 0 + ) + self.assertEqual( + exponential_histogram_aggregation._value_positive.counts, + [1, 1, 1, 1], ) result = exponential_histogram_aggregation.collect( @@ -1062,10 +1140,11 @@ def test_merge_collect_delta(self): self.assertEqual(exponential_histogram_aggregation._mapping.scale, 0) self.assertEqual( - exponential_histogram_aggregation._positive.offset, -4 + exponential_histogram_aggregation._value_positive.offset, -4 ) self.assertEqual( - exponential_histogram_aggregation._positive.counts, [1, 1, 1, 1] + exponential_histogram_aggregation._value_positive.counts, + [1, 1, 1, 1], ) result_1 = exponential_histogram_aggregation.collect( diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py b/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py new file mode 100644 index 00000000000..6574bf43357 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_exponential_bucket_histogram.py @@ -0,0 +1,357 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from platform import system +from time import sleep +from unittest import TestCase + +from pytest import mark + +from opentelemetry.sdk.metrics import Histogram, MeterProvider +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + InMemoryMetricReader, +) +from opentelemetry.sdk.metrics.view import ( + ExponentialBucketHistogramAggregation, +) + + +class TestExponentialBucketHistogramAggregation(TestCase): + + test_values = [2, 4, 1, 1, 8, 0.5, 0.1, 0.045] + + @mark.skipif( + system() == "Windows", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_delta_temporality(self): + """ + This test case instantiates an exponential histogram aggregation and + then uses it to record measurements and get metrics. The order in which + these actions are taken are relevant to the testing that happens here. + For this reason, the aggregation is only instantiated once, since the + reinstantiation of the aggregation would defeat the purpose of this + test case. + """ + + aggregation = ExponentialBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={Histogram: AggregationTemporality.DELTA}, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + # The test scenario here is calling collect without calling aggregate + # ever before. + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + # The test scenario here is calling aggregate then collect repeatedly. + results = [] + + for test_value in self.test_values: + histogram.record(test_value) + results.append(reader.get_metrics_data()) + + metric_data = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + previous_time_unix_nano = metric_data.time_unix_nano + + self.assertEqual(metric_data.positive.bucket_counts, [1]) + self.assertEqual(metric_data.negative.bucket_counts, [0]) + + self.assertLess( + metric_data.start_time_unix_nano, + previous_time_unix_nano, + ) + self.assertEqual(metric_data.min, self.test_values[0]) + self.assertEqual(metric_data.max, self.test_values[0]) + self.assertEqual(metric_data.sum, self.test_values[0]) + + for index, metrics_data in enumerate(results[1:]): + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_time_unix_nano, metric_data.start_time_unix_nano + ) + previous_time_unix_nano = metric_data.time_unix_nano + self.assertEqual(metric_data.positive.bucket_counts, [1]) + self.assertEqual(metric_data.negative.bucket_counts, [0]) + self.assertLess( + metric_data.start_time_unix_nano, metric_data.time_unix_nano + ) + self.assertEqual(metric_data.min, self.test_values[index + 1]) + self.assertEqual(metric_data.max, self.test_values[index + 1]) + # Using assertAlmostEqual here because in 3.12 resolution can cause + # these checks to fail. + self.assertAlmostEqual( + metric_data.sum, self.test_values[index + 1] + ) + + # The test scenario here is calling collect without calling aggregate + # immediately before, but having aggregate being called before at some + # moment. + results = [] + + for _ in range(10): + + results.append(reader.get_metrics_data()) + + provider.shutdown() + + for metrics_data in results: + self.assertIsNone(metrics_data) + + # The test scenario here is calling aggregate and collect, waiting for + # a certain amount of time, calling collect, then calling aggregate and + # collect again. + results = [] + + histogram.record(1) + results.append(reader.get_metrics_data()) + + sleep(0.1) + results.append(reader.get_metrics_data()) + + histogram.record(2) + results.append(reader.get_metrics_data()) + + metric_data_0 = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + metric_data_2 = ( + results[2] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertIsNone(results[1]) + + self.assertGreater( + metric_data_2.start_time_unix_nano, metric_data_0.time_unix_nano + ) + + provider.shutdown() + + @mark.skipif( + system() == "Windows", + reason=( + "Tests fail because Windows time_ns resolution is too low so " + "two different time measurements may end up having the exact same" + "value." + ), + ) + def test_synchronous_cumulative_temporality(self): + + aggregation = ExponentialBucketHistogramAggregation() + + reader = InMemoryMetricReader( + preferred_aggregation={Histogram: aggregation}, + preferred_temporality={ + Histogram: AggregationTemporality.CUMULATIVE + }, + ) + + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("name", "version") + + histogram = meter.create_histogram("histogram") + + results = [] + + for _ in range(10): + results.append(reader.get_metrics_data()) + + for metrics_data in results: + self.assertIsNone(metrics_data) + + results = [] + + for test_value in self.test_values: + histogram.record(test_value) + results.append(reader.get_metrics_data()) + + metric_data = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + start_time_unix_nano = metric_data.start_time_unix_nano + + self.assertLess( + metric_data.start_time_unix_nano, + metric_data.time_unix_nano, + ) + self.assertEqual(metric_data.min, self.test_values[0]) + self.assertEqual(metric_data.max, self.test_values[0]) + self.assertEqual(metric_data.sum, self.test_values[0]) + + previous_time_unix_nano = metric_data.time_unix_nano + + for index, metrics_data in enumerate(results[1:]): + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + start_time_unix_nano, metric_data.start_time_unix_nano + ) + self.assertLess( + metric_data.start_time_unix_nano, + metric_data.time_unix_nano, + ) + self.assertEqual( + metric_data.min, min(self.test_values[: index + 2]) + ) + self.assertEqual( + metric_data.max, max(self.test_values[: index + 2]) + ) + self.assertAlmostEqual( + metric_data.sum, sum(self.test_values[: index + 2]) + ) + + self.assertGreater( + metric_data.time_unix_nano, previous_time_unix_nano + ) + + previous_time_unix_nano = metric_data.time_unix_nano + + self.assertEqual( + metric_data.positive.bucket_counts, + [ + 1, + *[0] * 17, + 1, + *[0] * 36, + 1, + *[0] * 15, + 2, + *[0] * 15, + 1, + *[0] * 15, + 1, + *[0] * 15, + 1, + *[0] * 40, + ], + ) + self.assertEqual(metric_data.negative.bucket_counts, [0]) + + results = [] + + for _ in range(10): + results.append(reader.get_metrics_data()) + + provider.shutdown() + + metric_data = ( + results[0] + .resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + start_time_unix_nano = metric_data.start_time_unix_nano + + self.assertLess( + metric_data.start_time_unix_nano, + metric_data.time_unix_nano, + ) + self.assertEqual(metric_data.min, min(self.test_values)) + self.assertEqual(metric_data.max, max(self.test_values)) + self.assertAlmostEqual(metric_data.sum, sum(self.test_values)) + + previous_metric_data = metric_data + + for index, metrics_data in enumerate(results[1:]): + metric_data = ( + metrics_data.resource_metrics[0] + .scope_metrics[0] + .metrics[0] + .data.data_points[0] + ) + + self.assertEqual( + previous_metric_data.start_time_unix_nano, + metric_data.start_time_unix_nano, + ) + self.assertEqual(previous_metric_data.min, metric_data.min) + self.assertEqual(previous_metric_data.max, metric_data.max) + self.assertAlmostEqual(previous_metric_data.sum, metric_data.sum) + + self.assertEqual( + metric_data.positive.bucket_counts, + [ + 1, + *[0] * 17, + 1, + *[0] * 36, + 1, + *[0] * 15, + 2, + *[0] * 15, + 1, + *[0] * 15, + 1, + *[0] * 15, + 1, + *[0] * 40, + ], + ) + self.assertEqual(metric_data.negative.bucket_counts, [0]) + + self.assertLess( + previous_metric_data.time_unix_nano, + metric_data.time_unix_nano, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 9d61da72a04..7ea463ec8a8 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -68,7 +68,7 @@ def test_aggregate_delta(self): synchronous_sum_aggregation.aggregate(measurement(2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._current_value, 6) + self.assertEqual(synchronous_sum_aggregation._value, 6) synchronous_sum_aggregation = _SumAggregation( Mock(), True, AggregationTemporality.DELTA, 0 @@ -78,7 +78,7 @@ def test_aggregate_delta(self): synchronous_sum_aggregation.aggregate(measurement(-2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._current_value, 2) + self.assertEqual(synchronous_sum_aggregation._value, 2) def test_aggregate_cumulative(self): """ @@ -93,7 +93,7 @@ def test_aggregate_cumulative(self): synchronous_sum_aggregation.aggregate(measurement(2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._current_value, 6) + self.assertEqual(synchronous_sum_aggregation._value, 6) synchronous_sum_aggregation = _SumAggregation( Mock(), True, AggregationTemporality.CUMULATIVE, 0 @@ -103,7 +103,7 @@ def test_aggregate_cumulative(self): synchronous_sum_aggregation.aggregate(measurement(-2)) synchronous_sum_aggregation.aggregate(measurement(3)) - self.assertEqual(synchronous_sum_aggregation._current_value, 2) + self.assertEqual(synchronous_sum_aggregation._value, 2) def test_collect_delta(self): """ @@ -292,24 +292,16 @@ def test_aggregate(self): explicit_bucket_histogram_aggregation.aggregate(measurement(5)) # The first bucket keeps count of values between (-inf, 0] (-1 and 0) - self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[0], 2 - ) + self.assertEqual(explicit_bucket_histogram_aggregation._value[0], 2) # The second bucket keeps count of values between (0, 2] (1 and 2) - self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[1], 2 - ) + self.assertEqual(explicit_bucket_histogram_aggregation._value[1], 2) # The third bucket keeps count of values between (2, 4] (3 and 4) - self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[2], 2 - ) + self.assertEqual(explicit_bucket_histogram_aggregation._value[2], 2) # The fourth bucket keeps count of values between (4, inf) (3 and 4) - self.assertEqual( - explicit_bucket_histogram_aggregation._current_value[3], 1 - ) + self.assertEqual(explicit_bucket_histogram_aggregation._value[3], 1) histo = explicit_bucket_histogram_aggregation.collect( AggregationTemporality.CUMULATIVE, 1