Skip to content

Commit

Permalink
Extract retry mechanic from OTLPExporterMixin
Browse files Browse the repository at this point in the history
This is the first change in a chain of commits to rework the retry
mechanic. It is based on the work of open-telemetry#3764 and basically trying to land
the changes proposed by this monolithic commit step by step.

The plan is roughly to proceed in these steps:

* Extract retry mechanic from GRPC exporters
* Consolidate HTTP with GRPC exporter retry implementation
* Pipe timeout through RetryingExporter
* Make exporter lock protect the whole export instead of just a single iteration
* Make timeout float instead of int
* Add back-off with jitter

It's pretty likely that the plan will change along the way.
  • Loading branch information
LarsMichelsen committed Aug 26, 2024
1 parent 19e0a09 commit 6395d42
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 155 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from logging import getLogger
from time import sleep
from typing import Callable, Generic, Type, TypeVar, Optional

from ._internal import _create_exp_backoff_generator

ExportResultT = TypeVar("ExportResultT", covariant=True)
ExportPayloadT = TypeVar("ExportPayloadT", covariant=True)

logger = getLogger(__name__)


class RetryableExportError(Exception):
def __init__(self, retry_delay_sec: Optional[int]):
super().__init__()
self.retry_delay_sec = retry_delay_sec


class RetryingExporter(Generic[ExportResultT]):
def __init__(
self,
export_function: Callable[[ExportPayloadT], ExportResultT],
result: Type[ExportResultT],
):
self._export_function = export_function
self._result = result

self._shutdown = False
self._export_lock = threading.Lock()

def shutdown(self, timeout_millis: float = 30_000) -> None:
# wait for the last export if any
self._export_lock.acquire( # pylint: disable=consider-using-with
timeout=timeout_millis / 1e3
)
self._shutdown = True
self._export_lock.release()

def export_with_retry(self, payload: ExportPayloadT) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
return self._result.FAILURE

with self._export_lock:
try:
return self._export_function(payload)
except RetryableExportError as exc:
delay_sec = (
exc.retry_delay_sec
if exc.retry_delay_sec is not None
else delay
)
logger.warning("Retrying in %ss", delay_sec)
sleep(delay_sec)

return self._result.FAILURE
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _translate_data(
return encode_logs(data)

def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)
return self._exporter.export_with_retry(batch)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,15 @@

"""OTLP Exporter"""

import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Dict,
Generic,
List,
Optional,
Tuple,
Union,
)
from typing import Any, Callable, Dict, Generic, List, Optional # noqa: F401
from typing import Sequence as TypingSequence
from typing import TypeVar
from typing import Tuple, TypeVar, Union # noqa: F401
from urllib.parse import urlparse

from deprecated import deprecated

from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
_create_exp_backoff_generator,
)
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand All @@ -51,9 +34,14 @@
ssl_channel_credentials,
)

from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
RetryingExporter,
)
from opentelemetry.exporter.otlp.proto.grpc import _OTLP_GRPC_HEADERS
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
ArrayValue,
Expand Down Expand Up @@ -260,8 +248,8 @@ def __init__(
)
)

self._export_lock = threading.Lock()
self._shutdown = False
self._exporter = RetryingExporter(self._export, self._result)

@abstractmethod
def _translate_data(
Expand All @@ -272,98 +260,72 @@ def _translate_data(
def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)
else:
delay = None

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s"
),
error.code(),
self._exporting,
self._endpoint,
)
raise RetryableExportError(delay)

logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

return self._result.SUCCESS

except RpcError as error:

if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:

retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

return self._result.FAILURE
if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

@abstractmethod
def export(self, data) -> ExportResultT:
pass

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis / 1e3)
self._exporter.shutdown(timeout_millis=timeout_millis)
self._shutdown = True
self._export_lock.release()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,14 @@ def export(
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
if self._max_export_batch_size is None:
return self._export(data=metrics_data)
return self._exporter.export_with_retry(metrics_data)

export_result = MetricExportResult.SUCCESS

for split_metrics_data in self._split_metrics_data(metrics_data):
split_export_result = self._export(data=split_metrics_data)
split_export_result = self._exporter.export_with_retry(
split_metrics_data
)

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,17 @@

import logging
from os import environ
from typing import Dict, Optional, Sequence, Tuple, Union
from typing import Sequence as TypingSequence

from typing import Dict, Optional
from typing import Sequence
from typing import Tuple, Union

from grpc import ChannelCredentials, Compression

from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
encode_spans,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
get_resource_data,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
Expand All @@ -41,12 +37,14 @@
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
InstrumentationScope,
)
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401
ScopeSpans,
from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F40
ResourceSpans,
ScopeSpans,
)
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401
Span as CollectorSpan,
)
from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE,
OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY,
Expand Down Expand Up @@ -91,12 +89,11 @@ def __init__(
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
Union[Sequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):

if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE)
if insecure is not None:
Expand Down Expand Up @@ -143,7 +140,7 @@ def _translate_data(
return encode_spans(data)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)
return self._exporter.export_with_retry(spans)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
Expand Down
Loading

0 comments on commit 6395d42

Please sign in to comment.