Skip to content

Commit

Permalink
Fix SumAggregation for delta temporality
Browse files Browse the repository at this point in the history
Fixes #3268
  • Loading branch information
ocelotl committed Jul 24, 2023
1 parent 8378db9 commit 2f953dd
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Fix `SumAggregation` for delta temporality
([#3390](https://github.com/open-telemetry/opentelemetry-python/pull/3390))

## Version 1.19.0/0.40b0 (2023-07-13)

- Drop `setuptools` runtime requirement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,17 @@ def __init__(
self._instrument_is_monotonic = instrument_is_monotonic

if self._instrument_temporality is AggregationTemporality.DELTA:
self._value = 0
self._current_value = 0
else:
self._value = None
self._current_value = None

self._previous_value = None

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._value is None:
self._value = 0
self._value = self._value + measurement.value
if self._current_value is None:
self._current_value = 0
self._current_value = self._current_value + measurement.value

def collect(
self,
Expand All @@ -140,29 +142,30 @@ def collect(
Atomically return a point for the current value of the metric and
reset the aggregation value.
"""

if self._instrument_temporality is AggregationTemporality.DELTA:

with self._lock:
value = self._value
current_value = self._current_value
start_time_unix_nano = self._start_time_unix_nano

self._value = 0
self._current_value = 0
self._start_time_unix_nano = collection_start_nano

else:

with self._lock:
if self._value is None:
if self._current_value is None:
return None
value = self._value
self._value = None
current_value = self._current_value
self._current_value = None
start_time_unix_nano = self._start_time_unix_nano

current_point = NumberDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
value=value,
value=current_value,
)

if self._previous_point is None or (
Expand All @@ -171,16 +174,18 @@ def collect(
# Output DELTA for a synchronous instrument
# Output CUMULATIVE for an asynchronous instrument
self._previous_point = current_point
self._previous_value = current_value
return current_point

if aggregation_temporality is AggregationTemporality.DELTA:
# Output temporality DELTA for an asynchronous instrument
value = current_point.value - self._previous_point.value
current_value = current_point.value - self._previous_value
self._previous_value = current_point.value
output_start_time_unix_nano = self._previous_point.time_unix_nano

else:
# Output CUMULATIVE for a synchronous instrument
value = current_point.value + self._previous_point.value
current_value = current_point.value + self._previous_point.value
output_start_time_unix_nano = (
self._previous_point.start_time_unix_nano
)
Expand All @@ -189,10 +194,11 @@ def collect(
attributes=self._attributes,
start_time_unix_nano=output_start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
value=current_value,
)

self._previous_point = current_point

return current_point


Expand Down
136 changes: 136 additions & 0 deletions opentelemetry-sdk/tests/metrics/integration_test/test_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from io import StringIO
from itertools import count
from json import loads
from time import sleep
from unittest import TestCase

from opentelemetry.metrics import (
Observation,
get_meter_provider,
set_meter_provider,
)
from opentelemetry.sdk.metrics import MeterProvider, ObservableCounter
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.metrics.view import SumAggregation
from opentelemetry.test.globals_test import reset_metrics_globals

network_bytes_generator = count(start=8, step=8)

counter = 0


def observable_counter_callback(callback_options):

global counter

counter += 1

yield Observation(next(network_bytes_generator))


class TestDelta(TestCase):
def test_observable_counter_delta(self):
def setUp(self):
reset_metrics_globals()

def tearDown(self):
reset_metrics_globals()

output = StringIO()

aggregation = SumAggregation()

exporter = ConsoleMetricExporter(
out=output,
preferred_aggregation={ObservableCounter: aggregation},
preferred_temporality={
ObservableCounter: AggregationTemporality.DELTA
},
)

reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=100
)

provider = MeterProvider(metric_readers=[reader])
set_meter_provider(provider)

meter = get_meter_provider().get_meter(
"preferred-aggregation", "0.1.2"
)

meter.create_observable_counter(
"observable_counter", [observable_counter_callback]
)

sleep(1)

provider.shutdown()

output.seek(0)

joined_output = "".join(output.readlines())

stack = []

sections = []

previous_index = 0

for current_index, character in enumerate(joined_output):

if character == "{":

if not stack:
previous_index = current_index

stack.append(character)

elif character == "}":
stack.pop()

if not stack:

sections.append(
joined_output[
previous_index : current_index + 1
].strip()
)

joined_output = f"[{','.join(sections)}]"

result = loads(joined_output)

previous_time_unix_nano = result[0]["resource_metrics"][0][
"scope_metrics"
][0]["metrics"][0]["data"]["data_points"][0]["time_unix_nano"]

for element in result[1:]:

metric_data = element["resource_metrics"][0]["scope_metrics"][0][
"metrics"
][0]["data"]["data_points"][0]

self.assertEqual(
previous_time_unix_nano, metric_data["start_time_unix_nano"]
)
previous_time_unix_nano = metric_data["time_unix_nano"]
self.assertEqual(metric_data["value"], 8)
8 changes: 4 additions & 4 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_aggregate_delta(self):
synchronous_sum_aggregation.aggregate(measurement(2))
synchronous_sum_aggregation.aggregate(measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 6)
self.assertEqual(synchronous_sum_aggregation._current_value, 6)

synchronous_sum_aggregation = _SumAggregation(
Mock(), True, AggregationTemporality.DELTA, 0
Expand All @@ -75,7 +75,7 @@ def test_aggregate_delta(self):
synchronous_sum_aggregation.aggregate(measurement(-2))
synchronous_sum_aggregation.aggregate(measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)
self.assertEqual(synchronous_sum_aggregation._current_value, 2)

def test_aggregate_cumulative(self):
"""
Expand All @@ -90,7 +90,7 @@ def test_aggregate_cumulative(self):
synchronous_sum_aggregation.aggregate(measurement(2))
synchronous_sum_aggregation.aggregate(measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 6)
self.assertEqual(synchronous_sum_aggregation._current_value, 6)

synchronous_sum_aggregation = _SumAggregation(
Mock(), True, AggregationTemporality.CUMULATIVE, 0
Expand All @@ -100,7 +100,7 @@ def test_aggregate_cumulative(self):
synchronous_sum_aggregation.aggregate(measurement(-2))
synchronous_sum_aggregation.aggregate(measurement(3))

self.assertEqual(synchronous_sum_aggregation._value, 2)
self.assertEqual(synchronous_sum_aggregation._current_value, 2)

def test_collect_delta(self):
"""
Expand Down

0 comments on commit 2f953dd

Please sign in to comment.