From 2b01cc40039b7935f34714850c4d6e34441fb643 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 14 Jul 2020 10:04:46 +0200 Subject: [PATCH] Adjust method calls in samples to new client --- samples/snippets/iam_test.py | 16 +++-- samples/snippets/noxfile.py | 26 ++++---- samples/snippets/publisher.py | 26 ++++---- samples/snippets/publisher_test.py | 16 ++--- samples/snippets/quickstart/pub.py | 2 +- samples/snippets/quickstart/pub_test.py | 4 +- samples/snippets/quickstart/sub_test.py | 10 ++-- samples/snippets/subscriber.py | 80 ++++++++++++++++++------- samples/snippets/subscriber_test.py | 74 ++++++++++++++--------- 9 files changed, 153 insertions(+), 101 deletions(-) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py index d196953f6..077b9f3eb 100644 --- a/samples/snippets/iam_test.py +++ b/samples/snippets/iam_test.py @@ -36,15 +36,15 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.delete_topic(topic_path) + publisher_client.delete_topic(request={"topic": topic_path}) except Exception: pass - publisher_client.create_topic(topic_path) + publisher_client.create_topic(request={"name": topic_path}) yield topic_path - publisher_client.delete_topic(topic_path) + publisher_client.delete_topic(request={"topic": topic_path}) @pytest.fixture(scope="module") @@ -59,15 +59,19 @@ def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) try: - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) except Exception: pass - subscriber_client.create_subscription(subscription_path, topic=topic) + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) yield subscription_path - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_get_topic_policy(topic, capsys): diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py index ba55d7ce5..5660f08be 100644 --- a/samples/snippets/noxfile.py +++ b/samples/snippets/noxfile.py @@ -37,24 +37,22 @@ TEST_CONFIG = { # You can opt out from the test for specific Python versions. - 'ignored_versions': ["2.7"], - + "ignored_versions": ["2.7"], # An envvar key for determining the project id to use. Change it # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a # build specific Cloud project. You can also use your own string # to use your own Cloud project. - 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - # A dictionary you want to inject into your test. Don't put any # secrets here. These values will override predefined values. - 'envs': {}, + "envs": {}, } try: # Ensure we can import noxfile_config in the project's directory. - sys.path.append('.') + sys.path.append(".") from noxfile_config import TEST_CONFIG_OVERRIDE except ImportError as e: print("No user noxfile_config found: detail: {}".format(e)) @@ -69,12 +67,12 @@ def get_pytest_env_vars(): ret = {} # Override the GCLOUD_PROJECT and the alias. - env_key = TEST_CONFIG['gcloud_project_env'] + env_key = TEST_CONFIG["gcloud_project_env"] # This should error out if not set. - ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] + ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] # Apply user supplied envs. - ret.update(TEST_CONFIG['envs']) + ret.update(TEST_CONFIG["envs"]) return ret @@ -83,7 +81,7 @@ def get_pytest_env_vars(): ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] # Any default versions that should be ignored. -IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] +IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) @@ -138,7 +136,7 @@ def lint(session): args = FLAKE8_COMMON_ARGS + [ "--application-import-names", ",".join(local_names), - "." + ".", ] session.run("flake8", *args) @@ -182,9 +180,9 @@ def py(session): if session.python in TESTED_VERSIONS: _session_tests(session) else: - session.skip("SKIPPED: {} tests are disabled for this sample.".format( - session.python - )) + session.skip( + "SKIPPED: {} tests are disabled for this sample.".format(session.python) + ) # diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 477b31b9c..ed1e88c72 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -33,9 +33,9 @@ def list_topics(project_id): # project_id = "your-project-id" publisher = pubsub_v1.PublisherClient() - project_path = publisher.project_path(project_id) + project_path = f"projects/{project_id}" - for topic in publisher.list_topics(project_path): + for topic in publisher.list_topics(request={"project": project_path}): print(topic) # [END pubsub_list_topics] @@ -53,7 +53,7 @@ def create_topic(project_id, topic_id): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) - topic = publisher.create_topic(topic_path) + topic = publisher.create_topic(request={"name": topic_path}) print("Topic created: {}".format(topic)) # [END pubsub_quickstart_create_topic] @@ -72,7 +72,7 @@ def delete_topic(project_id, topic_id): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) - publisher.delete_topic(topic_path) + publisher.delete_topic(request={"topic": topic_path}) print("Topic deleted: {}".format(topic_path)) # [END pubsub_delete_topic] @@ -94,11 +94,11 @@ def publish_messages(project_id, topic_id): topic_path = publisher.topic_path(project_id, topic_id) for n in range(1, 10): - data = u"Message number {}".format(n) + data = "Message number {}".format(n) # Data must be a bytestring data = data.encode("utf-8") # When you publish a message, the client returns a future. - future = publisher.publish(topic_path, data=data) + future = publisher.publish(topic_path, data) print(future.result()) print("Published messages.") @@ -120,7 +120,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id): topic_path = publisher.topic_path(project_id, topic_id) for n in range(1, 10): - data = u"Message number {}".format(n) + data = "Message number {}".format(n) # Data must be a bytestring data = data.encode("utf-8") # Add two attributes, origin and username, to the message @@ -163,9 +163,7 @@ def callback(f): data = str(i) futures.update({data: None}) # When you publish a message, the client returns a future. - future = publisher.publish( - topic_path, data=data.encode("utf-8") # data must be a bytestring. - ) + future = publisher.publish(topic_path, data.encode("utf-8")) futures[data] = future # Publish failures shall be handled in the callback function. future.add_done_callback(get_callback(future, data)) @@ -203,10 +201,10 @@ def callback(future): print(message_id) for n in range(1, 10): - data = u"Message number {}".format(n) + data = "Message number {}".format(n) # Data must be a bytestring data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) + future = publisher.publish(topic_path, data) # Non-blocking. Allow the publisher client to batch multiple messages. future.add_done_callback(callback) @@ -263,10 +261,10 @@ def publish_messages_with_retry_settings(project_id, topic_id): topic_path = publisher.topic_path(project_id, topic_id) for n in range(1, 10): - data = u"Message number {}".format(n) + data = "Message number {}".format(n) # Data must be a bytestring data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) + future = publisher.publish(request={"topic": topic_path, "messages": data}) print(future.result()) print("Published messages with retry settings.") diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index b5c2ea1ea..2b170f9e0 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -39,9 +39,9 @@ def topic_admin(client): topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: - topic = client.get_topic(topic_path) + topic = client.get_topic(request={"topic": topic_path}) except: # noqa - topic = client.create_topic(topic_path) + topic = client.create_topic(request={"name": topic_path}) yield topic.name # Teardown of `topic_admin` is handled in `test_delete()`. @@ -52,13 +52,13 @@ def topic_publish(client): topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) try: - topic = client.get_topic(topic_path) + topic = client.get_topic(request={"topic": topic_path}) except: # noqa - topic = client.create_topic(topic_path) + topic = client.create_topic(request={"name": topic_path}) yield topic.name - client.delete_topic(topic.name) + client.delete_topic(request={"topic": topic.name}) def _make_sleep_patch(): @@ -87,7 +87,7 @@ def eventually_consistent_test(): def test_create(client): topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: - client.delete_topic(topic_path) + client.delete_topic(request={"topic": topic_path}) except Exception: pass @@ -95,7 +95,7 @@ def test_create(client): @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): - assert client.get_topic(topic_path) + assert client.get_topic(request={"topic": topic_path}) eventually_consistent_test() @@ -106,7 +106,7 @@ def test_delete(client, topic_admin): @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) + client.get_topic(request={"topic": client.topic_path(PROJECT, TOPIC_ADMIN)}) eventually_consistent_test() diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py index 16432c0c3..8585711f3 100644 --- a/samples/snippets/quickstart/pub.py +++ b/samples/snippets/quickstart/pub.py @@ -63,7 +63,7 @@ def pub(project_id, topic_id): ref = dict({"num_messages": 0}) # When you publish a message, the client returns a future. - api_future = client.publish(topic_path, data=data) + api_future = client.publish(topic_path, data) api_future.add_done_callback(get_callback(api_future, data, ref)) # Keep the main thread from exiting while the message future diff --git a/samples/snippets/quickstart/pub_test.py b/samples/snippets/quickstart/pub_test.py index 6f5cc06c4..0be087bd2 100644 --- a/samples/snippets/quickstart/pub_test.py +++ b/samples/snippets/quickstart/pub_test.py @@ -39,13 +39,13 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.create_topic(topic_path) + publisher_client.create_topic(request={"name": topic_path}) except AlreadyExists: pass yield TOPIC - publisher_client.delete_topic(topic_path) + publisher_client.delete_topic(request={"topic": topic_path}) def test_pub(publisher_client, topic, capsys): diff --git a/samples/snippets/quickstart/sub_test.py b/samples/snippets/quickstart/sub_test.py index 38047422a..089705af6 100644 --- a/samples/snippets/quickstart/sub_test.py +++ b/samples/snippets/quickstart/sub_test.py @@ -38,12 +38,12 @@ def topic_path(): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - topic = publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(request={"name": topic_path}) yield topic.name except AlreadyExists: yield topic_path - publisher_client.delete_topic(topic_path) + publisher_client.delete_topic(request={"topic": topic_path}) @pytest.fixture(scope="module") @@ -52,18 +52,18 @@ def subscription_path(topic_path): try: subscription = subscriber_client.create_subscription( - subscription_path, topic_path + request={"name": subscription_path, "topic": topic_path} ) yield subscription.name except AlreadyExists: yield subscription_path - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription(request={"subscription": subscription_path}) subscriber_client.close() def _publish_messages(topic_path): - publish_future = publisher_client.publish(topic_path, data=b"Hello World!") + publish_future = publisher_client.publish(topic_path, b"Hello World!") publish_future.result() diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index aeffb80d7..ad4d4127d 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -36,7 +36,8 @@ def list_subscriptions_in_topic(project_id, topic_id): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) - for subscription in publisher.list_topic_subscriptions(topic_path): + response = publisher.list_topic_subscriptions(request={"topic": topic_path}) + for subscription in response.subscriptions: print(subscription) # [END pubsub_list_topic_subscriptions] @@ -50,12 +51,14 @@ def list_subscriptions_in_project(project_id): # project_id = "your-project-id" subscriber = pubsub_v1.SubscriberClient() - project_path = subscriber.project_path(project_id) + project_path = f"projects/{project_id}" # Wrap the subscriber in a 'with' block to automatically call close() to # close the underlying gRPC channel when done. with subscriber: - for subscription in subscriber.list_subscriptions(project_path): + for subscription in subscriber.list_subscriptions( + request={"project": project_path} + ): print(subscription.name) # [END pubsub_list_subscriptions] @@ -70,14 +73,17 @@ def create_subscription(project_id, topic_id, subscription_id): # topic_id = "your-topic-id" # subscription_id = "your-subscription-id" + publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) + topic_path = publisher.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) # Wrap the subscriber in a 'with' block to automatically call close() to # close the underlying gRPC channel when done. with subscriber: - subscription = subscriber.create_subscription(subscription_path, topic_path) + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) print("Subscription created: {}".format(subscription)) # [END pubsub_create_pull_subscription] @@ -103,10 +109,12 @@ def create_subscription_with_dead_letter_topic( # with dead letter policy will forward dead letter messages to. # dead_letter_topic_id = "your-dead-letter-topic-id" + publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) + + topic_path = publisher.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id) dead_letter_policy = DeadLetterPolicy( dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 @@ -142,8 +150,9 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint): # subscription_id = "your-subscription-id" # endpoint = "https://my-test-project.appspot.com/push" + publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) + topic_path = publisher.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) @@ -152,7 +161,11 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint): # close the underlying gRPC channel when done. with subscriber: subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config + request={ + "name": subscription_path, + "topic": topic_path, + "push_config": push_config, + } ) print("Push subscription created: {}".format(subscription)) @@ -175,7 +188,7 @@ def delete_subscription(project_id, subscription_id): # Wrap the subscriber in a 'with' block to automatically call close() to # close the underlying gRPC channel when done. with subscriber: - subscriber.delete_subscription(subscription_path) + subscriber.delete_subscription(request={"subscription": subscription_path}) print("Subscription deleted: {}".format(subscription_path)) # [END pubsub_delete_subscription] @@ -210,7 +223,9 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint): # Wrap the subscriber in a 'with' block to automatically call close() to # close the underlying gRPC channel when done. with subscriber: - result = subscriber.update_subscription(subscription, update_mask) + result = subscriber.update_subscription( + request={"subscription": subscription, "update_mask": update_mask} + ) print("Subscription updated: {}".format(subscription_path)) print("New endpoint for subscription is: {}".format(result.push_config)) @@ -236,12 +251,16 @@ def update_subscription_with_dead_letter_policy( # with dead letter policy will forward dead letter messages to. # dead_letter_topic_id = "your-dead-letter-topic-id" + publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) + + topic_path = publisher.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id) - subscription_before_update = subscriber.get_subscription(subscription_path) + subscription_before_update = subscriber.get_subscription( + request={"subscription": subscription_path} + ) print("Before the update: {}".format(subscription_before_update)) # Indicates which fields in the provided subscription to update. @@ -261,7 +280,7 @@ def update_subscription_with_dead_letter_policy( with subscriber: subscription_after_update = subscriber.update_subscription( - subscription, update_mask + request={"subscription": subscription, "update_mask": update_mask} ) print("After the update: {}".format(subscription_after_update)) @@ -283,11 +302,14 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id): # TODO(developer): This is an existing subscription with a dead letter policy. # subscription_id = "your-subscription-id" + publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) + topic_path = publisher.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) - subscription_before_update = subscriber.get_subscription(subscription_path) + subscription_before_update = subscriber.get_subscription( + request={"subscription": subscription_path} + ) print("Before removing the policy: {}".format(subscription_before_update)) # Indicates which fields in the provided subscription to update. @@ -306,7 +328,7 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id): with subscriber: subscription_after_update = subscriber.update_subscription( - subscription, update_mask + request={"subscription": subscription, "update_mask": update_mask} ) print("After removing the policy: {}".format(subscription_after_update)) @@ -445,7 +467,9 @@ def synchronous_pull(project_id, subscription_id): # close the underlying gRPC channel when done. with subscriber: # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + response = subscriber.pull( + request={"subscription": subscription_path, "max_messages": NUM_MESSAGES} + ) ack_ids = [] for received_message in response.received_messages: @@ -453,7 +477,9 @@ def synchronous_pull(project_id, subscription_id): ack_ids.append(received_message.ack_id) # Acknowledges the received messages so they will not be sent again. - subscriber.acknowledge(subscription_path, ack_ids) + subscriber.acknowledge( + request={"subscription": subscription_path, "ack_ids": ack_ids} + ) print( "Received and acknowledged {} messages. Done.".format( @@ -485,7 +511,9 @@ def synchronous_pull_with_lease_management(project_id, subscription_id): SLEEP_TIME = 10 # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + response = subscriber.pull( + request={"subscription": subscription_path, "max_messages": NUM_MESSAGES} + ) multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() @@ -517,7 +545,11 @@ def worker(msg): if process.is_alive(): # `ack_deadline_seconds` must be between 10 to 600. subscriber.modify_ack_deadline( - subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, + request={ + "subscription": subscription_path, + "ack_ids": [ack_id], + "ack_deadline_seconds": ACK_DEADLINE, + } ) logger.info( "{}: Reset ack deadline for {} for {}s".format( @@ -527,7 +559,9 @@ def worker(msg): # If the processs is finished, acknowledges using `ack_id`. else: - subscriber.acknowledge(subscription_path, [ack_id]) + subscriber.acknowledge( + request={"subscription": subscription_path, "ack_ids": [ack_id]} + ) logger.info( "{}: Acknowledged {}".format( time.strftime("%X", time.gmtime()), msg_data diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index a7f7c139c..ee563b701 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -43,13 +43,13 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - topic = publisher_client.get_topic(topic_path) + topic = publisher_client.get_topic(request={"topic": topic_path}) except: # noqa - topic = publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(request={"name": topic_path}) yield topic.name - publisher_client.delete_topic(topic.name) + publisher_client.delete_topic(request={"topic": topic.name}) @pytest.fixture(scope="module") @@ -57,13 +57,13 @@ def dead_letter_topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) try: - dead_letter_topic = publisher_client.get_topic(topic_path) + dead_letter_topic = publisher_client.get_topic(request={"topic": topic_path}) except: # noqa - dead_letter_topic = publisher_client.create_topic(topic_path) + dead_letter_topic = publisher_client.create_topic(request={"name": topic_path}) yield dead_letter_topic.name - publisher_client.delete_topic(dead_letter_topic.name) + publisher_client.delete_topic(request={"topic": dead_letter_topic.name}) @pytest.fixture(scope="module") @@ -78,10 +78,12 @@ def subscription_admin(subscriber_client, topic): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: - subscription = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) except: # noqa subscription = subscriber_client.create_subscription( - subscription_path, topic=topic + request={"name": subscription_path, "topic": topic} ) yield subscription.name @@ -92,15 +94,17 @@ def subscription_sync(subscriber_client, topic): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) try: - subscription = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) except: # noqa subscription = subscriber_client.create_subscription( - subscription_path, topic=topic + request={"name": subscription_path, "topic": topic} ) yield subscription.name - subscriber_client.delete_subscription(subscription.name) + subscriber_client.delete_subscription(request={"subscription": subscription.name}) @pytest.fixture(scope="module") @@ -108,15 +112,17 @@ def subscription_async(subscriber_client, topic): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) try: - subscription = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) except: # noqa subscription = subscriber_client.create_subscription( - subscription_path, topic=topic + request={"name": subscription_path, "topic": topic} ) yield subscription.name - subscriber_client.delete_subscription(subscription.name) + subscriber_client.delete_subscription(request={"subscription": subscription.name}) @pytest.fixture(scope="module") @@ -124,15 +130,17 @@ def subscription_dlq(subscriber_client, topic): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) try: - subscription = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) except: # noqa subscription = subscriber_client.create_subscription( - subscription_path, topic=topic + request={"name": subscription_path, "topic": topic} ) yield subscription.name - subscriber_client.delete_subscription(subscription.name) + subscriber_client.delete_subscription(request={"subscription": subscription.name}) def test_list_in_topic(subscription_admin, capsys): @@ -159,7 +167,9 @@ def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) except Exception: pass @@ -167,7 +177,9 @@ def test_create(subscriber_client): @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) + assert subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) eventually_consistent_test() @@ -179,7 +191,9 @@ def test_create_subscription_with_dead_letter_policy( dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) try: - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) except Exception: pass @@ -196,7 +210,9 @@ def test_create_subscription_with_dead_letter_policy( def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) try: - subscriber_client.delete_subscription(subscription_path) + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) except Exception: pass @@ -204,7 +220,9 @@ def test_create_push(subscriber_client): @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) + assert subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) eventually_consistent_test() @@ -235,17 +253,17 @@ def test_delete(subscriber_client, subscription_admin): @backoff.on_exception(backoff.expo, AssertionError, max_time=60) def eventually_consistent_test(): with pytest.raises(Exception): - subscriber_client.get_subscription(subscription_admin) + subscriber_client.get_subscription( + request={"subscription": subscription_admin} + ) eventually_consistent_test() -def _publish_messages(publisher_client, topic): +def _publish_messages(publisher_client, topic, **attrs): for n in range(5): data = u"message {}".format(n).encode("utf-8") - publish_future = publisher_client.publish( - topic, data=data, origin="python-sample" - ) + publish_future = publisher_client.publish(topic, data, **attrs) publish_future.result() @@ -264,7 +282,7 @@ def test_receive_with_custom_attributes( publisher_client, topic, subscription_async, capsys ): - _publish_messages(publisher_client, topic) + _publish_messages(publisher_client, topic, origin="python-sample") subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5)