Skip to content

Commit

Permalink
Fix otlp exporter error handling misusing backoff.expo
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Oct 23, 2022
1 parent c0e8f40 commit e4b63ef
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
from collections.abc import Sequence
from os import environ
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Tuple,
Union,
Iterator,
)
from typing import Sequence as TypingSequence
from typing import TypeVar
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -68,6 +78,12 @@
}


def _expo(*, max_value: int) -> Iterator[int]:
gen = backoff.expo()
gen.send(None)
return gen


class InvalidCompressionValueException(Exception):
def __init__(self, environ_key: str, environ_value: str):
super().__init__(
Expand Down Expand Up @@ -293,11 +309,10 @@ def _export(
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# _expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):

for delay in _expo(max_value=max_value):
if delay == max_value:
return self._result.FAILURE

Expand All @@ -311,7 +326,6 @@ def _export(
return self._result.SUCCESS

except RpcError as error:

if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,19 +436,19 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

mock_expo.configure_mock(**{"return_value": [1]})
def test_unavailable(self, mock_sleep):

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.span]), SpanExportResult.FAILURE
)
mock_sleep.assert_called_with(1)
self.assertEqual(
mock_sleep.call_args_list,
[((1,),), ((2,),), ((4,),), ((8,),), ((16,),), ((32,),)],
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
Expand Down

0 comments on commit e4b63ef

Please sign in to comment.