diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 00154f8e6..6ab5165d1 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -183,13 +183,13 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): - future_reqs_dict = { + ack_reqs_dict = { req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } requests_completed, requests_to_retry = self._manager.send_unary_ack( ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), - future_reqs_dict=future_reqs_dict, + ack_reqs_dict=ack_reqs_dict, ) # Remove the completed messages from lease management. @@ -228,10 +228,10 @@ def _retry_acks(self, requests_to_retry): ) time.sleep(time_to_wait) - future_reqs_dict = {req.ack_id: req for req in requests_to_retry} + ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} requests_completed, requests_to_retry = self._manager.send_unary_ack( ack_ids=[req.ack_id for req in requests_to_retry], - future_reqs_dict=future_reqs_dict, + ack_reqs_dict=ack_reqs_dict, ) assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE @@ -280,7 +280,7 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): - future_reqs_dict = { + ack_reqs_dict = { req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } @@ -292,7 +292,7 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: modify_deadline_seconds=list( itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) ), - future_reqs_dict=future_reqs_dict, + ack_reqs_dict=ack_reqs_dict, ) assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE @@ -320,11 +320,11 @@ def _retry_modacks(self, requests_to_retry): ) time.sleep(time_to_wait) - future_reqs_dict = {req.ack_id: req for req in requests_to_retry} + ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} requests_completed, requests_to_retry = self._manager.send_unary_modack( modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry], modify_deadline_seconds=[req.seconds for req in requests_to_retry], - future_reqs_dict=future_reqs_dict, + ack_reqs_dict=ack_reqs_dict, ) def nack(self, items: Sequence[requests.NackRequest]) -> None: diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index b92696af8..5971e7988 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -149,33 +149,33 @@ def _get_ack_errors( return None -def _process_futures( +def _process_requests( error_status: Optional["status_pb2.Status"], - future_reqs_dict: "containers.ScalarMap", + ack_reqs_dict: "containers.ScalarMap", errors_dict: Optional["containers.ScalarMap"], ): """Process futures by referring to errors_dict. The errors returned by the server in `errors_dict` are used to complete - the request futures in `future_reqs_dict` (with a success or exception) or + the request futures in `ack_reqs_dict` (with a success or exception) or to return requests for further retries. """ requests_completed = [] requests_to_retry = [] - for ack_id in future_reqs_dict: + for ack_id in ack_reqs_dict: if errors_dict and ack_id in errors_dict: exactly_once_error = errors_dict[ack_id] if exactly_once_error.startswith("TRANSIENT_"): - requests_to_retry.append(future_reqs_dict[ack_id]) + requests_to_retry.append(ack_reqs_dict[ack_id]) else: if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) - future = future_reqs_dict[ack_id].future + future = ack_reqs_dict[ack_id].future future.set_exception(exc) - requests_completed.append(future_reqs_dict[ack_id]) + requests_completed.append(ack_reqs_dict[ack_id]) elif error_status: # Only permanent errors are expected here b/c retriable errors are # retried at the lower, GRPC level. @@ -185,16 +185,16 @@ def _process_futures( exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) - future = future_reqs_dict[ack_id].future + future = ack_reqs_dict[ack_id].future future.set_exception(exc) - requests_completed.append(future_reqs_dict[ack_id]) - elif future_reqs_dict[ack_id].future: - future = future_reqs_dict[ack_id].future + requests_completed.append(ack_reqs_dict[ack_id]) + elif ack_reqs_dict[ack_id].future: + future = ack_reqs_dict[ack_id].future # success future.set_result(AcknowledgeStatus.SUCCESS) - requests_completed.append(future_reqs_dict[ack_id]) + requests_completed.append(ack_reqs_dict[ack_id]) else: - requests_completed.append(future_reqs_dict[ack_id]) + requests_completed.append(ack_reqs_dict[ack_id]) return requests_completed, requests_to_retry @@ -556,7 +556,7 @@ def _schedule_message_on_hold( self._scheduler.schedule(self._callback, msg) def send_unary_ack( - self, ack_ids, future_reqs_dict + self, ack_ids, ack_reqs_dict ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: """Send a request using a separate unary request instead of over the stream. @@ -581,7 +581,7 @@ def send_unary_ack( status = status_pb2.Status() status.code = code_pb2.DEADLINE_EXCEEDED # Makes sure to complete futures so they don't block forever. - _process_futures(status, future_reqs_dict, None) + _process_requests(status, ack_reqs_dict, None) _LOGGER.debug( "RetryError while sending unary RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", @@ -592,13 +592,13 @@ def send_unary_ack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_futures( - error_status, future_reqs_dict, ack_errors_dict + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, ack_errors_dict ) return requests_completed, requests_to_retry def send_unary_modack( - self, modify_deadline_ack_ids, modify_deadline_seconds, future_reqs_dict + self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: """Send a request using a separate unary request instead of over the stream. @@ -635,7 +635,7 @@ def send_unary_modack( status = status_pb2.Status() status.code = code_pb2.DEADLINE_EXCEEDED # Makes sure to complete futures so they don't block forever. - _process_futures(status, future_reqs_dict, None) + _process_requests(status, ack_reqs_dict, None) _LOGGER.debug( "RetryError while sending unary RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", @@ -646,8 +646,8 @@ def send_unary_modack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_futures( - error_status, future_reqs_dict, modack_errors_dict + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, modack_errors_dict ) return requests_completed, requests_to_retry diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index aaedcb6ff..bbc6170e2 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -105,7 +105,7 @@ def test_ack(): dispatcher_.ack(items) manager.send_unary_ack.assert_called_once_with( - ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]} + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ) manager.leaser.remove.assert_called_once_with(items) @@ -132,7 +132,7 @@ def test_ack_no_time(): dispatcher_.ack(items) manager.send_unary_ack.assert_called_once_with( - ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]} + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ) manager.ack_histogram.add.assert_not_called() @@ -226,13 +226,13 @@ def test_retry_acks(): manager.send_unary_ack.assert_has_calls( [ mock.call( - ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]} + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ), mock.call( - ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]} + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ), mock.call( - ack_ids=["ack_id_string"], future_reqs_dict={"ack_id_string": items[0]} + ack_ids=["ack_id_string"], ack_reqs_dict={"ack_id_string": items[0]} ), ] ) @@ -277,17 +277,17 @@ def test_retry_modacks(): mock.call( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[20], - future_reqs_dict={"ack_id_string": items[0]}, + ack_reqs_dict={"ack_id_string": items[0]}, ), mock.call( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[20], - future_reqs_dict={"ack_id_string": items[0]}, + ack_reqs_dict={"ack_id_string": items[0]}, ), mock.call( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[20], - future_reqs_dict={"ack_id_string": items[0]}, + ack_reqs_dict={"ack_id_string": items[0]}, ), ] ) @@ -359,7 +359,7 @@ def test_nack(): manager.send_unary_modack.assert_called_once_with( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[0], - future_reqs_dict={ + ack_reqs_dict={ "ack_id_string": requests.ModAckRequest( ack_id="ack_id_string", seconds=0, future=None ) @@ -380,7 +380,7 @@ def test_modify_ack_deadline(): manager.send_unary_modack.assert_called_once_with( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[60], - future_reqs_dict={"ack_id_string": items[0]}, + ack_reqs_dict={"ack_id_string": items[0]}, ) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 6451be035..3d9904ca1 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -506,7 +506,7 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): def test_send_unary_ack(): manager = make_manager() - manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], future_reqs_dict={}) + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={}) manager._client.acknowledge.assert_called_once_with( subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] @@ -519,7 +519,7 @@ def test_send_unary_modack(): manager.send_unary_modack( modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], modify_deadline_seconds=[10, 20, 20], - future_reqs_dict={}, + ack_reqs_dict={}, ) manager._client.modify_ack_deadline.assert_has_calls( @@ -547,7 +547,7 @@ def test_send_unary_ack_api_call_error(caplog): error = exceptions.GoogleAPICallError("The front fell off") manager._client.acknowledge.side_effect = error - manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], future_reqs_dict={}) + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict={}) assert "The front fell off" in caplog.text @@ -563,7 +563,7 @@ def test_send_unary_modack_api_call_error(caplog): manager.send_unary_modack( modify_deadline_ack_ids=["ack_id_string"], modify_deadline_seconds=[0], - future_reqs_dict={}, + ack_reqs_dict={}, ) assert "The front fell off" in caplog.text @@ -580,14 +580,14 @@ def test_send_unary_ack_retry_error(caplog): manager._client.acknowledge.side_effect = error future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } with pytest.raises(exceptions.RetryError): manager.send_unary_ack( - ack_ids=["ack_id1", "ack_id2"], future_reqs_dict=future_reqs_dict + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict ) assert "RetryError while sending unary RPC" in caplog.text @@ -609,14 +609,14 @@ def test_send_unary_modack_retry_error(caplog): manager._client.modify_ack_deadline.side_effect = error future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future) } with pytest.raises(exceptions.RetryError): manager.send_unary_modack( modify_deadline_ack_ids=["ackid1"], modify_deadline_seconds=[0], - future_reqs_dict=future_reqs_dict, + ack_reqs_dict=ack_reqs_dict, ) assert "RetryError while sending unary RPC" in caplog.text @@ -1606,71 +1606,71 @@ def test_get_ack_errors_happy_case(from_call): assert ack_errors["ack_1"] == "error1" -def test_process_futures_no_requests(): +def test_process_requests_no_requests(): # no requests so no items in results lists - future_reqs_dict = {} + ack_reqs_dict = {} errors_dict = {} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert not requests_completed assert not requests_to_retry -def test_process_futures_error_dict_is_none(): +def test_process_requests_error_dict_is_none(): # it's valid to pass in `None` for `errors_dict` - future_reqs_dict = {} + ack_reqs_dict = {} errors_dict = None - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert not requests_completed assert not requests_to_retry -def test_process_futures_no_errors_has_no_future(): +def test_process_requests_no_errors_has_no_future(): # no errors so request should be completed, even with no future - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None ) } errors_dict = {} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert requests_completed[0].ack_id == "ackid1" assert not requests_to_retry -def test_process_futures_no_errors(): +def test_process_requests_no_errors(): # no errors so request and its future should be completed future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } errors_dict = {} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert requests_completed[0].ack_id == "ackid1" assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS assert not requests_to_retry -def test_process_futures_permanent_error_raises_exception(): +def test_process_requests_permanent_error_raises_exception(): # a permanent error raises an exception future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } errors_dict = {"ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID"} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert requests_completed[0].ack_id == "ackid1" with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: @@ -1682,34 +1682,34 @@ def test_process_futures_permanent_error_raises_exception(): assert not requests_to_retry -def test_process_futures_transient_error_returns_request(): +def test_process_requests_transient_error_returns_request(): # a transient error returns the request in `requests_to_retry` future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } errors_dict = {"ackid1": "TRANSIENT_FAILURE_INVALID_ACK_ID"} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert not requests_completed assert requests_to_retry[0].ack_id == "ackid1" assert not future.done() -def test_process_futures_unknown_error_raises_exception(): +def test_process_requests_unknown_error_raises_exception(): # an unknown error raises an exception future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } errors_dict = {"ackid1": "unknown_error"} - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) assert requests_completed[0].ack_id == "ackid1" with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: @@ -1719,18 +1719,18 @@ def test_process_futures_unknown_error_raises_exception(): assert not requests_to_retry -def test_process_futures_permission_denied_error_status_raises_exception(): +def test_process_requests_permission_denied_error_status_raises_exception(): # a permission-denied error status raises an exception future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } st = status_pb2.Status() st.code = code_pb2.Code.PERMISSION_DENIED - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - st, future_reqs_dict, None + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None ) assert requests_completed[0].ack_id == "ackid1" with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: @@ -1743,18 +1743,18 @@ def test_process_futures_permission_denied_error_status_raises_exception(): assert not requests_to_retry -def test_process_futures_failed_precondition_error_status_raises_exception(): +def test_process_requests_failed_precondition_error_status_raises_exception(): # a failed-precondition error status raises an exception future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } st = status_pb2.Status() st.code = code_pb2.Code.FAILED_PRECONDITION - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - st, future_reqs_dict, None + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None ) assert requests_completed[0].ack_id == "ackid1" with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: @@ -1767,18 +1767,18 @@ def test_process_futures_failed_precondition_error_status_raises_exception(): assert not requests_to_retry -def test_process_futures_other_error_status_raises_exception(): +def test_process_requests_other_error_status_raises_exception(): # an unrecognized error status raises an exception future = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future ) } st = status_pb2.Status() st.code = code_pb2.Code.OUT_OF_RANGE - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - st, future_reqs_dict, None + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None ) assert requests_completed[0].ack_id == "ackid1" with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: @@ -1787,12 +1787,12 @@ def test_process_futures_other_error_status_raises_exception(): assert not requests_to_retry -def test_process_futures_mixed_success_and_failure_acks(): +def test_process_requests_mixed_success_and_failure_acks(): # mixed success and failure (acks) future1 = futures.Future() future2 = futures.Future() future3 = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, @@ -1819,8 +1819,8 @@ def test_process_futures_mixed_success_and_failure_acks(): "ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID", "ackid2": "TRANSIENT_FAILURE_INVALID_ACK_ID", } - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) # message with ack_id 'ackid1' fails with an exception assert requests_completed[0].ack_id == "ackid1" @@ -1838,12 +1838,12 @@ def test_process_futures_mixed_success_and_failure_acks(): assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS -def test_process_futures_mixed_success_and_failure_modacks(): +def test_process_requests_mixed_success_and_failure_modacks(): # mixed success and failure (modacks) future1 = futures.Future() future2 = futures.Future() future3 = futures.Future() - future_reqs_dict = { + ack_reqs_dict = { "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future1), "ackid2": requests.ModAckRequest(ack_id="ackid2", seconds=60, future=future2), "ackid3": requests.ModAckRequest(ack_id="ackid3", seconds=60, future=future3), @@ -1852,8 +1852,8 @@ def test_process_futures_mixed_success_and_failure_modacks(): "ackid1": "PERMANENT_FAILURE_INVALID_ACK_ID", "ackid2": "TRANSIENT_FAILURE_INVALID_ACK_ID", } - requests_completed, requests_to_retry = streaming_pull_manager._process_futures( - None, future_reqs_dict, errors_dict + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict ) # message with ack_id 'ackid1' fails with an exception assert requests_completed[0].ack_id == "ackid1"