diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 5891e85a39d4fa..9dc7f6299953b4 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -319,34 +319,36 @@ def test_shutdown_immediate_all_methods_in_one_thread(self): def _write_msg_thread(self, q, n, results, i_when_exec_shutdown, event_shutdown, - event_start): - put_atleast = i_when_exec_shutdown//2 - for i in range(1, n+1): + barrier_start): + # All `write_msg_threads` + # put several items into the queue. + for i in range(0, i_when_exec_shutdown//2): + q.put((i, 'LOYD')) + # Wait for the barrier to be complete. + barrier_start.wait() + + for i in range(i, n): try: q.put((i, "YDLO")) except self.queue.ShutDown: results.append(False) break - # Be sure that all write_threads - # put few items into the queue. - if i == put_atleast: - event_start.wait() - - # Triggers shutdown of queue. + # Trigger queue shutdown. if i == i_when_exec_shutdown: + # Only once thread do it. if not event_shutdown.is_set(): event_shutdown.set() results.append(True) q.join() - def _read_msg_thread(self, q, results, event_start): - nbr = 0 + def _read_msg_thread(self, q, results, barrier_start): + # Wait for the barrier to be complete. + barrier_start.wait() while True: try: q.get(False) q.task_done() - nbr += 1 except self.queue.ShutDown: results.append(True) break @@ -354,47 +356,58 @@ def _read_msg_thread(self, q, results, event_start): pass q.join() - def _shutdown_thread(self, q, event_end, immediate): + def _shutdown_thread(self, q, results, event_end, immediate): event_end.wait() q.shutdown(immediate) + results.append(q.qsize() == 0) q.join() - def _join_thread(self, q, event_start): - event_start.wait() + def _join_thread(self, q, barrier_start): + # Wait for the barrier to be complete. + barrier_start.wait() q.join() def _shutdown_all_methods_in_many_threads(self, immediate): + # Run a 'multi-producers/consumers queue' use case, + # with enough items into the queue. + # When shutdown, all running threads will be concerned. q = self.type2test() ps = [] - ev_start = threading.Event() - ev_exec_shutdown = threading.Event() res_puts = [] res_gets = [] + res_shutdown = [] write_threads = 4 - read_threads = 16 - nb_msgs = 1024*4 + read_threads = 6 + join_threads = 2 + nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2 + # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads` + # put their part of items into the queue. And trigger the start of + # other threads as `_read_msg_thread`and `_join_thread`. + barrier_start = threading.Barrier(write_threads+read_threads+join_threads) + ev_exec_shutdown = threading.Event() lprocs = ( (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, when_exec_shutdown, ev_exec_shutdown, - ev_start)), - (self._read_msg_thread, read_threads, (q, res_gets, - ev_start)), - (self._join_thread, 2, (q, ev_start)), - (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + barrier_start)), + (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), + (self._join_thread, join_threads, (q, barrier_start)), + (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), ) - # start all thredas + # start all threads. for func, n, args in lprocs: for i in range(n): ps.append(threading.Thread(target=func, args=args)) ps[-1].start() - ev_start.set() for thread in ps: thread.join() self.assertEqual(res_puts.count(True), 1) self.assertLessEqual(res_gets.count(True), read_threads) + if immediate: + self.assertListEqual(res_shutdown, [True]) + self.assertTrue(q.empty()) def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False)