Skip to content

Commit

Permalink
Factor out duplicate backoff code (#3396)
Browse files Browse the repository at this point in the history
Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
pmcollins and ocelotl authored Aug 30, 2023
1 parent 0fa0ce5 commit 9121382
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ classifiers = [
]
dependencies = [
"opentelemetry-proto == 1.20.0.dev",
"backoff >= 1.10.0, < 2.0.0; python_version<'3.7'",
"backoff >= 1.10.0, < 3.0.0; python_version>='3.7'",
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from collections.abc import Sequence
from typing import Any, Mapping, Optional, List, Callable, TypeVar, Dict

import backoff

from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.proto.common.v1.common_pb2 import (
InstrumentationScope as PB2InstrumentationScope,
Expand Down Expand Up @@ -130,3 +132,16 @@ def _get_resource_data(
)
)
return resource_data


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _create_exp_backoff_generator(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
_create_exp_backoff_generator,
)
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -137,19 +137,6 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -266,7 +253,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _expo(max_value=max_value):
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
return self._result.FAILURE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ def test_otlp_headers_from_env(self):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -306,7 +308,9 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
def test_export_warning(self, mock_expo):
mock_expo.configure_mock(**{"return_value": [0]})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
mock_method.reset_mock()

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
@patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"})
def test_otlp_exporter_otlp_compression_envvar(
Expand Down Expand Up @@ -405,7 +407,9 @@ def test_otlp_exporter_otlp_compression_unspecified(
"localhost:4317", compression=Compression.NoCompression
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -420,7 +424,9 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_key_value,
_is_backoff_v2,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Expand Down Expand Up @@ -460,7 +460,7 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand All @@ -477,7 +477,9 @@ def generate_delays(*args, **kwargs):
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -486,12 +488,13 @@ def test_unavailable(self, mock_sleep, mock_expo):
add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.span]), SpanExportResult.FAILURE
)
result = self.exporter.export([self.span])
self.assertEqual(result, SpanExportResult.FAILURE)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -56,18 +58,6 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -147,7 +137,9 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = encode_logs(batch).SerializeToString()

for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
OTLPMetricExporterMixin,
Expand Down Expand Up @@ -73,7 +74,6 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_env_headers

import backoff
import requests
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
Expand All @@ -87,18 +87,6 @@
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin):

Expand Down Expand Up @@ -181,7 +169,9 @@ def export(
**kwargs,
) -> MetricExportResult:
serialized_data = encode_metrics(metrics_data)
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
encode_spans,
)
Expand Down Expand Up @@ -54,18 +56,6 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -145,7 +135,9 @@ def export(self, spans) -> SpanExportResult:

serialized_data = encode_spans(spans).SerializeToString()

for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from requests.models import Response
from responses import POST, activate, add

from opentelemetry.exporter.otlp.proto.common._internal import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
Expand All @@ -31,7 +32,6 @@
DEFAULT_METRICS_EXPORT_PATH,
DEFAULT_TIMEOUT,
OTLPMetricExporter,
_is_backoff_v2,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -298,7 +298,7 @@ def test_serialization(self, mock_post):
)

@activate
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import responses

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
DEFAULT_COMPRESSION,
DEFAULT_ENDPOINT,
DEFAULT_LOGS_EXPORT_PATH,
DEFAULT_TIMEOUT,
OTLPLogExporter,
_is_backoff_v2,
)
from opentelemetry.exporter.otlp.proto.http.version import __version__
from opentelemetry.sdk._logs import LogData
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_exporter_env(self):
self.assertIsInstance(exporter._session, requests.Session)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import requests
import responses

from opentelemetry.exporter.otlp.proto.common._internal import _is_backoff_v2
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
DEFAULT_COMPRESSION,
DEFAULT_ENDPOINT,
DEFAULT_TIMEOUT,
DEFAULT_TRACES_EXPORT_PATH,
OTLPSpanExporter,
_is_backoff_v2,
)
from opentelemetry.exporter.otlp.proto.http.version import __version__
from opentelemetry.sdk.environment_variables import (
Expand Down Expand Up @@ -204,7 +204,7 @@ def test_headers_parse_from_env(self):

# pylint: disable=no-self-use
@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.common._internal.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
Expand Down

0 comments on commit 9121382

Please sign in to comment.