Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk: Implement force_flush for span processors #389

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
"""

def force_flush(self) -> None:
"""Export all ended spans to the configured Exporter that have not
yet been exported.
"""


class MultiSpanProcessor(SpanProcessor):
"""Implementation of :class:`SpanProcessor` that forwards all received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def on_end(self, span: Span) -> None:
def shutdown(self) -> None:
self.span_exporter.shutdown()

def force_flush(self) -> None:
pass


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.
Expand Down Expand Up @@ -171,7 +174,7 @@ def worker(self):
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self._flush()
self.force_flush()

def export(self) -> None:
"""Exports at most max_export_batch_size spans."""
Expand All @@ -197,7 +200,7 @@ def export(self) -> None:
for index in range(idx):
self.spans_list[index] = None

def _flush(self):
def force_flush(self):
# export all elements until queue is empty
while self.queue:
self.export()
Expand Down
36 changes: 32 additions & 4 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _create_start_and_end_span(name, span_processor):


class TestBatchExportSpanProcessor(unittest.TestCase):
def test_batch_span_processor(self):
def test_shutdown(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
Expand All @@ -109,9 +109,35 @@ def test_batch_span_processor(self):
_create_start_and_end_span(name, span_processor)

span_processor.shutdown()
self.assertTrue(my_exporter.is_shutdown)

# check that spans are exported without an explicitly call to
# force_flush()
self.assertListEqual(span_names, spans_names_list)

self.assertTrue(my_exporter.is_shutdown)
def test_flush(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(my_exporter)

span_names0 = ["xxx", "bar", "foo"]
span_names1 = ["yyy", "baz", "fox"]

for name in span_names0:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertListEqual(span_names0, spans_names_list)

# create some more spans to check that span processor still works
for name in span_names1:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertListEqual(span_names0 + span_names1, spans_names_list)

span_processor.shutdown()

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
Expand All @@ -127,8 +153,9 @@ def test_batch_span_processor_lossless(self):
for _ in range(512):
_create_start_and_end_span("foo", span_processor)

span_processor.shutdown()
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 512)
span_processor.shutdown()

def test_batch_span_processor_many_spans(self):
"""Test that no spans are lost when sending many spans"""
Expand All @@ -150,8 +177,9 @@ def test_batch_span_processor_many_spans(self):

time.sleep(0.05) # give some time for the exporter to upload spans

span_processor.shutdown()
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 1024)
span_processor.shutdown()

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
Expand Down