Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: detect and adapt to backoff package version #2980

Merged
merged 3 commits into from
Oct 31, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Detect and adapt to backoff package version
Since backoff==2.0.0, the API of the expo function has changed. The
"porcelain" methods of the backoff library (the decorators
backoff.on_exception and backoff.on_predicate) send a "None" value into
the generator and discard the first yielded value. This is for
compatibility with the more general wait generator API inside the
backoff package.

This commit allows the OTLP exporters to automatically detect the
behavior of the installed backoff package and adapt accordingly.
nickstenning committed Oct 30, 2022

Verified

This commit was signed with the committer’s verified signature.
nickstenning Nick Stenning
commit ee78c957f1bb9f16ae7b88888ec4539e8d2f7478
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976))
- [exporter/opentelemetry-exporter-otlp-proto-http] Add OTLPMetricExporter
([#2891](https://github.com/open-telemetry/opentelemetry-python/pull/2891))
- Fix a bug with exporter retries for with newer versions of the backoff library
([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980))

## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26

@@ -90,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2726](https://github.com/open-telemetry/opentelemetry-python/pull/2726))
- fix: frozenset object has no attribute items
([#2727](https://github.com/open-telemetry/opentelemetry-python/pull/2727))
- fix: create suppress HTTP instrumentation key in opentelemetry context
- fix: create suppress HTTP instrumentation key in opentelemetry context
([#2729](https://github.com/open-telemetry/opentelemetry-python/pull/2729))
- Support logs SDK auto instrumentation enable/disable with env
([#2728](https://github.com/open-telemetry/opentelemetry-python/pull/2728))
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
@@ -183,6 +183,19 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
@@ -296,7 +309,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):
for delay in _expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE
Original file line number Diff line number Diff line change
@@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

@@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Original file line number Diff line number Diff line change
@@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

@@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})
Original file line number Diff line number Diff line change
@@ -436,7 +436,23 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

@@ -450,7 +466,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Original file line number Diff line number Diff line change
@@ -34,7 +34,9 @@ dependencies = [
]

[project.optional-dependencies]
test = []
test = [
"responses == 0.22.0",
]

[project.entry-points.opentelemetry_traces_exporter]
otlp_proto_http = "opentelemetry.exporter.otlp.proto.http.trace_exporter:OTLPSpanExporter"
Original file line number Diff line number Diff line change
@@ -20,8 +20,8 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
@@ -53,6 +53,18 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPLogExporter(LogExporter):

@@ -122,7 +134,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Original file line number Diff line number Diff line change
@@ -20,8 +20,8 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
@@ -54,6 +54,18 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPSpanExporter(SpanExporter):

@@ -133,7 +145,7 @@ def export(self, spans) -> SpanExportResult:

serialized_data = _ProtobufEncoder.serialize(spans)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
from unittest.mock import MagicMock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
@@ -159,6 +160,31 @@ def test_serialize(self):
expected_encoding.SerializeToString(),
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://logs.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPLogExporter(endpoint="http://logs.example.com/export")
logs = self._get_sdk_log_data()

exporter.export(logs)
mock_sleep.assert_called_once_with(1)

@staticmethod
def _get_sdk_log_data() -> List[LogData]:
log1 = LogData(
Original file line number Diff line number Diff line change
@@ -13,9 +13,11 @@
# limitations under the License.

import unittest
from unittest.mock import patch
from collections import OrderedDict
from unittest.mock import Mock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
@@ -37,6 +39,7 @@
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
)
from opentelemetry.sdk.trace import _Span

OS_ENV_ENDPOINT = "os.env.base"
OS_ENV_CERTIFICATE = "os/env/base.crt"
@@ -188,3 +191,40 @@ def test_headers_parse_from_env(self):
cm.records[0].message,
"Header doesn't match the format: missingValue.",
)

# pylint: disable=no-self-use
@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://traces.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPSpanExporter(
endpoint="http://traces.example.com/export"
)
span = _Span(
"abc",
context=Mock(
**{
"trace_state": OrderedDict([("a", "b"), ("c", "d")]),
"span_id": 10217189687419569865,
"trace_id": 67545097771067222548457157018666467027,
}
),
)

exporter.export([span])
mock_sleep.assert_called_once_with(1)
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ commands_pre =
exporter-otlp-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc

exporter-otlp-proto-http: pip install {toxinidir}/opentelemetry-proto
exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http
exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http[test]

exporter-jaeger-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-jaeger-proto-grpc {toxinidir}/exporter/opentelemetry-exporter-jaeger-thrift {toxinidir}/exporter/opentelemetry-exporter-jaeger
exporter-jaeger-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-jaeger-proto-grpc