Skip to content

Commit

Permalink
Making it impossible to call Policy.open() on an already opened pol…
Browse files Browse the repository at this point in the history
…icy.

Similar with `Policy.close()`.

Fixes googleapis#4488.
  • Loading branch information
dhermes committed Dec 16, 2017
1 parent 578ff7f commit d1c6667
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
33 changes: 31 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,22 @@ def _get_executor(executor):
return executor

def close(self):
"""Close the existing connection."""
"""Close the existing connection.
.. warning::
This method is not thread-safe. For example, if this method is
called while another thread is executing :meth:`open`, then the
policy could end up in an undefined state. The **same** policy
instance is not intended to be used by multiple workers (though
each policy instance **does** have a threadsafe private queue).
Raises:
ValueError: If the policy has not been opened yet.
"""
if self._future is None:
raise ValueError('This policy has not been opened yet.')

# Stop consuming messages.
self._request_queue.put(_helper_threads.STOP)
self._dispatch_thread.join() # Wait until stopped.
Expand All @@ -159,7 +174,7 @@ def close(self):

# The subscription is closing cleanly; resolve the future if it is not
# resolved already.
if self._future is not None and not self._future.done():
if not self._future.done():
self._future.set_result(None)
self._future = None

Expand Down Expand Up @@ -213,6 +228,14 @@ def _start_lease_worker(self):
def open(self, callback):
"""Open a streaming pull connection and begin receiving messages.
.. warning::
This method is not thread-safe. For example, if this method is
called while another thread is executing :meth:`close`, then the
policy could end up in an undefined state. The **same** policy
instance is not intended to be used by multiple workers (though
each policy instance **does** have a threadsafe private queue).
For each message received, the ``callback`` function is fired with
a :class:`~.pubsub_v1.subscriber.message.Message` as its only
argument.
Expand All @@ -224,7 +247,13 @@ def open(self, callback):
~google.api_core.future.Future: A future that provides
an interface to block on the subscription if desired, and
handle errors.
Raises:
ValueError: If the policy has already been opened.
"""
if self._future is not None:
raise ValueError('This policy has already been opened.')

# Create the Future that this method will return.
# This future is the main thread's interface to handle exceptions,
# block on the subscription, etc.
Expand Down
28 changes: 27 additions & 1 deletion pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def test_close():
policy = create_policy()
policy._dispatch_thread = dispatch_thread
policy._leases_thread = leases_thread
future = mock.Mock(spec=('done',))
future.done.return_value = True
policy._future = future

consumer = policy._consumer
with mock.patch.object(consumer, 'stop_consuming') as stop_consuming:
policy.close()
Expand All @@ -64,9 +68,21 @@ def test_close():
dispatch_thread.join.assert_called_once_with()
assert policy._leases_thread is None
leases_thread.join.assert_called_once_with()
assert policy._future is None
future.done.assert_called_once_with()


def test_close_without_future():
policy = create_policy()
assert policy._future is None

with pytest.raises(ValueError) as exc_info:
policy.close()

assert exc_info.value.args == ('This policy has not been opened yet.',)

def test_close_with_future():

def test_close_with_unfinished_future():
dispatch_thread = mock.Mock(spec=threading.Thread)
leases_thread = mock.Mock(spec=threading.Thread)

Expand Down Expand Up @@ -111,6 +127,16 @@ def test_open():
threads[2].start.assert_called_once_with()


def test_open_already_open():
policy = create_policy()
policy._future = mock.sentinel.future

with pytest.raises(ValueError) as exc_info:
policy.open(None)

assert exc_info.value.args == ('This policy has already been opened.',)


def test_dispatch_callback_valid_actions():
policy = create_policy()
kwargs = {'foo': 10, 'bar': 13.37}
Expand Down

0 comments on commit d1c6667

Please sign in to comment.