diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f909c8eec..22d3b73d5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple import uuid import grpc # type: ignore @@ -74,6 +74,15 @@ a subscription. We do this to reduce premature ack expiration. """ +_DEFAULT_STREAM_ACK_DEADLINE: float = 60 +"""The default stream ack deadline in seconds.""" + +_MAX_STREAM_ACK_DEADLINE: float = 600 +"""The maximum stream ack deadline in seconds.""" + +_MIN_STREAM_ACK_DEADLINE: float = 10 +"""The minimum stream ack deadline in seconds.""" + _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { code_pb2.DEADLINE_EXCEEDED, code_pb2.RESOURCE_EXHAUSTED, @@ -270,7 +279,36 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 - self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE + + # If max_duration_per_lease_extension is the default + # we set the stream_ack_deadline to the default of 60 + if self._flow_control.max_duration_per_lease_extension == 0: + self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE + # We will not be able to extend more than the default minimum + elif ( + self._flow_control.max_duration_per_lease_extension + < _MIN_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE + # Will not be able to extend past the max + elif ( + self._flow_control.max_duration_per_lease_extension + > _MAX_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE + else: + self._stream_ack_deadline = ( + self._flow_control.max_duration_per_lease_extension + ) + + self._ack_deadline = max( + min( + self._flow_control.min_duration_per_lease_extension, + histogram.MAX_ACK_DEADLINE, + ), + histogram.MIN_ACK_DEADLINE, + ) + self._rpc: Optional[bidi.ResumableBidiRpc] = None self._callback: Optional[functools.partial] = None self._closing = threading.Lock() @@ -741,10 +779,10 @@ def heartbeat(self) -> bool: if send_new_ack_deadline: request = gapic_types.StreamingPullRequest( - stream_ack_deadline_seconds=self.ack_deadline + stream_ack_deadline_seconds=self._stream_ack_deadline ) _LOGGER.debug( - "Sending new ack_deadline of %d seconds.", self.ack_deadline + "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline ) else: request = gapic_types.StreamingPullRequest() @@ -796,7 +834,7 @@ def open( _LOGGER.debug( "Creating a stream, default ACK deadline set to {} seconds.".format( - stream_ack_deadline_seconds + self._stream_ack_deadline ) ) @@ -928,6 +966,8 @@ def _get_initial_request( suitable for any other purpose). """ # Put the request together. + # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt + # anyway. Set to some big-ish value in case we modack late. request = gapic_types.StreamingPullRequest( stream_ack_deadline_seconds=stream_ack_deadline_seconds, modify_deadline_ack_ids=[], diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index a8cbfbcda..f44235335 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -107,16 +107,61 @@ def test_constructor_and_default_state(): assert manager._client_id is not None -def test_constructor_with_options(): +def test_constructor_with_default_options(): + flow_control_ = types.FlowControl() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription, - flow_control=mock.sentinel.flow_control, + flow_control=flow_control_, scheduler=mock.sentinel.scheduler, ) - assert manager.flow_control == mock.sentinel.flow_control + assert manager.flow_control == flow_control_ assert manager._scheduler == mock.sentinel.scheduler + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 60 + + +def test_constructor_with_min_and_max_duration_per_lease_extension_(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=15, max_duration_per_lease_extension=20 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 15 + assert manager._stream_ack_deadline == 20 + + +def test_constructor_with_min_duration_per_lease_extension_too_low(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=9, max_duration_per_lease_extension=9 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 10 + + +def test_constructor_with_max_duration_per_lease_extension_too_high(): + flow_control_ = types.FlowControl( + max_duration_per_lease_extension=601, min_duration_per_lease_extension=601 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 600 + assert manager._stream_ack_deadline == 600 def make_manager(**kwargs): @@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting(): manager._flow_control = types.FlowControl( min_duration_per_lease_extension=0, max_duration_per_lease_extension=0 ) + assert manager._stream_ack_deadline == 60 + assert manager._ack_deadline == 10 + assert manager._obtain_ack_deadline(maybe_update=False) == 10 deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + assert manager._stream_ack_deadline == 60 # When we get some historical data, the deadline is adjusted. manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2) @@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension(): manager._flow_control = types.FlowControl( max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1 ) + assert manager._ack_deadline == 10 + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should prevail. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + 1 + assert manager._stream_ack_deadline == 60 def test__obtain_ack_deadline_with_min_duration_per_lease_extension(): @@ -292,12 +344,12 @@ def test__obtain_ack_deadline_no_value_update(): def test_client_id(): manager1 = make_manager() - request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10) + request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60) client_id_1 = request1.client_id assert client_id_1 manager2 = make_manager() - request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10) + request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60) client_id_2 = request2.client_id assert client_id_2 @@ -308,7 +360,7 @@ def test_streaming_flow_control(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 10 assert request.max_outstanding_bytes == 1000 @@ -318,7 +370,7 @@ def test_streaming_flow_control_use_legacy_flow_control(): flow_control=types.FlowControl(max_messages=10, max_bytes=1000), use_legacy_flow_control=True, ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 0 assert request.max_outstanding_bytes == 0 @@ -1046,12 +1098,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog): result = manager.heartbeat() manager._rpc.send.assert_called_once_with( - gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10) + gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60) ) assert result # Set to false after a send is initiated. assert not manager._send_new_ack_deadline - assert "Sending new ack_deadline of 10 seconds." in caplog.text + assert "Sending new ack_deadline of 60 seconds." in caplog.text @mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True)