From ee78c957f1bb9f16ae7b88888ec4539e8d2f7478 Mon Sep 17 00:00:00 2001 From: Nick Stenning Date: Wed, 19 Oct 2022 00:10:27 +0200 Subject: [PATCH 1/2] Detect and adapt to backoff package version Since backoff==2.0.0, the API of the expo function has changed. The "porcelain" methods of the backoff library (the decorators backoff.on_exception and backoff.on_predicate) send a "None" value into the generator and discard the first yielded value. This is for compatibility with the more general wait generator API inside the backoff package. This commit allows the OTLP exporters to automatically detect the behavior of the installed backoff package and adapt accordingly. --- CHANGELOG.md | 4 +- .../exporter/otlp/proto/grpc/exporter.py | 17 +++++++- .../tests/logs/test_otlp_logs_exporter.py | 4 +- .../metrics/test_otlp_metrics_exporter.py | 4 +- .../tests/test_otlp_exporter_mixin.py | 2 +- .../tests/test_otlp_trace_exporter.py | 20 ++++++++- .../pyproject.toml | 4 +- .../otlp/proto/http/_log_exporter/__init__.py | 16 ++++++- .../proto/http/trace_exporter/__init__.py | 16 ++++++- .../tests/test_proto_log_exporter.py | 26 ++++++++++++ .../tests/test_proto_span_exporter.py | 42 ++++++++++++++++++- tox.ini | 2 +- 12 files changed, 140 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2358664bdf4..bfbbe3d2ca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976)) - [exporter/opentelemetry-exporter-otlp-proto-http] Add OTLPMetricExporter ([#2891](https://github.com/open-telemetry/opentelemetry-python/pull/2891)) +- Fix a bug with exporter retries for with newer versions of the backoff library + ([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980)) ## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26 @@ -90,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2726](https://github.com/open-telemetry/opentelemetry-python/pull/2726)) - fix: frozenset object has no attribute items ([#2727](https://github.com/open-telemetry/opentelemetry-python/pull/2727)) -- fix: create suppress HTTP instrumentation key in opentelemetry context +- fix: create suppress HTTP instrumentation key in opentelemetry context ([#2729](https://github.com/open-telemetry/opentelemetry-python/pull/2729)) - Support logs SDK auto instrumentation enable/disable with env ([#2728](https://github.com/open-telemetry/opentelemetry-python/pull/2728)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 4405bcad68b..aef66b79dec 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -25,7 +25,7 @@ from urllib.parse import urlparse from opentelemetry.sdk.trace import ReadableSpan -from backoff import expo +import backoff from google.rpc.error_details_pb2 import RetryInfo from grpc import ( ChannelCredentials, @@ -183,6 +183,19 @@ def _get_credentials(creds, environ_key): return ssl_channel_credentials() +# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff +# wait generator API requires a first .send(None) before reading the backoff +# values from the generator. +_is_backoff_v2 = next(backoff.expo()) is None + + +def _expo(*args, **kwargs): + gen = backoff.expo(*args, **kwargs) + if _is_backoff_v2: + gen.send(None) + return gen + + # pylint: disable=no-member class OTLPExporterMixin( ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT] @@ -296,7 +309,7 @@ def _export( # expo returns a generator that yields delay values which grow # exponentially. Once delay is greater than max_value, the yielded # value will remain constant. - for delay in expo(max_value=max_value): + for delay in _expo(max_value=max_value): if delay == max_value: return self._result.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index a9c63eaa0a0..07616090837 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): ) mock_method.reset_mock() - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable(self, mock_sleep, mock_expo): @@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo): ) mock_sleep.assert_called_with(1) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable_delay(self, mock_sleep, mock_expo): diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py index 262e26ed637..0e3e26b7476 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py @@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): ) mock_method.reset_mock() - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable(self, mock_sleep, mock_expo): @@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo): ) mock_sleep.assert_called_with(1) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable_delay(self, mock_sleep, mock_expo): diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 3f44ef228ee..81a874af705 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -56,7 +56,7 @@ def test_environ_to_compression(self): with self.assertRaises(InvalidCompressionValueException): environ_to_compression("test_invalid") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") def test_export_warning(self, mock_expo): mock_expo.configure_mock(**{"return_value": [0]}) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index a5cb4e699a6..cfb286edee0 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -436,7 +436,23 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure): # pylint: disable=protected-access self.assertIsNone(exporter._headers, None) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff): + # In backoff ~= 2.0.0 the first value yielded from expo is None. + def generate_delays(*args, **kwargs): + yield None + yield 1 + + mock_backoff.expo.configure_mock(**{"side_effect": generate_delays}) + + add_TraceServiceServicer_to_server( + TraceServiceServicerUNAVAILABLE(), self.server + ) + self.exporter.export([self.span]) + mock_sleep.assert_called_once_with(1) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable(self, mock_sleep, mock_expo): @@ -450,7 +466,7 @@ def test_unavailable(self, mock_sleep, mock_expo): ) mock_sleep.assert_called_with(1) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") def test_unavailable_delay(self, mock_sleep, mock_expo): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml index edfc307c174..a89e7fa1884 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml @@ -34,7 +34,9 @@ dependencies = [ ] [project.optional-dependencies] -test = [] +test = [ + "responses == 0.22.0", +] [project.entry-points.opentelemetry_traces_exporter] otlp_proto_http = "opentelemetry.exporter.otlp.proto.http.trace_exporter:OTLPSpanExporter" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index a74f849f406..580e084f4f9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -20,8 +20,8 @@ from typing import Dict, Optional, Sequence from time import sleep +import backoff import requests -from backoff import expo from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, @@ -53,6 +53,18 @@ DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds +# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff +# wait generator API requires a first .send(None) before reading the backoff +# values from the generator. +_is_backoff_v2 = next(backoff.expo()) is None + + +def _expo(*args, **kwargs): + gen = backoff.expo(*args, **kwargs) + if _is_backoff_v2: + gen.send(None) + return gen + class OTLPLogExporter(LogExporter): @@ -122,7 +134,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: serialized_data = _ProtobufEncoder.serialize(batch) - for delay in expo(max_value=self._MAX_RETRY_TIMEOUT): + for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): if delay == self._MAX_RETRY_TIMEOUT: return LogExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index a65bc44320f..9dc3b65014d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -20,8 +20,8 @@ from typing import Dict, Optional from time import sleep +import backoff import requests -from backoff import expo from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, @@ -54,6 +54,18 @@ DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds +# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff +# wait generator API requires a first .send(None) before reading the backoff +# values from the generator. +_is_backoff_v2 = next(backoff.expo()) is None + + +def _expo(*args, **kwargs): + gen = backoff.expo(*args, **kwargs) + if _is_backoff_v2: + gen.send(None) + return gen + class OTLPSpanExporter(SpanExporter): @@ -133,7 +145,7 @@ def export(self, spans) -> SpanExportResult: serialized_data = _ProtobufEncoder.serialize(spans) - for delay in expo(max_value=self._MAX_RETRY_TIMEOUT): + for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): if delay == self._MAX_RETRY_TIMEOUT: return SpanExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 2063820b5d2..d94ddfb8f7c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -19,6 +19,7 @@ from unittest.mock import MagicMock, patch import requests +import responses from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http._log_exporter import ( @@ -159,6 +160,31 @@ def test_serialize(self): expected_encoding.SerializeToString(), ) + @responses.activate + @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff") + @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") + def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff): + # In backoff ~= 2.0.0 the first value yielded from expo is None. + def generate_delays(*args, **kwargs): + yield None + yield 1 + + mock_backoff.expo.configure_mock(**{"side_effect": generate_delays}) + + # return a retryable error + responses.add( + responses.POST, + "http://logs.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + + exporter = OTLPLogExporter(endpoint="http://logs.example.com/export") + logs = self._get_sdk_log_data() + + exporter.export(logs) + mock_sleep.assert_called_once_with(1) + @staticmethod def _get_sdk_log_data() -> List[LogData]: log1 = LogData( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 73e54e86c0b..c7ff9a15dde 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -13,9 +13,11 @@ # limitations under the License. import unittest -from unittest.mock import patch +from collections import OrderedDict +from unittest.mock import Mock, patch import requests +import responses from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -37,6 +39,7 @@ OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, ) +from opentelemetry.sdk.trace import _Span OS_ENV_ENDPOINT = "os.env.base" OS_ENV_CERTIFICATE = "os/env/base.crt" @@ -188,3 +191,40 @@ def test_headers_parse_from_env(self): cm.records[0].message, "Header doesn't match the format: missingValue.", ) + + # pylint: disable=no-self-use + @responses.activate + @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.backoff") + @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep") + def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff): + # In backoff ~= 2.0.0 the first value yielded from expo is None. + def generate_delays(*args, **kwargs): + yield None + yield 1 + + mock_backoff.expo.configure_mock(**{"side_effect": generate_delays}) + + # return a retryable error + responses.add( + responses.POST, + "http://traces.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + + exporter = OTLPSpanExporter( + endpoint="http://traces.example.com/export" + ) + span = _Span( + "abc", + context=Mock( + **{ + "trace_state": OrderedDict([("a", "b"), ("c", "d")]), + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + exporter.export([span]) + mock_sleep.assert_called_once_with(1) diff --git a/tox.ini b/tox.ini index 1a24186fbd7..2248ab1e427 100644 --- a/tox.ini +++ b/tox.ini @@ -137,7 +137,7 @@ commands_pre = exporter-otlp-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc exporter-otlp-proto-http: pip install {toxinidir}/opentelemetry-proto - exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http + exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http[test] exporter-jaeger-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-jaeger-proto-grpc {toxinidir}/exporter/opentelemetry-exporter-jaeger-thrift {toxinidir}/exporter/opentelemetry-exporter-jaeger exporter-jaeger-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-jaeger-proto-grpc From 64b6dba1bbf681a31386a2b274ba663f1bacb07f Mon Sep 17 00:00:00 2001 From: Nick Stenning Date: Sun, 30 Oct 2022 10:22:24 +0100 Subject: [PATCH 2/2] Detect and backoff version in metrics exporter Since backoff==2.0.0, the API of the expo function has changed. The "porcelain" methods of the backoff library (the decorators backoff.on_exception and backoff.on_predicate) send a "None" value into the generator and discard the first yielded value. This is for compatibility with the more general wait generator API inside the backoff package. This commit allows the HTTP OTLP metrics exporter to automatically detect the behavior of the installed backoff package and adapt accordingly. --- .../proto/http/metric_exporter/__init__.py | 16 +++++++++-- .../metrics/test_otlp_metrics_exporter.py | 28 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 1423c31238f..c83ca00e06c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -66,8 +66,8 @@ from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.util.re import parse_headers +import backoff import requests -from backoff import expo _logger = logging.getLogger(__name__) @@ -77,6 +77,18 @@ DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds +# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff +# wait generator API requires a first .send(None) before reading the backoff +# values from the generator. +_is_backoff_v2 = next(backoff.expo()) is None + + +def _expo(*args, **kwargs): + gen = backoff.expo(*args, **kwargs) + if _is_backoff_v2: + gen.send(None) + return gen + class OTLPMetricExporter(MetricExporter): @@ -319,7 +331,7 @@ def export( **kwargs, ) -> MetricExportResult: serialized_data = self._translate_data(metrics_data) - for delay in expo(max_value=self._MAX_RETRY_TIMEOUT): + for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): if delay == self._MAX_RETRY_TIMEOUT: return MetricExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index e4665a47fc3..74ba53b3c59 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -16,6 +16,7 @@ from unittest.mock import patch import requests +import responses from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( @@ -269,3 +270,30 @@ def test_serialization(self, mock_post): verify=exporter._certificate_file, timeout=exporter._timeout, ) + + @responses.activate + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff") + @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") + def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff): + # In backoff ~= 2.0.0 the first value yielded from expo is None. + def generate_delays(*args, **kwargs): + yield None + yield 1 + + mock_backoff.expo.configure_mock(**{"side_effect": generate_delays}) + + # return a retryable error + responses.add( + responses.POST, + "http://metrics.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + + exporter = OTLPMetricExporter( + endpoint="http://metrics.example.com/export" + ) + metrics_data = self.metrics["sum_int"] + + exporter.export(metrics_data) + mock_sleep.assert_called_once_with(1)