From b547859ee8757f64fe7d465f06631d6ae3f56116 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 15 Dec 2017 16:00:29 -0800 Subject: [PATCH] Making it impossible to call `Policy.open()` on an already opened policy. Similar with `Policy.close()`. Fixes #4488. --- .../pubsub_v1/subscriber/policy/thread.py | 33 +++++++++++++++++-- .../subscriber/test_policy_thread.py | 28 +++++++++++++++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index d7267a79c9e4..660e8c1df2ff 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -147,7 +147,22 @@ def _get_executor(executor): return executor def close(self): - """Close the existing connection.""" + """Close the existing connection. + + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`open`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a threadsafe private queue). + + Raises: + ValueError: If the policy has not been opened yet. + """ + if self._future is None: + raise ValueError('This policy has not been opened yet.') + # Stop consuming messages. self._request_queue.put(_helper_threads.STOP) self._dispatch_thread.join() # Wait until stopped. @@ -159,7 +174,7 @@ def close(self): # The subscription is closing cleanly; resolve the future if it is not # resolved already. - if self._future is not None and not self._future.done(): + if not self._future.done(): self._future.set_result(None) self._future = None @@ -213,6 +228,14 @@ def _start_lease_worker(self): def open(self, callback): """Open a streaming pull connection and begin receiving messages. + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`close`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a threadsafe private queue). + For each message received, the ``callback`` function is fired with a :class:`~.pubsub_v1.subscriber.message.Message` as its only argument. @@ -224,7 +247,13 @@ def open(self, callback): ~google.api_core.future.Future: A future that provides an interface to block on the subscription if desired, and handle errors. + + Raises: + ValueError: If the policy has already been opened. """ + if self._future is not None: + raise ValueError('This policy has already been opened.') + # Create the Future that this method will return. # This future is the main thread's interface to handle exceptions, # block on the subscription, etc. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index ebb274905be6..8398cd61218b 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -55,6 +55,10 @@ def test_close(): policy = create_policy() policy._dispatch_thread = dispatch_thread policy._leases_thread = leases_thread + future = mock.Mock(spec=('done',)) + future.done.return_value = True + policy._future = future + consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: policy.close() @@ -64,9 +68,21 @@ def test_close(): dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() + assert policy._future is None + future.done.assert_called_once_with() + + +def test_close_without_future(): + policy = create_policy() + assert policy._future is None + + with pytest.raises(ValueError) as exc_info: + policy.close() + assert exc_info.value.args == ('This policy has not been opened yet.',) -def test_close_with_future(): + +def test_close_with_unfinished_future(): dispatch_thread = mock.Mock(spec=threading.Thread) leases_thread = mock.Mock(spec=threading.Thread) @@ -111,6 +127,16 @@ def test_open(): threads[2].start.assert_called_once_with() +def test_open_already_open(): + policy = create_policy() + policy._future = mock.sentinel.future + + with pytest.raises(ValueError) as exc_info: + policy.open(None) + + assert exc_info.value.args == ('This policy has already been opened.',) + + def test_dispatch_callback_valid_actions(): policy = create_policy() kwargs = {'foo': 10, 'bar': 13.37}