Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Policy.on_exception actually used to make consumer go inactive. #4472

Merged
merged 4 commits into from
Nov 29, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ def _blocking_consume(self):
self.stop_consuming()
except Exception as exc:
try:
self._policy.on_exception(exc)
self.active = self._policy.on_exception(exc)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

except:
self.active = False

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

raise

def start_consuming(self):
"""Start consuming the stream."""
Expand Down
13 changes: 8 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ def running(self):

.. note::

A ``False`` value here does not necessarily mean that the
A :data:`False` value here does not necessarily mean that the
subscription is closed; it merely means that _this_ future is
not the future applicable to it.

Since futures have a single result (or exception) and there is
not a concept of resetting them, a closing re-opening of a
not a concept of resetting them, a closing / re-opening of a
subscription will therefore return a new future.

Returns:
bool: ``True`` if this subscription is opened with this future,
``False`` otherwise.
bool: :data:`True` if this subscription is opened with this
future, :data:`False` otherwise.

This comment was marked as spam.

This comment was marked as spam.

"""
return self._policy.future is self
if self._policy.future is not self:
return False

return super(Future, self).running()
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ def on_exception(self, exception):
"""Called when a gRPC exception occurs.

If this method does nothing, then the stream is re-started. If this
raises an exception, it will stop the consumer thread.
This is executed on the response consumer helper thread.
raises an exception, it will stop the consumer thread. This is
executed on the response consumer helper thread.

Implementations should return :data:`True` if they want the consumer
thread to remain active, otherwise they should return :data:`False`.

Args:
exception (Exception): The exception raised by the RPC.
Expand Down
15 changes: 12 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,26 @@ def on_callback_request(self, callback_request):
getattr(self, action)(**kwargs)

def on_exception(self, exception):
"""Bubble the exception.
"""Handle the exception.

This will cause the stream to exit loudly.
If the exception is one of the retryable exceptions, this will signal
to the consumer thread that it should remain active.

This will cause the stream to exit when it returns :data:`False`.

Returns:
bool: Indicates if the caller should remain active or shut down.
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of retryable / idempotent exceptions.
"""
# If this is in the list of idempotent exceptions, then we want to
# retry. That entails just returning None.
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
return
return True

# Set any other exception on the future.
self._future.set_exception(exception)
return False

def on_response(self, response):
"""Process all received Pub/Sub messages.
Expand Down
59 changes: 49 additions & 10 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import types as base_types

from google.auth import credentials
import mock
import pytest
Expand Down Expand Up @@ -87,18 +89,55 @@ def test_blocking_consume_keyboard_interrupt():
on_res.assert_called_once_with(consumer._policy, mock.sentinel.A)


@mock.patch.object(thread.Policy, 'call_rpc', autospec=True)
@mock.patch.object(thread.Policy, 'on_response', autospec=True)
@mock.patch.object(thread.Policy, 'on_exception', autospec=True)
def test_blocking_consume_exception_reraise(on_exc, on_res, call_rpc):
consumer = create_consumer()
class OnException(object):

def __init__(self, exiting_event, success=True):
self.exiting_event = exiting_event
self.success = success

def __call__(self, exception):
self.exiting_event.set()
if self.success:
return False
else:
raise RuntimeError('Failed to handle exception.')


def test_blocking_consume_exception_normal_handling():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
exc = TypeError('Bad things!')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(consumer._exiting)

This comment was marked as spam.


# Establish that we get responses until we are sent the exiting event.
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
on_res.side_effect = TypeError('Bad things!')
on_exc.side_effect = on_res.side_effect
with pytest.raises(TypeError):
consumer._blocking_consume()
consumer._blocking_consume()

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_response.assert_called_once_with(mock.sentinel.A)
policy.on_exception.assert_called_once_with(exc)


def test_blocking_consume_exception_handling_fails():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
exc = NameError('It fails and it propagates.')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(
consumer._exiting, success=False)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume()

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_response.assert_called_once_with(mock.sentinel.A)
policy.on_exception.assert_called_once_with(exc)


def test_start_consuming():
Expand Down
6 changes: 3 additions & 3 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_on_exception_deadline_exceeded():
details = 'Bad thing happened. Time out, go sit in the corner.'
exc = exceptions.DeadlineExceeded(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_unavailable():
Expand All @@ -103,14 +103,14 @@ def test_on_exception_unavailable():
details = 'UNAVAILABLE. Service taking nap.'
exc = exceptions.ServiceUnavailable(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_other():
policy = create_policy()
policy._future = Future(policy=policy)
exc = TypeError('wahhhhhh')
assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is False
with pytest.raises(TypeError):
policy.future.result()

Expand Down