Skip to content

Commit

Permalink
PubSub: Add system tests for PubSub clients (#8277)
Browse files Browse the repository at this point in the history
* Add system tests for listing topics, subscriptions

* Add system test for listing topic's subscriptions

* Add system test for using PubSub snapshots

* Add PubSub system tests for managing IAM policy

* Add test for creating non-default subscriptions

* Remove flaky PubSub snapshots system test

The PubSub backend does not give any guarantees about when a message
will be re-delivered after seeking back to a snapshot, it will only
be delivered "eventually". That causes flakiness in the snapshots
test.

Since the test cannot wait for an indefinite amount of time, this
commit removes it in order to not randomly break the CI builds.
  • Loading branch information
plamut authored Jun 21, 2019
1 parent 09e36cd commit ac31bcf
Showing 1 changed file with 171 additions and 0 deletions.
171 changes: 171 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime
import itertools
import operator as op
import threading
import time

Expand Down Expand Up @@ -183,6 +184,176 @@ def test_subscribe_to_messages_async_callbacks(
future.cancel()


def test_creating_subscriptions_with_non_default_settings(
publisher, subscriber, project, topic_path, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and a subscription, customize the latter's policy
publisher.create_topic(topic_path)

msg_retention_duration = {"seconds": 911}
expiration_policy = {"ttl": {"seconds": 90210}}
new_subscription = subscriber.create_subscription(
subscription_path,
topic_path,
ack_deadline_seconds=30,
retain_acked_messages=True,
message_retention_duration=msg_retention_duration,
expiration_policy=expiration_policy,
)

# fetch the subscription and check its settings
project_path = subscriber.project_path(project)
subscriptions = subscriber.list_subscriptions(project_path)

subscriptions = [sub for sub in subscriptions if sub.topic == topic_path]
assert len(subscriptions) == 1
subscription = subscriptions[0]

assert subscription == new_subscription
assert subscription.ack_deadline_seconds == 30
assert subscription.retain_acked_messages
assert subscription.message_retention_duration.seconds == 911
assert subscription.expiration_policy.ttl.seconds == 90210


def test_listing_project_topics(publisher, project, cleanup):
topic_paths = [
publisher.topic_path(project, "topic-{}".format(i) + unique_resource_id("."))
for i in range(1, 4)
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

project_path = publisher.project_path(project)
project_topics = publisher.list_topics(project_path)
project_topics = set(t.name for t in project_topics)

# there might be other topics in the project, thus do a "is subset" check
assert set(topic_paths) <= project_topics


def test_listing_project_subscriptions(publisher, subscriber, project, cleanup):
# create topics
topic_paths = [
publisher.topic_path(project, "topic-1" + unique_resource_id(".")),
publisher.topic_path(project, "topic-2" + unique_resource_id(".")),
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

# create subscriptions
subscription_paths = [
subscriber.subscription_path(
project, "sub-{}".format(i) + unique_resource_id(".")
)
for i in range(1, 4)
]
for i, subscription in enumerate(subscription_paths):
topic = topic_paths[i % 2]
cleanup.append((subscriber.delete_subscription, subscription))
subscriber.create_subscription(subscription, topic)

# retrieve subscriptions and check that the list matches the expected
project_path = subscriber.project_path(project)
subscriptions = subscriber.list_subscriptions(project_path)
subscriptions = set(s.name for s in subscriptions)

# there might be other subscriptions in the project, thus do a "is subset" check
assert set(subscription_paths) <= subscriptions


def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
# create topics
topic_paths = [
publisher.topic_path(project, "topic-1" + unique_resource_id(".")),
publisher.topic_path(project, "topic-2" + unique_resource_id(".")),
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

# create subscriptions
subscription_paths = [
subscriber.subscription_path(
project, "sub-{}".format(i) + unique_resource_id(".")
)
for i in range(1, 4)
]
for i, subscription in enumerate(subscription_paths):
topic = topic_paths[i % 2]
cleanup.append((subscriber.delete_subscription, subscription))
subscriber.create_subscription(subscription, topic)

# retrieve subscriptions and check that the list matches the expected
subscriptions = publisher.list_topic_subscriptions(topic_paths[0])
subscriptions = set(subscriptions)

assert subscriptions == {subscription_paths[0], subscription_paths[2]}


def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
cleanup.append((publisher.delete_topic, topic_path))

# create a topic and customize its policy
publisher.create_topic(topic_path)
topic_policy = publisher.get_iam_policy(topic_path)

topic_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"])
topic_policy.bindings.add(
role="roles/pubsub.viewer", members=["group:[email protected]"]
)
new_policy = publisher.set_iam_policy(topic_path, topic_policy)

# fetch the topic policy again and check its values
topic_policy = publisher.get_iam_policy(topic_path)
assert topic_policy.bindings == new_policy.bindings
assert len(topic_policy.bindings) == 2

bindings = sorted(topic_policy.bindings, key=op.attrgetter("role"))
assert bindings[0].role == "roles/pubsub.editor"
assert bindings[0].members == ["domain:google.com"]

assert bindings[1].role == "roles/pubsub.viewer"
assert bindings[1].members == ["group:[email protected]"]


def test_managing_subscription_iam_policy(
publisher, subscriber, topic_path, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and a subscription, customize the latter's policy
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)
sub_policy = subscriber.get_iam_policy(subscription_path)

sub_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"])
sub_policy.bindings.add(
role="roles/pubsub.viewer", members=["group:[email protected]"]
)
new_policy = subscriber.set_iam_policy(subscription_path, sub_policy)

# fetch the subscription policy again and check its values
sub_policy = subscriber.get_iam_policy(subscription_path)
assert sub_policy.bindings == new_policy.bindings
assert len(sub_policy.bindings) == 2

bindings = sorted(sub_policy.bindings, key=op.attrgetter("role"))
assert bindings[0].role == "roles/pubsub.editor"
assert bindings[0].members == ["domain:google.com"]

assert bindings[1].role == "roles/pubsub.viewer"
assert bindings[1].members == ["group:[email protected]"]


class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
Expand Down

0 comments on commit ac31bcf

Please sign in to comment.