From 1b36f4544587fe5d69f364d3019e40d1d1d52250 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Thu, 28 Oct 2021 00:23:11 +0530 Subject: [PATCH 01/20] Experiment with deamon start delay --- .../src/opentelemetry/sdk/trace/export/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 4f0cc817c9f..006900c70bf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -196,7 +196,8 @@ def __init__( self.spans_list = [ None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] - self.worker_thread.start() + # self.worker_thread.start() + self._received_any_span = False def on_start( self, span: Span, parent_context: typing.Optional[Context] = None @@ -209,6 +210,10 @@ def on_end(self, span: ReadableSpan) -> None: return if not span.context.trace_flags.sampled: return + if not self._received_any_span: + self._received_any_span = True + self.worker_thread.start() + if len(self.queue) == self.max_queue_size: if not self._spans_dropped: logger.warning("Queue is full, likely spans will be dropped.") From 2d6b57424a37d2ccdeaecde823d8659ac8bd85e0 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Fri, 29 Oct 2021 14:10:16 +0530 Subject: [PATCH 02/20] Fix tests --- .../src/opentelemetry/sdk/trace/export/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 006900c70bf..3ba7b937029 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -196,8 +196,7 @@ def __init__( self.spans_list = [ None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] - # self.worker_thread.start() - self._received_any_span = False + self._daemon_started = False def on_start( self, span: Span, parent_context: typing.Optional[Context] = None @@ -210,8 +209,8 @@ def on_end(self, span: ReadableSpan) -> None: return if not span.context.trace_flags.sampled: return - if not self._received_any_span: - self._received_any_span = True + if not self._daemon_started: + self._daemon_started = True self.worker_thread.start() if len(self.queue) == self.max_queue_size: @@ -364,6 +363,9 @@ def _drain_queue(self): def force_flush(self, timeout_millis: int = None) -> bool: + if not self._daemon_started: # nothing to flush + return True + if timeout_millis is None: timeout_millis = self.export_timeout_millis @@ -387,7 +389,8 @@ def shutdown(self) -> None: self.done = True with self.condition: self.condition.notify_all() - self.worker_thread.join() + if self._daemon_started: + self.worker_thread.join() self.span_exporter.shutdown() From f21a65cc5235de3d5728e25e40b933b47f5fd96f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Fri, 29 Oct 2021 14:32:27 +0530 Subject: [PATCH 03/20] Fix lint --- .../src/opentelemetry/sdk/trace/export/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 3ba7b937029..9ad497be38b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -363,7 +363,7 @@ def _drain_queue(self): def force_flush(self, timeout_millis: int = None) -> bool: - if not self._daemon_started: # nothing to flush + if not self._daemon_started: # nothing to flush return True if timeout_millis is None: From ff89fc09527e2b2c2013c905fb27997315bb2acf Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Fri, 29 Oct 2021 21:43:17 +0530 Subject: [PATCH 04/20] Add _at_fork_reinit --- .../src/opentelemetry/sdk/trace/export/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 4f0cc817c9f..f6ea61d55bf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -18,7 +18,7 @@ import threading import typing from enum import Enum -from os import environ, linesep +from os import environ, linesep, register_at_fork from typing import Optional from opentelemetry.context import ( @@ -197,6 +197,7 @@ def __init__( None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start() + register_at_fork(after_in_child=self._at_fork_reinit) def on_start( self, span: Span, parent_context: typing.Optional[Context] = None @@ -220,6 +221,9 @@ def on_end(self, span: ReadableSpan) -> None: with self.condition: self.condition.notify() + def _at_fork_reinit(self): + self.condition._at_fork_reinit() + def worker(self): timeout = self.schedule_delay_millis / 1e3 flush_request = None # type: typing.Optional[_FlushRequest] From a003beaf29c41762ecab422cc80ee19f75689226 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sat, 30 Oct 2021 07:52:34 +0530 Subject: [PATCH 05/20] Reinit worker thread --- .../src/opentelemetry/sdk/trace/export/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index f6ea61d55bf..c277bd3176d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -223,6 +223,10 @@ def on_end(self, span: ReadableSpan) -> None: def _at_fork_reinit(self): self.condition._at_fork_reinit() + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() def worker(self): timeout = self.schedule_delay_millis / 1e3 From b07758fb9491c2965a54cf7ef6c6a0eac017c44e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 17:27:47 +0530 Subject: [PATCH 06/20] Update _at_fork_reinit impl --- .../sdk/trace/export/__init__.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index c277bd3176d..63ae961469b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -17,8 +17,9 @@ import sys import threading import typing +import os from enum import Enum -from os import environ, linesep, register_at_fork +from os import environ, linesep from typing import Optional from opentelemetry.context import ( @@ -197,7 +198,9 @@ def __init__( None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start() - register_at_fork(after_in_child=self._at_fork_reinit) + # Only available in *nix since py37. + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._at_fork_reinit) def on_start( self, span: Span, parent_context: typing.Optional[Context] = None @@ -222,12 +225,23 @@ def on_end(self, span: ReadableSpan) -> None: self.condition.notify() def _at_fork_reinit(self): - self.condition._at_fork_reinit() + # worker_thread is local to a process, only the thread that issued fork continues + # to exist. A new worker thread must be started in child process. self.worker_thread = threading.Thread( name="OtelBatchSpanProcessor", target=self.worker, daemon=True ) self.worker_thread.start() + # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` + # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) + # if exists, otherwise create a new one. + if hasattr(self.condition, "_at_fork_reinit"): + self.condition._at_fork_reinit() + else: + self.condition = threading.Condition(threading.Lock()) + + self.queue.clear() + def worker(self): timeout = self.schedule_delay_millis / 1e3 flush_request = None # type: typing.Optional[_FlushRequest] From 89a7da5a67e70f117564d91d74bdbbb40d0b142b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 17:58:08 +0530 Subject: [PATCH 07/20] Change order --- .../src/opentelemetry/sdk/trace/export/__init__.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 63ae961469b..43b25b97a2a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -225,13 +225,6 @@ def on_end(self, span: ReadableSpan) -> None: self.condition.notify() def _at_fork_reinit(self): - # worker_thread is local to a process, only the thread that issued fork continues - # to exist. A new worker thread must be started in child process. - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True - ) - self.worker_thread.start() - # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) # if exists, otherwise create a new one. @@ -240,6 +233,13 @@ def _at_fork_reinit(self): else: self.condition = threading.Condition(threading.Lock()) + # worker_thread is local to a process, only the thread that issued fork continues + # to exist. A new worker thread must be started in child process. + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() + self.queue.clear() def worker(self): From 92456de6d16b18472b25386b9e908e4557571762 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 18:12:50 +0530 Subject: [PATCH 08/20] Reorder --- .../sdk/trace/export/__init__.py | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 43b25b97a2a..374d12130e4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -39,6 +39,7 @@ from opentelemetry.util._time import _time_ns logger = logging.getLogger(__name__) +_lock = threading.Lock() class SpanExportResult(Enum): @@ -225,22 +226,24 @@ def on_end(self, span: ReadableSpan) -> None: self.condition.notify() def _at_fork_reinit(self): - # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` - # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) - # if exists, otherwise create a new one. - if hasattr(self.condition, "_at_fork_reinit"): - self.condition._at_fork_reinit() - else: - self.condition = threading.Condition(threading.Lock()) - - # worker_thread is local to a process, only the thread that issued fork continues - # to exist. A new worker thread must be started in child process. - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True - ) - self.worker_thread.start() + with _lock: + # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` + # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) + # if exists, otherwise create a new one. + if hasattr(self.condition, "_at_fork_reinit"): + self.condition._at_fork_reinit() + else: + self.condition = threading.Condition(threading.Lock()) + + self.queue.clear() + + # worker_thread is local to a process, only the thread that issued fork continues + # to exist. A new worker thread must be started in child process. + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() - self.queue.clear() def worker(self): timeout = self.schedule_delay_millis / 1e3 From a3ec380fd03d7908e2f21a365aa6cde0ed2a5fd2 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 18:33:16 +0530 Subject: [PATCH 09/20] Update impl --- .../sdk/trace/export/__init__.py | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 374d12130e4..e7cd1b10d83 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -39,7 +39,6 @@ from opentelemetry.util._time import _time_ns logger = logging.getLogger(__name__) -_lock = threading.Lock() class SpanExportResult(Enum): @@ -226,23 +225,22 @@ def on_end(self, span: ReadableSpan) -> None: self.condition.notify() def _at_fork_reinit(self): - with _lock: - # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` - # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) - # if exists, otherwise create a new one. - if hasattr(self.condition, "_at_fork_reinit"): - self.condition._at_fork_reinit() - else: - self.condition = threading.Condition(threading.Lock()) - - self.queue.clear() - - # worker_thread is local to a process, only the thread that issued fork continues - # to exist. A new worker thread must be started in child process. - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True - ) - self.worker_thread.start() + # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` + # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) + # if exists, otherwise create a new one. + if hasattr(self.condition, "_at_fork_reinit"): + self.condition._at_fork_reinit() + else: + self.condition = threading.Condition(threading.Lock()) + + self.queue.clear() + + # worker_thread is local to a process, only the thread that issued fork continues + # to exist. A new worker thread must be started in child process. + self.worker_thread = threading.Thread( + name="OtelBatchSpanProcessor", target=self.worker, daemon=True + ) + self.worker_thread.start() def worker(self): From 91e8ebb7def55bbec0370a247526fc6786ef51c6 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 18:33:42 +0530 Subject: [PATCH 10/20] black --- opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index e7cd1b10d83..42c4ecba654 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -242,7 +242,6 @@ def _at_fork_reinit(self): ) self.worker_thread.start() - def worker(self): timeout = self.schedule_delay_millis / 1e3 flush_request = None # type: typing.Optional[_FlushRequest] From abdc65d8a08d66ac26d2aede0e89a803cee286e3 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Sun, 31 Oct 2021 19:39:45 +0530 Subject: [PATCH 11/20] Undo prev change --- .../src/opentelemetry/sdk/trace/export/__init__.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9ad497be38b..4f0cc817c9f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -196,7 +196,7 @@ def __init__( self.spans_list = [ None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] - self._daemon_started = False + self.worker_thread.start() def on_start( self, span: Span, parent_context: typing.Optional[Context] = None @@ -209,10 +209,6 @@ def on_end(self, span: ReadableSpan) -> None: return if not span.context.trace_flags.sampled: return - if not self._daemon_started: - self._daemon_started = True - self.worker_thread.start() - if len(self.queue) == self.max_queue_size: if not self._spans_dropped: logger.warning("Queue is full, likely spans will be dropped.") @@ -363,9 +359,6 @@ def _drain_queue(self): def force_flush(self, timeout_millis: int = None) -> bool: - if not self._daemon_started: # nothing to flush - return True - if timeout_millis is None: timeout_millis = self.export_timeout_millis @@ -389,8 +382,7 @@ def shutdown(self) -> None: self.done = True with self.condition: self.condition.notify_all() - if self._daemon_started: - self.worker_thread.join() + self.worker_thread.join() self.span_exporter.shutdown() From 759b0a88b0ece84132b0f095fb69af333fdf4c71 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Tue, 2 Nov 2021 00:22:08 +0530 Subject: [PATCH 12/20] Add test bsp fork --- .../tests/trace/export/test_export.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 2e4672af268..4b03dbc9b64 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -30,6 +30,9 @@ OTEL_BSP_SCHEDULE_DELAY, ) from opentelemetry.sdk.trace import export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) class MySpanExporter(export.SpanExporter): @@ -356,6 +359,44 @@ def test_batch_span_processor_not_sampled(self): self.assertEqual(len(spans_names_list), 0) span_processor.shutdown() + def _check_fork_trace(self, exporter, expected): + time.sleep(0.5) # give some time for the exporter to upload spans + spans = exporter.get_finished_spans() + for span in spans: + self.assertIn(span.name, expected) + + def test_batch_span_processor_fork(self): + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + exporter = InMemorySpanExporter() + span_processor = export.BatchSpanProcessor( + exporter, + max_queue_size=256, + max_export_batch_size=64, + schedule_delay_millis=100, + ) + tracer_provider.add_span_processor(span_processor) + with tracer.start_as_current_span("foo"): + pass + time.sleep(0.5) # give some time for the exporter to upload spans + + self.assertTrue(span_processor.force_flush()) + self.assertEqual(len(exporter.get_finished_spans()), 1) + exporter.clear() + pid = os.fork() + if pid: + with tracer.start_as_current_span("parent"): + pass + self._check_fork_trace(exporter, ["parent"]) + else: + with tracer.start_as_current_span("child"): + with tracer.start_as_current_span("inner"): + pass + self._check_fork_trace(exporter, ["child", "inner"]) + + span_processor.shutdown() + def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" spans_names_list = [] From 96244542acf5210a147bffe04545d89aa52c09ad Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Tue, 2 Nov 2021 00:57:37 +0530 Subject: [PATCH 13/20] Fix lint --- .../src/opentelemetry/sdk/trace/export/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 42c4ecba654..bec243e27b2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -14,10 +14,10 @@ import collections import logging +import os import sys import threading import typing -import os from enum import Enum from os import environ, linesep from typing import Optional From 3909996940dc9219906610e6125026979e8b566c Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Tue, 2 Nov 2021 00:57:49 +0530 Subject: [PATCH 14/20] Conditionally run test --- opentelemetry-sdk/tests/trace/export/test_export.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 4b03dbc9b64..02601c9f778 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import sys import threading import time import unittest @@ -365,6 +366,10 @@ def _check_fork_trace(self, exporter, expected): for span in spans: self.assertIn(span.name, expected) + @unittest.skipUnless( + hasattr(os, "fork") and sys.version_info >= (3, 7), + "needs *nix and minor version 7 or later", + ) def test_batch_span_processor_fork(self): tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer(__name__) From 08f7db55df577b1bc900e829272bb67755212f6e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Tue, 2 Nov 2021 09:47:29 +0530 Subject: [PATCH 15/20] more testing --- .../sdk/trace/export/__init__.py | 4 +++- .../tests/trace/export/test_export.py | 20 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index bec243e27b2..0305653ef01 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -200,7 +200,9 @@ def __init__( self.worker_thread.start() # Only available in *nix since py37. if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) + os.register_at_fork( + after_in_child=self._at_fork_reinit + ) # pylint: disable=broad-except def on_start( self, span: Span, parent_context: typing.Optional[Context] = None diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 02601c9f778..75b6fb85bd7 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -34,6 +34,7 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( InMemorySpanExporter, ) +from opentelemetry.test.concurrency_test import ConcurrencyTestBase class MySpanExporter(export.SpanExporter): @@ -161,7 +162,7 @@ def _create_start_and_end_span(name, span_processor): span.end() -class TestBatchSpanProcessor(unittest.TestCase): +class TestBatchSpanProcessor(ConcurrencyTestBase): @mock.patch.dict( "os.environ", { @@ -379,7 +380,7 @@ def test_batch_span_processor_fork(self): exporter, max_queue_size=256, max_export_batch_size=64, - schedule_delay_millis=100, + schedule_delay_millis=10, ) tracer_provider.add_span_processor(span_processor) with tracer.start_as_current_span("foo"): @@ -395,6 +396,21 @@ def test_batch_span_processor_fork(self): pass self._check_fork_trace(exporter, ["parent"]) else: + exporter.clear() + + def _target(): + with tracer.start_as_current_span(f"span") as s: + s.set_attribute("i", "1") + with tracer.start_as_current_span("temp"): + pass + + self.run_with_many_threads(_target, 100) + + time.sleep(0.5) + + spans = exporter.get_finished_spans() + self.assertEqual(len(spans), 200) + exporter.clear() with tracer.start_as_current_span("child"): with tracer.start_as_current_span("inner"): pass From 0e4d3cec62800e2224773fcf5961507596925a1b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Tue, 2 Nov 2021 19:35:32 +0530 Subject: [PATCH 16/20] Fix lint --- opentelemetry-sdk/tests/trace/export/test_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 75b6fb85bd7..9678a310e02 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -399,7 +399,7 @@ def test_batch_span_processor_fork(self): exporter.clear() def _target(): - with tracer.start_as_current_span(f"span") as s: + with tracer.start_as_current_span("span") as s: s.set_attribute("i", "1") with tracer.start_as_current_span("temp"): pass From c473e3421b0d8d6c23edcdb99b0af5446cee3f83 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Thu, 4 Nov 2021 10:33:13 +0530 Subject: [PATCH 17/20] Review suggestions --- .../sdk/trace/export/__init__.py | 9 +------ .../tests/trace/export/test_export.py | 25 +++++++++---------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 0305653ef01..e89bb4e264f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -227,14 +227,7 @@ def on_end(self, span: ReadableSpan) -> None: self.condition.notify() def _at_fork_reinit(self): - # could be in an inconsistent state after fork, reinitialise by calling `_at_fork_reinit` - # (creates a new lock internally https://github.com/python/cpython/blob/main/Python/thread_pthread.h#L727) - # if exists, otherwise create a new one. - if hasattr(self.condition, "_at_fork_reinit"): - self.condition._at_fork_reinit() - else: - self.condition = threading.Condition(threading.Lock()) - + self.condition = threading.Condition(threading.Lock()) self.queue.clear() # worker_thread is local to a process, only the thread that issued fork continues diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 9678a310e02..a207f4b6361 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing import os import sys import threading @@ -390,14 +391,10 @@ def test_batch_span_processor_fork(self): self.assertTrue(span_processor.force_flush()) self.assertEqual(len(exporter.get_finished_spans()), 1) exporter.clear() - pid = os.fork() - if pid: - with tracer.start_as_current_span("parent"): - pass - self._check_fork_trace(exporter, ["parent"]) - else: - exporter.clear() + multiprocessing.set_start_method("fork") + + def child(conn): def _target(): with tracer.start_as_current_span("span") as s: s.set_attribute("i", "1") @@ -409,12 +406,14 @@ def _target(): time.sleep(0.5) spans = exporter.get_finished_spans() - self.assertEqual(len(spans), 200) - exporter.clear() - with tracer.start_as_current_span("child"): - with tracer.start_as_current_span("inner"): - pass - self._check_fork_trace(exporter, ["child", "inner"]) + conn.send(len(spans) == 200) + conn.close() + + parent_conn, child_conn = multiprocessing.Pipe() + p = multiprocessing.Process(target=child, args=(child_conn,)) + p.start() + self.assertTrue(parent_conn.recv()) + p.join() span_processor.shutdown() From 1a5063c415a05aa0c5d5dda7768e16c23db463f3 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Thu, 4 Nov 2021 22:55:43 +0530 Subject: [PATCH 18/20] Update log processor too --- .../sdk/_logs/export/__init__.py | 12 +++++ .../sdk/trace/export/__init__.py | 2 +- opentelemetry-sdk/tests/logs/test_export.py | 53 ++++++++++++++++++- .../tests/trace/export/test_export.py | 3 +- 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py index f65c967534b..c705c2b2497 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py @@ -16,6 +16,7 @@ import collections import enum import logging +import os import sys import threading from os import linesep @@ -154,6 +155,17 @@ def __init__( None ] * self._max_export_batch_size # type: List[Optional[LogData]] self._worker_thread.start() + # Only available in *nix since py37. + if hasattr(os, "register_at_fork"): + os.register_at_fork( + after_in_child=self._at_fork_reinit + ) # pylint: disable=protected-access + + def _at_fork_reinit(self): + self._condition = threading.Condition(threading.Lock()) + self._queue.clear() + self._worker_thread = threading.Thread(target=self.worker, daemon=True) + self._worker_thread.start() def worker(self): timeout = self._schedule_delay_millis / 1e3 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index e89bb4e264f..d40bb4968c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -202,7 +202,7 @@ def __init__( if hasattr(os, "register_at_fork"): os.register_at_fork( after_in_child=self._at_fork_reinit - ) # pylint: disable=broad-except + ) # pylint: disable=protected-access def on_start( self, span: Span, parent_context: typing.Optional[Context] = None diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 964b44f7694..45b83358f93 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -14,7 +14,9 @@ # pylint: disable=protected-access import logging +import multiprocessing import os +import sys import time import unittest from concurrent.futures import ThreadPoolExecutor @@ -38,6 +40,7 @@ from opentelemetry.sdk._logs.severity import SeverityNumber from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT @@ -158,7 +161,7 @@ def test_simple_log_processor_shutdown(self): self.assertEqual(len(finished_logs), 0) -class TestBatchLogProcessor(unittest.TestCase): +class TestBatchLogProcessor(ConcurrencyTestBase): def test_emit_call_log_record(self): exporter = InMemoryLogExporter() log_processor = Mock(wraps=BatchLogProcessor(exporter)) @@ -269,6 +272,54 @@ def bulk_log_and_flush(num_logs): finished_logs = exporter.get_finished_logs() self.assertEqual(len(finished_logs), 2415) + @unittest.skipUnless( + hasattr(os, "fork") and sys.version_info >= (3, 7), + "needs *nix and minor version 7 or later", + ) + def test_batch_log_processor_fork(self): + # pylint: disable=invalid-name + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor( + exporter, + max_export_batch_size=64, + schedule_delay_millis=10, + ) + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("test-fork") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.critical("yolo") + time.sleep(0.5) # give some time for the exporter to upload + + self.assertTrue(log_processor.force_flush()) + self.assertEqual(len(exporter.get_finished_logs()), 1) + exporter.clear() + + multiprocessing.set_start_method("fork") + + def child(conn): + def _target(): + logger.critical("Critical message child") + + self.run_with_many_threads(_target, 100) + + time.sleep(0.5) + + logs = exporter.get_finished_logs() + conn.send(len(logs) == 100) + conn.close() + + parent_conn, child_conn = multiprocessing.Pipe() + p = multiprocessing.Process(target=child, args=(child_conn,)) + p.start() + self.assertTrue(parent_conn.recv()) + p.join() + + log_processor.shutdown() + class TestConsoleExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index a207f4b6361..00ccfe44d38 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -373,6 +373,7 @@ def _check_fork_trace(self, exporter, expected): "needs *nix and minor version 7 or later", ) def test_batch_span_processor_fork(self): + # pylint: disable=invalid-name tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer(__name__) @@ -392,8 +393,6 @@ def test_batch_span_processor_fork(self): self.assertEqual(len(exporter.get_finished_spans()), 1) exporter.clear() - multiprocessing.set_start_method("fork") - def child(conn): def _target(): with tracer.start_as_current_span("span") as s: From f599c169a82b3ca59dd6915d04423a91b7828519 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Thu, 4 Nov 2021 23:44:22 +0530 Subject: [PATCH 19/20] Add CHANGELOG entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f4c305ae74..f8c6a90ddcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2153](https://github.com/open-telemetry/opentelemetry-python/pull/2153)) - Add metrics API ([#1887](https://github.com/open-telemetry/opentelemetry-python/pull/1887)) +- Make batch processor fork aware and reinit when needed +- ([#2242](https://github.com/open-telemetry/opentelemetry-python/pull/2242)) ## [1.6.2-0.25b2](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.2-0.25b2) - 2021-10-19 From 02c7bb9c56d2288de4a9b110133e606f5d632101 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Date: Fri, 5 Nov 2021 23:58:54 +0530 Subject: [PATCH 20/20] Fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8c6a90ddcb..55dd541b19d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add metrics API ([#1887](https://github.com/open-telemetry/opentelemetry-python/pull/1887)) - Make batch processor fork aware and reinit when needed -- ([#2242](https://github.com/open-telemetry/opentelemetry-python/pull/2242)) + ([#2242](https://github.com/open-telemetry/opentelemetry-python/pull/2242)) ## [1.6.2-0.25b2](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.2-0.25b2) - 2021-10-19