Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-94440: Fix issue of ProcessPoolExecutor shutdown hanging #94468

Merged
merged 8 commits into from
Mar 16, 2023
Merged
5 changes: 5 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ def run(self):
if self.is_shutting_down():
self.flag_executor_shutting_down()

# When only canceled futures remain in pending_work_items, our
# next call to wait_result_broken_or_wakeup would hang forever.
# This makes sure we have some running futures or none at all.
self.add_call_item_to_queue()

# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not self.pending_work_items:
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from logging.handlers import QueueHandler
import os
import queue
import signal
import sys
import threading
import time
Expand Down Expand Up @@ -397,6 +398,33 @@ def test_hang_gh83386(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")

def test_hang_gh94440(self):
"""shutdown(wait=True) doesn't hang when a future was submitted and
quickly canceled right before shutdown.

See https://github.com/python/cpython/issues/94440.
"""
if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
raise RuntimeError("timed out waiting for shutdown")

kwargs = {}
if getattr(self, 'ctx', None):
kwargs['mp_context'] = self.get_context()
executor = self.executor_type(max_workers=1, **kwargs)
executor.submit(int).result()
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(5)
executor.submit(int).cancel()
executor.shutdown(wait=True)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
def test_threads_terminate(self):
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ Thomas Perl
Mathieu Perreault
Mark Perrego
Trevor Perrin
Yonatan Perry
Gabriel de Perthuis
Tim Peters
Benjamin Peterson
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed ``concurrent.futures.process._ExecutorManagerThread`` to flush canceled
gpshead marked this conversation as resolved.
Show resolved Hide resolved
futures before attempting to shutdown, so that it doesn't hang if all that
remained in its pending work items are canceled futures.