diff --git a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md index 8070b1b1187..9c807d3225b 100644 --- a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add timestamps to OTLP exporter + ([#1199](https://github.com/open-telemetry/opentelemetry-python/pull/1199)) - Update OpenTelemetry protos to v0.5.0 ([#1143](https://github.com/open-telemetry/opentelemetry-python/pull/1143)) diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index 08a47c601e2..4cd4523ae92 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -85,6 +85,12 @@ def _get_data_points( data_point_class( labels=string_key_values, value=view_data.aggregator.current, + start_time_unix_nano=( + view_data.aggregator.last_checkpoint_timestamp + ), + time_unix_nano=( + view_data.aggregator.last_update_timestamp + ), ) ) break diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py index 1218fbbb330..0536ed790d7 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py @@ -14,6 +14,7 @@ from collections import OrderedDict from unittest import TestCase +from unittest.mock import patch from opentelemetry.exporter.otlp.metrics_exporter import OTLPMetricsExporter from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( @@ -60,9 +61,12 @@ def setUp(self): SumAggregator(), ) - def test_translate_metrics(self): + @patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_translate_metrics(self, mock_time_ns): # pylint: disable=no-member + mock_time_ns.configure_mock(**{"return_value": 1}) + self.counter_metric_record.instrument.add(1, OrderedDict([("a", "b")])) expected = ExportMetricsServiceRequest( @@ -92,6 +96,7 @@ def test_translate_metrics(self): ) ], value=1, + time_unix_nano=1, ) ], aggregation_temporality=( diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 388296fbe3e..ba6ef939539 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add timestamps to aggregators + ([#1199](https://github.com/open-telemetry/opentelemetry-python/pull/1199)) - Add Global Error Handler ([#1080](https://github.com/open-telemetry/opentelemetry-python/pull/1080)) - Update sampling result names diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 121f39a98b6..ecb510e8506 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -16,6 +16,7 @@ import logging import threading from collections import OrderedDict, namedtuple +from math import inf from opentelemetry.util import time_ns @@ -30,9 +31,10 @@ class Aggregator(abc.ABC): """ def __init__(self, config=None): - self.current = None - self.checkpoint = None - if config: + self._lock = threading.Lock() + self.last_update_timestamp = 0 + self.last_checkpoint_timestamp = 0 + if config is not None: self.config = config else: self.config = {} @@ -40,14 +42,32 @@ def __init__(self, config=None): @abc.abstractmethod def update(self, value): """Updates the current with the new value.""" + self.last_update_timestamp = time_ns() @abc.abstractmethod def take_checkpoint(self): """Stores a snapshot of the current value.""" + self.last_checkpoint_timestamp = time_ns() @abc.abstractmethod def merge(self, other): """Combines two aggregator values.""" + self.last_update_timestamp = max( + self.last_update_timestamp, other.last_update_timestamp + ) + self.last_checkpoint_timestamp = min( + self.last_checkpoint_timestamp, other.last_checkpoint_timestamp + ) + + def _verify_type(self, other): + if isinstance(other, self.__class__): + return True + logger.warning( + "Error in merging %s with %s.", + self.__class__.__name__, + other.__class__.__name__, + ) + return False class SumAggregator(Aggregator): @@ -57,81 +77,62 @@ def __init__(self, config=None): super().__init__(config=config) self.current = 0 self.checkpoint = 0 - self._lock = threading.Lock() - self.last_update_timestamp = None def update(self, value): with self._lock: self.current += value - self.last_update_timestamp = time_ns() + super().update(value) def take_checkpoint(self): with self._lock: self.checkpoint = self.current self.current = 0 + super().take_checkpoint() def merge(self, other): - if verify_type(self, other): + if self._verify_type(other): with self._lock: self.checkpoint += other.checkpoint - self.last_update_timestamp = get_latest_timestamp( - self.last_update_timestamp, other.last_update_timestamp - ) + super().merge(other) class MinMaxSumCountAggregator(Aggregator): """Aggregator for ValueRecorder metrics that keeps min, max, sum, count.""" _TYPE = namedtuple("minmaxsumcount", "min max sum count") - _EMPTY = _TYPE(None, None, None, 0) - - @classmethod - def _merge_checkpoint(cls, val1, val2): - if val1 is cls._EMPTY: - return val2 - if val2 is cls._EMPTY: - return val1 - return cls._TYPE( - min(val1.min, val2.min), - max(val1.max, val2.max), - val1.sum + val2.sum, - val1.count + val2.count, - ) + _EMPTY = _TYPE(inf, -inf, 0, 0) def __init__(self, config=None): super().__init__(config=config) self.current = self._EMPTY self.checkpoint = self._EMPTY - self._lock = threading.Lock() - self.last_update_timestamp = None def update(self, value): with self._lock: - if self.current is self._EMPTY: - self.current = self._TYPE(value, value, value, 1) - else: - self.current = self._TYPE( - min(self.current.min, value), - max(self.current.max, value), - self.current.sum + value, - self.current.count + 1, - ) - self.last_update_timestamp = time_ns() + self.current = self._TYPE( + min(self.current.min, value), + max(self.current.max, value), + self.current.sum + value, + self.current.count + 1, + ) + super().update(value) def take_checkpoint(self): with self._lock: self.checkpoint = self.current self.current = self._EMPTY + super().take_checkpoint() def merge(self, other): - if verify_type(self, other): + if self._verify_type(other): with self._lock: - self.checkpoint = self._merge_checkpoint( - self.checkpoint, other.checkpoint - ) - self.last_update_timestamp = get_latest_timestamp( - self.last_update_timestamp, other.last_update_timestamp + self.checkpoint = self._TYPE( + min(self.checkpoint.min, other.checkpoint.min), + max(self.checkpoint.max, other.checkpoint.max), + self.checkpoint.sum + other.checkpoint.sum, + self.checkpoint.count + other.checkpoint.count, ) + super().merge(other) class HistogramAggregator(Aggregator): @@ -139,75 +140,53 @@ class HistogramAggregator(Aggregator): def __init__(self, config=None): super().__init__(config=config) - self._lock = threading.Lock() - self.last_update_timestamp = None - boundaries = self.config.get("bounds") - if boundaries and self._validate_boundaries(boundaries): - self._boundaries = boundaries - else: - # no buckets except < 0 and > - self._boundaries = (0,) - - self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) - self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries]) - - self.current[">"] = 0 - self.checkpoint[">"] = 0 - - # pylint: disable=R0201 - def _validate_boundaries(self, boundaries): - if not boundaries: - logger.warning("Bounds is empty. Using default.") - return False - if not all( - boundaries[ii] < boundaries[ii + 1] - for ii in range(len(boundaries) - 1) - ): - logger.warning( - "Bounds must be sorted in increasing order. Using default." - ) - return False - return True - - @classmethod - def _merge_checkpoint(cls, val1, val2): - if val1.keys() == val2.keys(): - for ii, bb in val2.items(): - val1[ii] += bb - else: - logger.warning("Cannot merge histograms with different buckets.") - return val1 + # no buckets except < 0 and > + bounds = (0,) + config_bounds = self.config.get("bounds") + if config_bounds is not None: + if all( + config_bounds[i] < config_bounds[i + 1] + for i in range(len(config_bounds) - 1) + ): + bounds = config_bounds + else: + logger.warning( + "Bounds must be all different and sorted in increasing" + " order. Using default." + ) + + self.current = OrderedDict([(bb, 0) for bb in bounds]) + self.current[inf] = 0 + self.checkpoint = OrderedDict([(bb, 0) for bb in bounds]) + self.checkpoint[inf] = 0 def update(self, value): with self._lock: - if self.current is None: - self.current = [0 for ii in range(len(self._boundaries) + 1)] - # greater than max value - if value >= self._boundaries[len(self._boundaries) - 1]: - self.current[">"] += 1 - else: - for bb in self._boundaries: - # find first bucket that value is less than - if value < bb: - self.current[bb] += 1 - break - self.last_update_timestamp = time_ns() + for bb in self.current.keys(): + # find first bucket that value is less than + if value < bb: + self.current[bb] += 1 + break + super().update(value) def take_checkpoint(self): with self._lock: - self.checkpoint = self.current - self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) - self.current[">"] = 0 + self.checkpoint = self.current.copy() + for bb in self.current.keys(): + self.current[bb] = 0 + super().take_checkpoint() def merge(self, other): - if verify_type(self, other): + if self._verify_type(other): with self._lock: - self.checkpoint = self._merge_checkpoint( - self.checkpoint, other.checkpoint - ) - self.last_update_timestamp = get_latest_timestamp( - self.last_update_timestamp, other.last_update_timestamp - ) + if self.checkpoint.keys() == other.checkpoint.keys(): + for ii, bb in other.checkpoint.items(): + self.checkpoint[ii] += bb + super().merge(other) + else: + logger.warning( + "Cannot merge histograms with different buckets." + ) class LastValueAggregator(Aggregator): @@ -215,27 +194,29 @@ class LastValueAggregator(Aggregator): def __init__(self, config=None): super().__init__(config=config) - self._lock = threading.Lock() - self.last_update_timestamp = None + self.current = None + self.checkpoint = None def update(self, value): with self._lock: self.current = value - self.last_update_timestamp = time_ns() + super().update(value) def take_checkpoint(self): with self._lock: self.checkpoint = self.current self.current = None + super().take_checkpoint() def merge(self, other): last = self.checkpoint - self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp = max( self.last_update_timestamp, other.last_update_timestamp ) if self.last_update_timestamp == other.last_update_timestamp: last = other.checkpoint self.checkpoint = last + super().merge(other) class ValueObserverAggregator(Aggregator): @@ -248,46 +229,22 @@ def __init__(self, config=None): self.mmsc = MinMaxSumCountAggregator() self.current = None self.checkpoint = self._TYPE(None, None, None, 0, None) - self.last_update_timestamp = None def update(self, value): self.mmsc.update(value) self.current = value - self.last_update_timestamp = time_ns() + super().update(value) def take_checkpoint(self): self.mmsc.take_checkpoint() self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,))) + super().take_checkpoint() def merge(self, other): - if verify_type(self, other): + if self._verify_type(other): self.mmsc.merge(other.mmsc) last = self.checkpoint.last - self.last_update_timestamp = get_latest_timestamp( - self.last_update_timestamp, other.last_update_timestamp - ) if self.last_update_timestamp == other.last_update_timestamp: last = other.checkpoint.last self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) - - -def get_latest_timestamp(time_stamp, other_timestamp): - if time_stamp is None: - return other_timestamp - if other_timestamp is not None: - if time_stamp < other_timestamp: - return other_timestamp - return time_stamp - - -# pylint: disable=R1705 -def verify_type(this, other): - if isinstance(other, this.__class__): - return True - else: - logger.warning( - "Error in merging %s with %s.", - this.__class__.__name__, - other.__class__.__name__, - ) - return False + super().merge(other) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index efa6bcd24e1..0515b97703c 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -15,6 +15,7 @@ import concurrent.futures import random import unittest +from math import inf from unittest import mock from opentelemetry.context import get_value @@ -310,36 +311,58 @@ def test_merge(self): mmsc1.merge(mmsc2) - self.assertEqual( - mmsc1.checkpoint, - MinMaxSumCountAggregator._merge_checkpoint( - checkpoint1, checkpoint2 - ), - ) + mmsc1_checkpoint = mmsc1.checkpoint + mmsc1.checkpoint = checkpoint1 + mmsc2.checkpoint = checkpoint2 + + mmsc1.merge(mmsc2) + + self.assertEqual(mmsc1_checkpoint, mmsc1.checkpoint) + self.assertEqual(mmsc1.last_update_timestamp, 123) def test_merge_checkpoint(self): - func = MinMaxSumCountAggregator._merge_checkpoint - _type = MinMaxSumCountAggregator._TYPE + type_ = MinMaxSumCountAggregator._TYPE empty = MinMaxSumCountAggregator._EMPTY - ret = func(empty, empty) - self.assertEqual(ret, empty) + mmsc0 = MinMaxSumCountAggregator() + mmsc1 = MinMaxSumCountAggregator() + + mmsc0.checkpoint = empty + mmsc1.checkpoint = empty + + mmsc0.merge(mmsc1) + self.assertEqual(mmsc0.checkpoint, mmsc1.checkpoint) - ret = func(empty, _type(0, 0, 0, 0)) - self.assertEqual(ret, _type(0, 0, 0, 0)) + mmsc0.checkpoint = empty + mmsc1.checkpoint = type_(0, 0, 0, 0) - ret = func(_type(0, 0, 0, 0), empty) - self.assertEqual(ret, _type(0, 0, 0, 0)) + mmsc0.merge(mmsc1) + self.assertEqual(mmsc0.checkpoint, mmsc1.checkpoint) - ret = func(_type(0, 0, 0, 0), _type(0, 0, 0, 0)) - self.assertEqual(ret, _type(0, 0, 0, 0)) + mmsc0.checkpoint = type_(0, 0, 0, 0) + mmsc1.checkpoint = empty - ret = func(_type(44, 23, 55, 86), empty) - self.assertEqual(ret, _type(44, 23, 55, 86)) + mmsc1.merge(mmsc0) + self.assertEqual(mmsc1.checkpoint, mmsc0.checkpoint) - ret = func(_type(3, 150, 101, 3), _type(1, 33, 44, 2)) - self.assertEqual(ret, _type(1, 150, 101 + 44, 2 + 3)) + mmsc0.checkpoint = type_(0, 0, 0, 0) + mmsc1.checkpoint = type_(0, 0, 0, 0) + + mmsc0.merge(mmsc1) + self.assertEqual(mmsc1.checkpoint, mmsc0.checkpoint) + + mmsc0.checkpoint = type_(44, 23, 55, 86) + mmsc1.checkpoint = empty + + mmsc0.merge(mmsc1) + self.assertEqual(mmsc0.checkpoint, type_(44, 23, 55, 86)) + + mmsc0.checkpoint = type_(3, 150, 101, 3) + mmsc1.checkpoint = type_(1, 33, 44, 2) + + mmsc0.merge(mmsc1) + self.assertEqual(mmsc0.checkpoint, type_(1, 150, 101 + 44, 2 + 3)) def test_merge_with_empty(self): mmsc1 = MinMaxSumCountAggregator() @@ -353,40 +376,39 @@ def test_merge_with_empty(self): self.assertEqual(mmsc1.checkpoint, checkpoint1) def test_concurrent_update(self): - mmsc = MinMaxSumCountAggregator() + mmsc0 = MinMaxSumCountAggregator() + mmsc1 = MinMaxSumCountAggregator() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex: - fut1 = ex.submit(self.call_update, mmsc) - fut2 = ex.submit(self.call_update, mmsc) + mmsc0.checkpoint = ex.submit(self.call_update, mmsc0).result() + mmsc1.checkpoint = ex.submit(self.call_update, mmsc0).result() + + mmsc0.merge(mmsc1) - ret1 = fut1.result() - ret2 = fut2.result() + mmsc0_checkpoint = mmsc0.checkpoint - update_total = MinMaxSumCountAggregator._merge_checkpoint( - ret1, ret2 - ) - mmsc.take_checkpoint() + mmsc0.take_checkpoint() - self.assertEqual(update_total, mmsc.checkpoint) + self.assertEqual(mmsc0_checkpoint, mmsc0.checkpoint) + self.assertIsNot(mmsc0_checkpoint, mmsc0.checkpoint) def test_concurrent_update_and_checkpoint(self): - mmsc = MinMaxSumCountAggregator() - checkpoint_total = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0) + mmsc0 = MinMaxSumCountAggregator() + mmsc1 = MinMaxSumCountAggregator() + mmsc1.checkpoint = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: - fut = ex.submit(self.call_update, mmsc) + fut = ex.submit(self.call_update, mmsc0) while not fut.done(): - mmsc.take_checkpoint() - checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( - checkpoint_total, mmsc.checkpoint - ) + mmsc0.take_checkpoint() + mmsc0.merge(mmsc1) + mmsc1.checkpoint = mmsc0.checkpoint - mmsc.take_checkpoint() - checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( - checkpoint_total, mmsc.checkpoint - ) + mmsc0.take_checkpoint() + mmsc0.merge(mmsc1) - self.assertEqual(checkpoint_total, fut.result()) + self.assertEqual(mmsc0.checkpoint, fut.result()) class TestValueObserverAggregator(unittest.TestCase): @@ -395,7 +417,7 @@ def test_update(self, time_mock): time_mock.return_value = 123 observer = ValueObserverAggregator() # test current values without any update - self.assertEqual(observer.mmsc.current, (None, None, None, 0)) + self.assertEqual(observer.mmsc.current, (inf, -inf, 0, 0)) self.assertIsNone(observer.current) # call update with some values @@ -416,7 +438,7 @@ def test_checkpoint(self): # take checkpoint wihtout any update observer.take_checkpoint() - self.assertEqual(observer.checkpoint, (None, None, None, 0, None)) + self.assertEqual(observer.checkpoint, (inf, -inf, 0, 0, None)) # call update with some values values = (3, 50, 3, 97) @@ -523,7 +545,7 @@ def test_merge_last_updated_none(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer2.mmsc.checkpoint = mmsc_checkpoint2 - observer1.last_update_timestamp = None + observer1.last_update_timestamp = 0 observer2.last_update_timestamp = 100 observer1.checkpoint = checkpoint1 @@ -629,7 +651,7 @@ def test_merge_last_updated_none(self): observer1.checkpoint = 23 observer2.checkpoint = 47 - observer1.last_update_timestamp = None + observer1.last_update_timestamp = 0 observer2.last_update_timestamp = 100 observer1.merge(observer2) diff --git a/opentelemetry-sdk/tests/metrics/test_view.py b/opentelemetry-sdk/tests/metrics/test_view.py index 0de6b227317..b4a24d4ea35 100644 --- a/opentelemetry-sdk/tests/metrics/test_view.py +++ b/opentelemetry-sdk/tests/metrics/test_view.py @@ -13,6 +13,7 @@ # limitations under the License. import unittest +from math import inf from unittest import mock from opentelemetry.sdk import metrics @@ -223,7 +224,7 @@ def test_histogram_stateful(self): checkpoint = metrics_list[0].aggregator.checkpoint self.assertEqual( tuple(checkpoint.items()), - ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)), + ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (inf, 1)), ) exporter.clear() @@ -238,7 +239,7 @@ def test_histogram_stateful(self): checkpoint = metrics_list[0].aggregator.checkpoint self.assertEqual( tuple(checkpoint.items()), - ((20, 2), (40, 2), (60, 0), (80, 0), (100, 0), (">", 2)), + ((20, 2), (40, 2), (60, 0), (80, 0), (100, 0), (inf, 2)), ) def test_histogram_stateless(self): @@ -278,7 +279,7 @@ def test_histogram_stateless(self): checkpoint = metrics_list[0].aggregator.checkpoint self.assertEqual( tuple(checkpoint.items()), - ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)), + ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (inf, 1)), ) exporter.clear() @@ -293,7 +294,7 @@ def test_histogram_stateless(self): checkpoint = metrics_list[0].aggregator.checkpoint self.assertEqual( tuple(checkpoint.items()), - ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (">", 1)), + ((20, 1), (40, 1), (60, 0), (80, 0), (100, 0), (inf, 1)), )