Skip to content

Commit

Permalink
PubSub: Policy.on_exception actually used to make consumer go inact…
Browse files Browse the repository at this point in the history
…ive. (#4472)
  • Loading branch information
dhermes authored Nov 29, 2017
1 parent 480d80f commit 905f49b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 28 deletions.
6 changes: 1 addition & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,7 @@ def _blocking_consume(self):
except KeyboardInterrupt:
self.stop_consuming()
except Exception as exc:
try:
self._policy.on_exception(exc)
except:
self.active = False
raise
self.active = self._policy.on_exception(exc)

def start_consuming(self):
"""Start consuming the stream."""
Expand Down
13 changes: 8 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ def running(self):
.. note::
A ``False`` value here does not necessarily mean that the
A :data:`False` value here does not necessarily mean that the
subscription is closed; it merely means that _this_ future is
not the future applicable to it.
Since futures have a single result (or exception) and there is
not a concept of resetting them, a closing re-opening of a
not a concept of resetting them, a closing / re-opening of a
subscription will therefore return a new future.
Returns:
bool: ``True`` if this subscription is opened with this future,
``False`` otherwise.
bool: :data:`True` if this subscription is opened with this
future, :data:`False` otherwise.
"""
return self._policy.future is self
if self._policy.future is not self:
return False

return super(Future, self).running()
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ def on_exception(self, exception):
"""Called when a gRPC exception occurs.
If this method does nothing, then the stream is re-started. If this
raises an exception, it will stop the consumer thread.
This is executed on the response consumer helper thread.
raises an exception, it will stop the consumer thread. This is
executed on the response consumer helper thread.
Implementations should return :data:`True` if they want the consumer
thread to remain active, otherwise they should return :data:`False`.
Args:
exception (Exception): The exception raised by the RPC.
Expand Down
15 changes: 12 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,26 @@ def on_callback_request(self, callback_request):
getattr(self, action)(**kwargs)

def on_exception(self, exception):
"""Bubble the exception.
"""Handle the exception.
This will cause the stream to exit loudly.
If the exception is one of the retryable exceptions, this will signal
to the consumer thread that it should remain active.
This will cause the stream to exit when it returns :data:`False`.
Returns:
bool: Indicates if the caller should remain active or shut down.
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of retryable / idempotent exceptions.
"""
# If this is in the list of idempotent exceptions, then we want to
# retry. That entails just returning None.
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
return
return True

# Set any other exception on the future.
self._future.set_exception(exception)
return False

def on_response(self, response):
"""Process all received Pub/Sub messages.
Expand Down
65 changes: 55 additions & 10 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import types as base_types

from google.auth import credentials
import mock
import pytest
Expand Down Expand Up @@ -87,18 +89,61 @@ def test_blocking_consume_keyboard_interrupt():
on_res.assert_called_once_with(consumer._policy, mock.sentinel.A)


@mock.patch.object(thread.Policy, 'call_rpc', autospec=True)
@mock.patch.object(thread.Policy, 'on_response', autospec=True)
@mock.patch.object(thread.Policy, 'on_exception', autospec=True)
def test_blocking_consume_exception_reraise(on_exc, on_res, call_rpc):
consumer = create_consumer()
class OnException(object):

def __init__(self, exiting_event, acceptable=None):
self.exiting_event = exiting_event
self.acceptable = acceptable

def __call__(self, exception):
if exception is self.acceptable:
return True
else:
self.exiting_event.set()
return False


def test_blocking_consume_on_exception():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
exc = TypeError('Bad things!')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(consumer._exiting)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume()

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_response.assert_called_once_with(mock.sentinel.A)
policy.on_exception.assert_called_once_with(exc)


def test_blocking_consume_two_exceptions():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.side_effect = (
(mock.sentinel.A,),
(mock.sentinel.B,),
)
exc1 = NameError('Oh noes.')
exc2 = ValueError('Something grumble.')
policy.on_response.side_effect = (exc1, exc2)

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(
consumer._exiting, acceptable=exc1)

# Establish that we get responses until we are sent the exiting event.
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
on_res.side_effect = TypeError('Bad things!')
on_exc.side_effect = on_res.side_effect
with pytest.raises(TypeError):
consumer._blocking_consume()
consumer._blocking_consume()

# Check mocks.
assert policy.call_rpc.call_count == 2
policy.on_response.assert_has_calls(
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
policy.on_exception.assert_has_calls(
[mock.call(exc1), mock.call(exc2)])


def test_start_consuming():
Expand Down
6 changes: 3 additions & 3 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_on_exception_deadline_exceeded():
details = 'Bad thing happened. Time out, go sit in the corner.'
exc = exceptions.DeadlineExceeded(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_unavailable():
Expand All @@ -103,14 +103,14 @@ def test_on_exception_unavailable():
details = 'UNAVAILABLE. Service taking nap.'
exc = exceptions.ServiceUnavailable(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_other():
policy = create_policy()
policy._future = Future(policy=policy)
exc = TypeError('wahhhhhh')
assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is False
with pytest.raises(TypeError):
policy.future.result()

Expand Down

0 comments on commit 905f49b

Please sign in to comment.