Skip to content

Commit

Permalink
gh-95166: cancel map waited on future on timeout (GH-95169) (GH-95364)
Browse files Browse the repository at this point in the history
Co-authored-by: Kumar Aditya <[email protected]>
(cherry picked from commit e16d4ed)

Co-authored-by: Thomas Grainger <[email protected]>
  • Loading branch information
miss-islington and graingert authored Jul 28, 2022
1 parent 1230792 commit 763801a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
16 changes: 14 additions & 2 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, fs - done)


def _result_or_cancel(fut, timeout=None):
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
# Break a reference cycle with the exception in self._exception
del fut


class Future(object):
"""Represents the result of an asynchronous computation."""

Expand Down Expand Up @@ -604,9 +616,9 @@ def result_iterator():
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
yield _result_or_cancel(fs.pop())
else:
yield fs.pop().result(end_time - time.monotonic())
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
Expand Down
27 changes: 27 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,33 @@ def submit(pool):
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)

def test_executor_map_current_future_cancel(self):
stop_event = threading.Event()
log = []

def log_n_wait(ident):
log.append(f"{ident=} started")
try:
stop_event.wait()
finally:
log.append(f"{ident=} stopped")

with self.executor_type(max_workers=1) as pool:
# submit work to saturate the pool
fut = pool.submit(log_n_wait, ident="first")
try:
with contextlib.closing(
pool.map(log_n_wait, ["second", "third"], timeout=0)
) as gen:
with self.assertRaises(TimeoutError):
next(gen)
finally:
stop_event.set()
fut.result()
# ident='second' is cancelled as a result of raising a TimeoutError
# ident='third' is cancelled because it remained in the collection of futures
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.

0 comments on commit 763801a

Please sign in to comment.