Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Making thread.Policy.on_exception more robust. #4444

Merged
merged 3 commits into from
Nov 27, 2017

Conversation

dhermes
Copy link
Contributor

@dhermes dhermes commented Nov 23, 2017

NOTE: I am NOT comfortable with this as a fix, it seems WAAAAAY too bolted on.

  • I'm surprised the retry strategy doesn't come into play here
  • There should a less hard-coded way to map to the idempotent retry codes
  • The method should have a straightforward way to determine the gRPC status code from the exception
  • RuntimeError: set_exception can only be called once occurs if I don't filter out the UNAVAILABLE (e.g.). This seems like a bug in a different code path than is covered here.

  • Adding special handling for API core exceptions
  • Retrying on both types of idempotent error
  • Also doing a "drive-by" hygiene fix changing a global from logger to _LOGGER

Towards #4234.

@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Nov 23, 2017
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Nov 23, 2017
@chemelnucfin chemelnucfin changed the title Making thread.Policy.on_exception more robust. PubSub: Making thread.Policy.on_exception more robust. Nov 23, 2017
code = exception.grpc_status_code
else:
code = getattr(exception, 'code', None)
if callable(code):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

details = 'Bad thing happened. Time out, go sit in the corner.'
exc_state = grpc._channel._RPCState(
(), None, trailing, status_code, details)
exc = grpc._channel._Rendezvous(exc_state, None, None, None)

This comment was marked as spam.

@theacodes
Copy link
Contributor

I'm surprised the retry strategy doesn't come into play here

It's because this happens during streaming. api_core can't know the semantics for restarting the stream. In this case, it's simple, in others (such as spanner) it's not.

if getattr(exception, 'code', lambda: None)() == deadline_exceeded:
if isinstance(exception, exceptions.GoogleAPICallError):
code = exception.grpc_status_code
else:

This comment was marked as spam.


# If this is in the list of idempotent exceptions DEADLINE_EXCEEDED,
# then we want to retry. That entails just returning None.
if code in _IDEMPOTENT_RETRY_CODES:

This comment was marked as spam.

- Adding special handling for API core exceptions
- Retrying on both types of idempotent error
- Also doing a "drive-by" hygiene fix changing a global from
  `logger` to `_LOGGER`

Towards googleapis#4234.
Also removing the `logger` -> `_LOGGER` change to make
the PR easier to review.
@dhermes
Copy link
Contributor Author

dhermes commented Nov 27, 2017

So I have updated and have a better understanding thanks to chats with @jonparrott and @lukesneeringer.

  1. We only expect exceptions from api_core here. This is even true for iterators.
  2. I added BasePolicy._IDEMPOTENT_RETRIES and just added a unit test to sanity check that it matches the config. (I think this is preferred to an import time penalty for every user ever.)
  3. Using isinstance(exception, _IDEMPOTENT_RETRIES), all of the "branch to check for Rendezvous? is code callable? huh?" issues are gone.

I'm still not sure why set_exception gets called twice (crossref #4380). I'm going to try to reproduce this by removing exceptions.ServiceUnavailable from _IDEMPOTENT_RETRIES and then running the code in my gist.

I would like to continue that exploration but still merge this PR. SGTY @jonparrott and @lukesneeringer?

@@ -65,6 +66,10 @@ class BasePolicy(object):
"""

_managed_ack_ids = None
_IDEMPOTENT_RETRIES = (

This comment was marked as spam.

if getattr(exception, 'code', lambda: None)() == deadline_exceeded:
# If this is in the list of idempotent exceptions, then we want to
# retry. That entails just returning None.
if isinstance(exception, self._IDEMPOTENT_RETRIES):

This comment was marked as spam.

This comment was marked as spam.

@theacodes
Copy link
Contributor

I would like to continue that exploration but still merge this PR.

its-the-90s-go-for-it-girl-vhs-yospos-1425428235z

@dhermes dhermes merged commit 83865c2 into googleapis:master Nov 27, 2017
@dhermes dhermes deleted the towards-4234 branch November 27, 2017 22:10
@dhermes
Copy link
Contributor Author

dhermes commented Nov 27, 2017

Well I was able to reproduce

Traceback (most recent call last):
  File "${HOME}/.pyenv/versions/3.6.3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "${HOME}/.pyenv/versions/3.6.3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File ".../google/cloud/pubsub_v1/subscriber/_consumer.py", line 249, in _blocking_consume
    self._policy.on_exception(exc)
  File ".../google/cloud/pubsub_v1/subscriber/policy/thread.py", line 162, in on_exception
    self._future.set_exception(exception)
  File ".../google/cloud/pubsub_v1/futures.py", line 159, in set_exception
    raise RuntimeError('set_exception can only be called once.')
RuntimeError: set_exception can only be called once.

on my very first run with the hack

from google.cloud.pubsub_v1.subscriber.policy import base
# Monkey-patch to intentionally miss errors.
base.BasePolicy._RETRYABLE_STREAM_ERRORS = ()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants