From 1dd2f5beaf46a4f420be820eb1380e49105068eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Fri, 7 Feb 2020 17:30:29 -0500 Subject: [PATCH] sdk: fix force_flush in batch span processor (#397) #389 implemented force_flush() for the span processor. For BatchSpanProcessor it was implemented by exposing an already existing _flush() method, it created a race condition because the _flush() method was intended to be called only from the context of the worker thread, this because it uses the export() method that is not thread safe. The result after that PR is that some tests were failing randomly because export() was being executed in two different threads, the worker thread and the user thread calling force_flush(). This commit fixes it by implementing a more sophisticated flush mechanism. When a flush is requested, a special span token is inserted in the spans queue, a flag indicating a flush operation is on progress is set and the worker thread is waken up, after it a condition variable is monitored waiting for the worker thread to indicate that the token has been processed. The worker thread has a new logic to avoid sleeping (waiting on the condition variable) when there is a flush operation going on, it also notifies the caller (using another condition variable) when the token has been processed. --- .../src/opentelemetry/sdk/trace/__init__.py | 13 +++- .../sdk/trace/export/__init__.py | 62 ++++++++++++++++--- .../tests/trace/export/test_export.py | 33 ++++++++-- 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 23f1aaf79b..e429467061 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -70,9 +70,16 @@ def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown. """ - def force_flush(self) -> None: - """Export all ended spans to the configured Exporter that have not - yet been exported. + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Export all ended spans to the configured Exporter that have not yet + been exported. + + Args: + timeout_millis: The maximum amount of time to wait for spans to be + exported. + + Returns: + False if the timeout is exceeded, True otherwise. """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 5db2c1e957..3e2cc02c33 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -19,6 +19,7 @@ from enum import Enum from opentelemetry.context import Context +from opentelemetry.trace import DefaultSpan from opentelemetry.util import time_ns from .. import Span, SpanProcessor @@ -83,8 +84,9 @@ def on_end(self, span: Span) -> None: def shutdown(self) -> None: self.span_exporter.shutdown() - def force_flush(self) -> None: - pass + def force_flush(self, timeout_millis: int = 30000) -> bool: + # pylint: disable=unused-argument + return True class BatchExportSpanProcessor(SpanProcessor): @@ -94,6 +96,8 @@ class BatchExportSpanProcessor(SpanProcessor): batches ended spans and pushes them to the configured `SpanExporter`. """ + _FLUSH_TOKEN_SPAN = DefaultSpan(context=None) + def __init__( self, span_exporter: SpanExporter, @@ -123,6 +127,9 @@ def __init__( ) # type: typing.Deque[Span] self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition(threading.Lock()) + self.flush_condition = threading.Condition(threading.Lock()) + # flag to indicate that there is a flush operation on progress + self._flushing = False self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size @@ -156,7 +163,10 @@ def on_end(self, span: Span) -> None: def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: - if len(self.queue) < self.max_export_batch_size: + if ( + len(self.queue) < self.max_export_batch_size + and not self._flushing + ): with self.condition: self.condition.wait(timeout) if not self.queue: @@ -174,17 +184,21 @@ def worker(self): timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent - self.force_flush() + self._drain_queue() def export(self) -> None: """Exports at most max_export_batch_size spans.""" idx = 0 - + notify_flush = False # currently only a single thread acts as consumer, so queue.pop() will # not raise an exception while idx < self.max_export_batch_size and self.queue: - self.spans_list[idx] = self.queue.pop() - idx += 1 + span = self.queue.pop() + if span is self._FLUSH_TOKEN_SPAN: + notify_flush = True + else: + self.spans_list[idx] = span + idx += 1 with Context.use(suppress_instrumentation=True): try: # Ignore type b/c the Optional[None]+slicing is too "clever" @@ -196,15 +210,45 @@ def export(self) -> None: except Exception: logger.exception("Exception while exporting Span batch.") + if notify_flush: + with self.flush_condition: + self.flush_condition.notify() + # clean up list for index in range(idx): self.spans_list[index] = None - def force_flush(self): - # export all elements until queue is empty + def _drain_queue(self): + """"Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ while self.queue: self.export() + def force_flush(self, timeout_millis: int = 30000) -> bool: + if self.done: + logger.warning("Already shutdown, ignoring call to force_flush().") + return True + + self._flushing = True + self.queue.appendleft(self._FLUSH_TOKEN_SPAN) + + # wake up worker thread + with self.condition: + self.condition.notify_all() + + # wait for token to be processed + with self.flush_condition: + ret = self.flush_condition.wait(timeout_millis / 1e3) + + self._flushing = False + + if not ret: + logger.warning("Timeout was exceeded in force_flush().") + return ret + def shutdown(self) -> None: # signal the worker thread to finish and then wait for it self.done = True diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 43299ebe6a..ea513a4858 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -14,6 +14,7 @@ import time import unittest +from logging import WARNING from unittest import mock from opentelemetry import trace as trace_api @@ -24,10 +25,16 @@ class MySpanExporter(export.SpanExporter): """Very simple span exporter used for testing.""" - def __init__(self, destination, max_export_batch_size=None): + def __init__( + self, + destination, + max_export_batch_size=None, + export_timeout_millis=0.0, + ): self.destination = destination self.max_export_batch_size = max_export_batch_size self.is_shutdown = False + self.export_timeout = export_timeout_millis / 1e3 def export(self, spans: trace.Span) -> export.SpanExportResult: if ( @@ -35,6 +42,7 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: and len(spans) > self.max_export_batch_size ): raise ValueError("Batch is too big") + time.sleep(self.export_timeout) self.destination.extend(span.name for span in spans) return export.SpanExportResult.SUCCESS @@ -127,18 +135,33 @@ def test_flush(self): for name in span_names0: _create_start_and_end_span(name, span_processor) - span_processor.force_flush() + self.assertTrue(span_processor.force_flush()) self.assertListEqual(span_names0, spans_names_list) # create some more spans to check that span processor still works for name in span_names1: _create_start_and_end_span(name, span_processor) - span_processor.force_flush() + self.assertTrue(span_processor.force_flush()) self.assertListEqual(span_names0 + span_names1, spans_names_list) span_processor.shutdown() + def test_flush_timeout(self): + spans_names_list = [] + + my_exporter = MySpanExporter( + destination=spans_names_list, export_timeout_millis=500 + ) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + _create_start_and_end_span("foo", span_processor) + + # check that the timeout is not meet + with self.assertLogs(level=WARNING): + self.assertFalse(span_processor.force_flush(100)) + span_processor.shutdown() + def test_batch_span_processor_lossless(self): """Test that no spans are lost when sending max_queue_size spans""" spans_names_list = [] @@ -153,7 +176,7 @@ def test_batch_span_processor_lossless(self): for _ in range(512): _create_start_and_end_span("foo", span_processor) - span_processor.force_flush() + self.assertTrue(span_processor.force_flush()) self.assertEqual(len(spans_names_list), 512) span_processor.shutdown() @@ -177,7 +200,7 @@ def test_batch_span_processor_many_spans(self): time.sleep(0.05) # give some time for the exporter to upload spans - span_processor.force_flush() + self.assertTrue(span_processor.force_flush()) self.assertEqual(len(spans_names_list), 1024) span_processor.shutdown()