Skip to content

Commit

Permalink
pythongh-94440: Fix issue of ProcessPoolExecutor shutdown hanging
Browse files Browse the repository at this point in the history
  • Loading branch information
yonatanp committed Jun 30, 2022
1 parent 62bb7a3 commit 799d25c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ def run(self):
if self.is_shutting_down():
self.flag_executor_shutting_down()

# If only canceled futures remain in pending_work_items, we
# should purge them now to avoid waiting forever in our
# subsequent call to wait_result_broken_or_wakeup.
self.add_call_item_to_queue()

# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not self.pending_work_items:
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from logging.handlers import QueueHandler
import os
import queue
import signal
import sys
import threading
import time
Expand Down Expand Up @@ -397,6 +398,33 @@ def test_hang_gh83386(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")

def test_hang_gh94440(self):
"""shutdown(wait=True) doesn't hang when a future was submitted and
quickly canceled right before shutdown.
See https://github.com/python/cpython/issues/94440.
"""
if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
raise RuntimeError("timed out waiting for shutdown")

kwargs = {}
if getattr(self, 'ctx', None):
kwargs['mp_context'] = self.get_context()
executor = self.executor_type(max_workers=1, **kwargs)
executor.submit(int).result()
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(5)
executor.submit(int).cancel()
executor.shutdown(wait=True)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
def test_threads_terminate(self):
Expand Down

0 comments on commit 799d25c

Please sign in to comment.