From 71388c0bcd7946d5446878574db4ab3958ec237c Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 10 Aug 2022 20:16:03 -0400 Subject: [PATCH 1/3] fix: set default streaming_ack_deadline to 60 and make configurable --- .../_protocol/streaming_pull_manager.py | 46 ++++++++++-- .../subscriber/test_streaming_pull_manager.py | 71 ++++++++++++++++--- 2 files changed, 104 insertions(+), 13 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f909c8eec..614276b4e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -73,6 +73,13 @@ """The minimum ack_deadline, in seconds, for when exactly_once is enabled for a subscription. We do this to reduce premature ack expiration. """ +_DEFAULT_STREAM_ACK_DEADLINE = 60 +"""The default stream ack deadline, in seconds.""" + +_MAX_STREAM_ACK_DEADLINE = 600 +"""""" +_MIN_STREAM_ACK_DEADLINE = 10 +"""""" _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { code_pb2.DEADLINE_EXCEEDED, @@ -270,7 +277,36 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 - self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE + + # If max_duration_per_lease_extension is the default + # we set the stream_ack_deadline to the default of 60 + if self._flow_control.max_duration_per_lease_extension == 0: + self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE + # We will not be able to extend more than the default minimum + elif ( + self._flow_control.max_duration_per_lease_extension + < _MIN_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE + # Will not be able to extend past the max + elif ( + self._flow_control.max_duration_per_lease_extension + > _MAX_STREAM_ACK_DEADLINE + ): + self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE + else: + self._stream_ack_deadline = ( + self._flow_control.max_duration_per_lease_extension + ) + + self._ack_deadline = max( + min( + self._flow_control.min_duration_per_lease_extension, + histogram.MAX_ACK_DEADLINE, + ), + histogram.MIN_ACK_DEADLINE, + ) + self._rpc: Optional[bidi.ResumableBidiRpc] = None self._callback: Optional[functools.partial] = None self._closing = threading.Lock() @@ -741,10 +777,10 @@ def heartbeat(self) -> bool: if send_new_ack_deadline: request = gapic_types.StreamingPullRequest( - stream_ack_deadline_seconds=self.ack_deadline + stream_ack_deadline_seconds=self._stream_ack_deadline ) _LOGGER.debug( - "Sending new ack_deadline of %d seconds.", self.ack_deadline + "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline ) else: request = gapic_types.StreamingPullRequest() @@ -796,7 +832,7 @@ def open( _LOGGER.debug( "Creating a stream, default ACK deadline set to {} seconds.".format( - stream_ack_deadline_seconds + self._stream_ack_deadline ) ) @@ -928,6 +964,8 @@ def _get_initial_request( suitable for any other purpose). """ # Put the request together. + # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt + # anyway. Set to some big-ish value in case we modack late. request = gapic_types.StreamingPullRequest( stream_ack_deadline_seconds=stream_ack_deadline_seconds, modify_deadline_ack_ids=[], diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index a8cbfbcda..14caf993a 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from asyncio.streams import FlowControlMixin import functools import logging import threading @@ -107,16 +108,61 @@ def test_constructor_and_default_state(): assert manager._client_id is not None -def test_constructor_with_options(): +def test_constructor_with_default_options(): + flow_control_ = types.FlowControl() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription, - flow_control=mock.sentinel.flow_control, + flow_control=flow_control_, scheduler=mock.sentinel.scheduler, ) - assert manager.flow_control == mock.sentinel.flow_control + assert manager.flow_control == flow_control_ assert manager._scheduler == mock.sentinel.scheduler + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 60 + + +def test_constructor_with_min_and_max_duration_per_lease_extension_(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=15, max_duration_per_lease_extension=20 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 15 + assert manager._stream_ack_deadline == 20 + + +def test_constructor_with_min_duration_per_lease_extension_too_low(): + flow_control_ = types.FlowControl( + min_duration_per_lease_extension=9, max_duration_per_lease_extension=9 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 10 + assert manager._stream_ack_deadline == 10 + + +def test_constructor_with_max_duration_per_lease_extension_too_high(): + flow_control_ = types.FlowControl( + max_duration_per_lease_extension=601, min_duration_per_lease_extension=601 + ) + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=flow_control_, + scheduler=mock.sentinel.scheduler, + ) + assert manager._ack_deadline == 600 + assert manager._stream_ack_deadline == 600 def make_manager(**kwargs): @@ -164,9 +210,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting(): manager._flow_control = types.FlowControl( min_duration_per_lease_extension=0, max_duration_per_lease_extension=0 ) + assert manager._stream_ack_deadline == 60 + assert manager._ack_deadline == 10 + assert manager._obtain_ack_deadline(maybe_update=False) == 10 deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + assert manager._stream_ack_deadline == 60 # When we get some historical data, the deadline is adjusted. manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2) @@ -186,11 +236,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension(): manager._flow_control = types.FlowControl( max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1 ) + assert manager._ack_deadline == 10 + manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large # The deadline configured in flow control should prevail. deadline = manager._obtain_ack_deadline(maybe_update=True) assert deadline == histogram.MIN_ACK_DEADLINE + 1 + assert manager._stream_ack_deadline == 60 def test__obtain_ack_deadline_with_min_duration_per_lease_extension(): @@ -292,12 +345,12 @@ def test__obtain_ack_deadline_no_value_update(): def test_client_id(): manager1 = make_manager() - request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10) + request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60) client_id_1 = request1.client_id assert client_id_1 manager2 = make_manager() - request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10) + request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60) client_id_2 = request2.client_id assert client_id_2 @@ -308,7 +361,7 @@ def test_streaming_flow_control(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 10 assert request.max_outstanding_bytes == 1000 @@ -318,7 +371,7 @@ def test_streaming_flow_control_use_legacy_flow_control(): flow_control=types.FlowControl(max_messages=10, max_bytes=1000), use_legacy_flow_control=True, ) - request = manager._get_initial_request(stream_ack_deadline_seconds=10) + request = manager._get_initial_request(stream_ack_deadline_seconds=60) assert request.max_outstanding_messages == 0 assert request.max_outstanding_bytes == 0 @@ -1046,12 +1099,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog): result = manager.heartbeat() manager._rpc.send.assert_called_once_with( - gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10) + gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60) ) assert result # Set to false after a send is initiated. assert not manager._send_new_ack_deadline - assert "Sending new ack_deadline of 10 seconds." in caplog.text + assert "Sending new ack_deadline of 60 seconds." in caplog.text @mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True) From 5b04cf6589c7a3b9eab631cdaf237ae18acf484a Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 10 Aug 2022 20:24:54 -0400 Subject: [PATCH 2/3] fix lint --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 2 +- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 614276b4e..d1455e09d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple import uuid import grpc # type: ignore diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 14caf993a..f44235335 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from asyncio.streams import FlowControlMixin import functools import logging import threading From d9f77b4ee70862c1cd60dcb8a8494adc3b655c5c Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 11 Aug 2022 10:29:39 -0400 Subject: [PATCH 3/3] mypy fixes --- .../subscriber/_protocol/streaming_pull_manager.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d1455e09d..22d3b73d5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -73,13 +73,15 @@ """The minimum ack_deadline, in seconds, for when exactly_once is enabled for a subscription. We do this to reduce premature ack expiration. """ -_DEFAULT_STREAM_ACK_DEADLINE = 60 -"""The default stream ack deadline, in seconds.""" -_MAX_STREAM_ACK_DEADLINE = 600 -"""""" -_MIN_STREAM_ACK_DEADLINE = 10 -"""""" +_DEFAULT_STREAM_ACK_DEADLINE: float = 60 +"""The default stream ack deadline in seconds.""" + +_MAX_STREAM_ACK_DEADLINE: float = 600 +"""The maximum stream ack deadline in seconds.""" + +_MIN_STREAM_ACK_DEADLINE: float = 10 +"""The minimum stream ack deadline in seconds.""" _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { code_pb2.DEADLINE_EXCEEDED,