diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py index b3a1af540064..3caff7043e2a 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py @@ -21,6 +21,7 @@ import random import time +from google.api_core import exceptions import six from google.cloud.pubsub_v1 import types @@ -65,6 +66,10 @@ class BasePolicy(object): """ _managed_ack_ids = None + _RETRYABLE_STREAM_ERRORS = ( + exceptions.DeadlineExceeded, + exceptions.ServiceUnavailable, + ) def __init__(self, client, subscription, flow_control=types.FlowControl(), histogram_data=None): diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index a4cd9136e0e9..b30dd8107814 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -153,10 +153,9 @@ def on_exception(self, exception): This will cause the stream to exit loudly. """ - # If this is DEADLINE_EXCEEDED, then we want to retry. - # That entails just returning None. - deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED - if getattr(exception, 'code', lambda: None)() == deadline_exceeded: + # If this is in the list of idempotent exceptions, then we want to + # retry. That entails just returning None. + if isinstance(exception, self._RETRYABLE_STREAM_ERRORS): return # Set any other exception on the future. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py index 3ebf1e6e6d18..5dd082ec2de7 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py @@ -14,11 +14,15 @@ import time +from google.api_core import exceptions +from google.auth import credentials +import grpc import mock -from google.auth import credentials from google.cloud.pubsub_v1 import subscriber from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.gapic import subscriber_client_config +from google.cloud.pubsub_v1.subscriber.policy import base from google.cloud.pubsub_v1.subscriber.policy import thread @@ -28,6 +32,23 @@ def create_policy(flow_control=types.FlowControl()): return thread.Policy(client, 'sub_name_d', flow_control=flow_control) +def test_idempotent_retry_codes(): + # Make sure the config matches our hard-coded tuple of exceptions. + interfaces = subscriber_client_config.config['interfaces'] + retry_codes = interfaces['google.pubsub.v1.Subscriber']['retry_codes'] + idempotent = retry_codes['idempotent'] + + status_codes = tuple( + getattr(grpc.StatusCode, name, None) + for name in idempotent + ) + expected = tuple( + exceptions.exception_class_for_grpc_status(status_code) + for status_code in status_codes + ) + assert base.BasePolicy._RETRYABLE_STREAM_ERRORS == expected + + def test_ack_deadline(): policy = create_policy() assert policy.ack_deadline == 10 diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 106e735a7110..e31315900c97 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -17,8 +17,8 @@ from concurrent import futures import threading +from google.api_core import exceptions from google.auth import credentials -import grpc import mock import pytest from six.moves import queue @@ -90,8 +90,19 @@ def test_on_callback_request(): def test_on_exception_deadline_exceeded(): policy = create_policy() - exc = mock.Mock(spec=('code',)) - exc.code.return_value = grpc.StatusCode.DEADLINE_EXCEEDED + + details = 'Bad thing happened. Time out, go sit in the corner.' + exc = exceptions.DeadlineExceeded(details) + + assert policy.on_exception(exc) is None + + +def test_on_exception_unavailable(): + policy = create_policy() + + details = 'UNAVAILABLE. Service taking nap.' + exc = exceptions.ServiceUnavailable(details) + assert policy.on_exception(exc) is None