Skip to content

Commit

Permalink
Detect and adapt to backoff package version
Browse files Browse the repository at this point in the history
Since backoff==2.0.0, the API of the expo function has changed. The
"porcelain" methods of the backoff library (the decorators
backoff.on_exception and backoff.on_predicate) send a "None" value into
the generator and discard the first yielded value. This is for
compatibility with the more general wait generator API inside the
backoff package.

This commit allows the OTLP exporters to automatically detect the
behavior of the installed backoff package and adapt accordingly.
  • Loading branch information
nickstenning committed Oct 29, 2022
1 parent 35ba257 commit 66a92f4
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 15 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976))
- [exporter/opentelemetry-exporter-otlp-proto-http] Add OTLPMetricExporter
([#2891](https://github.com/open-telemetry/opentelemetry-python/pull/2891))
- Fix a bug with exporter retries for with newer versions of the backoff library
([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980))

## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26

Expand Down Expand Up @@ -90,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2726](https://github.com/open-telemetry/opentelemetry-python/pull/2726))
- fix: frozenset object has no attribute items
([#2727](https://github.com/open-telemetry/opentelemetry-python/pull/2727))
- fix: create suppress HTTP instrumentation key in opentelemetry context
- fix: create suppress HTTP instrumentation key in opentelemetry context
([#2729](https://github.com/open-telemetry/opentelemetry-python/pull/2729))
- Support logs SDK auto instrumentation enable/disable with env
([#2728](https://github.com/open-telemetry/opentelemetry-python/pull/2728))
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ protobuf~=3.18.1
markupsafe==2.0.1
bleach==4.1.0 # This dependency was updated to a breaking version.
codespell==2.1.0
responses==0.22.0
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -183,6 +183,21 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
if _is_backoff_v2:
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen
else:
return backoff.expo(*args, **kwargs)


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -296,7 +311,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):
for delay in _expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,23 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -450,7 +466,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -53,6 +53,20 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
if _is_backoff_v2:
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen
else:
return backoff.expo(*args, **kwargs)


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -122,7 +136,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
Expand Down Expand Up @@ -54,6 +54,20 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
if _is_backoff_v2:
gen = backoff.expo(*args, **kwargs)
gen.send(None)
return gen
else:
return backoff.expo(*args, **kwargs)


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -133,7 +147,7 @@ def export(self, spans) -> SpanExportResult:

serialized_data = _ProtobufEncoder.serialize(spans)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from unittest.mock import MagicMock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
Expand Down Expand Up @@ -159,6 +160,31 @@ def test_serialize(self):
expected_encoding.SerializeToString(),
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://logs.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPLogExporter(endpoint="http://logs.example.com/export")
logs = self._get_sdk_log_data()

exporter.export(logs)
mock_sleep.assert_called_once_with(1)

@staticmethod
def _get_sdk_log_data() -> List[LogData]:
log1 = LogData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

import unittest
from unittest.mock import patch
from collections import OrderedDict
from unittest.mock import Mock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
Expand All @@ -37,6 +39,7 @@
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
)
from opentelemetry.sdk.trace import _Span

OS_ENV_ENDPOINT = "os.env.base"
OS_ENV_CERTIFICATE = "os/env/base.crt"
Expand Down Expand Up @@ -188,3 +191,37 @@ def test_headers_parse_from_env(self):
cm.records[0].message,
"Header doesn't match the format: missingValue.",
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://traces.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPSpanExporter(endpoint="http://traces.example.com/export")
span = _Span(
"abc",
context=Mock(
**{
"trace_state": OrderedDict([("a", "b"), ("c", "d")]),
"span_id": 10217189687419569865,
"trace_id": 67545097771067222548457157018666467027,
}
)
)

exporter.export([span])
mock_sleep.assert_called_once_with(1)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ deps =
opentelemetry: pytest
opentelemetry: pytest-benchmark
opentelemetry: flaky
opentelemetry-exporter-otlp-proto-http: responses
coverage: pytest
coverage: pytest-cov
mypy,mypyinstalled: mypy
Expand Down

0 comments on commit 66a92f4

Please sign in to comment.