Skip to content

Commit

Permalink
PubSub: Making thread.Policy.on_exception more robust. (#4444)
Browse files Browse the repository at this point in the history
- Adding special handling for API core exceptions
- Retrying on both types of idempotent error

Towards #4234.
  • Loading branch information
dhermes authored Nov 27, 2017
1 parent 44998fa commit 83865c2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import random
import time

from google.api_core import exceptions
import six

from google.cloud.pubsub_v1 import types
Expand Down Expand Up @@ -65,6 +66,10 @@ class BasePolicy(object):
"""

_managed_ack_ids = None
_RETRYABLE_STREAM_ERRORS = (
exceptions.DeadlineExceeded,
exceptions.ServiceUnavailable,
)

def __init__(self, client, subscription,
flow_control=types.FlowControl(), histogram_data=None):
Expand Down
7 changes: 3 additions & 4 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ def on_exception(self, exception):
This will cause the stream to exit loudly.
"""
# If this is DEADLINE_EXCEEDED, then we want to retry.
# That entails just returning None.
deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
if getattr(exception, 'code', lambda: None)() == deadline_exceeded:
# If this is in the list of idempotent exceptions, then we want to
# retry. That entails just returning None.
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
return

# Set any other exception on the future.
Expand Down
23 changes: 22 additions & 1 deletion pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

import time

from google.api_core import exceptions
from google.auth import credentials
import grpc
import mock

from google.auth import credentials
from google.cloud.pubsub_v1 import subscriber
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.gapic import subscriber_client_config
from google.cloud.pubsub_v1.subscriber.policy import base
from google.cloud.pubsub_v1.subscriber.policy import thread


Expand All @@ -28,6 +32,23 @@ def create_policy(flow_control=types.FlowControl()):
return thread.Policy(client, 'sub_name_d', flow_control=flow_control)


def test_idempotent_retry_codes():
# Make sure the config matches our hard-coded tuple of exceptions.
interfaces = subscriber_client_config.config['interfaces']
retry_codes = interfaces['google.pubsub.v1.Subscriber']['retry_codes']
idempotent = retry_codes['idempotent']

status_codes = tuple(
getattr(grpc.StatusCode, name, None)
for name in idempotent
)
expected = tuple(
exceptions.exception_class_for_grpc_status(status_code)
for status_code in status_codes
)
assert base.BasePolicy._RETRYABLE_STREAM_ERRORS == expected


def test_ack_deadline():
policy = create_policy()
assert policy.ack_deadline == 10
Expand Down
17 changes: 14 additions & 3 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from concurrent import futures
import threading

from google.api_core import exceptions
from google.auth import credentials
import grpc
import mock
import pytest
from six.moves import queue
Expand Down Expand Up @@ -90,8 +90,19 @@ def test_on_callback_request():

def test_on_exception_deadline_exceeded():
policy = create_policy()
exc = mock.Mock(spec=('code',))
exc.code.return_value = grpc.StatusCode.DEADLINE_EXCEEDED

details = 'Bad thing happened. Time out, go sit in the corner.'
exc = exceptions.DeadlineExceeded(details)

assert policy.on_exception(exc) is None


def test_on_exception_unavailable():
policy = create_policy()

details = 'UNAVAILABLE. Service taking nap.'
exc = exceptions.ServiceUnavailable(details)

assert policy.on_exception(exc) is None


Expand Down

0 comments on commit 83865c2

Please sign in to comment.