Skip to content

Commit

Permalink
Fix histogram aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Sep 8, 2023
1 parent f078af1 commit 0e21c70
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,36 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._start_time_unix_nano = start_time_unix_nano
self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max

self._current_value = None
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max
self._start_time_unix_nano = start_time_unix_nano
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_temporality = AggregationTemporality.DELTA

self._previous_collection_start_nano = self._start_time_unix_nano

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)

def aggregate(self, measurement: Measurement) -> None:

value = measurement.value
with self._lock:

if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
value = measurement.value

self._sum += value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
self._current_value[bisect_left(self._boundaries, value)] += 1
self._sum = self._sum + value

def collect(
self,
Expand All @@ -289,79 +292,49 @@ def collect(
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
sum_ = self._sum
max_ = self._max
min_ = self._min
if aggregation_temporality is AggregationTemporality.DELTA:

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
self._sum = 0
self._min = inf
self._max = -inf
current_value = self._current_value
min_ = self._min
max_ = self._max
sum_ = self._sum
previous_collection_start_nano = (
self._previous_collection_start_nano
)

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
self._current_value = None
self._min = inf
self._max = -inf
self._sum = 0
self._previous_collection_start_nano = collection_start_nano

if self._previous_point is None or (
self._instrument_temporality is aggregation_temporality
):
self._previous_point = current_point
return current_point
if current_value is None:
return None

max_ = current_point.max
min_ = current_point.min

if aggregation_temporality is AggregationTemporality.CUMULATIVE:
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
]

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(self._current_value),
sum=self._sum,
bucket_counts=tuple(self._current_value),
explicit_bounds=self._boundaries,
min=self._min,
max=self._max,
)


# pylint: disable=protected-access
Expand Down
Loading

0 comments on commit 0e21c70

Please sign in to comment.