diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py index 84adf126cc..526fe3c703 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py @@ -139,6 +139,7 @@ def export_with_retry( # pylint: disable=too-many-return-statements ): remaining_time_sec = deadline_sec - time() if remaining_time_sec < 1e-09: + logger.warning("FAILURE TIMED OUT") return self._result.FAILURE # Timed out if self._shutdown.is_set(): @@ -160,11 +161,13 @@ def export_with_retry( # pylint: disable=too-many-return-statements if delay_sec > time_remaining_sec: # We should not exceed the requested timeout + logger.warning("FAILURE WOOT") return self._result.FAILURE logger.warning("Retrying in %0.2fs", delay_sec) self._shutdown.wait(delay_sec) + logger.warning("FAILURE END") return self._result.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py index 65e29bedfd..f0ef8bca76 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py @@ -16,7 +16,7 @@ import time import unittest from itertools import repeat -from logging import WARNING +from logging import WARNING, getLogger from typing import Type from unittest.mock import ANY, Mock, patch @@ -30,6 +30,7 @@ result_type: Type = Mock() +logger = getLogger(__name__) class TestRetryableExporter(unittest.TestCase): def test_export_no_retry(self): @@ -194,7 +195,7 @@ def test_shutdown(self): def test_shutdown_wait_last_export(self, mock_backoff): """Test that shutdown waits for ongoing export to complete.""" - timeout_sec = 0.05 + timeout_sec = 10.0 class ExportFunc: is_exporting = threading.Event() @@ -207,6 +208,7 @@ class ExportFunc: mock_export_func = Mock(side_effect=side_effect) def __call__(self, *args, **kwargs): + logger.warning("ExportFunc.__call__") self.is_exporting.set() self.ready_to_continue.wait() return self.mock_export_func(*args, **kwargs) @@ -219,10 +221,13 @@ def __call__(self, *args, **kwargs): class ExportWrap: def __init__(self) -> None: + logger.warning("ExportWrap.__init__") self.result = None def __call__(self, *args, **kwargs): + logger.warning("ExportWrap.__call__") self.result = exporter.export_with_retry("payload") + logger.warning(f"result: {self.result!r}") return self.result export_wrapped = ExportWrap() @@ -230,43 +235,51 @@ def __call__(self, *args, **kwargs): export_thread = threading.Thread( name="export_thread", target=export_wrapped ) - with self.assertLogs(level=WARNING): - try: - # Simulate shutdown occurring during retry process - # Intended execution flow - # - # main thread: - # - start export_thread - # - wait for is_exporting - # export_thread: - # - call export_func - # - set is_exporting - # - wait for ready_to_continue - # main thread: - # - start shutdown thread - # - sleep to yield to shutdown thread - # shutdown_thread: - # - block at acquiring lock held by export_thread - # - shutdown is now pending timeout/export completion - # main thread: - # - set ready_to_continue - # - join all threads - export_thread.start() - export_func.is_exporting.wait() - start_time = time.time() - shutdown_thread = threading.Thread( - name="shutdown_thread", target=exporter.shutdown - ) - shutdown_thread.start() - export_func.ready_to_continue.set() - finally: - export_thread.join() - shutdown_thread.join() + #with self.assertLogs(level=WARNING): + try: + # Simulate shutdown occurring during retry process + # Intended execution flow + # + # main thread: + # - start export_thread + # - wait for is_exporting + # export_thread: + # - call export_func + # - set is_exporting + # - wait for ready_to_continue + # main thread: + # - start shutdown thread + # - sleep to yield to shutdown thread + # shutdown_thread: + # - block at acquiring lock held by export_thread + # - shutdown is now pending timeout/export completion + # main thread: + # - set ready_to_continue + # - join all threads + logger.warning("main: export_thread.start") + export_thread.start() + logger.warning("main: export_func.is_exporting.wait") + export_func.is_exporting.wait() + start_time = time.time() + shutdown_thread = threading.Thread( + name="shutdown_thread", target=exporter.shutdown + ) + logger.warning("main: shutdown_thread.start") + shutdown_thread.start() + logger.warning("main: export_func.ready_to_continue.set") + export_func.ready_to_continue.set() + finally: + logger.warning("main: export_thread.join") + export_thread.join() + logger.warning("main: shutdown_thread.join") + shutdown_thread.join() duration = time.time() - start_time + logger.warning(f"main: duration {duration}") self.assertLessEqual(duration, timeout_sec) # pylint: disable=protected-access self.assertTrue(exporter._shutdown) + logger.warning(f"main: {export_wrapped.result!r}") self.assertIs(export_wrapped.result, result_type.SUCCESS) def test_shutdown_timeout_cancels_export_retries(self):