Skip to content

Commit

Permalink
Detect and adapt to backoff package version
Browse files Browse the repository at this point in the history
Since backoff==2.0.0, the API of the expo function has changed. The
"porcelain" methods of the backoff library (the decorators
backoff.on_exception and backoff.on_predicate) send a "None" value into
the generator and discard the first yielded value. This is for
compatibility with the more general wait generator API inside the
backoff package.

This commit allows the OTLP exporters to automatically detect the
behavior of the installed backoff package and adapt accordingly.
  • Loading branch information
nickstenning committed Oct 20, 2022
1 parent 321f90f commit 353fbcb
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2959](https://github.com/open-telemetry/opentelemetry-python/pull/2959))
- Add http-metric instrument names to semantic conventions
([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976))
- Fix a bug with exporter retries for with newer versions of the backoff library
([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980))

## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -183,6 +183,20 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
if next(backoff.expo()) is None:

def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen

else:
_expo = backoff.expo


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -296,7 +310,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):
for delay in _expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -450,7 +450,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -53,6 +53,19 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
if next(backoff.expo()) is None:

def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen

else:
_expo = backoff.expo


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -122,7 +135,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
Expand Down Expand Up @@ -54,6 +54,19 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
if next(backoff.expo()) is None:

def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen

else:
_expo = backoff.expo


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -133,7 +146,7 @@ def export(self, spans) -> SpanExportResult:

serialized_data = _ProtobufEncoder.serialize(spans)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down

0 comments on commit 353fbcb

Please sign in to comment.