Skip to content

Commit

Permalink
Making Policy.on_callback_request() less open-ended.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Dec 4, 2017
1 parent 3c0f225 commit 0d44af6
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 16 deletions.
9 changes: 6 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ class QueueCallbackWorker(object):
concurrency boundary implemented by ``executor``. Items will
be popped off (with a blocking ``get()``) until :attr:`STOP`
is encountered.
callback (Callable): A callback that can process items pulled off
of the queue.
callback (Callable[[str, Dict], Any]): A callback that can process
items pulled off of the queue. Items are assumed to be a pair
of a method name to be invoked and a dictionary of keyword
arguments for that method.
"""

def __init__(self, queue, callback):
Expand All @@ -154,6 +156,7 @@ def __call__(self):
# Run the callback. If any exceptions occur, log them and
# continue.
try:
self._callback(item)
action, kwargs = item
self._callback(action, kwargs)
except Exception as exc:
_LOGGER.error('%s: %s', exc.__class__.__name__, exc)
33 changes: 28 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
_LOGGER.debug('Creating callback requests thread (not starting).')
self._callback_requests = _helper_threads.QueueCallbackWorker(
self._request_queue,
self.on_callback_request,
self.dispatch_callback,
)

def close(self):
Expand Down Expand Up @@ -180,10 +180,33 @@ def open(self, callback):
# Return the future.
return self._future

def on_callback_request(self, callback_request):
"""Map the callback request to the appropriate gRPC request."""
action, kwargs = callback_request[0], callback_request[1]
getattr(self, action)(**kwargs)
def dispatch_callback(self, action, kwargs):
"""Map the callback request to the appropriate gRPC request.
Args:
action (str): The method to be invoked.
kwargs (Dict[str, Any]): The keyword arguments for the method
specified by ``action``.
Raises:
ValueError: If ``action`` isn't one of the expected actions
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
"""
if action == 'ack':
self.ack(**kwargs)
elif action == 'drop':
self.drop(**kwargs)
elif action == 'lease':
self.lease(**kwargs)
elif action == 'modify_ack_deadline':
self.modify_ack_deadline(**kwargs)
elif action == 'nack':
self.nack(**kwargs)
else:
raise ValueError(
'Unexpected action', action,
'Must be one of "ack", "drop", "lease", '
'"modify_ack_deadline" or "nack".')

def on_exception(self, exception):
"""Handle the exception.
Expand Down
10 changes: 6 additions & 4 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ def test_queue_callback_worker():
# Set up an appropriate mock for the queue, and call the queue callback
# thread.
with mock.patch.object(queue.Queue, 'get') as get:
get.side_effect = (mock.sentinel.A, _helper_threads.STOP)
item1 = ('action', mock.sentinel.A)
get.side_effect = (item1, _helper_threads.STOP)
qct()

# Assert that we got the expected calls.
assert get.call_count == 2
callback.assert_called_once_with(mock.sentinel.A)
callback.assert_called_once_with('action', mock.sentinel.A)


def test_queue_callback_worker_exception():
Expand All @@ -141,9 +142,10 @@ def test_queue_callback_worker_exception():
# Set up an appropriate mock for the queue, and call the queue callback
# thread.
with mock.patch.object(queue.Queue, 'get') as get:
get.side_effect = (mock.sentinel.A, _helper_threads.STOP)
item1 = ('action', mock.sentinel.A)
get.side_effect = (item1, _helper_threads.STOP)
qct()

# Assert that we got the expected calls.
assert get.call_count == 2
callback.assert_called_once_with(mock.sentinel.A)
callback.assert_called_once_with('action', mock.sentinel.A)
27 changes: 23 additions & 4 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,30 @@ def test_open(thread_start, htr_start):
thread_start.assert_called()


def test_on_callback_request():
def test_dispatch_callback_valid_actions():
policy = create_policy()
with mock.patch.object(policy, 'call_rpc') as call_rpc:
policy.on_callback_request(('call_rpc', {'something': 42}))
call_rpc.assert_called_once_with(something=42)
kwargs = {'foo': 10, 'bar': 13.37}
actions = (
'ack',
'drop',
'lease',
'modify_ack_deadline',
'nack',
)
for action in actions:
with mock.patch.object(policy, action) as mocked:
policy.dispatch_callback(action, kwargs)
mocked.assert_called_once_with(**kwargs)


def test_dispatch_callback_invalid_action():
policy = create_policy()
with pytest.raises(ValueError) as exc_info:
policy.dispatch_callback('gecko', {})

assert len(exc_info.value.args) == 3
assert exc_info.value.args[0] == 'Unexpected action'
assert exc_info.value.args[1] == 'gecko'


def test_on_exception_deadline_exceeded():
Expand Down

0 comments on commit 0d44af6

Please sign in to comment.