Skip to content
This repository has been archived by the owner on Oct 19, 2023. It is now read-only.

Commit

Permalink
raise an exception if timeout is exceeded.
Browse files Browse the repository at this point in the history
  • Loading branch information
mauriciovasquezbernal committed Feb 6, 2020
1 parent 66f9814 commit 12257df
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
7 changes: 3 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,14 @@ def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
"""

def force_flush(self, timeout_millis: int = 30000) -> bool:
def force_flush(self, timeout_millis: int = 30000):
"""Export all ended spans to the configured Exporter that have not
yet been exported.
An exception is raised if the timeout is exceeeded.
Args:
timeout_millis: The maximum amount of time to wait for spans to be exported.
Returns:
False if the timeout is exceeded, True otherwise.
"""


Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ def on_end(self, span: Span) -> None:
def shutdown(self) -> None:
self.span_exporter.shutdown()

def force_flush(self, timeout_millis: int = 30000) -> bool:
def force_flush(self, timeout_millis: int = 30000):
# pylint: disable=unused-argument
return True
pass


class BatchExportSpanProcessor(SpanProcessor):
Expand Down Expand Up @@ -227,10 +227,10 @@ def _drain_queue(self):
while self.queue:
self.export()

def force_flush(self, timeout_millis: int = 30000) -> bool:
def force_flush(self, timeout_millis: int = 30000):
if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True
return

self._flushing = True
self.queue.appendleft(self._FLUSH_TOKEN_SPAN)
Expand All @@ -244,7 +244,8 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
ret = self.flush_condition.wait(timeout_millis / 1e3)

self._flushing = False
return ret
if not ret:
raise RuntimeError("timeout exceeded on force_flush()")

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ def test_flush(self):
for name in span_names0:
_create_start_and_end_span(name, span_processor)

self.assertTrue(span_processor.force_flush())
span_processor.force_flush()
self.assertListEqual(span_names0, spans_names_list)

# create some more spans to check that span processor still works
for name in span_names1:
_create_start_and_end_span(name, span_processor)

self.assertTrue(span_processor.force_flush())
span_processor.force_flush()
self.assertListEqual(span_names0 + span_names1, spans_names_list)

span_processor.shutdown()
Expand All @@ -157,7 +157,8 @@ def test_flush_timeout(self):
_create_start_and_end_span("foo", span_processor)

# check that the timeout is not meet
self.assertFalse(span_processor.force_flush(100))
with self.assertRaises(RuntimeError):
span_processor.force_flush(100)
span_processor.shutdown()

def test_batch_span_processor_lossless(self):
Expand All @@ -174,7 +175,7 @@ def test_batch_span_processor_lossless(self):
for _ in range(512):
_create_start_and_end_span("foo", span_processor)

self.assertTrue(span_processor.force_flush())
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 512)
span_processor.shutdown()

Expand All @@ -198,7 +199,7 @@ def test_batch_span_processor_many_spans(self):

time.sleep(0.05) # give some time for the exporter to upload spans

self.assertTrue(span_processor.force_flush())
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 1024)
span_processor.shutdown()

Expand Down

0 comments on commit 12257df

Please sign in to comment.