From 25022f1d748b463330cb304ac13b377f987fec9f Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 14 Jul 2020 15:33:14 -0700 Subject: [PATCH 1/5] samples: add samples for publish with ordering keys --- samples/snippets/README.rst | 68 +++++--------------- samples/snippets/publisher.py | 98 ++++++++++++++++++++++++++++- samples/snippets/publisher_test.py | 14 +++++ samples/snippets/subscriber.py | 32 ++++++++++ samples/snippets/subscriber_test.py | 11 ++++ 5 files changed, 167 insertions(+), 56 deletions(-) diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst index 40b2e21fc..8a42d0cdc 100644 --- a/samples/snippets/README.rst +++ b/samples/snippets/README.rst @@ -110,11 +110,11 @@ To run this sample: .. code-block:: bash - $ python publisher.py + $ python publisher.py --help usage: publisher.py [-h] project_id - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys} ... This application demonstrates how to perform basic operations on topics @@ -125,7 +125,7 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys} list Lists all Pub/Sub topics in the given project. create Create a new Pub/Sub topic. delete Deletes an existing Pub/Sub topic. @@ -141,6 +141,11 @@ To run this sample: batch settings. publish-with-retry-settings Publishes messages with custom retry settings. + publish-with-ordering-keys + Publishes messages with ordering keys. + resume-publish-with-ordering-keys + Resume publishing messages with ordering keys when + unrecoverable errors occur. optional arguments: -h, --help show this help message and exit @@ -160,11 +165,11 @@ To run this sample: .. code-block:: bash - $ python subscriber.py + $ python subscriber.py --help usage: subscriber.py [-h] project_id - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} ... This application demonstrates how to perform basic operations on @@ -175,13 +180,15 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} list-in-topic Lists all subscriptions for a given topic. list-in-project Lists all subscriptions in the current project. create Create a new pull subscription on the given topic. create-with-dead-letter-policy Create a subscription with dead letter policy. create-push Create a new push subscription on the given topic. + create-with-ordering + Create a subscription with dead letter policy. delete Deletes an existing Pub/Sub topic. update-push Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a @@ -209,53 +216,6 @@ To run this sample: -Identity and Access Management -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com//googleapis/python-pubsub&page=editor&open_in_editor=samples/snippets/iam.py,samples/snippets/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python iam.py - - usage: iam.py [-h] - project - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - ... - - This application demonstrates how to perform basic operations on IAM - policies with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project Your Google Cloud project ID - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - get-topic-policy Prints the IAM policy for the given topic. - get-subscription-policy - Prints the IAM policy for the given subscription. - set-topic-policy Sets the IAM policy for a topic. - set-subscription-policy - Sets the IAM policy for a topic. - check-topic-permissions - Checks to which permissions are available on the given - topic. - check-subscription-permissions - Checks to which permissions are available on the given - subscription. - - optional arguments: - -h, --help show this help message and exit - - - The client library @@ -273,4 +233,4 @@ to `browse the source`_ and `report issues`_. https://github.com/GoogleCloudPlatform/google-cloud-python/issues -.. _Google Cloud SDK: https://cloud.google.com/sdk/ \ No newline at end of file +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 477b31b9c..d46c0b7d9 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -273,6 +273,82 @@ def publish_messages_with_retry_settings(project_id, topic_id): # [END pubsub_publisher_retry_settings] +def publish_with_ordering_keys(project_id, topic_id): + """Publishes messages with ordering keys.""" + # [START pubsub_publish_with_ordering_keys] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher_options = pubsub_v1.types.PublisherOptions( + enable_message_ordering=True + ) + publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options) + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for message in [ + ("message1", "key1"), + ("message2", "key2"), + ("message3", "key1"), + ("message4", "key2"), + ]: + # Data must be a bytestring + data = message[0].encode("utf-8") + ordering_key = message[1] + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data, ordering_key=ordering_key + ) + print(future.result()) + + print("Published messages with ordering keys.") + # [END pubsub_publish_with_ordering_keys] + + +def resume_publish_with_ordering_keys(project_id, topic_id): + """Resume publishing messages with ordering keys when unrecoverable errors occur.""" + # [START pubsub_resume_publish_with_ordering_keys] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher_options = pubsub_v1.types.PublisherOptions( + enable_message_ordering=True + ) + publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options) + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for message in [ + ("message1", "key1"), + ("message2", "key2"), + ("message3", "key1"), + ("message4", "key2"), + ]: + # Data must be a bytestring + data = message[0].encode("utf-8") + ordering_key = message[1] + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data, ordering_key=ordering_key + ) + try: + print(future.result()) + except RuntimeError: + # Resume publish on an ordering key that has had unrecoverable errors. + publisher.resume_publish(topic_path, ordering_key) + + print("Published messages with ordering keys.") + # [END pubsub_resume_publish_with_ordering_keys] + + if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, @@ -288,7 +364,9 @@ def publish_messages_with_retry_settings(project_id, topic_id): delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_id") - publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) + publish_parser = subparsers.add_parser( + "publish", help=publish_messages.__doc__ + ) publish_parser.add_argument("topic_id") publish_with_custom_attributes_parser = subparsers.add_parser( @@ -298,7 +376,8 @@ def publish_messages_with_retry_settings(project_id, topic_id): publish_with_custom_attributes_parser.add_argument("topic_id") publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, + "publish-with-error-handler", + help=publish_messages_with_error_handler.__doc__, ) publish_with_error_handler_parser.add_argument("topic_id") @@ -314,6 +393,17 @@ def publish_messages_with_retry_settings(project_id, topic_id): ) publish_with_retry_settings_parser.add_argument("topic_id") + publish_with_ordering_keys_parser = subparsers.add_parser( + "publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__, + ) + publish_with_ordering_keys_parser.add_argument("topic_id") + + resume_publish_with_ordering_keys_parser = subparsers.add_parser( + "resume-publish-with-ordering-keys", + help=resume_publish_with_ordering_keys.__doc__, + ) + resume_publish_with_ordering_keys_parser.add_argument("topic_id") + args = parser.parse_args() if args.command == "list": @@ -332,3 +422,7 @@ def publish_messages_with_retry_settings(project_id, topic_id): publish_messages_with_batch_settings(args.project_id, args.topic_id) elif args.command == "publish-with-retry-settings": publish_messages_with_retry_settings(args.project_id, args.topic_id) + elif args.command == "publish-with-ordering-keys": + publish_with_ordering_keys(args.project_id, args.topic_id) + elif args.command == "resume-publish-with-ordering-keys": + resume_publish_with_ordering_keys(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index b5c2ea1ea..5b2b2f294 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -144,3 +144,17 @@ def test_publish_with_error_handler(topic_publish, capsys): out, _ = capsys.readouterr() assert "Published" in out + + +def test_publish_with_ordering_keys(topic_publish, capsys): + publisher.publish_messages_with_ordering_keys(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published messages with ordering keys." in out + + +def test_resume_publish_with_error_handler(topic_publish, capsys): + publisher.resume_publish_with_ordering_keys(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published messages with ordering keys." in out diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index f079e7d42..7dd0bf09b 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -160,6 +160,28 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint): # [END pubsub_create_push_subscription] +def create_subscription_with_ordering(project_id, topic_id, subscription_id): + """Create a subscription with dead letter policy.""" + # [START pubsub_enable_subscription_ordering] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, enable_message_ordering=True + ) + print("Created subscription with ordering: {}".format(subscription)) + # [END pubsub_enable_subscription_ordering] + + def delete_subscription(project_id, subscription_id): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] @@ -654,6 +676,12 @@ def callback(message): create_push_parser.add_argument("subscription_id") create_push_parser.add_argument("endpoint") + create_subscription_with_ordering_parser = subparsers.add_parser( + "create-with-ordering", help=create_subscription_with_ordering.__doc__ + ) + create_subscription_with_ordering_parser.add_argument("topic_id") + create_subscription_with_ordering_parser.add_argument("subscription_id") + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_id") @@ -746,6 +774,10 @@ def callback(message): create_push_subscription( args.project_id, args.topic_id, args.subscription_id, args.endpoint, ) + elif args.command == "create-with-ordering": + create_subscription_with_ordering( + args.project_id, args.topic_id, args.subscription_id + ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_id) elif args.command == "update-push": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index a7f7c139c..62018e9a9 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -29,6 +29,7 @@ SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID +SUBSCRIPTION_ORDERING = "subscription-test-subscription-ordering-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -209,6 +210,16 @@ def eventually_consistent_test(): eventually_consistent_test() +def test_create_subscription_with_ordering(subscriber_client, capsys): + subscriber.create_subscription_with_ordering(PROJECT, TOPIC, SUBSCRIPTION_ORDERING) + out, _ = capsys.readouterr() + assert "Created subscription with ordering" in out + assert "enable_message_ordering: true" in out + + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ORDERING) + subscriber_client.delete_subscription(subscription_path) + + def test_update(subscriber_client, subscription_admin, capsys): subscriber.update_push_subscription( PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT From c3d23679e4f69b981087f3b61ac726c8a3fc516a Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 14 Jul 2020 15:38:21 -0700 Subject: [PATCH 2/5] update readme --- samples/snippets/README.rst | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst index 8a42d0cdc..2c67c2c11 100644 --- a/samples/snippets/README.rst +++ b/samples/snippets/README.rst @@ -215,7 +215,44 @@ To run this sample: -h, --help show this help message and exit +Identity and Access Management ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com//googleapis/python-pubsub&page=editor&open_in_editor=samples/snippets/iam.py,samples/snippets/README.rst + + + + +To run this sample: +.. code-block:: bash + $ python iam.py + usage: iam.py [-h] + project + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + ... + This application demonstrates how to perform basic operations on IAM + policies with the Cloud Pub/Sub API. + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + positional arguments: + project Your Google Cloud project ID + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + get-topic-policy Prints the IAM policy for the given topic. + get-subscription-policy + Prints the IAM policy for the given subscription. + set-topic-policy Sets the IAM policy for a topic. + set-subscription-policy + Sets the IAM policy for a topic. + check-topic-permissions + Checks to which permissions are available on the given + topic. + check-subscription-permissions + Checks to which permissions are available on the given + subscription. + optional arguments: + -h, --help show this help message and exit The client library From 8b6b65014d83b329bfa339fcbe8bdd9ddad325cd Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 27 Jul 2020 10:32:53 -0700 Subject: [PATCH 3/5] add regional endpoint --- samples/snippets/publisher.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index d46c0b7d9..9e7294dd8 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -285,7 +285,13 @@ def publish_with_ordering_keys(project_id, topic_id): publisher_options = pubsub_v1.types.PublisherOptions( enable_message_ordering=True ) - publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options) + # Sending messages to the same region ensures they are received in order + # even when multiple publishers are used. + client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"} + publisher = pubsub_v1.PublisherClient( + publisher_options=publisher_options, + client_options=client_options + ) # The `topic_path` method creates a fully qualified identifier # in the form `projects/{project_id}/topics/{topic_id}` topic_path = publisher.topic_path(project_id, topic_id) @@ -321,7 +327,13 @@ def resume_publish_with_ordering_keys(project_id, topic_id): publisher_options = pubsub_v1.types.PublisherOptions( enable_message_ordering=True ) - publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options) + # Sending messages to the same region ensures they are received in order + # even when multiple publishers are used. + client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"} + publisher = pubsub_v1.PublisherClient( + publisher_options=publisher_options, + client_options=client_options + ) # The `topic_path` method creates a fully qualified identifier # in the form `projects/{project_id}/topics/{topic_id}` topic_path = publisher.topic_path(project_id, topic_id) From 0aacc25813e74d778f823d55f032cad9892bf7f2 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 7 Aug 2020 10:14:34 -0700 Subject: [PATCH 4/5] remove extra white space --- samples/snippets/publisher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 9e7294dd8..399d37679 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -287,7 +287,7 @@ def publish_with_ordering_keys(project_id, topic_id): ) # Sending messages to the same region ensures they are received in order # even when multiple publishers are used. - client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"} + client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"} publisher = pubsub_v1.PublisherClient( publisher_options=publisher_options, client_options=client_options @@ -329,7 +329,7 @@ def resume_publish_with_ordering_keys(project_id, topic_id): ) # Sending messages to the same region ensures they are received in order # even when multiple publishers are used. - client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"} + client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"} publisher = pubsub_v1.PublisherClient( publisher_options=publisher_options, client_options=client_options From 08e6974574101d39914442d14ba600d8ff7eadc8 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 7 Aug 2020 10:28:59 -0700 Subject: [PATCH 5/5] fix function signature in test --- samples/snippets/publisher_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 5b2b2f294..95fda846a 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -147,7 +147,7 @@ def test_publish_with_error_handler(topic_publish, capsys): def test_publish_with_ordering_keys(topic_publish, capsys): - publisher.publish_messages_with_ordering_keys(PROJECT, TOPIC_PUBLISH) + publisher.publish_with_ordering_keys(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published messages with ordering keys." in out