Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.6] bpo-22087: Fix Policy.get_event_loop() to detect fork (GH-7208) #7218

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,29 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):

class _Local(threading.local):
_loop = None
_pid = None
_set_called = False

def __init__(self):
super().__init__()
self._pid = os.getpid()

def __init__(self):
self._local = self._Local()

def _check_pid(self):
if self._local._pid != os.getpid():
# If we detect we're in a child process forked by multiprocessing,
# we reset self._local so that we'll get a new event loop.
self._local = self._Local()

def get_event_loop(self):
"""Get the event loop.

This may be None or an instance of EventLoop.
"""
self._check_pid()

if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
Expand All @@ -604,6 +617,7 @@ def get_event_loop(self):

def set_event_loop(self, loop):
"""Set the event loop."""
self._check_pid()
self._local._set_called = True
assert loop is None or isinstance(loop, AbstractEventLoop)
self._local._loop = loop
Expand Down
33 changes: 32 additions & 1 deletion Lib/test/test_asyncio/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import errno
import io
import multiprocessing
import os
import pathlib
import signal
Expand All @@ -12,7 +13,6 @@
import tempfile
import threading
import unittest
import warnings
from unittest import mock

if sys.platform == 'win32':
Expand Down Expand Up @@ -1556,6 +1556,37 @@ def create_watcher(self):
return asyncio.FastChildWatcher()


class ForkedProcessTests(unittest.TestCase):
def setUp(self):
self.parent_loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(self.parent_loop)
self.ctx = multiprocessing.get_context("fork")

def tearDown(self):
self.parent_loop.close()

def _check_loops_not_equal(self, old_loop):
loop = asyncio.get_event_loop()
if loop is old_loop:
raise RuntimeError("Child process inherited parent's event loop")

try:
val = loop.run_until_complete(asyncio.sleep(0.05, result=42))
if val != 42:
raise RuntimeError("new event loop does not work")
finally:
loop.close()

sys.exit(loop is old_loop)

def test_new_loop_in_child(self):
p = self.ctx.Process(target=self._check_loops_not_equal,
args=(self.parent_loop,))
p.start()
p.join()
self.assertEqual(p.exitcode, 0)


class PolicyTests(unittest.TestCase):

def create_policy(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix Policy.get_event_loop() to detect fork and return a new loop.

Original patch by Dan O'Reilly.