From fe1d1f62e7c10d5d316051eebc369b4422ac7dbf Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 29 Apr 2022 21:33:19 +0000 Subject: [PATCH 1/8] Add timeouts to metric SDK --- .../proto/grpc/_metric_exporter/__init__.py | 8 +- .../exporter/otlp/proto/grpc/exporter.py | 12 ++- .../exporter/prometheus/__init__.py | 8 +- .../sdk/_metrics/_internal/__init__.py | 18 ++-- .../sdk/_metrics/_internal/export/__init__.py | 46 +++++++--- .../sdk/_metrics/_internal/metric_reader.py | 14 ++- .../tests/metrics/test_backward_compat.py | 89 +++++++++++++++++++ .../tests/metrics/test_metric_reader.py | 2 +- .../tests/metrics/test_metrics.py | 11 ++- .../test_periodic_exporting_metric_reader.py | 11 ++- 10 files changed, 183 insertions(+), 36 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/test_backward_compat.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index c40ff1ddd21..f3e88951653 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -169,9 +169,13 @@ def _translate_data( ) def export( - self, metrics: Sequence[Metric], *args, **kwargs + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> MetricExportResult: + # TODO(): OTLPExporterMixin should pass timeout to gRPC return self._export(metrics) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index b965061c5cb..255a1f0775d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -277,7 +277,15 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: logger.exception(error) return output - def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT: + def _export( + self, + data: TypingSequence[SDKDataT], + timeout_millis: Optional[float] = None, + ) -> ExportResultT: + if timeout_millis is not None: + timeout_seconds = timeout_millis / 10**3 + else: + timeout_seconds = self._timeout max_value = 64 # expo returns a generator that yields delay values which grow @@ -292,7 +300,7 @@ def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT: self._client.Export( request=self._translate_data(data), metadata=self._headers, - timeout=self._timeout, + timeout=timeout_seconds, ) return self._result.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 1b81f062c02..708aa3b680b 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None: self._collector._callback = self.collect def _receive_metrics( - self, metrics: Iterable[Metric], *args, **kwargs + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self, *args, **kwargs) -> bool: + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: REGISTRY.unregister(self._collector) - return True class _CustomCollector: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py index 39256b73271..d6649d416f2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py @@ -48,6 +48,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util._once import Once +from opentelemetry.util._time import time_ns _logger = getLogger(__name__) @@ -369,16 +370,16 @@ def __init__( self._shutdown_once = Once() self._shutdown = False - def force_flush(self) -> bool: + def force_flush(self, timeout_millis: float = 10_000) -> bool: # FIXME implement a timeout for metric_reader in self._sdk_config.metric_readers: - metric_reader.collect() + metric_reader.collect(timeout_millis=timeout_millis) return True - def shutdown(self): - # FIXME implement a timeout + def shutdown(self, timeout_millis: float = 10_000): + deadline_ns = time_ns() + timeout_millis * 10**6 def _shutdown(): self._shutdown = True @@ -392,8 +393,15 @@ def _shutdown(): metric_reader_error = {} for metric_reader in self._sdk_config.metric_readers: + current_ts = time_ns() try: - metric_reader.shutdown() + if current_ts >= deadline_ns: + raise TimeoutError( + "Didn't get to execute, deadline already exceeded" + ) + metric_reader.shutdown( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) # pylint: disable=broad-except except Exception as error: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index 959006e2347..4159fe6b899 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -17,6 +17,7 @@ from abc import ABC, abstractmethod from enum import Enum from os import environ, linesep +from socket import timeout from sys import stdout from threading import Event, RLock, Thread from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence @@ -31,6 +32,7 @@ from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.util._once import Once +from opentelemetry.util._time import time_ns _logger = logging.getLogger(__name__) @@ -53,8 +55,11 @@ class MetricExporter(ABC): @abstractmethod def export( - self, metrics: Sequence[Metric], *args, **kwargs - ) -> "MetricExportResult": + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: """Exports a batch of telemetry data. Args: @@ -65,7 +70,7 @@ def export( """ @abstractmethod - def shutdown(self, *args, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -90,14 +95,17 @@ def __init__( self.formatter = formatter def export( - self, metrics: Sequence[Metric], *args, **kwargs + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> MetricExportResult: for metric in metrics: self.out.write(self.formatter(metric)) self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self, *args, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: pass @@ -127,11 +135,16 @@ def get_metrics(self) -> List[Metric]: self._metrics = [] return metrics - def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: with self._lock: self._metrics = list(metrics) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: pass @@ -193,23 +206,28 @@ def _at_fork_reinit(self): def _ticker(self) -> None: interval_secs = self._export_interval_millis / 1e3 while not self._shutdown_event.wait(interval_secs): - self.collect() + self.collect(timeout_millis=self._export_timeout_millis) # one last collection below before shutting down completely - self.collect() + self.collect(timeout_millis=self._export_interval_millis) def _receive_metrics( - self, metrics: Iterable[Metric], *args, **kwargs + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export(metrics) + self._exporter.export(metrics, timeout_millis=timeout_millis) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + deadline_ns = time_ns() + timeout_millis * 10**6 + def _shutdown(): self._shutdown = True @@ -219,5 +237,5 @@ def _shutdown(): return self._shutdown_event.set() - self._daemon_thread.join() - self._exporter.shutdown() + self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9) + self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index afdc7083162..c8964f77e3e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -138,7 +138,7 @@ def __init__( self._instrument_class_aggregation.update(preferred_aggregation or {}) @final - def collect(self) -> None: + def collect(self, timeout_millis: float = 10_000) -> None: """Collects the metrics from the internal SDK state and invokes the `_receive_metrics` with the collection. """ @@ -148,7 +148,8 @@ def collect(self) -> None: ) return self._receive_metrics( - self._collect(self, self._instrument_class_temporality) + self._collect(self, self._instrument_class_temporality), + timeout_millis=timeout_millis, ) @final @@ -162,11 +163,16 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py new file mode 100644 index 00000000000..febc778c68d --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The purpose of this test is to test for backward compatibility with any user-implementable +interfaces as they were originally defined. For example, changes to the MetricExporter ABC must +be made in such a way that existing implementations (outside of this repo) continue to work +when *called* by the SDK. + +This does not apply to classes which are not intended to be overriden by the user e.g. Meter +and PeriodicExportingMetricReader concrete class. Those may freely be modified in a +backward-compatible way for *callers*. + +Ideally, we could use mypy for this as well, but SDK is not type checked atm. +""" + +from typing import Iterable, Sequence +from unittest import TestCase + +from opentelemetry.sdk._metrics import MeterProvider +from opentelemetry.sdk._metrics.export import ( + MetricExporter, + MetricExportResult, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.point import Metric + + +# Do not change these classes until after major version 1 +class OrigMetricExporter(MetricExporter): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + pass + + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + pass + + +class OrigMetricReader(MetricReader): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + pass + + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + self.collect() + + +class TestBackwardCompat(TestCase): + def test_metric_exporter(self): + exporter = OrigMetricExporter() + meter_provider = MeterProvider( + metric_readers=[PeriodicExportingMetricReader(exporter)] + ) + # produce some data + meter_provider.get_meter("foo").create_counter("mycounter").add(12) + meter_provider.shutdown() + + def test_metric_reader(self): + reader = OrigMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + # produce some data + meter_provider.get_meter("foo").create_counter("mycounter").add(12) + meter_provider.shutdown() diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index 48e69ea8006..78b25e732e3 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -51,7 +51,7 @@ def __init__( def _receive_metrics(self, metrics): pass - def shutdown(self): + def shutdown(self, *args, **kwargs): return True diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index ca256319354..7164a2bda0c 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -49,7 +49,7 @@ def __init__(self): def _receive_metrics(self, metrics): pass - def shutdown(self): + def shutdown(self, *args, **kwargs): return True @@ -433,12 +433,17 @@ def __init__(self): self.metrics = {} self._counter = 0 - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: self.metrics[self._counter] = metrics self._counter += 1 return MetricExportResult.SUCCESS - def shutdown(self) -> None: + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index ff67e848afe..0e7ffbd6c3d 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -13,12 +13,14 @@ # limitations under the License. import time +from typing import Sequence from unittest.mock import Mock from flaky import flaky from opentelemetry.sdk._metrics.export import ( MetricExporter, + MetricExportResult, PeriodicExportingMetricReader, ) from opentelemetry.sdk._metrics.point import Gauge, Metric, Sum @@ -33,12 +35,17 @@ def __init__(self, wait=0): self.metrics = [] self._shutdown = False - def export(self, metrics): + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: time.sleep(self.wait) self.metrics.extend(metrics) return True - def shutdown(self): + def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: self._shutdown = True From a02aba8cb6ee7d59ffef56c3a3922b9a519fb6d8 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 03:21:51 +0000 Subject: [PATCH 2/8] comments --- .../otlp/proto/grpc/_metric_exporter/__init__.py | 4 ++-- .../exporter/otlp/proto/grpc/exporter.py | 12 ++---------- .../opentelemetry/exporter/prometheus/__init__.py | 2 +- .../sdk/_metrics/_internal/__init__.py | 12 ++++++++---- .../sdk/_metrics/_internal/export/__init__.py | 9 ++++----- .../sdk/_metrics/_internal/metric_reader.py | 2 +- .../tests/metrics/test_backward_compat.py | 10 ++-------- .../tests/metrics/test_metric_reader.py | 15 ++++++++++----- opentelemetry-sdk/tests/metrics/test_metrics.py | 13 ++++++++++--- .../test_periodic_exporting_metric_reader.py | 2 +- 10 files changed, 41 insertions(+), 40 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index f3e88951653..693eb261dba 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -174,8 +174,8 @@ def export( timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - # TODO(): OTLPExporterMixin should pass timeout to gRPC + # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC return self._export(metrics) - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 255a1f0775d..b965061c5cb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -277,15 +277,7 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: logger.exception(error) return output - def _export( - self, - data: TypingSequence[SDKDataT], - timeout_millis: Optional[float] = None, - ) -> ExportResultT: - if timeout_millis is not None: - timeout_seconds = timeout_millis / 10**3 - else: - timeout_seconds = self._timeout + def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT: max_value = 64 # expo returns a generator that yields delay values which grow @@ -300,7 +292,7 @@ def _export( self._client.Export( request=self._translate_data(data), metadata=self._headers, - timeout=timeout_seconds, + timeout=self._timeout, ) return self._result.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 708aa3b680b..448a04bf293 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -120,7 +120,7 @@ def _receive_metrics( return self._collector.add_metrics_data(metrics) - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: REGISTRY.unregister(self._collector) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py index d6649d416f2..3d1e6a86e6f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py @@ -371,14 +371,18 @@ def __init__( self._shutdown = False def force_flush(self, timeout_millis: float = 10_000) -> bool: - - # FIXME implement a timeout + deadline_ns = time_ns() + timeout_millis * 10**6 for metric_reader in self._sdk_config.metric_readers: - metric_reader.collect(timeout_millis=timeout_millis) + current_ts = time_ns() + if current_ts >= deadline_ns: + raise Exception("Timed out while flushing metric readers") + metric_reader.collect( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) return True - def shutdown(self, timeout_millis: float = 10_000): + def shutdown(self, timeout_millis: float = 30_000): deadline_ns = time_ns() + timeout_millis * 10**6 def _shutdown(): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index 4159fe6b899..c1855790be4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -17,7 +17,6 @@ from abc import ABC, abstractmethod from enum import Enum from os import environ, linesep -from socket import timeout from sys import stdout from threading import Event, RLock, Thread from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence @@ -70,7 +69,7 @@ def export( """ @abstractmethod - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -105,7 +104,7 @@ def export( self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -144,7 +143,7 @@ def _receive_metrics( with self._lock: self._metrics = list(metrics) - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -225,7 +224,7 @@ def _receive_metrics( _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: deadline_ns = time_ns() + timeout_millis * 10**6 def _shutdown(): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index c8964f77e3e..d949a340478 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -172,7 +172,7 @@ def _receive_metrics( """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py index febc778c68d..daa315abd08 100644 --- a/opentelemetry-sdk/tests/metrics/test_backward_compat.py +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -40,12 +40,6 @@ # Do not change these classes until after major version 1 class OrigMetricExporter(MetricExporter): - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: - """Shuts down the exporter. - - Called when the SDK is shut down. - """ - def export( self, metrics: Sequence[Metric], @@ -54,7 +48,7 @@ def export( ) -> MetricExportResult: pass - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -67,7 +61,7 @@ def _receive_metrics( ) -> None: pass - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self.collect() diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index 78b25e732e3..1b95953cf80 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -13,12 +13,12 @@ # limitations under the License. from os import environ -from typing import Dict +from typing import Dict, Iterable from unittest import TestCase from unittest.mock import patch -from opentelemetry.sdk._metrics._internal.aggregation import Aggregation from opentelemetry.sdk._metrics.aggregation import ( + Aggregation, DefaultAggregation, LastValueAggregation, ) @@ -31,7 +31,7 @@ UpDownCounter, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader -from opentelemetry.sdk._metrics.point import AggregationTemporality +from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.sdk.environment_variables import ( _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) @@ -48,10 +48,15 @@ def __init__( preferred_aggregation=preferred_aggregation, ) - def _receive_metrics(self, metrics): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: pass - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: return True diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 7164a2bda0c..3f527ff7320 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -19,6 +19,8 @@ from unittest import TestCase from unittest.mock import MagicMock, Mock, patch +from pyparsing import Iterable + from opentelemetry._metrics import NoOpMeter from opentelemetry.sdk._metrics import Meter, MeterProvider from opentelemetry.sdk._metrics.aggregation import SumAggregation @@ -46,10 +48,15 @@ class DummyMetricReader(MetricReader): def __init__(self): super().__init__() - def _receive_metrics(self, metrics): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: pass - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: return True @@ -443,7 +450,7 @@ def export( self._counter += 1 return MetricExportResult.SUCCESS - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 0e7ffbd6c3d..d2ad8cdccc3 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -45,7 +45,7 @@ def export( self.metrics.extend(metrics) return True - def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._shutdown = True From bb84fadb150b74549b45d61ee99baa44d32a08f2 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 03:30:22 +0000 Subject: [PATCH 3/8] don't use TimeoutError as it is intended for OS related timeouts --- .../src/opentelemetry/sdk/_metrics/_internal/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py index 3d1e6a86e6f..6113fff1795 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py @@ -400,7 +400,7 @@ def _shutdown(): current_ts = time_ns() try: if current_ts >= deadline_ns: - raise TimeoutError( + raise Exception( "Didn't get to execute, deadline already exceeded" ) metric_reader.shutdown( From b4f48221502ea3b0d0ec0d048587e29ecaa36f60 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 03:36:24 +0000 Subject: [PATCH 4/8] changelog and typo --- CHANGELOG.md | 4 ++++ opentelemetry-sdk/tests/metrics/test_metrics.py | 4 +--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c98bb873ce5..f3ffc3fcb0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD) +- Add timeouts to metric SDK + ([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653)) +- Add variadic arguments to metric exporter/reader interfaces + ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) - Add variadic arguments to metric exporter/reader interfaces ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) - Move Metrics API behind internal package diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3f527ff7320..0b3cf36ce65 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -15,12 +15,10 @@ from logging import WARNING from time import sleep -from typing import Sequence +from typing import Sequence, Iterable from unittest import TestCase from unittest.mock import MagicMock, Mock, patch -from pyparsing import Iterable - from opentelemetry._metrics import NoOpMeter from opentelemetry.sdk._metrics import Meter, MeterProvider from opentelemetry.sdk._metrics.aggregation import SumAggregation From 10f9acf3b5c9984c6714c0c101d6760d661f0737 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 03:36:52 +0000 Subject: [PATCH 5/8] isort --- opentelemetry-sdk/tests/metrics/test_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 0b3cf36ce65..3b571638016 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -15,7 +15,7 @@ from logging import WARNING from time import sleep -from typing import Sequence, Iterable +from typing import Iterable, Sequence from unittest import TestCase from unittest.mock import MagicMock, Mock, patch From aa6e4a15c53d8562705bde5d41fabfbdb0b27439 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 03:42:56 +0000 Subject: [PATCH 6/8] fix _time_ns import --- .../opentelemetry/sdk/_metrics/_internal/__init__.py | 10 +++++----- .../sdk/_metrics/_internal/export/__init__.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py index 6113fff1795..eebcb0b13cf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py @@ -48,7 +48,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util._once import Once -from opentelemetry.util._time import time_ns +from opentelemetry.util._time import _time_ns _logger = getLogger(__name__) @@ -371,10 +371,10 @@ def __init__( self._shutdown = False def force_flush(self, timeout_millis: float = 10_000) -> bool: - deadline_ns = time_ns() + timeout_millis * 10**6 + deadline_ns = _time_ns() + timeout_millis * 10**6 for metric_reader in self._sdk_config.metric_readers: - current_ts = time_ns() + current_ts = _time_ns() if current_ts >= deadline_ns: raise Exception("Timed out while flushing metric readers") metric_reader.collect( @@ -383,7 +383,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True def shutdown(self, timeout_millis: float = 30_000): - deadline_ns = time_ns() + timeout_millis * 10**6 + deadline_ns = _time_ns() + timeout_millis * 10**6 def _shutdown(): self._shutdown = True @@ -397,7 +397,7 @@ def _shutdown(): metric_reader_error = {} for metric_reader in self._sdk_config.metric_readers: - current_ts = time_ns() + current_ts = _time_ns() try: if current_ts >= deadline_ns: raise Exception( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index c1855790be4..c8dcb43327e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -31,7 +31,7 @@ from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.util._once import Once -from opentelemetry.util._time import time_ns +from opentelemetry.util._time import _time_ns _logger = logging.getLogger(__name__) @@ -225,7 +225,7 @@ def _receive_metrics( detach(token) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - deadline_ns = time_ns() + timeout_millis * 10**6 + deadline_ns = _time_ns() + timeout_millis * 10**6 def _shutdown(): self._shutdown = True @@ -236,5 +236,5 @@ def _shutdown(): return self._shutdown_event.set() - self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9) - self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6) + self._daemon_thread.join(timeout=(deadline_ns - _time_ns()) / 10**9) + self._exporter.shutdown(timeout=(deadline_ns - _time_ns()) / 10**6) From f5339d0bc264a01da541dde63eed49958062d30d Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 11:22:36 -0400 Subject: [PATCH 7/8] Update CHANGELOG.md Co-authored-by: Srikanth Chekuri --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f3ffc3fcb0e..1614019b288 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653)) - Add variadic arguments to metric exporter/reader interfaces ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) -- Add variadic arguments to metric exporter/reader interfaces - ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) - Move Metrics API behind internal package ([#2651](https://github.com/open-telemetry/opentelemetry-python/pull/2651)) From 49084fca362f5f5419a519ca533c76fa42dd8ece Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 15:24:09 +0000 Subject: [PATCH 8/8] use self.fail in tests --- .../tests/metrics/test_backward_compat.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py index daa315abd08..4b3b504d65d 100644 --- a/opentelemetry-sdk/tests/metrics/test_backward_compat.py +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -73,11 +73,17 @@ def test_metric_exporter(self): ) # produce some data meter_provider.get_meter("foo").create_counter("mycounter").add(12) - meter_provider.shutdown() + try: + meter_provider.shutdown() + except Exception: + self.fail() def test_metric_reader(self): reader = OrigMetricReader() meter_provider = MeterProvider(metric_readers=[reader]) # produce some data meter_provider.get_meter("foo").create_counter("mycounter").add(12) - meter_provider.shutdown() + try: + meter_provider.shutdown() + except Exception: + self.fail()