From 40cfbc437822dcf56868777eedb01bdb7b136f6f Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 13 Jul 2022 15:47:42 -0400 Subject: [PATCH 1/6] modify samples --- .../subscriber/_protocol/streaming_pull_manager.py | 8 ++++++-- samples/snippets/quickstart/pub.py | 13 +++++++------ samples/snippets/quickstart/sub.py | 5 ++++- samples/snippets/subscriber.py | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f7e44cb7f..4d7466196 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -932,10 +932,13 @@ def _get_initial_request( if self._leaser is not None: # Explicitly copy the list, as it could be modified by another # thread. - lease_ids = list(self._leaser.ack_ids) + if len(self._leaser.ack_ids) < 100000: + _LOGGER.info("sending", len(lease_ids), "lease ids") + lease_ids = list(self._leaser.ack_ids) + else: + lease_ids = list(self._leaser.ack_ids[:100000]) else: lease_ids = [] - # Put the request together. request = gapic_types.StreamingPullRequest( modify_deadline_ack_ids=list(lease_ids), @@ -950,6 +953,7 @@ def _get_initial_request( 0 if self._use_legacy_flow_control else self._flow_control.max_bytes ), ) + # Return the initial request. return request diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py index 7215abd86..8d49bf7dd 100644 --- a/samples/snippets/quickstart/pub.py +++ b/samples/snippets/quickstart/pub.py @@ -15,6 +15,7 @@ # limitations under the License. import argparse +from time import sleep from google.cloud import pubsub_v1 @@ -27,13 +28,13 @@ def pub(project_id: str, topic_id: str) -> None: topic_path = client.topic_path(project_id, topic_id) # Data sent to Cloud Pub/Sub must be a bytestring. - data = b"Hello, World!" - - # When you publish a message, the client returns a future. - api_future = client.publish(topic_path, data) - message_id = api_future.result() - print(f"Published {data.decode()} to {topic_path}: {message_id}") + data = b"Hello, World!" + while True: + sleep(1) + # When you publish a message, the client returns a future. + api_future = client.publish(topic_path, data) + message_id = api_future.result() if __name__ == "__main__": diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py index 0900f652d..c5d49fdaa 100644 --- a/samples/snippets/quickstart/sub.py +++ b/samples/snippets/quickstart/sub.py @@ -15,9 +15,11 @@ # limitations under the License. import argparse +from time import sleep from typing import Optional from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1.types import FlowControl def sub(project_id: str, subscription_id: str, timeout: Optional[float] = None) -> None: @@ -31,11 +33,12 @@ def sub(project_id: str, subscription_id: str, timeout: Optional[float] = None) def callback(message: pubsub_v1.subscriber.message.Message) -> None: print(f"Received {message}.") # Acknowledge the message. Unack'ed messages will be redelivered. + sleep(500) message.ack() print(f"Acknowledged {message.message_id}.") streaming_pull_future = subscriber_client.subscribe( - subscription_path, callback=callback + subscription_path, callback=callback, flow_control=FlowControl(max_messages=700000) ) print(f"Listening for messages on {subscription_path}..\n") diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index fb2c98f33..607cab79f 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -582,7 +582,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: message.ack() # Limit the subscriber to only have ten outstanding messages at a time. - flow_control = pubsub_v1.types.FlowControl(max_messages=10) + flow_control = pubsub_v1.types.FlowControl(max_messages=700000) streaming_pull_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control From a40ee92b41682d2eba94b11be2bae9c9284e8cad Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Jul 2022 10:42:32 -0400 Subject: [PATCH 2/6] remove bidi modacks from initial request --- .../subscriber/_protocol/streaming_pull_manager.py | 14 -------------- .../subscriber/test_streaming_pull_manager.py | 4 ---- 2 files changed, 18 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4d7466196..c8c80f7f7 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -927,22 +927,8 @@ def _get_initial_request( A request suitable for being the first request on the stream (and not suitable for any other purpose). """ - # Any ack IDs that are under lease management need to have their - # deadline extended immediately. - if self._leaser is not None: - # Explicitly copy the list, as it could be modified by another - # thread. - if len(self._leaser.ack_ids) < 100000: - _LOGGER.info("sending", len(lease_ids), "lease ids") - lease_ids = list(self._leaser.ack_ids) - else: - lease_ids = list(self._leaser.ack_ids[:100000]) - else: - lease_ids = [] # Put the request together. request = gapic_types.StreamingPullRequest( - modify_deadline_ack_ids=list(lease_ids), - modify_deadline_seconds=[self.ack_deadline] * len(lease_ids), stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, client_id=self._client_id, diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index ab21a1597..58956a6b8 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1318,8 +1318,6 @@ def test__get_initial_request(): assert isinstance(initial_request, gapic_types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" assert initial_request.stream_ack_deadline_seconds == 123 - assert initial_request.modify_deadline_ack_ids == ["1", "2"] - assert initial_request.modify_deadline_seconds == [10, 10] def test__get_initial_request_wo_leaser(): @@ -1331,8 +1329,6 @@ def test__get_initial_request_wo_leaser(): assert isinstance(initial_request, gapic_types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" assert initial_request.stream_ack_deadline_seconds == 123 - assert initial_request.modify_deadline_ack_ids == [] - assert initial_request.modify_deadline_seconds == [] def test__on_response_delivery_attempt(): From b031306c65239d72aeb5bc7dccdf3c249194d8e1 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Jul 2022 15:18:15 -0400 Subject: [PATCH 3/6] add empty lists --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 2 ++ .../unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index c8c80f7f7..e24f833d0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -930,6 +930,8 @@ def _get_initial_request( # Put the request together. request = gapic_types.StreamingPullRequest( stream_ack_deadline_seconds=stream_ack_deadline_seconds, + modify_deadline_ack_ids=[], + modify_deadline_seconds=[], subscription=self._subscription, client_id=self._client_id, max_outstanding_messages=( diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 58956a6b8..a8cbfbcda 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1318,6 +1318,8 @@ def test__get_initial_request(): assert isinstance(initial_request, gapic_types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" assert initial_request.stream_ack_deadline_seconds == 123 + assert initial_request.modify_deadline_ack_ids == [] + assert initial_request.modify_deadline_seconds == [] def test__get_initial_request_wo_leaser(): @@ -1329,6 +1331,8 @@ def test__get_initial_request_wo_leaser(): assert isinstance(initial_request, gapic_types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" assert initial_request.stream_ack_deadline_seconds == 123 + assert initial_request.modify_deadline_ack_ids == [] + assert initial_request.modify_deadline_seconds == [] def test__on_response_delivery_attempt(): From 158770428e5c4b72ef0a9aca03727e9952af1feb Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Jul 2022 15:19:31 -0400 Subject: [PATCH 4/6] revert quickstart tests --- samples/snippets/quickstart/pub.py | 13 ++++++------- samples/snippets/quickstart/sub.py | 5 +---- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py index 8d49bf7dd..7215abd86 100644 --- a/samples/snippets/quickstart/pub.py +++ b/samples/snippets/quickstart/pub.py @@ -15,7 +15,6 @@ # limitations under the License. import argparse -from time import sleep from google.cloud import pubsub_v1 @@ -28,13 +27,13 @@ def pub(project_id: str, topic_id: str) -> None: topic_path = client.topic_path(project_id, topic_id) # Data sent to Cloud Pub/Sub must be a bytestring. - data = b"Hello, World!" - while True: - sleep(1) - # When you publish a message, the client returns a future. - api_future = client.publish(topic_path, data) - message_id = api_future.result() + + # When you publish a message, the client returns a future. + api_future = client.publish(topic_path, data) + message_id = api_future.result() + + print(f"Published {data.decode()} to {topic_path}: {message_id}") if __name__ == "__main__": diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py index c5d49fdaa..0900f652d 100644 --- a/samples/snippets/quickstart/sub.py +++ b/samples/snippets/quickstart/sub.py @@ -15,11 +15,9 @@ # limitations under the License. import argparse -from time import sleep from typing import Optional from google.cloud import pubsub_v1 -from google.cloud.pubsub_v1.types import FlowControl def sub(project_id: str, subscription_id: str, timeout: Optional[float] = None) -> None: @@ -33,12 +31,11 @@ def sub(project_id: str, subscription_id: str, timeout: Optional[float] = None) def callback(message: pubsub_v1.subscriber.message.Message) -> None: print(f"Received {message}.") # Acknowledge the message. Unack'ed messages will be redelivered. - sleep(500) message.ack() print(f"Acknowledged {message.message_id}.") streaming_pull_future = subscriber_client.subscribe( - subscription_path, callback=callback, flow_control=FlowControl(max_messages=700000) + subscription_path, callback=callback ) print(f"Listening for messages on {subscription_path}..\n") From e0d5fd6a0860cc7c7edb8900d8b39a564259ce68 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Jul 2022 15:21:44 -0400 Subject: [PATCH 5/6] revert samples changes --- samples/snippets/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 607cab79f..fb2c98f33 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -582,7 +582,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: message.ack() # Limit the subscriber to only have ten outstanding messages at a time. - flow_control = pubsub_v1.types.FlowControl(max_messages=700000) + flow_control = pubsub_v1.types.FlowControl(max_messages=10) streaming_pull_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control From 92d57fe50487235e540773cb2941ca0490303099 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Jul 2022 15:33:30 -0400 Subject: [PATCH 6/6] lint --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index e24f833d0..f909c8eec 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -941,7 +941,6 @@ def _get_initial_request( 0 if self._use_legacy_flow_control else self._flow_control.max_bytes ), ) - # Return the initial request. return request