From b577a438007c8e0ca458210e218514b429e3fb74 Mon Sep 17 00:00:00 2001 From: Duprat Date: Mon, 18 Mar 2024 17:15:29 +0100 Subject: [PATCH] gh-115258: Fix hanging tests for threading queue shutdown (#115940) This reinstates `test_shutdown_immediate_all_methods_in_many_threads` and improves `test_shutdown_all_methods_in_many_threads`. --- Lib/test/test_queue.py | 132 ++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 61 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index ad31ba1af03b6f6..c4d101101323933 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -317,97 +317,107 @@ def test_shutdown_all_methods_in_one_thread(self): def test_shutdown_immediate_all_methods_in_one_thread(self): return self._shutdown_all_methods_in_one_thread(True) - def _write_msg_thread(self, q, n, results, delay, - i_when_exec_shutdown, - event_start, event_end): - event_start.wait() - for i in range(1, n+1): + def _write_msg_thread(self, q, n, results, + i_when_exec_shutdown, event_shutdown, + barrier_start): + # All `write_msg_threads` + # put several items into the queue. + for i in range(0, i_when_exec_shutdown//2): + q.put((i, 'LOYD')) + # Wait for the barrier to be complete. + barrier_start.wait() + + for i in range(i_when_exec_shutdown//2, n): try: q.put((i, "YDLO")) - results.append(True) except self.queue.ShutDown: results.append(False) - # triggers shutdown of queue - if i == i_when_exec_shutdown: - event_end.set() - time.sleep(delay) - # end of all puts - q.join() + break - def _read_msg_thread(self, q, nb, results, delay, event_start): - event_start.wait() - block = True - while nb: - time.sleep(delay) + # Trigger queue shutdown. + if i == i_when_exec_shutdown: + # Only one thread should call shutdown(). + if not event_shutdown.is_set(): + event_shutdown.set() + results.append(True) + + def _read_msg_thread(self, q, results, barrier_start): + # Get at least one item. + q.get(True) + q.task_done() + # Wait for the barrier to be complete. + barrier_start.wait() + while True: try: - # Get at least one message - q.get(block) - block = False + q.get(False) q.task_done() - results.append(True) - nb -= 1 except self.queue.ShutDown: - results.append(False) - nb -= 1 + results.append(True) + break except self.queue.Empty: pass - q.join() - def _shutdown_thread(self, q, event_end, immediate): + def _shutdown_thread(self, q, results, event_end, immediate): event_end.wait() q.shutdown(immediate) - q.join() + results.append(q.qsize() == 0) - def _join_thread(self, q, delay, event_start): - event_start.wait() - time.sleep(delay) + def _join_thread(self, q, barrier_start): + # Wait for the barrier to be complete. + barrier_start.wait() q.join() def _shutdown_all_methods_in_many_threads(self, immediate): + # Run a 'multi-producers/consumers queue' use case, + # with enough items into the queue. + # When shutdown, all running threads will be joined. q = self.type2test() ps = [] - ev_start = threading.Event() - ev_exec_shutdown = threading.Event() res_puts = [] res_gets = [] - delay = 1e-4 - read_process = 4 - nb_msgs = read_process * 16 - nb_msgs_r = nb_msgs // read_process - when_exec_shutdown = nb_msgs // 2 - lprocs = ( - (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, - when_exec_shutdown, - ev_start, ev_exec_shutdown)), - (self._read_msg_thread, read_process, (q, nb_msgs_r, - res_gets, delay*2, - ev_start)), - (self._join_thread, 2, (q, delay*2, ev_start)), - (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), - ) - # start all threds + res_shutdown = [] + write_threads = 4 + read_threads = 6 + join_threads = 2 + nb_msgs = 1024*64 + nb_msgs_w = nb_msgs // write_threads + when_exec_shutdown = nb_msgs_w // 2 + # Use of a Barrier to ensure that + # - all write threads put all their items into the queue, + # - all read thread get at least one item from the queue, + # and keep on running until shutdown. + # The join thread is started only when shutdown is immediate. + nparties = write_threads + read_threads + if immediate: + nparties += join_threads + barrier_start = threading.Barrier(nparties) + ev_exec_shutdown = threading.Event() + lprocs = [ + (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, + when_exec_shutdown, ev_exec_shutdown, + barrier_start)), + (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), + (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), + ] + if immediate: + lprocs.append((self._join_thread, join_threads, (q, barrier_start))) + # start all threads. for func, n, args in lprocs: for i in range(n): ps.append(threading.Thread(target=func, args=args)) ps[-1].start() - # set event in order to run q.shutdown() - ev_start.set() - - if not immediate: - assert(len(res_gets) == len(res_puts)) - assert(res_gets.count(True) == res_puts.count(True)) - else: - assert(len(res_gets) <= len(res_puts)) - assert(res_gets.count(True) <= res_puts.count(True)) - - for thread in ps[1:]: + for thread in ps: thread.join() - @unittest.skip("test times out (gh-115258)") + self.assertTrue(True in res_puts) + self.assertEqual(res_gets.count(True), read_threads) + if immediate: + self.assertListEqual(res_shutdown, [True]) + self.assertTrue(q.empty()) + def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) - @unittest.skip("test times out (gh-115258)") def test_shutdown_immediate_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(True)