From 5bacc131a871d15415daa237407698e3b3a1c07a Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Tue, 1 Sep 2020 18:18:53 +0200 Subject: [PATCH 1/3] Improve BatchExportSpanProcessor * it was possible for force flush calls to miss the flush finished notifications by the worker thread. in case a flush token got added in the main thread and the worker thread processed and notified the flush condition before the main thread called wait on the flush condition, the wakup is missed and the main thread has to wait the full flush timeout * calls to force flush were not really thread safe since the state if a flush operation is in progress was indictated by a single boolean flag which gets reset when the first force flush call finishes. * instead of having a boolean flag to indicate a flush request use an Event. When a call to force flush is made it is looked up if a flush request event is currently pending or a new one is created. The worker thread will check if a flush request event exists, unset it and use a local reference for signaling once the export operation finished. Force flush calls will wait in the meantime on the flush request event until they are signaled by the worker thread. This also makes calls to force flush thread safe since multiple threads can/might wait on one event. --- .../sdk/trace/export/__init__.py | 145 +++++++++++++----- .../tests/trace/export/test_export.py | 55 ++++++- 2 files changed, 157 insertions(+), 43 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9fe55ed7fd4..5ca3e249d86 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -20,8 +20,7 @@ import typing from enum import Enum -from opentelemetry.context import attach, detach, get_current, set_value -from opentelemetry.trace import DefaultSpan +from opentelemetry.context import attach, detach, set_value from opentelemetry.util import time_ns from .. import Span, SpanProcessor @@ -91,6 +90,16 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True +class _FlushRequest: + """Represents a request for the BatchExportSpanProcessor to flush spans.""" + + __slots__ = ["event", "num_spans"] + + def __init__(self): + self.event = threading.Event() + self.num_spans = 0 + + class BatchExportSpanProcessor(SpanProcessor): """Batch span processor implementation. @@ -98,8 +107,6 @@ class BatchExportSpanProcessor(SpanProcessor): batches ended spans and pushes them to the configured `SpanExporter`. """ - _FLUSH_TOKEN_SPAN = DefaultSpan(context=None) - def __init__( self, span_exporter: SpanExporter, @@ -129,9 +136,7 @@ def __init__( ) # type: typing.Deque[Span] self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition(threading.Lock()) - self.flush_condition = threading.Condition(threading.Lock()) - # flag to indicate that there is a flush operation on progress - self._flushing = False + self._flush_request = None # type: typing.Optional[_FlushRequest] self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size @@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None: def worker(self): timeout = self.schedule_delay_millis / 1e3 + flush_request = None # type: typing.Optional[_FlushRequest] while not self.done: - if ( - len(self.queue) < self.max_export_batch_size - and not self._flushing - ): - with self.condition: + with self.condition: + if self.done: + # done flag may have changed, avoid waiting + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self.queue) < self.max_export_batch_size + and flush_request is None + ): + self.condition.wait(timeout) + flush_request = self._get_and_unset_flush_request() if not self.queue: # spurious notification, let's wait again + self._notify_flush_request_finished(flush_request) + flush_request = None continue if self.done: # missing spans will be sent when calling flush break - # substract the duration of this export call to the next timeout + # subtract the duration of this export call to the next timeout start = time_ns() - self.export() + self.export(flush_request) end = time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self.condition: + shutdown_flush_request = self._get_and_unset_flush_request() + # be sure that all spans are sent self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _get_and_unset_flush_request(self,) -> typing.Optional[_FlushRequest]: + """Returns the current flush request and makes it invisible to the + worker thread for subsequent calls. + """ + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_spans = len(self.queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: typing.Optional[_FlushRequest], + ): + """Notifies the flush initiator(s) waiting on the given request/event + that the flush operation was finished. + """ + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + """Either returns the current active flush event or creates a new one. - def export(self) -> None: - """Exports at most max_export_batch_size spans.""" + The flush event will be visible and read by the worker thread before an + export operation starts. Callers of a flush operation may wait on the + returned event to be notified when the flush/export operation was + finished. + + This method is not thread-safe, i.e. callers need to take care about + synchronization/locking. + """ + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def export(self, flush_request: typing.Optional[_FlushRequest]): + """Exports spans considering the given flush_request. + + In case of a given flush_requests spans are exported in batches until + the number of exported spans reached or exceeded the number of spans in + the flush request. + In no flush_request was given at most max_export_batch_size spans are + exported. + """ + if not flush_request: + self.export_batch() + return + + num_spans = flush_request.num_spans + while self.queue: + num_exported = self.export_batch() + num_spans -= num_exported + + if num_spans <= 0: + break + + def export_batch(self) -> int: + """Exports at most max_export_batch_size spans and returns the number of + exported spans. + """ idx = 0 - notify_flush = False # currently only a single thread acts as consumer, so queue.pop() will # not raise an exception while idx < self.max_export_batch_size and self.queue: - span = self.queue.pop() - if span is self._FLUSH_TOKEN_SPAN: - notify_flush = True - else: - self.spans_list[idx] = span - idx += 1 + self.spans_list[idx] = self.queue.pop() + idx += 1 token = attach(set_value("suppress_instrumentation", True)) try: # Ignore type b/c the Optional[None]+slicing is too "clever" # for mypy self.span_exporter.export(self.spans_list[:idx]) # type: ignore - # pylint: disable=broad-except - except Exception: + except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span batch.") detach(token) - if notify_flush: - with self.flush_condition: - self.flush_condition.notify() - # clean up list for index in range(idx): self.spans_list[index] = None + return idx def _drain_queue(self): """"Export all elements until queue is empty. @@ -226,26 +299,20 @@ def _drain_queue(self): `export` that is not thread safe. """ while self.queue: - self.export() + self.export_batch() def force_flush(self, timeout_millis: int = 30000) -> bool: if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True - self._flushing = True - self.queue.appendleft(self._FLUSH_TOKEN_SPAN) - - # wake up worker thread with self.condition: + flush_request = self._get_or_create_flush_request() + # signal the worker thread to flush and wait for it to finish self.condition.notify_all() # wait for token to be processed - with self.flush_condition: - ret = self.flush_condition.wait(timeout_millis / 1e3) - - self._flushing = False - + ret = flush_request.event.wait(timeout_millis / 1e3) if not ret: logger.warning("Timeout was exceeded in force_flush().") return ret diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 43b7893951f..34e1d14d23c 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -13,8 +13,10 @@ # limitations under the License. import os +import threading import time import unittest +from concurrent.futures import ThreadPoolExecutor from logging import WARNING from unittest import mock @@ -31,11 +33,13 @@ def __init__( destination, max_export_batch_size=None, export_timeout_millis=0.0, + export_event: threading.Event = None, ): self.destination = destination self.max_export_batch_size = max_export_batch_size self.is_shutdown = False self.export_timeout = export_timeout_millis / 1e3 + self.export_event = export_event def export(self, spans: trace.Span) -> export.SpanExportResult: if ( @@ -45,6 +49,8 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: raise ValueError("Batch is too big") time.sleep(self.export_timeout) self.destination.extend(span.name for span in spans) + if self.export_event: + self.export_event.set() return export.SpanExportResult.SUCCESS def shutdown(self): @@ -148,6 +154,42 @@ def test_flush(self): span_processor.shutdown() + def test_flush_empty(self): + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + self.assertTrue(span_processor.force_flush()) + + def test_flush_from_multiple_threads(self): + num_threads = 50 + num_spans = 10 + + span_list = [] + + my_exporter = MySpanExporter(destination=span_list) + span_processor = export.BatchExportSpanProcessor( + my_exporter, max_queue_size=512, max_export_batch_size=128 + ) + + def create_spans_and_flush(tno: int): + for span_idx in range(num_spans): + _create_start_and_end_span( + "Span {}-{}".format(tno, span_idx), span_processor + ) + self.assertTrue(span_processor.force_flush()) + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + future_list = [] + for thread_no in range(num_threads): + future = executor.submit(create_spans_and_flush, thread_no) + future_list.append(future) + + executor.shutdown() + + self.assertEqual(num_threads * num_spans, len(span_list)) + def test_flush_timeout(self): spans_names_list = [] @@ -209,17 +251,22 @@ def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" spans_names_list = [] - my_exporter = MySpanExporter(destination=spans_names_list) + export_event = threading.Event() + my_exporter = MySpanExporter( + destination=spans_names_list, export_event=export_event + ) span_processor = export.BatchExportSpanProcessor( - my_exporter, schedule_delay_millis=50 + my_exporter, schedule_delay_millis=50, ) # create single span + start_time = time.time() _create_start_and_end_span("foo", span_processor) - time.sleep(0.05 + 0.02) - # span should be already exported + self.assertTrue(export_event.wait(2)) + export_time = time.time() self.assertEqual(len(spans_names_list), 1) + self.assertGreaterEqual((export_time - start_time) * 1e3, 50) span_processor.shutdown() From 8f714593ad1a212d55ac808d76e5ce2e38c2f1f9 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Tue, 1 Sep 2020 19:11:44 +0200 Subject: [PATCH 2/3] changelog --- opentelemetry-sdk/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index acd1ce1d7d1..6d8502fa052 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -8,6 +8,8 @@ ([#1034](https://github.com/open-telemetry/opentelemetry-python/pull/1034)) - Remove lazy Event and Link API from Span interface ([#1045](https://github.com/open-telemetry/opentelemetry-python/pull/1045)) +- Improve BatchExportSpanProcessor + ([#1062](https://github.com/open-telemetry/opentelemetry-python/pull/1062)) ## Version 0.12b0 From fc15079e193ef4d683c1b3d185876c769f972947 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Fri, 4 Sep 2020 08:01:31 +0200 Subject: [PATCH 3/3] Internalize export methods in BatchExportSpanProcessor --- .../src/opentelemetry/sdk/trace/export/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 5ca3e249d86..7c1e51f3ec5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -194,7 +194,7 @@ def worker(self): # subtract the duration of this export call to the next timeout start = time_ns() - self.export(flush_request) + self._export(flush_request) end = time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration @@ -247,7 +247,7 @@ def _get_or_create_flush_request(self) -> _FlushRequest: self._flush_request = _FlushRequest() return self._flush_request - def export(self, flush_request: typing.Optional[_FlushRequest]): + def _export(self, flush_request: typing.Optional[_FlushRequest]): """Exports spans considering the given flush_request. In case of a given flush_requests spans are exported in batches until @@ -257,18 +257,18 @@ def export(self, flush_request: typing.Optional[_FlushRequest]): exported. """ if not flush_request: - self.export_batch() + self._export_batch() return num_spans = flush_request.num_spans while self.queue: - num_exported = self.export_batch() + num_exported = self._export_batch() num_spans -= num_exported if num_spans <= 0: break - def export_batch(self) -> int: + def _export_batch(self) -> int: """Exports at most max_export_batch_size spans and returns the number of exported spans. """ @@ -299,7 +299,7 @@ def _drain_queue(self): `export` that is not thread safe. """ while self.queue: - self.export_batch() + self._export_batch() def force_flush(self, timeout_millis: int = 30000) -> bool: if self.done: