From 9f451a1fe0ac5cb2fb13d72b0436e0b521a4fecb Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 9 Apr 2021 15:21:30 +0200 Subject: [PATCH] chore: fix streaming pull close unit test flakiness (#361) * chore: fix streaming pull close test flakiness * Store shutdown thread on the manager instance --- .../_protocol/streaming_pull_manager.py | 5 +++-- .../subscriber/test_streaming_pull_manager.py | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index ac940de26..e244e871d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -148,6 +148,7 @@ def __init__( self._closing = threading.Lock() self._closed = False self._close_callbacks = [] + self._regular_shutdown_thread = None # Created on intentional shutdown. # Generate a random client id tied to this object. All streaming pull # connections (initial and re-connects) will then use the same client @@ -539,13 +540,13 @@ def close(self, reason=None): an "intentional" shutdown. This is passed to the callbacks specified via :meth:`add_close_callback`. """ - thread = threading.Thread( + self._regular_shutdown_thread = threading.Thread( name=_REGULAR_SHUTDOWN_THREAD_NAME, daemon=True, target=self._shutdown, kwargs={"reason": reason}, ) - thread.start() + self._regular_shutdown_thread.start() def _shutdown(self, reason=None): """Run the actual shutdown sequence (stop the stream and all helper threads). diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9930e8f14..25ab4f0ae 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -550,6 +550,19 @@ def make_running_manager(**kwargs): ) +def await_manager_shutdown(manager, timeout=None): + # NOTE: This method should be called after manager.close(), i.e. after the shutdown + # thread has been created and started. + shutdown_thread = manager._regular_shutdown_thread + + if shutdown_thread is None: # pragma: NO COVER + raise Exception("Shutdown thread does not exist on the manager instance.") + + shutdown_thread.join(timeout=timeout) + if shutdown_thread.is_alive(): # pragma: NO COVER + pytest.fail("Shutdown not completed in time.") + + def test_close(): ( manager, @@ -561,6 +574,7 @@ def test_close(): ) = make_running_manager() manager.close() + await_manager_shutdown(manager, timeout=3) consumer.stop.assert_called_once() leaser.stop.assert_called_once() @@ -583,6 +597,7 @@ def test_close_inactive_consumer(): consumer.is_active = False manager.close() + await_manager_shutdown(manager, timeout=3) consumer.stop.assert_not_called() leaser.stop.assert_called_once() @@ -596,6 +611,7 @@ def test_close_idempotent(): manager.close() manager.close() + await_manager_shutdown(manager, timeout=3) assert scheduler.shutdown.call_count == 1 @@ -640,6 +656,7 @@ def test_close_no_dispatcher_error(): dispatcher.start() manager.close() + await_manager_shutdown(manager, timeout=3) error_callback.assert_not_called() @@ -651,6 +668,7 @@ def test_close_callbacks(): manager.add_close_callback(callback) manager.close(reason="meep") + await_manager_shutdown(manager, timeout=3) callback.assert_called_once_with(manager, "meep") @@ -660,6 +678,7 @@ def test_close_blocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() + await_manager_shutdown(manager, timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True) @@ -669,6 +688,7 @@ def test_close_nonblocking_scheduler_shutdown(): scheduler = manager._scheduler manager.close() + await_manager_shutdown(manager, timeout=3) scheduler.shutdown.assert_called_once_with(await_msg_callbacks=False) @@ -690,6 +710,7 @@ def fake_nack(self): manager._messages_on_hold._messages_on_hold.append(messages[2]) manager.close() + await_manager_shutdown(manager, timeout=3) assert sorted(nacked_messages) == [b"msg1", b"msg2", b"msg3"]