Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making it impossible to call Policy.open() on an already opened policy. #4606

Merged
merged 3 commits into from
Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,26 @@ def _get_executor(executor):
return executor

def close(self):
"""Close the existing connection."""
"""Close the existing connection.

.. warning::

This method is not thread-safe. For example, if this method is
called while another thread is executing :meth:`open`, then the
policy could end up in an undefined state. The **same** policy
instance is not intended to be used by multiple workers (though
each policy instance **does** have a thread-safe private queue).

Returns:
~google.api_core.future.Future: The future that **was** attached
to the subscription.

Raises:
ValueError: If the policy has not been opened yet.
"""
if self._future is None:
raise ValueError('This policy has not been opened yet.')

# Stop consuming messages.
self._request_queue.put(_helper_threads.STOP)
self._dispatch_thread.join() # Wait until stopped.
Expand All @@ -159,9 +178,11 @@ def close(self):

# The subscription is closing cleanly; resolve the future if it is not
# resolved already.
if self._future is not None and not self._future.done():
if not self._future.done():
self._future.set_result(None)
future = self._future
self._future = None
return future

def _start_dispatch(self):
"""Start a thread to dispatch requests queued up by callbacks.
Expand Down Expand Up @@ -213,6 +234,14 @@ def _start_lease_worker(self):
def open(self, callback):
"""Open a streaming pull connection and begin receiving messages.

.. warning::

This method is not thread-safe. For example, if this method is
called while another thread is executing :meth:`close`, then the
policy could end up in an undefined state. The **same** policy
instance is not intended to be used by multiple workers (though
each policy instance **does** have a thread-safe private queue).

For each message received, the ``callback`` function is fired with
a :class:`~.pubsub_v1.subscriber.message.Message` as its only
argument.
Expand All @@ -222,9 +251,15 @@ def open(self, callback):

Returns:
~google.api_core.future.Future: A future that provides
an interface to block on the subscription if desired, and
handle errors.
an interface to block on the subscription if desired, and
handle errors.

Raises:
ValueError: If the policy has already been opened.
"""
if self._future is not None:
raise ValueError('This policy has already been opened.')

# Create the Future that this method will return.
# This future is the main thread's interface to handle exceptions,
# block on the subscription, etc.
Expand Down
36 changes: 32 additions & 4 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,35 @@ def test_close():
policy = create_policy()
policy._dispatch_thread = dispatch_thread
policy._leases_thread = leases_thread
future = mock.Mock(spec=('done',))
future.done.return_value = True
policy._future = future

consumer = policy._consumer
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
policy.close()
closed_fut = policy.close()
stop_consuming.assert_called_once_with()

assert policy._dispatch_thread is None
dispatch_thread.join.assert_called_once_with()
assert policy._leases_thread is None
leases_thread.join.assert_called_once_with()
assert closed_fut is future
assert policy._future is None
future.done.assert_called_once_with()


def test_close_without_future():
policy = create_policy()
assert policy._future is None

with pytest.raises(ValueError) as exc_info:
policy.close()

assert exc_info.value.args == ('This policy has not been opened yet.',)

def test_close_with_future():

def test_close_with_unfinished_future():
dispatch_thread = mock.Mock(spec=threading.Thread)
leases_thread = mock.Mock(spec=threading.Thread)

Expand All @@ -77,14 +94,15 @@ def test_close_with_future():
consumer = policy._consumer
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
future = policy.future
policy.close()
closed_fut = policy.close()
stop_consuming.assert_called_once_with()

assert policy._dispatch_thread is None
dispatch_thread.join.assert_called_once_with()
assert policy._leases_thread is None
leases_thread.join.assert_called_once_with()
assert policy.future != future
assert policy._future is None
assert closed_fut is future
assert future.result() is None


Expand All @@ -111,6 +129,16 @@ def test_open():
threads[2].start.assert_called_once_with()


def test_open_already_open():
policy = create_policy()
policy._future = mock.sentinel.future

with pytest.raises(ValueError) as exc_info:
policy.open(None)

assert exc_info.value.args == ('This policy has already been opened.',)


def test_dispatch_callback_valid_actions():
policy = create_policy()
kwargs = {'foo': 10, 'bar': 13.37}
Expand Down