Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch processor fork aware and reinit when needed #2242

Merged
merged 27 commits into from
Nov 5, 2021

Conversation

srikanthccv
Copy link
Member

@srikanthccv srikanthccv commented Oct 27, 2021

Description

Since 3.7 python provides register_at_fork which can be used to make our batch processor fork-safe.
celery/gunicorn don't already support Windows since os.fork is not available. ASK: where to document this? Some stats about minor version usage from pypystats.org https://pypistats.org/packages/opentelemetry-sdk.

I have created a repo containing working examples using this change without library worker hooks here https://github.com/lonewolf3739/potential-potato

@srikanthccv srikanthccv marked this pull request as ready for review October 29, 2021 08:59
@srikanthccv srikanthccv requested a review from a team October 29, 2021 08:59
@srikanthccv srikanthccv changed the title Experiment with deamon start delay Delay daemon thread start in BatchSpanProcessor Oct 29, 2021
@ocelotl
Copy link
Contributor

ocelotl commented Oct 29, 2021

Can a test case be added? I understand that testing threads can be difficult, asking just in case ✌️

@owais
Copy link
Contributor

owais commented Oct 29, 2021

If any sampled span is received by processor before forking happens they still run into same incompatible problems with fork and threads.

Could you elaborate on this a bit?

@srikanthccv
Copy link
Member Author

If any sampled span is received by processor before forking happens they still run into same incompatible problems with fork and threads.

Could you elaborate on this a bit?

Imagine uwsgi itself (or some intermediary) is participating in tracing pipeline and emits a real traces before creating the worker processes. Now span processor starts the daemon thread for processing them. This instance of batch processor has acquired the locks. There is highly likely chances of worker processes created with state where lock is held by parent process and deadlock occurs when they try to acquire lock. Does it make it clear?

@srikanthccv
Copy link
Member Author

Python does reinit to solve the similar problem in the standard libs here, another but it is mostly internal. Taking an inspiration from there, this diff makes it work every time. Thoughts?

diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
index 4f0cc817c..c277bd317 100644
--- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
+++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
@@ -18,7 +18,7 @@ import sys
 import threading
 import typing
 from enum import Enum
-from os import environ, linesep
+from os import environ, linesep, register_at_fork
 from typing import Optional

 from opentelemetry.context import (
@@ -197,6 +197,7 @@ class BatchSpanProcessor(SpanProcessor):
             None
         ] * self.max_export_batch_size  # type: typing.List[typing.Optional[Span]]
         self.worker_thread.start()
+        register_at_fork(after_in_child=self._at_fork_reinit)

     def on_start(
         self, span: Span, parent_context: typing.Optional[Context] = None
@@ -220,6 +221,13 @@ class BatchSpanProcessor(SpanProcessor):
             with self.condition:
                 self.condition.notify()

+    def _at_fork_reinit(self):
+        self.condition._at_fork_reinit()
+        self.worker_thread = threading.Thread(
+            name="OtelBatchSpanProcessor", target=self.worker, daemon=True
+        )
+        self.worker_thread.start()
+
     def worker(self):
         timeout = self.schedule_delay_millis / 1e3
         flush_request = None  # type: typing.Optional[_FlushRequest]

@owais
Copy link
Contributor

owais commented Oct 30, 2021

reinit solution sounds much better if it works as expected.

@owais
Copy link
Contributor

owais commented Oct 30, 2021

Imagine uwsgi itself (or some intermediary) is participating in tracing pipeline and emits a real traces before creating the worker processes. Now span processor starts the daemon thread for processing them. This instance of batch processor has acquired the locks. There is highly likely chances of worker processes created with state where lock is held by parent process and deadlock occurs when they try to acquire lock. Does it make it clear?

Yes but then this feels more like a band-aid than a solution. I think we should experiment with the reinit idea you shared above.

@srikanthccv srikanthccv marked this pull request as draft October 30, 2021 15:35
@owais
Copy link
Contributor

owais commented Nov 1, 2021

Looks like this was introduced only in 3.7 + Unix. I think we should still do this but document the limitations.

https://docs.python.org/3/library/os.html?highlight=at_fork#os.register_at_fork

@owais
Copy link
Contributor

owais commented Nov 1, 2021

BTW what are the implications when users upgrade to next version with existing solutions in place (gunicorn, celery signals). Will we end up initializing two pipelines? Would there be any conflicts?

@srikanthccv srikanthccv changed the title Make batch span processor fork aware and reinit when needed Make batch processor fork aware and reinit when needed Nov 4, 2021
@srikanthccv
Copy link
Member Author

Updated log processor and added test for that as well.

@srikanthccv srikanthccv added the Approve Public API check This label shows that the public symbols added or changed in a PR are strictly necessary label Nov 4, 2021
@owais owais merged commit 29e4bab into open-telemetry:main Nov 5, 2021
owais added a commit to owais/opentelemetry-python-contrib that referenced this pull request Nov 10, 2021
owais added a commit to owais/opentelemetry-python-contrib that referenced this pull request Nov 10, 2021
owais added a commit to open-telemetry/opentelemetry-python-contrib that referenced this pull request Nov 10, 2021
@phillipuniverse
Copy link

Since 3.7 python provides register_at_fork which can be used to make our batch processor fork-safe.

@lonewolf3739 does this mean that this PR (and general fork safety support) will only work with Python 3.7+?

@Shackelford-Arden
Copy link

BTW what are the implications when users upgrade to next version with existing solutions in place (gunicorn, celery signals). Will we end up initializing two pipelines? Would there be any conflicts?

You mentioned Celery here, and as a user of Celery who is indeed using Signals (specifically the worker_process_init.connect signal), I'd be curious if would you happen to be able to provide context on how this code change might affect that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Approve Public API check This label shows that the public symbols added or changed in a PR are strictly necessary
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants