Skip to content

Commit

Permalink
sdk: fix force_flush in batch span processor (open-telemetry#397)
Browse files Browse the repository at this point in the history
open-telemetry#389 implemented force_flush() for the span processor. For BatchSpanProcessor
it was implemented by exposing an already existing _flush() method, it created
a race condition because the _flush() method was intended to be called only
from the context of the worker thread, this because it uses the export() method
that is not thread safe.

The result after that PR is that some tests were failing randomly because
export() was being executed in two different threads, the worker thread and the
user thread calling force_flush().

This commit fixes it by implementing a more sophisticated flush mechanism.
When a flush is requested, a special span token is inserted in the spans queue,
a flag indicating a flush operation is on progress is set and the worker thread
is waken up, after it a condition variable is monitored waiting for the worker
thread to indicate that the token has been processed.

The worker thread has a new logic to avoid sleeping (waiting on the condition
variable) when there is a flush operation going on, it also notifies the caller
(using another condition variable) when the token has been processed.
  • Loading branch information
mauriciovasquezbernal authored and toumorokoshi committed Feb 17, 2020
1 parent 57dfd86 commit 1dd2f5b
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 17 deletions.
13 changes: 10 additions & 3 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
"""

def force_flush(self) -> None:
"""Export all ended spans to the configured Exporter that have not
yet been exported.
def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Export all ended spans to the configured Exporter that have not yet
been exported.
Args:
timeout_millis: The maximum amount of time to wait for spans to be
exported.
Returns:
False if the timeout is exceeded, True otherwise.
"""


Expand Down
62 changes: 53 additions & 9 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from enum import Enum

from opentelemetry.context import Context
from opentelemetry.trace import DefaultSpan
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor
Expand Down Expand Up @@ -83,8 +84,9 @@ def on_end(self, span: Span) -> None:
def shutdown(self) -> None:
self.span_exporter.shutdown()

def force_flush(self) -> None:
pass
def force_flush(self, timeout_millis: int = 30000) -> bool:
# pylint: disable=unused-argument
return True


class BatchExportSpanProcessor(SpanProcessor):
Expand All @@ -94,6 +96,8 @@ class BatchExportSpanProcessor(SpanProcessor):
batches ended spans and pushes them to the configured `SpanExporter`.
"""

_FLUSH_TOKEN_SPAN = DefaultSpan(context=None)

def __init__(
self,
span_exporter: SpanExporter,
Expand Down Expand Up @@ -123,6 +127,9 @@ def __init__(
) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.condition = threading.Condition(threading.Lock())
self.flush_condition = threading.Condition(threading.Lock())
# flag to indicate that there is a flush operation on progress
self._flushing = False
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
Expand Down Expand Up @@ -156,7 +163,10 @@ def on_end(self, span: Span) -> None:
def worker(self):
timeout = self.schedule_delay_millis / 1e3
while not self.done:
if len(self.queue) < self.max_export_batch_size:
if (
len(self.queue) < self.max_export_batch_size
and not self._flushing
):
with self.condition:
self.condition.wait(timeout)
if not self.queue:
Expand All @@ -174,17 +184,21 @@ def worker(self):
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self.force_flush()
self._drain_queue()

def export(self) -> None:
"""Exports at most max_export_batch_size spans."""
idx = 0

notify_flush = False
# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
self.spans_list[idx] = self.queue.pop()
idx += 1
span = self.queue.pop()
if span is self._FLUSH_TOKEN_SPAN:
notify_flush = True
else:
self.spans_list[idx] = span
idx += 1
with Context.use(suppress_instrumentation=True):
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
Expand All @@ -196,15 +210,45 @@ def export(self) -> None:
except Exception:
logger.exception("Exception while exporting Span batch.")

if notify_flush:
with self.flush_condition:
self.flush_condition.notify()

# clean up list
for index in range(idx):
self.spans_list[index] = None

def force_flush(self):
# export all elements until queue is empty
def _drain_queue(self):
""""Export all elements until queue is empty.
Can only be called from the worker thread context because it invokes
`export` that is not thread safe.
"""
while self.queue:
self.export()

def force_flush(self, timeout_millis: int = 30000) -> bool:
if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

self._flushing = True
self.queue.appendleft(self._FLUSH_TOKEN_SPAN)

# wake up worker thread
with self.condition:
self.condition.notify_all()

# wait for token to be processed
with self.flush_condition:
ret = self.flush_condition.wait(timeout_millis / 1e3)

self._flushing = False

if not ret:
logger.warning("Timeout was exceeded in force_flush().")
return ret

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
Expand Down
33 changes: 28 additions & 5 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import time
import unittest
from logging import WARNING
from unittest import mock

from opentelemetry import trace as trace_api
Expand All @@ -24,17 +25,24 @@
class MySpanExporter(export.SpanExporter):
"""Very simple span exporter used for testing."""

def __init__(self, destination, max_export_batch_size=None):
def __init__(
self,
destination,
max_export_batch_size=None,
export_timeout_millis=0.0,
):
self.destination = destination
self.max_export_batch_size = max_export_batch_size
self.is_shutdown = False
self.export_timeout = export_timeout_millis / 1e3

def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
self.max_export_batch_size is not None
and len(spans) > self.max_export_batch_size
):
raise ValueError("Batch is too big")
time.sleep(self.export_timeout)
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS

Expand Down Expand Up @@ -127,18 +135,33 @@ def test_flush(self):
for name in span_names0:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertTrue(span_processor.force_flush())
self.assertListEqual(span_names0, spans_names_list)

# create some more spans to check that span processor still works
for name in span_names1:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertTrue(span_processor.force_flush())
self.assertListEqual(span_names0 + span_names1, spans_names_list)

span_processor.shutdown()

def test_flush_timeout(self):
spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, export_timeout_millis=500
)
span_processor = export.BatchExportSpanProcessor(my_exporter)

_create_start_and_end_span("foo", span_processor)

# check that the timeout is not meet
with self.assertLogs(level=WARNING):
self.assertFalse(span_processor.force_flush(100))
span_processor.shutdown()

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
spans_names_list = []
Expand All @@ -153,7 +176,7 @@ def test_batch_span_processor_lossless(self):
for _ in range(512):
_create_start_and_end_span("foo", span_processor)

span_processor.force_flush()
self.assertTrue(span_processor.force_flush())
self.assertEqual(len(spans_names_list), 512)
span_processor.shutdown()

Expand All @@ -177,7 +200,7 @@ def test_batch_span_processor_many_spans(self):

time.sleep(0.05) # give some time for the exporter to upload spans

span_processor.force_flush()
self.assertTrue(span_processor.force_flush())
self.assertEqual(len(spans_names_list), 1024)
span_processor.shutdown()

Expand Down

0 comments on commit 1dd2f5b

Please sign in to comment.