Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: convert all RPC error types to exceptions #170

Merged
merged 3 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@
"""The load threshold below which to resume the incoming message stream."""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
if isinstance(exception, grpc.RpcError):
return exceptions.from_grpc_error(exception)
return exception
def _wrap_as_exception(maybe_exception):
"""Wrap an object as a Python exception, if needed.

Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.

Returns:
The argument itself if an instance of ``BaseException``, otherwise
the argument represented as an instance of ``Exception`` (sub)class.
"""
if isinstance(maybe_exception, grpc.RpcError):
return exceptions.from_grpc_error(maybe_exception)
elif isinstance(maybe_exception, BaseException):
return maybe_exception

return Exception(maybe_exception)


def _wrap_callback_errors(callback, on_callback_error, message):
Expand Down Expand Up @@ -651,7 +662,7 @@ def _should_recover(self, exception):
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of retryable / idempotent exceptions.
"""
exception = _maybe_wrap_exception(exception)
exception = _wrap_as_exception(exception)
# If this is in the list of idempotent exceptions, then we want to
# recover.
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
Expand All @@ -673,7 +684,7 @@ def _should_terminate(self, exception):
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of terminating exceptions.
"""
exception = _maybe_wrap_exception(exception)
exception = _wrap_as_exception(exception)
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.info("Observed terminating stream error %s", exception)
return True
Expand All @@ -692,9 +703,9 @@ def _on_rpc_done(self, future):
background consumer and preventing it from being ``joined()``.
"""
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
future = _maybe_wrap_exception(future)
error = _wrap_as_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future}
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error}
)
thread.daemon = True
thread.start()
12 changes: 9 additions & 3 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
mock.create_autospec(grpc.RpcError, instance=True),
exceptions.GoogleAPICallError,
),
({"error": "RPC terminated"}, Exception),
("something broke", Exception),
],
)
def test__maybe_wrap_exception(exception, expected_cls):
def test__wrap_as_exception(exception, expected_cls):
assert isinstance(
streaming_pull_manager._maybe_wrap_exception(exception), expected_cls
streaming_pull_manager._wrap_as_exception(exception), expected_cls
)


Expand Down Expand Up @@ -956,8 +958,12 @@ def test__on_rpc_done(thread):
manager._on_rpc_done(mock.sentinel.error)

thread.assert_called_once_with(
name=mock.ANY, target=manager.close, kwargs={"reason": mock.sentinel.error}
name=mock.ANY, target=manager.close, kwargs={"reason": mock.ANY}
)
_, kwargs = thread.call_args
reason = kwargs["kwargs"]["reason"]
assert isinstance(reason, Exception)
assert reason.args == (mock.sentinel.error,) # Exception wraps the original error


def test_activate_ordering_keys():
Expand Down