Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: quickstart sub.py update #2475

Merged
merged 10 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/cloud-client/iam_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC. All Rights Reserved.
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
12 changes: 5 additions & 7 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,13 +36,11 @@ def topic(client):
topic_path = client.topic_path(PROJECT, TOPIC)

try:
client.delete_topic(topic_path)
except Exception:
pass

client.create_topic(topic_path)
response = client.get_topic(topic_path)
except: # noqa
response = client.create_topic(topic_path)

yield topic_path
yield response.name


def _make_sleep_patch():
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/quickstart.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
19 changes: 10 additions & 9 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
Expand All @@ -34,20 +33,22 @@ def sub(project_id, subscription_name):
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
# Acknowledge the message. Unack'ed messages will be redelivered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional nit)

# Unacknowledged messages will be redelivered when a client subscribes.

I previously intuited that this client would just keep getting the same message if the message was not ack'd which is not the case. Only if this client reconnects will the message get redelivered to this client, which can happen if the loop exits because of an exception, etc...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment Gus. That's something that I have never thought much of. I think Pub/Sub load-balances if there are multiple subscribers, but only if the load is large (like >10Mb). I remember trying with just three messages and they would keep getting delivered to the same subscriber client.

message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))
print('Acknowledged message {}\n'.format(message.message_id))

client.subscribe(subscription_path, callback=callback)
streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(60)
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()


if __name__ == '__main__':
Expand Down
91 changes: 42 additions & 49 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
Expand All @@ -29,84 +27,79 @@
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()
publisher_client = pubsub_v1.PublisherClient()
subscriber_client = pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def topic_path(publisher_client):
def topic_path():
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
topic = publisher_client.create_topic(topic_path)
return topic.name
except AlreadyExists:
pass

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()
return topic_path


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic_path):
def subscription_path(topic_path):
hongalex marked this conversation as resolved.
Show resolved Hide resolved
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.create_subscription(subscription_path, topic_path)
subscription = subscriber_client.create_subscription(
subscription_path, topic_path)
return subscription.name
except AlreadyExists:
pass

yield SUBSCRIPTION
return subscription_path


@pytest.fixture
def to_delete(publisher_client, subscriber_client):
doomed = []
yield doomed
for client, item in doomed:
def _to_delete(resource_paths):
for item in resource_paths:
if 'topics' in item:
publisher_client.delete_topic(item)
if 'subscriptions' in item:
subscriber_client.delete_subscription(item)


def _make_sleep_patch():
real_sleep = time.sleep
def _publish_messages(topic_path):
publish_future = publisher_client.publish(topic_path, data=b'Hello World!')
publish_future.result()


def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)
def _sub_timeout(project_id, subscription_name):
# This is an exactly copy of `sub.py` except
# StreamingPullFuture.result() will time out after 10s.
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(
project_id, subscription_name)

return mock.patch('time.sleep', new=new_sleep)
def callback(message):
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
message.ack()
print('Acknowledged message {}\n'.format(message.message_id))

streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

try:
streaming_pull_future.result(timeout=10)
except: # noqa
streaming_pull_future.cancel()

def test_sub(publisher_client,
topic_path,
subscriber_client,
subscription,
to_delete,
capsys):

publisher_client.publish(topic_path, data=b'Hello, World!')
def test_sub(monkeypatch, topic_path, subscription_path, capsys):
monkeypatch.setattr(sub, 'sub', _sub_timeout)

to_delete.append((publisher_client, topic_path))
_publish_messages(topic_path)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)
sub.sub(PROJECT, SUBSCRIPTION)

to_delete.append((subscriber_client,
'projects/{}/subscriptions/{}'.format(PROJECT,
SUBSCRIPTION)))
# Clean up resources.
_to_delete([topic_path, subscription_path])

out, _ = capsys.readouterr()
assert "Received message" in out
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Loading