From 12257df8396dad924a22ba723ec8905ce866f612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Thu, 6 Feb 2020 08:00:39 -0500 Subject: [PATCH] raise an exception if timeout is exceeded. --- .../src/opentelemetry/sdk/trace/__init__.py | 7 +++---- .../src/opentelemetry/sdk/trace/export/__init__.py | 11 ++++++----- opentelemetry-sdk/tests/trace/export/test_export.py | 11 ++++++----- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 7690e31996..90063c891e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -70,15 +70,14 @@ def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown. """ - def force_flush(self, timeout_millis: int = 30000) -> bool: + def force_flush(self, timeout_millis: int = 30000): """Export all ended spans to the configured Exporter that have not yet been exported. + An exception is raised if the timeout is exceeeded. + Args: timeout_millis: The maximum amount of time to wait for spans to be exported. - - Returns: - False if the timeout is exceeded, True otherwise. """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index c3d1e57428..0fe44acd7d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -84,9 +84,9 @@ def on_end(self, span: Span) -> None: def shutdown(self) -> None: self.span_exporter.shutdown() - def force_flush(self, timeout_millis: int = 30000) -> bool: + def force_flush(self, timeout_millis: int = 30000): # pylint: disable=unused-argument - return True + pass class BatchExportSpanProcessor(SpanProcessor): @@ -227,10 +227,10 @@ def _drain_queue(self): while self.queue: self.export() - def force_flush(self, timeout_millis: int = 30000) -> bool: + def force_flush(self, timeout_millis: int = 30000): if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") - return True + return self._flushing = True self.queue.appendleft(self._FLUSH_TOKEN_SPAN) @@ -244,7 +244,8 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: ret = self.flush_condition.wait(timeout_millis / 1e3) self._flushing = False - return ret + if not ret: + raise RuntimeError("timeout exceeded on force_flush()") def shutdown(self) -> None: # signal the worker thread to finish and then wait for it diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 2d2c142a9f..b90220016c 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -134,14 +134,14 @@ def test_flush(self): for name in span_names0: _create_start_and_end_span(name, span_processor) - self.assertTrue(span_processor.force_flush()) + span_processor.force_flush() self.assertListEqual(span_names0, spans_names_list) # create some more spans to check that span processor still works for name in span_names1: _create_start_and_end_span(name, span_processor) - self.assertTrue(span_processor.force_flush()) + span_processor.force_flush() self.assertListEqual(span_names0 + span_names1, spans_names_list) span_processor.shutdown() @@ -157,7 +157,8 @@ def test_flush_timeout(self): _create_start_and_end_span("foo", span_processor) # check that the timeout is not meet - self.assertFalse(span_processor.force_flush(100)) + with self.assertRaises(RuntimeError): + span_processor.force_flush(100) span_processor.shutdown() def test_batch_span_processor_lossless(self): @@ -174,7 +175,7 @@ def test_batch_span_processor_lossless(self): for _ in range(512): _create_start_and_end_span("foo", span_processor) - self.assertTrue(span_processor.force_flush()) + span_processor.force_flush() self.assertEqual(len(spans_names_list), 512) span_processor.shutdown() @@ -198,7 +199,7 @@ def test_batch_span_processor_many_spans(self): time.sleep(0.05) # give some time for the exporter to upload spans - self.assertTrue(span_processor.force_flush()) + span_processor.force_flush() self.assertEqual(len(spans_names_list), 1024) span_processor.shutdown()