diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index d7267a79c9e4..39f161a3b93e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -147,7 +147,26 @@ def _get_executor(executor): return executor def close(self): - """Close the existing connection.""" + """Close the existing connection. + + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`open`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a thread-safe private queue). + + Returns: + ~google.api_core.future.Future: The future that **was** attached + to the subscription. + + Raises: + ValueError: If the policy has not been opened yet. + """ + if self._future is None: + raise ValueError('This policy has not been opened yet.') + # Stop consuming messages. self._request_queue.put(_helper_threads.STOP) self._dispatch_thread.join() # Wait until stopped. @@ -159,9 +178,11 @@ def close(self): # The subscription is closing cleanly; resolve the future if it is not # resolved already. - if self._future is not None and not self._future.done(): + if not self._future.done(): self._future.set_result(None) + future = self._future self._future = None + return future def _start_dispatch(self): """Start a thread to dispatch requests queued up by callbacks. @@ -213,6 +234,14 @@ def _start_lease_worker(self): def open(self, callback): """Open a streaming pull connection and begin receiving messages. + .. warning:: + + This method is not thread-safe. For example, if this method is + called while another thread is executing :meth:`close`, then the + policy could end up in an undefined state. The **same** policy + instance is not intended to be used by multiple workers (though + each policy instance **does** have a thread-safe private queue). + For each message received, the ``callback`` function is fired with a :class:`~.pubsub_v1.subscriber.message.Message` as its only argument. @@ -222,9 +251,15 @@ def open(self, callback): Returns: ~google.api_core.future.Future: A future that provides - an interface to block on the subscription if desired, and - handle errors. + an interface to block on the subscription if desired, and + handle errors. + + Raises: + ValueError: If the policy has already been opened. """ + if self._future is not None: + raise ValueError('This policy has already been opened.') + # Create the Future that this method will return. # This future is the main thread's interface to handle exceptions, # block on the subscription, etc. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index ebb274905be6..a1b254fb2eee 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -55,18 +55,35 @@ def test_close(): policy = create_policy() policy._dispatch_thread = dispatch_thread policy._leases_thread = leases_thread + future = mock.Mock(spec=('done',)) + future.done.return_value = True + policy._future = future + consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: - policy.close() + closed_fut = policy.close() stop_consuming.assert_called_once_with() assert policy._dispatch_thread is None dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() + assert closed_fut is future + assert policy._future is None + future.done.assert_called_once_with() + + +def test_close_without_future(): + policy = create_policy() + assert policy._future is None + + with pytest.raises(ValueError) as exc_info: + policy.close() + assert exc_info.value.args == ('This policy has not been opened yet.',) -def test_close_with_future(): + +def test_close_with_unfinished_future(): dispatch_thread = mock.Mock(spec=threading.Thread) leases_thread = mock.Mock(spec=threading.Thread) @@ -77,14 +94,15 @@ def test_close_with_future(): consumer = policy._consumer with mock.patch.object(consumer, 'stop_consuming') as stop_consuming: future = policy.future - policy.close() + closed_fut = policy.close() stop_consuming.assert_called_once_with() assert policy._dispatch_thread is None dispatch_thread.join.assert_called_once_with() assert policy._leases_thread is None leases_thread.join.assert_called_once_with() - assert policy.future != future + assert policy._future is None + assert closed_fut is future assert future.result() is None @@ -111,6 +129,16 @@ def test_open(): threads[2].start.assert_called_once_with() +def test_open_already_open(): + policy = create_policy() + policy._future = mock.sentinel.future + + with pytest.raises(ValueError) as exc_info: + policy.open(None) + + assert exc_info.value.args == ('This policy has already been opened.',) + + def test_dispatch_callback_valid_actions(): policy = create_policy() kwargs = {'foo': 10, 'bar': 13.37}