From b547859ee8757f64fe7d465f06631d6ae3f56116 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 15 Dec 2017 16:00:29 -0800 Subject: [PATCH 1/3] Making it impossible to call `Policy.open()` on an already opened policy. Similar with `Policy.close()`. Fixes #4488. --- .../pubsub_v1/subscriber/policy/thread.py | 33 +++++++++++++++++-- .../subscriber/test_policy_thread.py | 28 +++++++++++++++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index d7267a79c9e4..660e8c1df2ff 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -147,7 +147,22 @@ def _get_executor(executor): return executor def close(self): - """Close the existing connection.""" + """Close the existing connection. + + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`open`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a threadsafe private queue). + + Raises: + ValueError: If the policy has not been opened yet. + """ + if self._future is None: + raise ValueError('This policy has not been opened yet.') + # Stop consuming messages. self._request_queue.put(_helper_threads.STOP) self._dispatch_thread.join() # Wait until stopped. @@ -159,7 +174,7 @@ def close(self): # The subscription is closing cleanly; resolve the future if it is not # resolved already. - if self._future is not None and not self._future.done(): + if not self._future.done(): self._future.set_result(None) self._future = None @@ -213,6 +228,14 @@ def _start_lease_worker(self): def open(self, callback): """Open a streaming pull connection and begin receiving messages. + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`close`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a threadsafe private queue). + For each message received, the ``callback`` function is fired with a :class:`~.pubsub_v1.subscriber.message.Message` as its only argument. @@ -224,7 +247,13 @@ def open(self, callback): ~google.api_core.future.Future: A future that provides an interface to block on the subscription if desired, and handle errors. + + Raises: + ValueError: If the policy has already been opened. """ + if self._future is not None: + raise ValueError('This policy has already been opened.') + # Create the Future that this method will return. # This future is the main thread's interface to handle exceptions, # block on the subscription, etc. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index ebb274905be6..8398cd61218b 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -55,6 +55,10 @@ def test_close(): policy = create_policy() policy._dispatch_thread = dispatch_thread policy._leases_thread = leases_thread + future = mock.Mock(spec=('done',)) + future.done.return_value = True + policy._future = future + consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: policy.close() @@ -64,9 +68,21 @@ def test_close(): dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() + assert policy._future is None + future.done.assert_called_once_with() + + +def test_close_without_future(): + policy = create_policy() + assert policy._future is None + + with pytest.raises(ValueError) as exc_info: + policy.close() + assert exc_info.value.args == ('This policy has not been opened yet.',) -def test_close_with_future(): + +def test_close_with_unfinished_future(): dispatch_thread = mock.Mock(spec=threading.Thread) leases_thread = mock.Mock(spec=threading.Thread) @@ -111,6 +127,16 @@ def test_open(): threads[2].start.assert_called_once_with() +def test_open_already_open(): + policy = create_policy() + policy._future = mock.sentinel.future + + with pytest.raises(ValueError) as exc_info: + policy.open(None) + + assert exc_info.value.args == ('This policy has already been opened.',) + + def test_dispatch_callback_valid_actions(): policy = create_policy() kwargs = {'foo': 10, 'bar': 13.37} From 341f0eec97f019c867c031f6f5ccab120cabe981 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 15 Dec 2017 16:02:39 -0800 Subject: [PATCH 2/3] Adding hyphen to `thread-safe`. --- pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 660e8c1df2ff..f8d633abff39 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -155,7 +155,7 @@ def close(self): called while another thread is executing :meth:`open`, then the policy could end up in an undefined state. The **same** policy instance is not intended to be used by multiple workers (though - each policy instance **does** have a threadsafe private queue). + each policy instance **does** have a thread-safe private queue). Raises: ValueError: If the policy has not been opened yet. @@ -234,7 +234,7 @@ def open(self, callback): called while another thread is executing :meth:`close`, then the policy could end up in an undefined state. The **same** policy instance is not intended to be used by multiple workers (though - each policy instance **does** have a threadsafe private queue). + each policy instance **does** have a thread-safe private queue). For each message received, the ``callback`` function is fired with a :class:`~.pubsub_v1.subscriber.message.Message` as its only From 75b248ef0e0d44a72c704e5a36b41453b30b3d0c Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Mon, 18 Dec 2017 09:05:08 -0800 Subject: [PATCH 3/3] Returning the future from `Policy.close()`. --- .../google/cloud/pubsub_v1/subscriber/policy/thread.py | 10 ++++++++-- .../unit/pubsub_v1/subscriber/test_policy_thread.py | 8 +++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index f8d633abff39..39f161a3b93e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -157,6 +157,10 @@ def close(self): instance is not intended to be used by multiple workers (though each policy instance **does** have a thread-safe private queue). + Returns: + ~google.api_core.future.Future: The future that **was** attached + to the subscription. + Raises: ValueError: If the policy has not been opened yet. """ @@ -176,7 +180,9 @@ def close(self): # resolved already. if not self._future.done(): self._future.set_result(None) + future = self._future self._future = None + return future def _start_dispatch(self): """Start a thread to dispatch requests queued up by callbacks. @@ -245,8 +251,8 @@ def open(self, callback): Returns: ~google.api_core.future.Future: A future that provides - an interface to block on the subscription if desired, and - handle errors. + an interface to block on the subscription if desired, and + handle errors. Raises: ValueError: If the policy has already been opened. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 8398cd61218b..a1b254fb2eee 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -61,13 +61,14 @@ def test_close(): consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: - policy.close() + closed_fut = policy.close() stop_consuming.assert_called_once_with() assert policy._dispatch_thread is None dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() + assert closed_fut is future assert policy._future is None future.done.assert_called_once_with() @@ -93,14 +94,15 @@ def test_close_with_unfinished_future(): consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: future = policy.future - policy.close() + closed_fut = policy.close() stop_consuming.assert_called_once_with() assert policy._dispatch_thread is None dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() - assert policy.future != future + assert policy._future is None + assert closed_fut is future assert future.result() is None