From 7d573e1e899579e786ff54a97568017c3f1c68b6 Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Mon, 5 Oct 2020 22:18:49 -0700 Subject: [PATCH 1/5] Data points in exporter shouldnt use bound instruments --- .../otlp/metrics_exporter/__init__.py | 124 ++++++++++-------- .../tests/test_otlp_metric_exporter.py | 21 +-- .../system_metrics/__init__.py | 6 +- .../sdk/metrics/export/aggregate.py | 13 +- .../sdk/metrics/export/controller.py | 44 +++++++ 5 files changed, 141 insertions(+), 67 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index 18ce772ea41..3a7ad586c6f 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -58,48 +58,54 @@ MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) logger = logging.getLogger(__name__) DataPointT = TypeVar("DataPointT", IntDataPoint, DoubleDataPoint) def _get_data_points( - sdk_metric: MetricRecord, data_point_class: Type[DataPointT] + sdk_metric_record: MetricRecord, data_point_class: Type[DataPointT] ) -> List[DataPointT]: - data_points = [] - - for ( - label, - bound_counter, - ) in sdk_metric.instrument.bound_instruments.items(): - - string_key_values = [] - - for label_key, label_value in label: - string_key_values.append( - StringKeyValue(key=label_key, value=label_value) - ) - - for view_data in bound_counter.view_datas: - - if view_data.labels == label: - - data_points.append( - data_point_class( - labels=string_key_values, - value=view_data.aggregator.current, - start_time_unix_nano=( - view_data.aggregator.last_checkpoint_timestamp - ), - time_unix_nano=( - view_data.aggregator.last_update_timestamp - ), - ) - ) - break - - return data_points + if isinstance(sdk_metric_record.aggregator, SumAggregator): + value = sdk_metric_record.aggregator.checkpoint + + elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator): + # FIXME: How are values to be interpreted from this aggregator? + raise Exception("MinMaxSumCount aggregator data not supported") + + elif isinstance(sdk_metric_record.aggregator, HistogramAggregator): + # FIXME: How are values to be interpreted from this aggregator? + raise Exception("Histogram aggregator data not supported") + + elif isinstance(sdk_metric_record.aggregator, LastValueAggregator): + value = sdk_metric_record.aggregator.checkpoint + + elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator): + value = sdk_metric_record.aggregator.checkpoint.last + + return [ + data_point_class( + labels=[ + StringKeyValue(key=str(label_key), value=str(label_value)) + for label_key, label_value in sdk_metric_record.labels + ], + value=value, + start_time_unix_nano=( + sdk_metric_record.aggregator.initial_checkpoint_timestamp + ), + time_unix_nano=( + sdk_metric_record.aggregator.last_update_timestamp + ), + ) + ] class OTLPMetricsExporter( @@ -179,13 +185,13 @@ def _translate_data( # SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true) # UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false) # ValueObserver Gauge() - for sdk_metric in data: + for sdk_metric_record in data: - if sdk_metric.resource not in ( + if sdk_metric_record.resource not in ( sdk_resource_instrumentation_library_metrics.keys() ): sdk_resource_instrumentation_library_metrics[ - sdk_metric.resource + sdk_metric_record.resource ] = InstrumentationLibraryMetrics() type_class = { @@ -204,15 +210,17 @@ def _translate_data( }, } - value_type = sdk_metric.instrument.value_type + value_type = sdk_metric_record.instrument.value_type sum_class = type_class[value_type]["sum"]["class"] gauge_class = type_class[value_type]["gauge"]["class"] data_point_class = type_class[value_type]["data_point_class"] - if isinstance(sdk_metric.instrument, Counter): + if isinstance(sdk_metric_record.instrument, Counter): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA ), @@ -220,9 +228,11 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, UpDownCounter): + elif isinstance(sdk_metric_record.instrument, UpDownCounter): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA ), @@ -230,13 +240,15 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, (ValueRecorder)): + elif isinstance(sdk_metric_record.instrument, (ValueRecorder)): logger.warning("Skipping exporting of ValueRecorder metric") continue - elif isinstance(sdk_metric.instrument, SumObserver): + elif isinstance(sdk_metric_record.instrument, SumObserver): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ), @@ -244,9 +256,11 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, UpDownSumObserver): + elif isinstance(sdk_metric_record.instrument, UpDownSumObserver): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ), @@ -254,20 +268,24 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, (ValueObserver)): + elif isinstance(sdk_metric_record.instrument, (ValueObserver)): otlp_metric_data = gauge_class( - data_points=_get_data_points(sdk_metric, data_point_class) + data_points=_get_data_points( + sdk_metric_record, data_point_class + ) ) argument = type_class[value_type]["gauge"]["argument"] sdk_resource_instrumentation_library_metrics[ - sdk_metric.resource + sdk_metric_record.resource ].metrics.append( OTLPMetric( **{ - "name": sdk_metric.instrument.name, - "description": sdk_metric.instrument.description, - "unit": sdk_metric.instrument.unit, + "name": sdk_metric_record.instrument.name, + "description": ( + sdk_metric_record.instrument.description + ), + "unit": sdk_metric_record.instrument.unit, argument: otlp_metric_data, } ) diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py index 530f9a430ae..3034fcdf651 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py @@ -52,14 +52,14 @@ def setUp(self): self.counter_metric_record = MetricRecord( Counter( - "a", - "b", "c", + "d", + "e", int, MeterProvider(resource=resource,).get_meter(__name__), - ("d",), + ("f",), ), - OrderedDict([("e", "f")]), + [("g", "h")], SumAggregator(), resource, ) @@ -97,7 +97,9 @@ def test_translate_metrics(self, mock_time_ns): mock_time_ns.configure_mock(**{"return_value": 1}) - self.counter_metric_record.instrument.add(1, OrderedDict([("a", "b")])) + self.counter_metric_record.aggregator.checkpoint = 1 + self.counter_metric_record.aggregator.initial_checkpoint_timestamp = 1 + self.counter_metric_record.aggregator.last_update_timestamp = 1 expected = ExportMetricsServiceRequest( resource_metrics=[ @@ -114,19 +116,20 @@ def test_translate_metrics(self, mock_time_ns): InstrumentationLibraryMetrics( metrics=[ OTLPMetric( - name="a", - description="b", - unit="c", + name="c", + description="d", + unit="e", int_sum=IntSum( data_points=[ IntDataPoint( labels=[ StringKeyValue( - key="a", value="b" + key="g", value="h" ) ], value=1, time_unix_nano=1, + start_time_unix_nano=1, ) ], aggregation_temporality=( diff --git a/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py b/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py index 2b453dbd7d0..d901bc09455 100644 --- a/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py @@ -88,11 +88,15 @@ def __init__( self, exporter: MetricsExporter, interval: int = 30, + meter=None, labels: typing.Optional[typing.Dict[str, str]] = None, config: typing.Optional[typing.Dict[str, typing.List[str]]] = None, ): self._labels = {} if labels is None else labels - self.meter = metrics.get_meter(__name__) + if meter is None: + self.meter = metrics.get_meter(__name__) + else: + self.meter = meter self.controller = PushController( meter=self.meter, exporter=exporter, interval=interval ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 7d3ad52df0e..84ab518a47f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -33,7 +33,8 @@ class Aggregator(abc.ABC): def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = 0 - self.last_checkpoint_timestamp = 0 + self.initial_checkpoint_timestamp = 0 + self.checkpointed = True if config is not None: self.config = config else: @@ -42,12 +43,15 @@ def __init__(self, config=None): @abc.abstractmethod def update(self, value): """Updates the current with the new value.""" + if self.checkpointed: + self.initial_checkpoint_timestamp = time_ns() + self.checkpointed = False self.last_update_timestamp = time_ns() @abc.abstractmethod def take_checkpoint(self): """Stores a snapshot of the current value.""" - self.last_checkpoint_timestamp = time_ns() + self.checkpointed = True @abc.abstractmethod def merge(self, other): @@ -55,8 +59,9 @@ def merge(self, other): self.last_update_timestamp = max( self.last_update_timestamp, other.last_update_timestamp ) - self.last_checkpoint_timestamp = max( - self.last_checkpoint_timestamp, other.last_checkpoint_timestamp + self.initial_checkpoint_timestamp = max( + self.initial_checkpoint_timestamp, + other.initial_checkpoint_timestamp, ) def _verify_type(self, other): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py index e095ebbb72c..b020b66b2e1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py @@ -13,6 +13,7 @@ # limitations under the License. import threading +from time import sleep from opentelemetry.context import attach, detach, set_value from opentelemetry.metrics import Meter @@ -61,3 +62,46 @@ def tick(self): detach(token) # Perform post-exporting logic self.meter.processor.finished_collection() + + +class DebugController: + """A debug controller, used to replace Push controller when debugging + + Push controller uses a thread which makes it hard to use the IPython + debugger. This controller does not use a thread, but relies on the user + manually calling its ``run`` method to start the controller. + + Args: + meter: The meter used to collect metrics. + exporter: The exporter used to export metrics. + interval: The collect/export interval in seconds. + """ + + daemon = True + + def __init__( + self, meter: Meter, exporter: MetricsExporter, interval: float + ): + super().__init__() + self.meter = meter + self.exporter = exporter + self.interval = interval + + def run(self): + while True: + self.tick() + sleep(self.interval) + + def shutdown(self): + # Run one more collection pass to flush metrics batched in the meter + self.tick() + + def tick(self): + # Collect all of the meter's metrics to be exported + self.meter.collect() + # Export the collected metrics + token = attach(set_value("suppress_instrumentation", True)) + self.exporter.export(self.meter.processor.checkpoint_set()) + detach(token) + # Perform post-exporting logic + self.meter.processor.finished_collection() From f7235b6e7d595e3438937a4767cbd32072af51ae Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 16 Oct 2020 13:55:00 -0600 Subject: [PATCH 2/5] Update changelogs --- exporter/opentelemetry-exporter-otlp/CHANGELOG.md | 2 ++ .../CHANGELOG.md | 3 +++ .../instrumentation/system_metrics/__init__.py | 6 +----- opentelemetry-sdk/CHANGELOG.md | 2 ++ 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md index 0e9fbba8a83..b1deb74e27f 100644 --- a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md @@ -4,6 +4,8 @@ - Add Env variables in OTLP exporter ([#1101](https://github.com/open-telemetry/opentelemetry-python/pull/1101)) +- Do not use bound instruments in OTLP exporter + ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) ## Version 0.14b0 diff --git a/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md index 5f6ff0530c7..f23e794b1db 100644 --- a/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Fix issue when specific metrics are not available in certain OS + ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) + ## Version 0.14b0 Released 2020-10-13 diff --git a/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py b/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py index d901bc09455..2b453dbd7d0 100644 --- a/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py @@ -88,15 +88,11 @@ def __init__( self, exporter: MetricsExporter, interval: int = 30, - meter=None, labels: typing.Optional[typing.Dict[str, str]] = None, config: typing.Optional[typing.Dict[str, typing.List[str]]] = None, ): self._labels = {} if labels is None else labels - if meter is None: - self.meter = metrics.get_meter(__name__) - else: - self.meter = meter + self.meter = metrics.get_meter(__name__) self.controller = PushController( meter=self.meter, exporter=exporter, interval=interval ) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 34a622bb995..6e1925f11d2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,6 +14,8 @@ ([#1282](https://github.com/open-telemetry/opentelemetry-python/pull/1282)) - Span.is_recording() returns false after span has ended ([#1289](https://github.com/open-telemetry/opentelemetry-python/pull/1289)) +- Add debug Controller and set initial checkpoint timestamp in aggregators + ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) ## Version 0.14b0 From 0cd8e58d993866473136e187b0b4de65d544f535 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 16 Oct 2020 14:02:48 -0600 Subject: [PATCH 3/5] Remove wrong Changelog message --- .../opentelemetry-instrumentation-system-metrics/CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md index f23e794b1db..5f6ff0530c7 100644 --- a/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-system-metrics/CHANGELOG.md @@ -2,9 +2,6 @@ ## Unreleased -- Fix issue when specific metrics are not available in certain OS - ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) - ## Version 0.14b0 Released 2020-10-13 From 2acbc301c87d33308fa1b874b10af66f4ae6869e Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 16 Oct 2020 14:25:05 -0600 Subject: [PATCH 4/5] Move debug controller --- .../sdk/metrics/export/controller.py | 44 ------------- .../util/src/opentelemetry/test/controller.py | 62 +++++++++++++++++++ 2 files changed, 62 insertions(+), 44 deletions(-) create mode 100644 tests/util/src/opentelemetry/test/controller.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py index b020b66b2e1..e095ebbb72c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py @@ -13,7 +13,6 @@ # limitations under the License. import threading -from time import sleep from opentelemetry.context import attach, detach, set_value from opentelemetry.metrics import Meter @@ -62,46 +61,3 @@ def tick(self): detach(token) # Perform post-exporting logic self.meter.processor.finished_collection() - - -class DebugController: - """A debug controller, used to replace Push controller when debugging - - Push controller uses a thread which makes it hard to use the IPython - debugger. This controller does not use a thread, but relies on the user - manually calling its ``run`` method to start the controller. - - Args: - meter: The meter used to collect metrics. - exporter: The exporter used to export metrics. - interval: The collect/export interval in seconds. - """ - - daemon = True - - def __init__( - self, meter: Meter, exporter: MetricsExporter, interval: float - ): - super().__init__() - self.meter = meter - self.exporter = exporter - self.interval = interval - - def run(self): - while True: - self.tick() - sleep(self.interval) - - def shutdown(self): - # Run one more collection pass to flush metrics batched in the meter - self.tick() - - def tick(self): - # Collect all of the meter's metrics to be exported - self.meter.collect() - # Export the collected metrics - token = attach(set_value("suppress_instrumentation", True)) - self.exporter.export(self.meter.processor.checkpoint_set()) - detach(token) - # Perform post-exporting logic - self.meter.processor.finished_collection() diff --git a/tests/util/src/opentelemetry/test/controller.py b/tests/util/src/opentelemetry/test/controller.py new file mode 100644 index 00000000000..754d7bf9792 --- /dev/null +++ b/tests/util/src/opentelemetry/test/controller.py @@ -0,0 +1,62 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.metrics import Meter +from opentelemetry.sdk.metrics.export import MetricsExporter + + +class DebugController: + """A debug controller, used to replace Push controller when debugging + + Push controller uses a thread which makes it hard to use the IPython + debugger. This controller does not use a thread, but relies on the user + manually calling its ``run`` method to start the controller. + + Args: + meter: The meter used to collect metrics. + exporter: The exporter used to export metrics. + interval: The collect/export interval in seconds. + """ + + daemon = True + + def __init__( + self, meter: Meter, exporter: MetricsExporter, interval: float + ): + super().__init__() + self.meter = meter + self.exporter = exporter + self.interval = interval + + def run(self): + while True: + self.tick() + sleep(self.interval) + + def shutdown(self): + # Run one more collection pass to flush metrics batched in the meter + self.tick() + + def tick(self): + # Collect all of the meter's metrics to be exported + self.meter.collect() + # Export the collected metrics + token = attach(set_value("suppress_instrumentation", True)) + self.exporter.export(self.meter.processor.checkpoint_set()) + detach(token) + # Perform post-exporting logic + self.meter.processor.finished_collection() From ce40a209d3c5a85f171622d8a4577293086ecb66 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 16 Oct 2020 15:19:45 -0600 Subject: [PATCH 5/5] Update changelog --- opentelemetry-sdk/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 6e1925f11d2..2c1c3d971d0 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,7 +14,7 @@ ([#1282](https://github.com/open-telemetry/opentelemetry-python/pull/1282)) - Span.is_recording() returns false after span has ended ([#1289](https://github.com/open-telemetry/opentelemetry-python/pull/1289)) -- Add debug Controller and set initial checkpoint timestamp in aggregators +- Set initial checkpoint timestamp in aggregators ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) ## Version 0.14b0