From 0d44af6f9cf696d1bce78bd6593c97cd23df720c Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 1 Dec 2017 11:02:46 -0800 Subject: [PATCH] Making Policy.on_callback_request() less open-ended. --- .../pubsub_v1/subscriber/_helper_threads.py | 9 +++-- .../pubsub_v1/subscriber/policy/thread.py | 33 ++++++++++++++++--- .../subscriber/test_helper_threads.py | 10 +++--- .../subscriber/test_policy_thread.py | 27 ++++++++++++--- 4 files changed, 63 insertions(+), 16 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py b/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py index 01b1d88c69bf..b0f166e1a3fa 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py @@ -136,8 +136,10 @@ class QueueCallbackWorker(object): concurrency boundary implemented by ``executor``. Items will be popped off (with a blocking ``get()``) until :attr:`STOP` is encountered. - callback (Callable): A callback that can process items pulled off - of the queue. + callback (Callable[[str, Dict], Any]): A callback that can process + items pulled off of the queue. Items are assumed to be a pair + of a method name to be invoked and a dictionary of keyword + arguments for that method. """ def __init__(self, queue, callback): @@ -154,6 +156,7 @@ def __call__(self): # Run the callback. If any exceptions occur, log them and # continue. try: - self._callback(item) + action, kwargs = item + self._callback(action, kwargs) except Exception as exc: _LOGGER.error('%s: %s', exc.__class__.__name__, exc) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 6dec828a3f1c..5fffcd09f068 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -119,7 +119,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(), _LOGGER.debug('Creating callback requests thread (not starting).') self._callback_requests = _helper_threads.QueueCallbackWorker( self._request_queue, - self.on_callback_request, + self.dispatch_callback, ) def close(self): @@ -180,10 +180,33 @@ def open(self, callback): # Return the future. return self._future - def on_callback_request(self, callback_request): - """Map the callback request to the appropriate gRPC request.""" - action, kwargs = callback_request[0], callback_request[1] - getattr(self, action)(**kwargs) + def dispatch_callback(self, action, kwargs): + """Map the callback request to the appropriate gRPC request. + + Args: + action (str): The method to be invoked. + kwargs (Dict[str, Any]): The keyword arguments for the method + specified by ``action``. + + Raises: + ValueError: If ``action`` isn't one of the expected actions + "ack", "drop", "lease", "modify_ack_deadline" or "nack". + """ + if action == 'ack': + self.ack(**kwargs) + elif action == 'drop': + self.drop(**kwargs) + elif action == 'lease': + self.lease(**kwargs) + elif action == 'modify_ack_deadline': + self.modify_ack_deadline(**kwargs) + elif action == 'nack': + self.nack(**kwargs) + else: + raise ValueError( + 'Unexpected action', action, + 'Must be one of "ack", "drop", "lease", ' + '"modify_ack_deadline" or "nack".') def on_exception(self, exception): """Handle the exception. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py index 585a0baf21ec..ec889b7fc2fd 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py @@ -125,12 +125,13 @@ def test_queue_callback_worker(): # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: - get.side_effect = (mock.sentinel.A, _helper_threads.STOP) + item1 = ('action', mock.sentinel.A) + get.side_effect = (item1, _helper_threads.STOP) qct() # Assert that we got the expected calls. assert get.call_count == 2 - callback.assert_called_once_with(mock.sentinel.A) + callback.assert_called_once_with('action', mock.sentinel.A) def test_queue_callback_worker_exception(): @@ -141,9 +142,10 @@ def test_queue_callback_worker_exception(): # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: - get.side_effect = (mock.sentinel.A, _helper_threads.STOP) + item1 = ('action', mock.sentinel.A) + get.side_effect = (item1, _helper_threads.STOP) qct() # Assert that we got the expected calls. assert get.call_count == 2 - callback.assert_called_once_with(mock.sentinel.A) + callback.assert_called_once_with('action', mock.sentinel.A) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index fef7df01dea0..f73bfea21e12 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -81,11 +81,30 @@ def test_open(thread_start, htr_start): thread_start.assert_called() -def test_on_callback_request(): +def test_dispatch_callback_valid_actions(): policy = create_policy() - with mock.patch.object(policy, 'call_rpc') as call_rpc: - policy.on_callback_request(('call_rpc', {'something': 42})) - call_rpc.assert_called_once_with(something=42) + kwargs = {'foo': 10, 'bar': 13.37} + actions = ( + 'ack', + 'drop', + 'lease', + 'modify_ack_deadline', + 'nack', + ) + for action in actions: + with mock.patch.object(policy, action) as mocked: + policy.dispatch_callback(action, kwargs) + mocked.assert_called_once_with(**kwargs) + + +def test_dispatch_callback_invalid_action(): + policy = create_policy() + with pytest.raises(ValueError) as exc_info: + policy.dispatch_callback('gecko', {}) + + assert len(exc_info.value.args) == 3 + assert exc_info.value.args[0] == 'Unexpected action' + assert exc_info.value.args[1] == 'gecko' def test_on_exception_deadline_exceeded():