From 6188472b0839519fa44411d8b6fdb4e9d3aa9d84 Mon Sep 17 00:00:00 2001 From: Duprat Date: Sun, 25 Feb 2024 14:58:13 +0100 Subject: [PATCH 1/9] Fix infinite loop in '_read_msg_thread' of 'test_shutdown_[immediate_]all_methods_in_many_threads' unittests --- Lib/test/test_queue.py | 85 +++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 92d670ca6f8f5b..5891e85a39d4fa 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -317,38 +317,39 @@ def test_shutdown_all_methods_in_one_thread(self): def test_shutdown_immediate_all_methods_in_one_thread(self): return self._shutdown_all_methods_in_one_thread(True) - def _write_msg_thread(self, q, n, results, delay, - i_when_exec_shutdown, - event_start, event_end): - event_start.wait() + def _write_msg_thread(self, q, n, results, + i_when_exec_shutdown, event_shutdown, + event_start): + put_atleast = i_when_exec_shutdown//2 for i in range(1, n+1): try: q.put((i, "YDLO")) - results.append(True) except self.queue.ShutDown: results.append(False) - # triggers shutdown of queue + break + + # Be sure that all write_threads + # put few items into the queue. + if i == put_atleast: + event_start.wait() + + # Triggers shutdown of queue. if i == i_when_exec_shutdown: - event_end.set() - time.sleep(delay) - # end of all puts + if not event_shutdown.is_set(): + event_shutdown.set() + results.append(True) q.join() - def _read_msg_thread(self, q, nb, results, delay, event_start): - event_start.wait() - block = True - while nb: - time.sleep(delay) + def _read_msg_thread(self, q, results, event_start): + nbr = 0 + while True: try: - # Get at least one message - q.get(block) - block = False + q.get(False) q.task_done() - results.append(True) - nb -= 1 + nbr += 1 except self.queue.ShutDown: - results.append(False) - nb -= 1 + results.append(True) + break except self.queue.Empty: pass q.join() @@ -358,9 +359,8 @@ def _shutdown_thread(self, q, event_end, immediate): q.shutdown(immediate) q.join() - def _join_thread(self, q, delay, event_start): + def _join_thread(self, q, event_start): event_start.wait() - time.sleep(delay) q.join() def _shutdown_all_methods_in_many_threads(self, immediate): @@ -370,44 +370,35 @@ def _shutdown_all_methods_in_many_threads(self, immediate): ev_exec_shutdown = threading.Event() res_puts = [] res_gets = [] - delay = 1e-4 - read_process = 4 - nb_msgs = read_process * 16 - nb_msgs_r = nb_msgs // read_process - when_exec_shutdown = nb_msgs // 2 + write_threads = 4 + read_threads = 16 + nb_msgs = 1024*4 + nb_msgs_w = nb_msgs // write_threads + when_exec_shutdown = nb_msgs_w // 2 lprocs = ( - (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, - when_exec_shutdown, - ev_start, ev_exec_shutdown)), - (self._read_msg_thread, read_process, (q, nb_msgs_r, - res_gets, delay*2, + (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, + when_exec_shutdown, ev_exec_shutdown, + ev_start)), + (self._read_msg_thread, read_threads, (q, res_gets, ev_start)), - (self._join_thread, 2, (q, delay*2, ev_start)), + (self._join_thread, 2, (q, ev_start)), (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), ) - # start all threds + # start all thredas for func, n, args in lprocs: for i in range(n): ps.append(threading.Thread(target=func, args=args)) ps[-1].start() - # set event in order to run q.shutdown() ev_start.set() - - if not immediate: - assert(len(res_gets) == len(res_puts)) - assert(res_gets.count(True) == res_puts.count(True)) - else: - assert(len(res_gets) <= len(res_puts)) - assert(res_gets.count(True) <= res_puts.count(True)) - - for thread in ps[1:]: + for thread in ps: thread.join() - @unittest.skip("test times out (gh-115258)") + self.assertEqual(res_puts.count(True), 1) + self.assertLessEqual(res_gets.count(True), read_threads) + def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) - @unittest.skip("test times out (gh-115258)") def test_shutdown_immediate_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(True) From deb0b85bcf231541fa76edbca667abd82dd63781 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:05:01 +0000 Subject: [PATCH 2/9] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst new file mode 100644 index 00000000000000..72b30015226ca9 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst @@ -0,0 +1 @@ +Fix hang tests of `test_shutdown_[immediate_]all_methods_in_many_thread` unittests of 'test_queue.py'. This unit test is dedicated to the new feature "Shutdown a queue" to the threading queue. From 63a545fb9023724519b3f38bcc3bafaa4b61e1ed Mon Sep 17 00:00:00 2001 From: Duprat Date: Mon, 26 Feb 2024 17:03:28 +0100 Subject: [PATCH 3/9] Update 2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst --- .../next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst index 72b30015226ca9..0ec4df3814d85e 100644 --- a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst +++ b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst @@ -1 +1,2 @@ -Fix hang tests of `test_shutdown_[immediate_]all_methods_in_many_thread` unittests of 'test_queue.py'. This unit test is dedicated to the new feature "Shutdown a queue" to the threading queue. +Fix hang the test from ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_thread`` unittests. +This unit test is dedicated to the new ``Shutdown`` feature in the threading queue. From 5a09f7df4330af8cd4ecc8f9fb05d9e9bac153c0 Mon Sep 17 00:00:00 2001 From: Duprat Date: Tue, 27 Feb 2024 13:12:26 +0100 Subject: [PATCH 4/9] Update 2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst --- .../next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst index 0ec4df3814d85e..5ac85b2bcb2a62 100644 --- a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst +++ b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst @@ -1,2 +1 @@ -Fix hang the test from ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_thread`` unittests. -This unit test is dedicated to the new ``Shutdown`` feature in the threading queue. +Fix a hang test in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads`` unittests. This unit test is dedicated to the new **shutdown** feature in the threading queue. From b9ee958975931b5e496e58ab03a0799c8b922c1a Mon Sep 17 00:00:00 2001 From: Duprat Date: Mon, 11 Mar 2024 18:13:42 +0100 Subject: [PATCH 5/9] Add comments to the `_write_msg_thread` and `_shutdown_all_methods_in_many_threads` methods, with a code refactoring. Add a `results` list to the `_shutdown_thread` method. Add tests. Fix nit. --- Lib/test/test_queue.py | 65 +++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 5891e85a39d4fa..9dc7f6299953b4 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -319,34 +319,36 @@ def test_shutdown_immediate_all_methods_in_one_thread(self): def _write_msg_thread(self, q, n, results, i_when_exec_shutdown, event_shutdown, - event_start): - put_atleast = i_when_exec_shutdown//2 - for i in range(1, n+1): + barrier_start): + # All `write_msg_threads` + # put several items into the queue. + for i in range(0, i_when_exec_shutdown//2): + q.put((i, 'LOYD')) + # Wait for the barrier to be complete. + barrier_start.wait() + + for i in range(i, n): try: q.put((i, "YDLO")) except self.queue.ShutDown: results.append(False) break - # Be sure that all write_threads - # put few items into the queue. - if i == put_atleast: - event_start.wait() - - # Triggers shutdown of queue. + # Trigger queue shutdown. if i == i_when_exec_shutdown: + # Only once thread do it. if not event_shutdown.is_set(): event_shutdown.set() results.append(True) q.join() - def _read_msg_thread(self, q, results, event_start): - nbr = 0 + def _read_msg_thread(self, q, results, barrier_start): + # Wait for the barrier to be complete. + barrier_start.wait() while True: try: q.get(False) q.task_done() - nbr += 1 except self.queue.ShutDown: results.append(True) break @@ -354,47 +356,58 @@ def _read_msg_thread(self, q, results, event_start): pass q.join() - def _shutdown_thread(self, q, event_end, immediate): + def _shutdown_thread(self, q, results, event_end, immediate): event_end.wait() q.shutdown(immediate) + results.append(q.qsize() == 0) q.join() - def _join_thread(self, q, event_start): - event_start.wait() + def _join_thread(self, q, barrier_start): + # Wait for the barrier to be complete. + barrier_start.wait() q.join() def _shutdown_all_methods_in_many_threads(self, immediate): + # Run a 'multi-producers/consumers queue' use case, + # with enough items into the queue. + # When shutdown, all running threads will be concerned. q = self.type2test() ps = [] - ev_start = threading.Event() - ev_exec_shutdown = threading.Event() res_puts = [] res_gets = [] + res_shutdown = [] write_threads = 4 - read_threads = 16 - nb_msgs = 1024*4 + read_threads = 6 + join_threads = 2 + nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2 + # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads` + # put their part of items into the queue. And trigger the start of + # other threads as `_read_msg_thread`and `_join_thread`. + barrier_start = threading.Barrier(write_threads+read_threads+join_threads) + ev_exec_shutdown = threading.Event() lprocs = ( (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, when_exec_shutdown, ev_exec_shutdown, - ev_start)), - (self._read_msg_thread, read_threads, (q, res_gets, - ev_start)), - (self._join_thread, 2, (q, ev_start)), - (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + barrier_start)), + (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), + (self._join_thread, join_threads, (q, barrier_start)), + (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), ) - # start all thredas + # start all threads. for func, n, args in lprocs: for i in range(n): ps.append(threading.Thread(target=func, args=args)) ps[-1].start() - ev_start.set() for thread in ps: thread.join() self.assertEqual(res_puts.count(True), 1) self.assertLessEqual(res_gets.count(True), read_threads) + if immediate: + self.assertListEqual(res_shutdown, [True]) + self.assertTrue(q.empty()) def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) From e0be6b5ffbc1bbdf1aded908a389d3a7e0e0699a Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 13 Mar 2024 16:28:55 +0100 Subject: [PATCH 6/9] Fix hangs by removing `q.join()` instructions from threads. Start `join_thread` only when shutdown is immediate. Update tests. --- Lib/test/test_queue.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 9dc7f6299953b4..c2959984a7c73f 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -340,9 +340,11 @@ def _write_msg_thread(self, q, n, results, if not event_shutdown.is_set(): event_shutdown.set() results.append(True) - q.join() def _read_msg_thread(self, q, results, barrier_start): + # Get at least one item. + q.get(True) + q.task_done() # Wait for the barrier to be complete. barrier_start.wait() while True: @@ -354,13 +356,11 @@ def _read_msg_thread(self, q, results, barrier_start): break except self.queue.Empty: pass - q.join() def _shutdown_thread(self, q, results, event_end, immediate): event_end.wait() q.shutdown(immediate) results.append(q.qsize() == 0) - q.join() def _join_thread(self, q, barrier_start): # Wait for the barrier to be complete. @@ -382,19 +382,25 @@ def _shutdown_all_methods_in_many_threads(self, immediate): nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2 - # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads` - # put their part of items into the queue. And trigger the start of - # other threads as `_read_msg_thread`and `_join_thread`. - barrier_start = threading.Barrier(write_threads+read_threads+join_threads) + # Use of a `threading.Barrier`` to ensure that + # all `_write_msg_threads`put their part of items into the queue + # all `_read_msg_thread` get at least one itme from the queue, + # and keep on running until shutdown. + # The `_join_thread` is started only when shutdown is emmediate. + nparties = write_threads + read_threads + if immediate: + nparties += join_threads + barrier_start = threading.Barrier(nparties) ev_exec_shutdown = threading.Event() - lprocs = ( + lprocs = [ (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, when_exec_shutdown, ev_exec_shutdown, barrier_start)), (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)), - (self._join_thread, join_threads, (q, barrier_start)), (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), - ) + ] + if immediate: + lprocs.append((self._join_thread, join_threads, (q, barrier_start))) # start all threads. for func, n, args in lprocs: for i in range(n): @@ -403,7 +409,7 @@ def _shutdown_all_methods_in_many_threads(self, immediate): for thread in ps: thread.join() - self.assertEqual(res_puts.count(True), 1) + self.assertTrue(True in res_puts) self.assertLessEqual(res_gets.count(True), read_threads) if immediate: self.assertListEqual(res_shutdown, [True]) From e4e880be83abd16108f593a4e7ebef41ddd1c6c6 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 15 Mar 2024 15:02:04 +0100 Subject: [PATCH 7/9] Update comments. Fix start value of range. Change `self.assertLessEqual` to `self.assertEqual`. --- Lib/test/test_queue.py | 18 +++++++++--------- ...4-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 3 ++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index c2959984a7c73f..9b22f2f4a3a848 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -327,7 +327,7 @@ def _write_msg_thread(self, q, n, results, # Wait for the barrier to be complete. barrier_start.wait() - for i in range(i, n): + for i in range(i_when_exec_shutdown//2, n): try: q.put((i, "YDLO")) except self.queue.ShutDown: @@ -336,7 +336,7 @@ def _write_msg_thread(self, q, n, results, # Trigger queue shutdown. if i == i_when_exec_shutdown: - # Only once thread do it. + # Only one thread should call shutdown(). if not event_shutdown.is_set(): event_shutdown.set() results.append(True) @@ -370,7 +370,7 @@ def _join_thread(self, q, barrier_start): def _shutdown_all_methods_in_many_threads(self, immediate): # Run a 'multi-producers/consumers queue' use case, # with enough items into the queue. - # When shutdown, all running threads will be concerned. + # When shutdown, all running threads will be joined. q = self.type2test() ps = [] res_puts = [] @@ -382,11 +382,11 @@ def _shutdown_all_methods_in_many_threads(self, immediate): nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2 - # Use of a `threading.Barrier`` to ensure that - # all `_write_msg_threads`put their part of items into the queue - # all `_read_msg_thread` get at least one itme from the queue, - # and keep on running until shutdown. - # The `_join_thread` is started only when shutdown is emmediate. + # Use of a Barrier to ensure that + # - all write threads put all their items into the queue, + # - all read thread get at least one item from the queue, + # and keep on running until shutdown. + # The join thread is started only when shutdown is immediate. nparties = write_threads + read_threads if immediate: nparties += join_threads @@ -410,7 +410,7 @@ def _shutdown_all_methods_in_many_threads(self, immediate): thread.join() self.assertTrue(True in res_puts) - self.assertLessEqual(res_gets.count(True), read_threads) + self.assertEqual(res_gets.count(True), read_threads) if immediate: self.assertListEqual(res_shutdown, [True]) self.assertTrue(q.empty()) diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst index 5ac85b2bcb2a62..22187cd2b0d660 100644 --- a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst +++ b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst @@ -1 +1,2 @@ -Fix a hang test in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads`` unittests. This unit test is dedicated to the new **shutdown** feature in the threading queue. +Fix a hang in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads``. +This unit test is dedicated to the new ``shutdown`` feature in the threading queue. \ No newline at end of file From 349d08a4e14954a3580994e719194385e31aba16 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 15 Mar 2024 15:42:15 +0100 Subject: [PATCH 8/9] Add newline at end of file. --- .../next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst index 22187cd2b0d660..76994f436c1941 100644 --- a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst +++ b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst @@ -1,2 +1,2 @@ Fix a hang in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads``. -This unit test is dedicated to the new ``shutdown`` feature in the threading queue. \ No newline at end of file +This unit test is dedicated to the new ``shutdown`` feature in the threading queue. From 7c1935df226518535d31e9dfb2b98b94b32875b9 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 18 Mar 2024 08:48:41 -0700 Subject: [PATCH 9/9] Remove unnecessary news entry --- .../next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst diff --git a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst b/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst deleted file mode 100644 index 76994f436c1941..00000000000000 --- a/Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst +++ /dev/null @@ -1,2 +0,0 @@ -Fix a hang in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads``. -This unit test is dedicated to the new ``shutdown`` feature in the threading queue.