Skip to content

Commit

Permalink
Making thread.Policy.on_exception more robust.
Browse files Browse the repository at this point in the history
- Adding special handling for API core exceptions
- Retrying on both types of idempotent error
- Also doing a "drive-by" hygiene fix changing a global from
  `logger` to `_LOGGER`

Towards #4234.
  • Loading branch information
dhermes committed Nov 27, 2017
1 parent 7a9e4f8 commit ae0b073
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
33 changes: 22 additions & 11 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@
import grpc
from six.moves import queue as queue_mod

from google.api_core import exceptions
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import _helper_threads
from google.cloud.pubsub_v1.subscriber.futures import Future
from google.cloud.pubsub_v1.subscriber.policy import base
from google.cloud.pubsub_v1.subscriber.message import Message


logger = logging.getLogger(__name__)
_LOGGER = logging.getLogger(__name__)
_IDEMPOTENT_RETRY_CODES = (
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.UNAVAILABLE,
)


def _callback_completed(future):
"""Simple callback that just logs a `Future`'s result."""
logger.debug('Result: %s', future.result())
_LOGGER.debug('Result: %s', future.result())


class Policy(base.BasePolicy):
Expand Down Expand Up @@ -80,7 +85,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
)

# Also maintain a request queue and an executor.
logger.debug('Creating callback requests thread (not starting).')
_LOGGER.debug('Creating callback requests thread (not starting).')
if executor is None:
executor = futures.ThreadPoolExecutor(max_workers=10)
self._executor = executor
Expand Down Expand Up @@ -122,7 +127,7 @@ def open(self, callback):
self._future = Future(policy=self)

# Start the thread to pass the requests.
logger.debug('Starting callback requests worker.')
_LOGGER.debug('Starting callback requests worker.')
self._callback = callback
self._consumer.helper_threads.start(
'callback requests worker',
Expand All @@ -135,7 +140,7 @@ def open(self, callback):

# Spawn a helper thread that maintains all of the leases for
# this policy.
logger.debug('Spawning lease maintenance worker.')
_LOGGER.debug('Spawning lease maintenance worker.')
self._leaser = threading.Thread(target=self.maintain_leases)
self._leaser.daemon = True
self._leaser.start()
Expand All @@ -153,10 +158,16 @@ def on_exception(self, exception):
This will cause the stream to exit loudly.
"""
# If this is DEADLINE_EXCEEDED, then we want to retry.
# That entails just returning None.
deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
if getattr(exception, 'code', lambda: None)() == deadline_exceeded:
if isinstance(exception, exceptions.GoogleAPICallError):
code = exception.grpc_status_code
else:
code = getattr(exception, 'code', None)
if callable(code):
code = code()

# If this is in the list of idempotent exceptions DEADLINE_EXCEEDED,
# then we want to retry. That entails just returning None.
if code in _IDEMPOTENT_RETRY_CODES:
return

# Set any other exception on the future.
Expand All @@ -168,8 +179,8 @@ def on_response(self, response):
For each message, schedule a callback with the executor.
"""
for msg in response.received_messages:
logger.debug('New message received from Pub/Sub: %r', msg)
logger.debug(self._callback)
_LOGGER.debug('New message received from Pub/Sub: %r', msg)
_LOGGER.debug(self._callback)
message = Message(msg.message, msg.ack_id, self._request_queue)
future = self._executor.submit(self._callback, message)
future.add_done_callback(_callback_completed)
23 changes: 21 additions & 2 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from concurrent import futures
import threading

from google.api_core import exceptions
from google.auth import credentials
import grpc
import grpc._channel
import mock
import pytest
from six.moves import queue
Expand Down Expand Up @@ -89,9 +91,26 @@ def test_on_callback_request():


def test_on_exception_deadline_exceeded():
# Also traverses the raw gRPC exception path.
policy = create_policy()
exc = mock.Mock(spec=('code',))
exc.code.return_value = grpc.StatusCode.DEADLINE_EXCEEDED

trailing = None
status_code = grpc.StatusCode.DEADLINE_EXCEEDED
details = 'Bad thing happened. Time out, go sit in the corner.'
exc_state = grpc._channel._RPCState(
(), None, trailing, status_code, details)
exc = grpc._channel._Rendezvous(exc_state, None, None, None)

assert policy.on_exception(exc) is None


def test_on_exception_unavailable():
# Also traverses the ``api_core`` exception path.
policy = create_policy()

details = 'UNAVAILABLE. Service taking nap.'
exc = exceptions.ServiceUnavailable(details)

assert policy.on_exception(exc) is None


Expand Down

0 comments on commit ae0b073

Please sign in to comment.