Skip to content

Commit

Permalink
SDK: shut down span processors automatically (open-telemetry#280)
Browse files Browse the repository at this point in the history
The BatchExportSpanProcessor is an asynchronous span processor that uses a
worker thread to call the different exporters. Before this commit applications
had to shut down the span processor explicitely to guarantee that all the spans
were summited to the exporters, this was not very intuitive for the users.

This commit removes that limitation by implementing the tracer's __del__ method
and an atexit hook. According to __del__'s documentation [1] it is possible
that sometimes it's not called, for that reason the atexit hook is also used to
guarantee that the processor is shut down in all the cases.

[1] https://docs.python.org/3/reference/datamodel.html#object.__del__
  • Loading branch information
mauriciovasquezbernal authored and c24t committed Nov 27, 2019
1 parent 5f311e0 commit a42b063
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 12 deletions.
2 changes: 0 additions & 2 deletions examples/basic_tracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,3 @@
with tracer.start_as_current_span("bar"):
with tracer.start_as_current_span("baz"):
print(Context)

span_processor.shutdown()
1 change: 0 additions & 1 deletion examples/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ def hello():

if __name__ == "__main__":
app.run(debug=True)
span_processor.shutdown()
1 change: 0 additions & 1 deletion examples/http/tracer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,3 @@
# Spans and propagating context as appropriate.
http_requests.enable(tracer)
response = requests.get(url="http://127.0.0.1:5000/")
span_processor.shutdown()
4 changes: 0 additions & 4 deletions ext/opentelemetry-ext-jaeger/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ gRPC is still not supported by this implementation.
with tracer.start_as_current_span('foo'):
print('Hello world!')
# shutdown the span processor
# TODO: this has to be improved so user doesn't need to call it manually
span_processor.shutdown()
The `examples <./examples>`_ folder contains more elaborated examples.

References
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,3 @@
time.sleep(0.2)

time.sleep(0.1)

# shutdown the span processor
# TODO: this has to be improved so user doesn't need to call it manually
span_processor.shutdown()
14 changes: 14 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


import atexit
import logging
import random
import threading
Expand Down Expand Up @@ -296,19 +297,25 @@ class Tracer(trace_api.Tracer):
Args:
name: The name of the tracer.
shutdown_on_exit: Register an atexit hook to shut down the tracer when
the application exits.
"""

def __init__(
self,
name: str = "",
sampler: sampling.Sampler = trace_api.sampling.ALWAYS_ON,
shutdown_on_exit: bool = True,
) -> None:
slot_name = "current_span"
if name:
slot_name = "{}.current_span".format(name)
self._current_span_slot = Context.register_slot(slot_name)
self._active_span_processor = MultiSpanProcessor()
self.sampler = sampler
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)

def get_current_span(self):
"""See `opentelemetry.trace.Tracer.get_current_span`."""
Expand Down Expand Up @@ -426,5 +433,12 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
# thread safe
self._active_span_processor.add_span_processor(span_processor)

def shutdown(self):
"""Shut down the span processors added to the tracer."""
self._active_span_processor.shutdown()
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None


tracer = Tracer()
9 changes: 9 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MySpanExporter(export.SpanExporter):
def __init__(self, destination, max_export_batch_size=None):
self.destination = destination
self.max_export_batch_size = max_export_batch_size
self.is_shutdown = False

def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
Expand All @@ -37,6 +38,9 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS

def shutdown(self):
self.is_shutdown = True


class TestSimpleExportSpanProcessor(unittest.TestCase):
def test_simple_span_processor(self):
Expand All @@ -55,6 +59,9 @@ def test_simple_span_processor(self):

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

span_processor.shutdown()
self.assertTrue(my_exporter.is_shutdown)

def test_simple_span_processor_no_context(self):
"""Check that we process spans that are never made active.
Expand Down Expand Up @@ -102,6 +109,8 @@ def test_batch_span_processor(self):
span_processor.shutdown()
self.assertListEqual(span_names, spans_names_list)

self.assertTrue(my_exporter.is_shutdown)

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
spans_names_list = []
Expand Down
73 changes: 73 additions & 0 deletions opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import shutil
import subprocess
import unittest
from unittest import mock

Expand All @@ -26,6 +28,77 @@ def test_extends_api(self):
tracer = trace.Tracer()
self.assertIsInstance(tracer, trace_api.Tracer)

def test_shutdown(self):
tracer = trace.Tracer()

mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
tracer.add_span_processor(mock_processor1)

mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
tracer.add_span_processor(mock_processor2)

tracer.shutdown()

self.assertEqual(mock_processor1.shutdown.call_count, 1)
self.assertEqual(mock_processor2.shutdown.call_count, 1)

shutdown_python_code = """
import atexit
from unittest import mock
from opentelemetry.sdk import trace
mock_processor = mock.Mock(spec=trace.SpanProcessor)
def print_shutdown_count():
print(mock_processor.shutdown.call_count)
# atexit hooks are called in inverse order they are added, so do this before
# creating the tracer
atexit.register(print_shutdown_count)
tracer = trace.Tracer({tracer_parameters})
tracer.add_span_processor(mock_processor)
{tracer_shutdown}
"""

def run_general_code(shutdown_on_exit, explicit_shutdown):
tracer_parameters = ""
tracer_shutdown = ""

if not shutdown_on_exit:
tracer_parameters = "shutdown_on_exit=False"

if explicit_shutdown:
tracer_shutdown = "tracer.shutdown()"

return subprocess.check_output(
[
# use shutil to avoid calling python outside the
# virtualenv on windows.
shutil.which("python"),
"-c",
shutdown_python_code.format(
tracer_parameters=tracer_parameters,
tracer_shutdown=tracer_shutdown,
),
]
)

# test default shutdown_on_exit (True)
out = run_general_code(True, False)
self.assertTrue(out.startswith(b"1"))

# test that shutdown is called only once even if Tracer.shutdown is
# called explicitely
out = run_general_code(True, True)
self.assertTrue(out.startswith(b"1"))

# test shutdown_on_exit=False
out = run_general_code(False, False)
self.assertTrue(out.startswith(b"0"))


class TestTracerSampling(unittest.TestCase):
def test_default_sampler(self):
Expand Down

0 comments on commit a42b063

Please sign in to comment.