From 905f49bd608e8d01d39b718962de6b9c85bb7617 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 29 Nov 2017 09:58:52 -0800 Subject: [PATCH] PubSub: `Policy.on_exception` actually used to make consumer go inactive. (#4472) --- .../cloud/pubsub_v1/subscriber/_consumer.py | 6 +- .../cloud/pubsub_v1/subscriber/futures.py | 13 ++-- .../cloud/pubsub_v1/subscriber/policy/base.py | 7 +- .../pubsub_v1/subscriber/policy/thread.py | 15 ++++- .../pubsub_v1/subscriber/test_consumer.py | 65 ++++++++++++++++--- .../subscriber/test_policy_thread.py | 6 +- 6 files changed, 84 insertions(+), 28 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py index c3c690a92c91..5907a1c7e1f1 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py @@ -243,11 +243,7 @@ def _blocking_consume(self): except KeyboardInterrupt: self.stop_consuming() except Exception as exc: - try: - self._policy.on_exception(exc) - except: - self.active = False - raise + self.active = self._policy.on_exception(exc) def start_consuming(self): """Start consuming the stream.""" diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py index 15a932f2478f..fa1f457a2602 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py @@ -39,16 +39,19 @@ def running(self): .. note:: - A ``False`` value here does not necessarily mean that the + A :data:`False` value here does not necessarily mean that the subscription is closed; it merely means that _this_ future is not the future applicable to it. Since futures have a single result (or exception) and there is - not a concept of resetting them, a closing re-opening of a + not a concept of resetting them, a closing / re-opening of a subscription will therefore return a new future. Returns: - bool: ``True`` if this subscription is opened with this future, - ``False`` otherwise. + bool: :data:`True` if this subscription is opened with this + future, :data:`False` otherwise. """ - return self._policy.future is self + if self._policy.future is not self: + return False + + return super(Future, self).running() diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py index c0b4afde2ec0..181f671fcc5b 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py @@ -376,8 +376,11 @@ def on_exception(self, exception): """Called when a gRPC exception occurs. If this method does nothing, then the stream is re-started. If this - raises an exception, it will stop the consumer thread. - This is executed on the response consumer helper thread. + raises an exception, it will stop the consumer thread. This is + executed on the response consumer helper thread. + + Implementations should return :data:`True` if they want the consumer + thread to remain active, otherwise they should return :data:`False`. Args: exception (Exception): The exception raised by the RPC. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 6c678dda3cd4..70cd227f68a8 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -176,17 +176,26 @@ def on_callback_request(self, callback_request): getattr(self, action)(**kwargs) def on_exception(self, exception): - """Bubble the exception. + """Handle the exception. - This will cause the stream to exit loudly. + If the exception is one of the retryable exceptions, this will signal + to the consumer thread that it should remain active. + + This will cause the stream to exit when it returns :data:`False`. + + Returns: + bool: Indicates if the caller should remain active or shut down. + Will be :data:`True` if the ``exception`` is "acceptable", i.e. + in a list of retryable / idempotent exceptions. """ # If this is in the list of idempotent exceptions, then we want to # retry. That entails just returning None. if isinstance(exception, self._RETRYABLE_STREAM_ERRORS): - return + return True # Set any other exception on the future. self._future.set_exception(exception) + return False def on_response(self, response): """Process all received Pub/Sub messages. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py index 52038a891d94..3eb820d6c418 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import types as base_types + from google.auth import credentials import mock import pytest @@ -87,18 +89,61 @@ def test_blocking_consume_keyboard_interrupt(): on_res.assert_called_once_with(consumer._policy, mock.sentinel.A) -@mock.patch.object(thread.Policy, 'call_rpc', autospec=True) -@mock.patch.object(thread.Policy, 'on_response', autospec=True) -@mock.patch.object(thread.Policy, 'on_exception', autospec=True) -def test_blocking_consume_exception_reraise(on_exc, on_res, call_rpc): - consumer = create_consumer() +class OnException(object): + + def __init__(self, exiting_event, acceptable=None): + self.exiting_event = exiting_event + self.acceptable = acceptable + + def __call__(self, exception): + if exception is self.acceptable: + return True + else: + self.exiting_event.set() + return False + + +def test_blocking_consume_on_exception(): + policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception')) + policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B) + exc = TypeError('Bad things!') + policy.on_response.side_effect = exc + + consumer = _consumer.Consumer(policy=policy) + policy.on_exception.side_effect = OnException(consumer._exiting) + + # Establish that we get responses until we are sent the exiting event. + consumer._blocking_consume() + + # Check mocks. + policy.call_rpc.assert_called_once() + policy.on_response.assert_called_once_with(mock.sentinel.A) + policy.on_exception.assert_called_once_with(exc) + + +def test_blocking_consume_two_exceptions(): + policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception')) + policy.call_rpc.side_effect = ( + (mock.sentinel.A,), + (mock.sentinel.B,), + ) + exc1 = NameError('Oh noes.') + exc2 = ValueError('Something grumble.') + policy.on_response.side_effect = (exc1, exc2) + + consumer = _consumer.Consumer(policy=policy) + policy.on_exception.side_effect = OnException( + consumer._exiting, acceptable=exc1) # Establish that we get responses until we are sent the exiting event. - call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B) - on_res.side_effect = TypeError('Bad things!') - on_exc.side_effect = on_res.side_effect - with pytest.raises(TypeError): - consumer._blocking_consume() + consumer._blocking_consume() + + # Check mocks. + assert policy.call_rpc.call_count == 2 + policy.on_response.assert_has_calls( + [mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)]) + policy.on_exception.assert_has_calls( + [mock.call(exc1), mock.call(exc2)]) def test_start_consuming(): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 8e9d41138fc7..fef7df01dea0 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -94,7 +94,7 @@ def test_on_exception_deadline_exceeded(): details = 'Bad thing happened. Time out, go sit in the corner.' exc = exceptions.DeadlineExceeded(details) - assert policy.on_exception(exc) is None + assert policy.on_exception(exc) is True def test_on_exception_unavailable(): @@ -103,14 +103,14 @@ def test_on_exception_unavailable(): details = 'UNAVAILABLE. Service taking nap.' exc = exceptions.ServiceUnavailable(details) - assert policy.on_exception(exc) is None + assert policy.on_exception(exc) is True def test_on_exception_other(): policy = create_policy() policy._future = Future(policy=policy) exc = TypeError('wahhhhhh') - assert policy.on_exception(exc) is None + assert policy.on_exception(exc) is False with pytest.raises(TypeError): policy.future.result()