From b431b87527f1136ac1393c9d3a82b56b6d92cb0b Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sat, 10 Feb 2024 15:10:48 -0800 Subject: [PATCH] Revert "gh-96471: Add shutdown() method to queue.Queue (#104750)" Reason for the revert: https://github.com/python/cpython/issues/115258 test_queue timed out on Windows free-threading build. The PR author had mentioned this in https://github.com/python/cpython/pull/104750#issuecomment-1935449280 but I forgot about it and merged anyway. This reverts commit b2d9d134dcb5633deebebf2b0118cd4f7ca598a2. --- Doc/library/queue.rst | 38 -- Doc/whatsnew/3.13.rst | 7 - Lib/queue.py | 50 --- Lib/test/test_queue.py | 378 ------------------ ...3-05-06-04-57-10.gh-issue-96471.C9wAU7.rst | 1 - 5 files changed, 474 deletions(-) delete mode 100644 Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 1421fc2e552f0e..b2b787c5a8260c 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -93,14 +93,6 @@ The :mod:`queue` module defines the following classes and exceptions: on a :class:`Queue` object which is full. -.. exception:: ShutDown - - Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on - a :class:`Queue` object which has been shut down. - - .. versionadded:: 3.13 - - .. _queueobjects: Queue Objects @@ -143,8 +135,6 @@ provide the public methods described below. immediately available, else raise the :exc:`Full` exception (*timeout* is ignored in that case). - Raises :exc:`ShutDown` if the queue has been shut down. - .. method:: Queue.put_nowait(item) @@ -165,9 +155,6 @@ provide the public methods described below. an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`. - Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if - the queue has been shut down immediately. - .. method:: Queue.get_nowait() @@ -190,8 +177,6 @@ fully processed by daemon consumer threads. Raises a :exc:`ValueError` if called more times than there were items placed in the queue. - Raises :exc:`ShutDown` if the queue has been shut down immediately. - .. method:: Queue.join() @@ -202,8 +187,6 @@ fully processed by daemon consumer threads. indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. - Raises :exc:`ShutDown` if the queue has been shut down immediately. - Example of how to wait for enqueued tasks to be completed:: @@ -231,27 +214,6 @@ Example of how to wait for enqueued tasks to be completed:: print('All work completed') -Terminating queues -^^^^^^^^^^^^^^^^^^ - -:class:`Queue` objects can be made to prevent further interaction by shutting -them down. - -.. method:: Queue.shutdown(immediate=False) - - Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` raise - :exc:`ShutDown`. - - By default, :meth:`~Queue.get` on a shut down queue will only raise once the - queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise - immediately instead. - - All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate* - is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`. - - .. versionadded:: 3.13 - - SimpleQueue Objects ------------------- diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index aee37737a9990a..7b391a0d258fe3 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -406,13 +406,6 @@ pdb command line option or :envvar:`PYTHONSAFEPATH` environment variable). (Contributed by Tian Gao and Christian Walther in :gh:`111762`.) -queue ------ - -* Add :meth:`queue.Queue.shutdown` (along with :exc:`queue.ShutDown`) for queue - termination. - (Contributed by Laurie Opperman and Yves Duprat in :gh:`104750`.) - re -- * Rename :exc:`!re.error` to :exc:`re.PatternError` for improved clarity. diff --git a/Lib/queue.py b/Lib/queue.py index 467ff4fcecb134..55f50088460f9e 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,10 +25,6 @@ class Full(Exception): pass -class ShutDown(Exception): - '''Raised when put/get with shut-down queue.''' - - class Queue: '''Create a queue object with a given maximum size. @@ -58,9 +54,6 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 - # Queue shutdown state - self.is_shutdown = False - def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -74,8 +67,6 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue. - - Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 @@ -93,8 +84,6 @@ def join(self): to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. - - Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: while self.unfinished_tasks: @@ -140,12 +129,8 @@ def put(self, item, block=True, timeout=None): Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). - - Raises ShutDown if the queue has been shut down. ''' with self.not_full: - if self.is_shutdown: - raise ShutDown if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: @@ -153,8 +138,6 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if self.is_shutdown: - raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -164,8 +147,6 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if self.is_shutdown: - raise ShutDown self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -180,21 +161,14 @@ def get(self, block=True, timeout=None): Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). - - Raises ShutDown if the queue has been shut down and is empty, - or if the queue has been shut down immediately. ''' with self.not_empty: - if self.is_shutdown and not self._qsize(): - raise ShutDown if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() - if self.is_shutdown and not self._qsize(): - raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -204,8 +178,6 @@ def get(self, block=True, timeout=None): if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.is_shutdown and not self._qsize(): - raise ShutDown item = self._get() self.not_full.notify() return item @@ -226,28 +198,6 @@ def get_nowait(self): ''' return self.get(block=False) - def shutdown(self, immediate=False): - '''Shut-down the queue, making queue gets and puts raise. - - By default, gets will only raise once the queue is empty. Set - 'immediate' to True to make gets raise immediately instead. - - All blocked callers of put() will be unblocked, and also get() - and join() if 'immediate'. The ShutDown exception is raised. - ''' - with self.mutex: - self.is_shutdown = True - if immediate: - n_items = self._qsize() - while self._qsize(): - self._get() - if self.unfinished_tasks > 0: - self.unfinished_tasks -= 1 - self.not_empty.notify_all() - # release all blocked threads in `join()` - self.all_tasks_done.notify_all() - self.not_full.notify_all() - # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index e3d4d566cdda48..33113a72e6b6a9 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -241,384 +241,6 @@ def test_shrinking_queue(self): with self.assertRaises(self.queue.Full): q.put_nowait(4) - def test_shutdown_empty(self): - q = self.type2test() - q.shutdown() - with self.assertRaises(self.queue.ShutDown): - q.put("data") - with self.assertRaises(self.queue.ShutDown): - q.get() - - def test_shutdown_nonempty(self): - q = self.type2test() - q.put("data") - q.shutdown() - q.get() - with self.assertRaises(self.queue.ShutDown): - q.get() - - def test_shutdown_immediate(self): - q = self.type2test() - q.put("data") - q.shutdown(immediate=True) - with self.assertRaises(self.queue.ShutDown): - q.get() - - def test_shutdown_allowed_transitions(self): - # allowed transitions would be from alive via shutdown to immediate - q = self.type2test() - self.assertFalse(q.is_shutdown) - - q.shutdown() - self.assertTrue(q.is_shutdown) - - q.shutdown(immediate=True) - self.assertTrue(q.is_shutdown) - - q.shutdown(immediate=False) - - def _shutdown_all_methods_in_one_thread(self, immediate): - q = self.type2test(2) - q.put("L") - q.put_nowait("O") - q.shutdown(immediate) - - with self.assertRaises(self.queue.ShutDown): - q.put("E") - with self.assertRaises(self.queue.ShutDown): - q.put_nowait("W") - if immediate: - with self.assertRaises(self.queue.ShutDown): - q.get() - with self.assertRaises(self.queue.ShutDown): - q.get_nowait() - with self.assertRaises(ValueError): - q.task_done() - q.join() - else: - self.assertIn(q.get(), "LO") - q.task_done() - self.assertIn(q.get(), "LO") - q.task_done() - q.join() - # on shutdown(immediate=False) - # when queue is empty, should raise ShutDown Exception - with self.assertRaises(self.queue.ShutDown): - q.get() # p.get(True) - with self.assertRaises(self.queue.ShutDown): - q.get_nowait() # p.get(False) - with self.assertRaises(self.queue.ShutDown): - q.get(True, 1.0) - - def test_shutdown_all_methods_in_one_thread(self): - return self._shutdown_all_methods_in_one_thread(False) - - def test_shutdown_immediate_all_methods_in_one_thread(self): - return self._shutdown_all_methods_in_one_thread(True) - - def _write_msg_thread(self, q, n, results, delay, - i_when_exec_shutdown, - event_start, event_end): - event_start.wait() - for i in range(1, n+1): - try: - q.put((i, "YDLO")) - results.append(True) - except self.queue.ShutDown: - results.append(False) - # triggers shutdown of queue - if i == i_when_exec_shutdown: - event_end.set() - time.sleep(delay) - # end of all puts - q.join() - - def _read_msg_thread(self, q, nb, results, delay, event_start): - event_start.wait() - block = True - while nb: - time.sleep(delay) - try: - # Get at least one message - q.get(block) - block = False - q.task_done() - results.append(True) - nb -= 1 - except self.queue.ShutDown: - results.append(False) - nb -= 1 - except self.queue.Empty: - pass - q.join() - - def _shutdown_thread(self, q, event_end, immediate): - event_end.wait() - q.shutdown(immediate) - q.join() - - def _join_thread(self, q, delay, event_start): - event_start.wait() - time.sleep(delay) - q.join() - - def _shutdown_all_methods_in_many_threads(self, immediate): - q = self.type2test() - ps = [] - ev_start = threading.Event() - ev_exec_shutdown = threading.Event() - res_puts = [] - res_gets = [] - delay = 1e-4 - read_process = 4 - nb_msgs = read_process * 16 - nb_msgs_r = nb_msgs // read_process - when_exec_shutdown = nb_msgs // 2 - lprocs = ( - (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, - when_exec_shutdown, - ev_start, ev_exec_shutdown)), - (self._read_msg_thread, read_process, (q, nb_msgs_r, - res_gets, delay*2, - ev_start)), - (self._join_thread, 2, (q, delay*2, ev_start)), - (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), - ) - # start all threds - for func, n, args in lprocs: - for i in range(n): - ps.append(threading.Thread(target=func, args=args)) - ps[-1].start() - # set event in order to run q.shutdown() - ev_start.set() - - if not immediate: - assert(len(res_gets) == len(res_puts)) - assert(res_gets.count(True) == res_puts.count(True)) - else: - assert(len(res_gets) <= len(res_puts)) - assert(res_gets.count(True) <= res_puts.count(True)) - - for thread in ps[1:]: - thread.join() - - def test_shutdown_all_methods_in_many_threads(self): - return self._shutdown_all_methods_in_many_threads(False) - - def test_shutdown_immediate_all_methods_in_many_threads(self): - return self._shutdown_all_methods_in_many_threads(True) - - def _get(self, q, go, results, shutdown=False): - go.wait() - try: - msg = q.get() - results.append(not shutdown) - return not shutdown - except self.queue.ShutDown: - results.append(shutdown) - return shutdown - - def _get_shutdown(self, q, go, results): - return self._get(q, go, results, True) - - def _get_task_done(self, q, go, results): - go.wait() - try: - msg = q.get() - q.task_done() - results.append(True) - return msg - except self.queue.ShutDown: - results.append(False) - return False - - def _put(self, q, msg, go, results, shutdown=False): - go.wait() - try: - q.put(msg) - results.append(not shutdown) - return not shutdown - except self.queue.ShutDown: - results.append(shutdown) - return shutdown - - def _put_shutdown(self, q, msg, go, results): - return self._put(q, msg, go, results, True) - - def _join(self, q, results, shutdown=False): - try: - q.join() - results.append(not shutdown) - return not shutdown - except self.queue.ShutDown: - results.append(shutdown) - return shutdown - - def _join_shutdown(self, q, results): - return self._join(q, results, True) - - def _shutdown_get(self, immediate): - q = self.type2test(2) - results = [] - go = threading.Event() - q.put("Y") - q.put("D") - # queue full - - if immediate: - thrds = ( - (self._get_shutdown, (q, go, results)), - (self._get_shutdown, (q, go, results)), - ) - else: - thrds = ( - # on shutdown(immediate=False) - # one of these threads shoud raise Shutdown - (self._get, (q, go, results)), - (self._get, (q, go, results)), - (self._get, (q, go, results)), - ) - threads = [] - for func, params in thrds: - threads.append(threading.Thread(target=func, args=params)) - threads[-1].start() - q.shutdown(immediate) - go.set() - for t in threads: - t.join() - if immediate: - self.assertListEqual(results, [True, True]) - else: - self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1)) - - def test_shutdown_get(self): - return self._shutdown_get(False) - - def test_shutdown_immediate_get(self): - return self._shutdown_get(True) - - def _shutdown_put(self, immediate): - q = self.type2test(2) - results = [] - go = threading.Event() - q.put("Y") - q.put("D") - # queue fulled - - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._put_shutdown, (q, "W", go, results)), - ) - threads = [] - for func, params in thrds: - threads.append(threading.Thread(target=func, args=params)) - threads[-1].start() - q.shutdown() - go.set() - for t in threads: - t.join() - - self.assertEqual(results, [True]*len(thrds)) - - def test_shutdown_put(self): - return self._shutdown_put(False) - - def test_shutdown_immediate_put(self): - return self._shutdown_put(True) - - def _shutdown_join(self, immediate): - q = self.type2test() - results = [] - q.put("Y") - go = threading.Event() - nb = q.qsize() - - thrds = ( - (self._join, (q, results)), - (self._join, (q, results)), - ) - threads = [] - for func, params in thrds: - threads.append(threading.Thread(target=func, args=params)) - threads[-1].start() - if not immediate: - res = [] - for i in range(nb): - threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res))) - threads[-1].start() - q.shutdown(immediate) - go.set() - for t in threads: - t.join() - - self.assertEqual(results, [True]*len(thrds)) - - def test_shutdown_immediate_join(self): - return self._shutdown_join(True) - - def test_shutdown_join(self): - return self._shutdown_join(False) - - def _shutdown_put_join(self, immediate): - q = self.type2test(2) - results = [] - go = threading.Event() - q.put("Y") - nb = q.qsize() - # queue not fulled - - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._join, (q, results)), - ) - threads = [] - for func, params in thrds: - threads.append(threading.Thread(target=func, args=params)) - threads[-1].start() - self.assertEqual(q.unfinished_tasks, nb) - for i in range(nb): - t = threading.Thread(target=q.task_done) - t.start() - threads.append(t) - q.shutdown(immediate) - go.set() - for t in threads: - t.join() - - self.assertEqual(results, [True]*len(thrds)) - - def test_shutdown_immediate_put_join(self): - return self._shutdown_put_join(True) - - def test_shutdown_put_join(self): - return self._shutdown_put_join(False) - - def test_shutdown_get_task_done_join(self): - q = self.type2test(2) - results = [] - go = threading.Event() - q.put("Y") - q.put("D") - self.assertEqual(q.unfinished_tasks, q.qsize()) - - thrds = ( - (self._get_task_done, (q, go, results)), - (self._get_task_done, (q, go, results)), - (self._join, (q, results)), - (self._join, (q, results)), - ) - threads = [] - for func, params in thrds: - threads.append(threading.Thread(target=func, args=params)) - threads[-1].start() - go.set() - q.shutdown(False) - for t in threads: - t.join() - - self.assertEqual(results, [True]*len(thrds)) - - class QueueTest(BaseQueueTestMixin): def setUp(self): diff --git a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst b/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst deleted file mode 100644 index 0bace8d8bd425c..00000000000000 --- a/Misc/NEWS.d/next/Library/2023-05-06-04-57-10.gh-issue-96471.C9wAU7.rst +++ /dev/null @@ -1 +0,0 @@ -Add :py:class:`queue.Queue` termination with :py:meth:`~queue.Queue.shutdown`.