diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d7e7e41967cc21..6742a07753c921 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, fs - done) + +def _result_or_cancel(fut, timeout=None): + try: + try: + return fut.result(timeout) + finally: + fut.cancel() + finally: + # Break a reference cycle with the exception in self._exception + del fut + + class Future(object): """Represents the result of an asynchronous computation.""" @@ -604,9 +616,9 @@ def result_iterator(): while fs: # Careful not to keep a reference to the popped future if timeout is None: - yield fs.pop().result() + yield _result_or_cancel(fs.pop()) else: - yield fs.pop().result(end_time - time.monotonic()) + yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) finally: for future in fs: future.cancel() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index f0473495fdc655..02c2fa06fad4eb 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -931,6 +931,33 @@ def submit(pool): with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: workers.submit(tuple) + def test_executor_map_current_future_cancel(self): + stop_event = threading.Event() + log = [] + + def log_n_wait(ident): + log.append(f"{ident=} started") + try: + stop_event.wait() + finally: + log.append(f"{ident=} stopped") + + with self.executor_type(max_workers=1) as pool: + # submit work to saturate the pool + fut = pool.submit(log_n_wait, ident="first") + try: + with contextlib.closing( + pool.map(log_n_wait, ["second", "third"], timeout=0) + ) as gen: + with self.assertRaises(TimeoutError): + next(gen) + finally: + stop_event.set() + fut.result() + # ident='second' is cancelled as a result of raising a TimeoutError + # ident='third' is cancelled because it remained in the collection of futures + self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) + class ProcessPoolExecutorTest(ExecutorTest): diff --git a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst new file mode 100644 index 00000000000000..34b017078436d2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst @@ -0,0 +1 @@ +Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.