Skip to content

Commit

Permalink
Add optional timeout argument to export_with_retry
Browse files Browse the repository at this point in the history
This change allows the user to call retrying exports with individiual
timeouts. This argument will be needed in the next step, where the GRPC
metric exporters batch processing is changed to respect the timeout for
the whole export.
  • Loading branch information
LarsMichelsen committed Sep 6, 2024
1 parent 7863b2a commit a966a92
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def shutdown(self, timeout_millis: float = 30_000) -> None:
self._shutdown.set()

def export_with_retry( # pylint: disable=too-many-return-statements
self, payload: ExportPayloadT
self,
payload: ExportPayloadT,
timeout_sec: Optional[float] = None,
) -> ExportResultT:
"""Exports payload with handling of retryable errors
Expand All @@ -91,6 +93,9 @@ def export_with_retry( # pylint: disable=too-many-return-statements
raised error, a retry attempt will not occur before that delay. If a retry after that delay
is not possible, will immediately abort without retrying.
In case no timeout_sec is not given, the timeout defaults to the timeout given during
initialization.
Will reattempt the export until timeout has passed, at which point the export will be
abandoned and a failure will be returned. A pending shutdown timing out will also cause
retries to time out.
Expand All @@ -107,9 +112,17 @@ def export_with_retry( # pylint: disable=too-many-return-statements
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

timeout_sec = self._timeout_sec
timeout_sec = (
timeout_sec if timeout_sec is not None else self._timeout_sec
)
deadline_sec = time() + timeout_sec

# If negative timeout passed (from e.g. external batch deadline - see GRPC metric exporter)
# fail immediately
if timeout_sec <= 0:
logger.warning("Export deadline passed, ignoring data")
return self._result.FAILURE

with acquire_timeout(self._export_lock, timeout_sec) as is_locked:
if not is_locked:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ def test_export_retry(self, mock_backoff):
self.assertEqual(export_func.call_count, len(side_effect))
self.assertIs(result, result_type.FAILURE)

def test_export_uses_arg_timout_when_given(self) -> None:
export_func = Mock(side_effect=RetryableExportError(None))
exporter = RetryingExporter(export_func, result_type, timeout_sec=2)
with self.assertLogs(level=WARNING):
start = time.time()
exporter.export_with_retry("payload", 0.1)
duration = time.time() - start
self.assertAlmostEqual(duration, 0.1, places=1)

@patch(
"opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator",
return_value=repeat(0.25),
Expand Down

0 comments on commit a966a92

Please sign in to comment.