Skip to content

Commit

Permalink
subprocess's Popen.wait() is now thread safe so that multiple threads
Browse files Browse the repository at this point in the history
may be calling wait() or poll() on a Popen instance at the same time
without losing the Popen.returncode value.  Fixes issue #21291.
  • Loading branch information
gpshead committed Apr 23, 2014
1 parent 9e59967 commit d65ba51
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 10 deletions.
50 changes: 40 additions & 10 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ class STARTUPINFO:
import _posixsubprocess
import select
import selectors
try:
import threading
except ImportError:
import dummy_threading as threading

# When select or poll has indicated that the file is writable,
# we can write up to _PIPE_BUF bytes without risk of blocking.
Expand Down Expand Up @@ -748,6 +752,12 @@ def __init__(self, args, bufsize=-1, executable=None,
pass_fds=()):
"""Create new Popen instance."""
_cleanup()
# Held while anything is calling waitpid before returncode has been
# updated to prevent clobbering returncode if wait() or poll() are
# called from multiple threads at once. After acquiring the lock,
# code must re-check self.returncode to see if another thread just
# finished a waitpid() call.
self._waitpid_lock = threading.Lock()

self._input = None
self._communication_started = False
Expand Down Expand Up @@ -1450,6 +1460,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,
def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
_WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
_WEXITSTATUS=os.WEXITSTATUS):
"""All callers to this function MUST hold self._waitpid_lock."""
# This method is called (indirectly) by __del__, so it cannot
# refer to anything outside of its local scope.
if _WIFSIGNALED(sts):
Expand All @@ -1471,7 +1482,13 @@ def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
"""
if self.returncode is None:
if not self._waitpid_lock.acquire(False):
# Something else is busy calling waitpid. Don't allow two
# at once. We know nothing yet.
return None
try:
if self.returncode is not None:
return self.returncode # Another thread waited.
pid, sts = _waitpid(self.pid, _WNOHANG)
if pid == self.pid:
self._handle_exitstatus(sts)
Expand All @@ -1485,10 +1502,13 @@ def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
# can't get the status.
# http://bugs.python.org/issue15756
self.returncode = 0
finally:
self._waitpid_lock.release()
return self.returncode


def _try_wait(self, wait_flags):
"""All callers to this function MUST hold self._waitpid_lock."""
try:
(pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
except OSError as e:
Expand Down Expand Up @@ -1521,23 +1541,33 @@ def wait(self, timeout=None, endtime=None):
# cribbed from Lib/threading.py in Thread.wait() at r71065.
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
(pid, sts) = self._try_wait(os.WNOHANG)
assert pid == self.pid or pid == 0
if pid == self.pid:
self._handle_exitstatus(sts)
break
if self._waitpid_lock.acquire(False):
try:
if self.returncode is not None:
break # Another thread waited.
(pid, sts) = self._try_wait(os.WNOHANG)
assert pid == self.pid or pid == 0
if pid == self.pid:
self._handle_exitstatus(sts)
break
finally:
self._waitpid_lock.release()
remaining = self._remaining_time(endtime)
if remaining <= 0:
raise TimeoutExpired(self.args, timeout)
delay = min(delay * 2, remaining, .05)
time.sleep(delay)
else:
while self.returncode is None:
(pid, sts) = self._try_wait(0)
# Check the pid and loop as waitpid has been known to return
# 0 even without WNOHANG in odd situations. issue14396.
if pid == self.pid:
self._handle_exitstatus(sts)
with self._waitpid_lock:
if self.returncode is not None:
break # Another thread waited.
(pid, sts) = self._try_wait(0)
# Check the pid and loop as waitpid has been known to
# return 0 even without WNOHANG in odd situations.
# http://bugs.python.org/issue14396.
if pid == self.pid:
self._handle_exitstatus(sts)
return self.returncode


Expand Down
48 changes: 48 additions & 0 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,54 @@ def open_fds():
if exc is not None:
raise exc

@unittest.skipIf(threading is None, "threading required")
def test_threadsafe_wait(self):
"""Issue21291: Popen.wait() needs to be threadsafe for returncode."""
proc = subprocess.Popen([sys.executable, '-c',
'import time; time.sleep(12)'])
self.assertEqual(proc.returncode, None)
results = []

def kill_proc_timer_thread():
results.append(('thread-start-poll-result', proc.poll()))
# terminate it from the thread and wait for the result.
proc.kill()
proc.wait()
results.append(('thread-after-kill-and-wait', proc.returncode))
# this wait should be a no-op given the above.
proc.wait()
results.append(('thread-after-second-wait', proc.returncode))

# This is a timing sensitive test, the failure mode is
# triggered when both the main thread and this thread are in
# the wait() call at once. The delay here is to allow the
# main thread to most likely be blocked in its wait() call.
t = threading.Timer(0.2, kill_proc_timer_thread)
t.start()

# Wait for the process to finish; the thread should kill it
# long before it finishes on its own. Supplying a timeout
# triggers a different code path for better coverage.
proc.wait(timeout=20)
# Should be -9 because of the proc.kill() from the thread.
self.assertEqual(proc.returncode, -9,
msg="unexpected result in wait from main thread")

# This should be a no-op with no change in returncode.
proc.wait()
self.assertEqual(proc.returncode, -9,
msg="unexpected result in second main wait.")

t.join()
# Ensure that all of the thread results are as expected.
# When a race condition occurs in wait(), the returncode could
# be set by the wrong thread that doesn't actually have it
# leading to an incorrect value.
self.assertEqual([('thread-start-poll-result', None),
('thread-after-kill-and-wait', -9),
('thread-after-second-wait', -9)],
results)

def test_issue8780(self):
# Ensure that stdout is inherited from the parent
# if stdout=PIPE is not used
Expand Down
4 changes: 4 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Core and Builtins
Library
-------

- Issue #21291: subprocess's Popen.wait() is now thread safe so that
multiple threads may be calling wait() or poll() on a Popen instance
at the same time without losing the Popen.returncode value.

- Issue #21127: Path objects can now be instantiated from str subclass
instances (such as numpy.str_).

Expand Down

0 comments on commit d65ba51

Please sign in to comment.