diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index e504e0a8bfb7..22019c33b8d6 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -16,6 +16,7 @@ import datetime import itertools +import operator as op import threading import time @@ -302,6 +303,63 @@ def test_using_snapshots( assert msg.message.data == b"Message 2" +def test_managing_topic_iam_policy(publisher, topic_path, cleanup): + cleanup.append((publisher.delete_topic, topic_path)) + + # create a topic and customize its policy + publisher.create_topic(topic_path) + topic_policy = publisher.get_iam_policy(topic_path) + + topic_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) + topic_policy.bindings.add( + role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"] + ) + new_policy = publisher.set_iam_policy(topic_path, topic_policy) + + # fetch the topic policy again and check its values + topic_policy = publisher.get_iam_policy(topic_path) + assert topic_policy.bindings == new_policy.bindings + assert len(topic_policy.bindings) == 2 + + bindings = sorted(topic_policy.bindings, key=op.attrgetter("role")) + assert bindings[0].role == "roles/pubsub.editor" + assert bindings[0].members == ["domain:google.com"] + + assert bindings[1].role == "roles/pubsub.viewer" + assert bindings[1].members == ["group:cloud-logs@google.com"] + + +def test_managing_subscription_iam_policy( + publisher, subscriber, topic_path, subscription_path, cleanup +): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and a subscription, customize the latter's policy + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + sub_policy = subscriber.get_iam_policy(subscription_path) + + sub_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) + sub_policy.bindings.add( + role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"] + ) + new_policy = subscriber.set_iam_policy(subscription_path, sub_policy) + + # fetch the subscription policy again and check its values + sub_policy = subscriber.get_iam_policy(subscription_path) + assert sub_policy.bindings == new_policy.bindings + assert len(sub_policy.bindings) == 2 + + bindings = sorted(sub_policy.bindings, key=op.attrgetter("role")) + assert bindings[0].role == "roles/pubsub.editor" + assert bindings[0].members == ["domain:google.com"] + + assert bindings[1].role == "roles/pubsub.viewer" + assert bindings[1].members == ["group:cloud-logs@google.com"] + + class TestStreamingPull(object): def test_streaming_pull_callback_error_propagation( self, publisher, topic_path, subscriber, subscription_path, cleanup