diff --git a/tests/system.py b/tests/system.py index 1694c5022..f07bf25ae 100644 --- a/tests/system.py +++ b/tests/system.py @@ -32,18 +32,19 @@ from google.cloud.pubsub_v1 import exceptions from google.cloud.pubsub_v1 import futures from google.cloud.pubsub_v1 import types +from google.pubsub_v1 import types as gapic_types from test_utils.system import unique_resource_id -@pytest.fixture(scope=u"module") +@pytest.fixture(scope="module") def project(): _, default_project = google.auth.default() yield default_project -@pytest.fixture(scope=u"module") +@pytest.fixture(scope="module") def publisher(): yield pubsub_v1.PublisherClient() @@ -71,15 +72,15 @@ def cleanup(): yield registry # Perform all clean up. - for to_call, argument in registry: - to_call(argument) + for to_call, args, kwargs in registry: + to_call(*args, **kwargs) def test_publish_messages(publisher, topic_path, cleanup): # Make sure the topic gets deleted. - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) futures = [ publisher.publish( @@ -95,7 +96,7 @@ def test_publish_messages(publisher, topic_path, cleanup): def test_publish_large_messages(publisher, topic_path, cleanup): # Make sure the topic gets deleted. - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) # Each message should be smaller than 10**7 bytes (the server side limit for # PublishRequest), but all messages combined in a PublishRequest should @@ -110,7 +111,7 @@ def test_publish_large_messages(publisher, topic_path, cleanup): max_latency=2.0, # so that autocommit happens after publishing all messages max_messages=100, ) - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)] @@ -126,15 +127,17 @@ def test_subscribe_to_messages( publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # Create a topic. - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) # Subscribe to the topic. This must happen before the messages # are published. - subscriber.create_subscription(subscription_path, topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) # Publish some messages. futures = [ @@ -169,15 +172,17 @@ def test_subscribe_to_messages_async_callbacks( publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # Create a topic. - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) # Subscribe to the topic. This must happen before the messages # are published. - subscriber.create_subscription(subscription_path, topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) # Publish some messages. futures = [ @@ -219,17 +224,19 @@ def test_creating_subscriptions_with_non_default_settings( publisher, subscriber, project, topic_path, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # create a topic and a subscription, customize the latter's policy - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) msg_retention_duration = {"seconds": 911} expiration_policy = {"ttl": {"seconds": 90210}} new_subscription = subscriber.create_subscription( - subscription_path, - topic_path, + name=subscription_path, + topic=topic_path, ack_deadline_seconds=30, retain_acked_messages=True, message_retention_duration=msg_retention_duration, @@ -238,7 +245,7 @@ def test_creating_subscriptions_with_non_default_settings( # fetch the subscription and check its settings project_path = subscriber.project_path(project) - subscriptions = subscriber.list_subscriptions(project_path) + subscriptions = subscriber.list_subscriptions(project=project_path) subscriptions = [sub for sub in subscriptions if sub.topic == topic_path] assert len(subscriptions) == 1 @@ -257,11 +264,11 @@ def test_listing_project_topics(publisher, project, cleanup): for i in range(1, 4) ] for topic in topic_paths: - cleanup.append((publisher.delete_topic, topic)) - publisher.create_topic(topic) + cleanup.append((publisher.delete_topic, (), {"topic": topic})) + publisher.create_topic(name=topic) - project_path = publisher.project_path(project) - project_topics = publisher.list_topics(project_path) + project_path = f"projects/{project}" + project_topics = publisher.list_topics(project=project_path) project_topics = set(t.name for t in project_topics) # there might be other topics in the project, thus do a "is subset" check @@ -275,8 +282,8 @@ def test_listing_project_subscriptions(publisher, subscriber, project, cleanup): publisher.topic_path(project, "topic-2" + unique_resource_id(".")), ] for topic in topic_paths: - cleanup.append((publisher.delete_topic, topic)) - publisher.create_topic(topic) + cleanup.append((publisher.delete_topic, (), {"topic": topic})) + publisher.create_topic(name=topic) # create subscriptions subscription_paths = [ @@ -287,12 +294,14 @@ def test_listing_project_subscriptions(publisher, subscriber, project, cleanup): ] for i, subscription in enumerate(subscription_paths): topic = topic_paths[i % 2] - cleanup.append((subscriber.delete_subscription, subscription)) - subscriber.create_subscription(subscription, topic) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription}) + ) + subscriber.create_subscription(name=subscription, topic=topic) # retrieve subscriptions and check that the list matches the expected - project_path = subscriber.project_path(project) - subscriptions = subscriber.list_subscriptions(project_path) + project_path = f"projects/{project}" + subscriptions = subscriber.list_subscriptions(project=project_path) subscriptions = set(s.name for s in subscriptions) # there might be other subscriptions in the project, thus do a "is subset" check @@ -306,8 +315,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup): publisher.topic_path(project, "topic-2" + unique_resource_id(".")), ] for topic in topic_paths: - cleanup.append((publisher.delete_topic, topic)) - publisher.create_topic(topic) + cleanup.append((publisher.delete_topic, (), {"topic": topic})) + publisher.create_topic(name=topic) # create subscriptions subscription_paths = [ @@ -318,21 +327,23 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup): ] for i, subscription in enumerate(subscription_paths): topic = topic_paths[i % 2] - cleanup.append((subscriber.delete_subscription, subscription)) - subscriber.create_subscription(subscription, topic) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription}) + ) + subscriber.create_subscription(name=subscription, topic=topic) # retrieve subscriptions and check that the list matches the expected - subscriptions = publisher.list_topic_subscriptions(topic_paths[0]) - subscriptions = set(subscriptions) + response = publisher.list_topic_subscriptions(topic=topic_paths[0]) + subscriptions = set(response.subscriptions) assert subscriptions == {subscription_paths[0], subscription_paths[2]} def test_managing_topic_iam_policy(publisher, topic_path, cleanup): - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) # create a topic and customize its policy - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) topic_policy = publisher.get_iam_policy(topic_path) topic_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) @@ -358,12 +369,14 @@ def test_managing_subscription_iam_policy( publisher, subscriber, topic_path, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # create a topic and a subscription, customize the latter's policy - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) sub_policy = subscriber.get_iam_policy(subscription_path) sub_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) @@ -397,17 +410,15 @@ def test_subscriber_not_leaking_open_sockets( # subscriber releases the sockets, too. subscriber = pubsub_v1.SubscriberClient() subscriber_2 = pubsub_v1.SubscriberClient() - cleanup.append((subscriber_2.delete_subscription, subscription_path)) - - def one_arg_close(subscriber): # the cleanup helper expects exactly one argument - subscriber.close() - - cleanup.append((one_arg_close, subscriber_2)) - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append( + (subscriber_2.delete_subscription, (), {"subscription": subscription_path}) + ) + cleanup.append((subscriber_2.close, (), {})) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) # Create topic before starting to track connection count (any sockets opened # by the publisher client are not counted by this test). - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) current_process = psutil.Process() conn_count_start = len(current_process.connections()) @@ -419,14 +430,14 @@ def one_arg_close(subscriber): # the cleanup helper expects exactly one argumen # Publish a few messages, wait for the publish to succeed. publish_futures = [ - publisher.publish(topic_path, u"message {}".format(i).encode()) + publisher.publish(topic_path, "message {}".format(i).encode()) for i in range(1, 4) ] for future in publish_futures: future.result() # Synchronously pull messages. - response = subscriber.pull(subscription_path, max_messages=3) + response = subscriber.pull(subscription=subscription_path, max_messages=3) assert len(response.received_messages) == 3 conn_count_end = len(current_process.connections()) @@ -437,15 +448,17 @@ def test_synchronous_pull_no_deadline_error_if_no_messages( publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # Create a topic and subscribe to it. - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) try: - response = subscriber.pull(subscription_path, max_messages=2) + response = subscriber.pull(subscription=subscription_path, max_messages=2) except core_exceptions.DeadlineExceeded: pytest.fail( "Unexpected DeadlineExceeded error on synchronous pull when no " @@ -460,12 +473,14 @@ def test_streaming_pull_callback_error_propagation( self, publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # create a topic and subscribe to it - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) # publish a messages and wait until published future = publisher.publish(topic_path, b"hello!") @@ -486,17 +501,19 @@ def test_streaming_pull_ack_deadline( self, publisher, subscriber, project, topic_path, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # Create a topic and a subscription, then subscribe to the topic. This # must happen before the messages are published. - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) # Subscribe to the topic. This must happen before the messages # are published. subscriber.create_subscription( - subscription_path, topic_path, ack_deadline_seconds=45 + name=subscription_path, topic=topic_path, ack_deadline_seconds=45 ) # publish some messages and wait for completion @@ -535,12 +552,14 @@ def test_streaming_pull_max_messages( self, publisher, topic_path, subscriber, subscription_path, cleanup ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # create a topic and subscribe to it - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50 _publish_messages(publisher, topic_path, batch_sizes=batch_sizes) @@ -596,12 +615,14 @@ def test_streaming_pull_subscriber_permissions_sufficient( ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # create a topic and subscribe to it - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) # A service account granting only the pubsub.subscriber role must be used. filename = os.path.join( @@ -631,12 +652,14 @@ def test_publisher_role_can_publish_messages( ): # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) # Create a topic and subscribe to it. - publisher.create_topic(topic_path) - subscriber.create_subscription(subscription_path, topic_path) + publisher.create_topic(name=topic_path) + subscriber.create_subscription(name=subscription_path, topic=topic_path) # Create a publisher client with only the publisher role only. filename = os.path.join( @@ -646,7 +669,7 @@ def test_publisher_role_can_publish_messages( _publish_messages(publisher_only_client, topic_path, batch_sizes=[2]) - response = subscriber.pull(subscription_path, max_messages=2) + response = subscriber.pull(subscription=subscription_path, max_messages=2) assert len(response.received_messages) == 2 @pytest.mark.skip( @@ -659,14 +682,16 @@ def test_snapshot_seek_subscriber_permissions_sufficient( snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name) # Make sure the topic and subscription get deleted. - cleanup.append((publisher.delete_topic, topic_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) - cleanup.append((subscriber.delete_snapshot, snapshot_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) + cleanup.append((subscriber.delete_snapshot, (), {"snapshot": snapshot_path})) # Create a topic and subscribe to it. - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) subscriber.create_subscription( - subscription_path, topic_path, retain_acked_messages=True + name=subscription_path, topic=topic_path, retain_acked_messages=True ) # A service account granting only the pubsub.subscriber role must be used. @@ -677,20 +702,23 @@ def test_snapshot_seek_subscriber_permissions_sufficient( # Publish two messages and create a snapshot inbetween. _publish_messages(publisher, topic_path, batch_sizes=[1]) - response = subscriber.pull(subscription_path, max_messages=10) + response = subscriber.pull(subscription=subscription_path, max_messages=10) assert len(response.received_messages) == 1 - subscriber.create_snapshot(snapshot_path, subscription_path) + subscriber.create_snapshot(name=snapshot_path, subscription=subscription_path) _publish_messages(publisher, topic_path, batch_sizes=[1]) - response = subscriber.pull(subscription_path, max_messages=10) + response = subscriber.pull(subscription=subscription_path, max_messages=10) assert len(response.received_messages) == 1 # A subscriber-only client should be allowed to seek to a snapshot. - subscriber_only_client.seek(subscription_path, snapshot=snapshot_path) + seek_request = gapic_types.SeekRequest( + subscription=subscription_path, snapshot=snapshot_path + ) + subscriber_only_client.seek(seek_request) # We should receive one message again, since we sought back to a snapshot. - response = subscriber.pull(subscription_path, max_messages=10) + response = subscriber.pull(subscription=subscription_path, max_messages=10) assert len(response.received_messages) == 1 def test_viewer_role_can_list_resources( @@ -699,9 +727,9 @@ def test_viewer_role_can_list_resources( project_path = "projects/" + project # Make sure the created topic gets deleted. - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) - publisher.create_topic(topic_path) + publisher.create_topic(name=topic_path) # A service account granting only the pubsub.viewer role must be used. filename = os.path.join( @@ -712,10 +740,15 @@ def test_viewer_role_can_list_resources( # The following operations should not raise permission denied errors. # NOTE: At least one topic exists. - topic = next(iter(viewer_only_publisher.list_topics(project_path))) - next(iter(viewer_only_publisher.list_topic_subscriptions(topic.name)), None) - next(iter(viewer_only_subscriber.list_subscriptions(project_path)), None) - next(iter(viewer_only_subscriber.list_snapshots(project_path)), None) + topic = next(iter(viewer_only_publisher.list_topics(project=project_path))) + + response = viewer_only_publisher.list_topic_subscriptions(topic=topic.name) + next(iter(response.subscriptions), None) + + next( + iter(viewer_only_subscriber.list_subscriptions(project=project_path)), None + ) + next(iter(viewer_only_subscriber.list_snapshots(project=project_path)), None) def test_editor_role_can_create_resources( self, project, publisher, topic_path, subscriber, subscription_path, cleanup @@ -724,9 +757,11 @@ def test_editor_role_can_create_resources( snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name) # Make sure the created resources get deleted. - cleanup.append((subscriber.delete_snapshot, snapshot_path)) - cleanup.append((subscriber.delete_subscription, subscription_path)) - cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_snapshot, (), {"snapshot": snapshot_path})) + cleanup.append( + (subscriber.delete_subscription, (), {"subscription": subscription_path}) + ) + cleanup.append((publisher.delete_topic, (), {"topic": topic_path})) # A service account granting only the pubsub.editor role must be used. filename = os.path.join( @@ -736,9 +771,11 @@ def test_editor_role_can_create_resources( editor_publisher = type(publisher).from_service_account_file(filename) # The following operations should not raise permission denied errors. - editor_publisher.create_topic(topic_path) - editor_subscriber.create_subscription(subscription_path, topic_path) - editor_subscriber.create_snapshot(snapshot_path, subscription_path) + editor_publisher.create_topic(name=topic_path) + editor_subscriber.create_subscription(name=subscription_path, topic=topic_path) + editor_subscriber.create_snapshot( + name=snapshot_path, subscription=subscription_path + ) def _publish_messages(publisher, topic_path, batch_sizes): @@ -760,7 +797,7 @@ def _publish_messages(publisher, topic_path, batch_sizes): def _make_messages(count): messages = [ - u"message {}/{}".format(i, count).encode("utf-8") for i in range(1, count + 1) + "message {}/{}".format(i, count).encode("utf-8") for i in range(1, count + 1) ] return messages