diff --git a/CHANGELOG.md b/CHANGELOG.md index a4af2ef82ec..ee3a8df122d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2870](https://github.com/open-telemetry/opentelemetry-python/pull/2870)) - Fix: Remove `LogEmitter.flush()` to align with OTel Log spec ([#2863](https://github.com/open-telemetry/opentelemetry-python/pull/2863)) +- Fix: Handle `backoff` dependency version 1.0 and 2.0 + ([#2915](https://github.com/open-telemetry/opentelemetry-python/pull/2915)) ## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0) - 2022-08-08 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml index 2e05816e8d6..0aab086cdca 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml @@ -24,8 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", ] dependencies = [ - "backoff >= 1.10.0, < 2.0.0; python_version<'3.7'", - "backoff >= 1.10.0, < 3.0.0; python_version>='3.7'", + "backoff >= 1.10.0, < 3.0.0", "googleapis-common-protos ~= 1.52", "grpcio >= 1.0.0, < 2.0.0", "opentelemetry-api ~= 1.3", diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 4405bcad68b..6a1322becb1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -24,8 +24,9 @@ from typing import TypeVar from urllib.parse import urlparse from opentelemetry.sdk.trace import ReadableSpan +import sys -from backoff import expo +import backoff from google.rpc.error_details_pb2 import RetryInfo from grpc import ( ChannelCredentials, @@ -282,81 +283,58 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: def _export( self, data: Union[TypingSequence[ReadableSpan], MetricsData] ) -> ExportResultT: + try: + self._export_backoff(data) + except RpcError: + return self._result.FAILURE - # FIXME remove this check if the export type for traces - # gets updated to a class that represents the proto - # TracesData and use the code below instead. - # logger.warning( - # "Transient error %s encountered while exporting %s, retrying in %ss.", - # error.code(), - # data.__class__.__name__, - # delay, - # ) - max_value = 64 - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in expo(max_value=max_value): - - if delay == max_value: - return self._result.FAILURE - - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) + @backoff.on_exception( backoff.expo, RpcError, max_time=120, on_backoff=self._on_backoff) + def _export_backoff( + self, data: Union[TypingSequence[ReadableSpan], MetricsData] + ) -> ExportResultT: + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=self._timeout, + ) - return self._result.SUCCESS + return self._result.SUCCESS - except RpcError as error: + except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: + if error.code() in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s, retrying in %ss." - ), - error.code(), - self._exporting, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s, error code: %s", - self._exporting, - error.code(), + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 ) + raise + else: + logger.error( + "Failed to export %s, error code: %s", + self._exporting, + error.code(), + ) - if error.code() == StatusCode.OK: - return self._result.SUCCESS - - return self._result.FAILURE + if error.code() == StatusCode.OK: + return self._result.SUCCESS - return self._result.FAILURE + return self._result.FAILURE def shutdown(self) -> None: pass @@ -369,3 +347,16 @@ def _exporting(self) -> str: warning messages. """ pass + + def _on_backoff(self, details): + _, error, _ = sys.exc_info().code() + assert isinstance(error, RpcError) + logger.warning( + ( + "Transient error %s encountered while exporting " + "%s, retrying in %ss." + ), + error.code(), + self._exporting, + details["wait"], + )