Skip to content

Commit

Permalink
[grpc exporter] Handle backoff 1.0 and 2.0
Browse files Browse the repository at this point in the history
Fixes #2829.
  • Loading branch information
Yamakaky committed Sep 1, 2022
1 parent 0db9d19 commit 3ebf0c6
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2870](https://github.com/open-telemetry/opentelemetry-python/pull/2870))
- Fix: Remove `LogEmitter.flush()` to align with OTel Log spec
([#2863](https://github.com/open-telemetry/opentelemetry-python/pull/2863))
- Fix: Handle `backoff` dependency version 1.0 and 2.0
([#2915](https://github.com/open-telemetry/opentelemetry-python/pull/2915))

## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0) - 2022-08-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.10",
]
dependencies = [
"backoff >= 1.10.0, < 2.0.0; python_version<'3.7'",
"backoff >= 1.10.0, < 3.0.0; python_version>='3.7'",
"backoff >= 1.10.0, < 3.0.0",
"googleapis-common-protos ~= 1.52",
"grpcio >= 1.0.0, < 2.0.0",
"opentelemetry-api ~= 1.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
from typing import TypeVar
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan
import sys

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -282,81 +283,58 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
try:
self._export_backoff(data)
except RpcError:
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE

try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
@backoff.on_exception( backoff.expo, RpcError, max_time=120, on_backoff=self._on_backoff)
def _export_backoff(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS
return self._result.SUCCESS

except RpcError as error:
except RpcError as error:

if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:

retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)
raise
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE
if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE
return self._result.FAILURE

def shutdown(self) -> None:
pass
Expand All @@ -369,3 +347,16 @@ def _exporting(self) -> str:
warning messages.
"""
pass

def _on_backoff(self, details):
_, error, _ = sys.exc_info().code()
assert isinstance(error, RpcError)
logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
details["wait"],
)

0 comments on commit 3ebf0c6

Please sign in to comment.