Skip to content

Commit

Permalink
docs(samples): Add code sample for optimistic subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jun 5, 2024
1 parent cfd7926 commit d16bf30
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
73 changes: 73 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,70 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
# [END pubsub_create_pull_subscription]


def optimistic_subscribe(
project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
# [START pubsub_optimistic_subscribe]
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Subscription already exists. Timed out.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except NotFound:
print(f"Subscription {subscription_path} not found, creating it.")

# If the subscription does not exist, then create it.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

if subscription:
print(f"Subscription {subscription.name} created")

# Subscribe on the created subscription.
try:
streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_optimistic_subscribe]


def create_subscription_with_dead_letter_topic(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -1157,6 +1221,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
optimistic_subscribe_parser.add_argument("topic_id")
optimistic_subscribe_parser.add_argument("subscription_id")
optimistic_subscribe_parser.add_argument("timeout")

receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
receive_parser.add_argument("subscription_id")
receive_parser.add_argument("timeout", default=None, type=float, nargs="?")
Expand Down Expand Up @@ -1299,6 +1370,8 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
elif args.command == "optimistic-subscribe":
optimistic_subscribe(args.project_id, args.topic_id, args.subscription_id, args.timeout)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_id, args.timeout)
elif args.command == "receive-custom-attributes":
Expand Down
48 changes: 48 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,54 @@ def test_create_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_optimistic_subscribe(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
publisher_client: pubsub_v1.PublisherClient,
capsys: CaptureFixture[str]
) -> None:
subscription_id = (
f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
)
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, subscription_id
)
# Ensure there is no pre-existing subscription.
# So that we can test the case where optimistic subscribe fails.
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

# Invoke optimistic_subscribe when the subscription is not present.
# This tests scenario where optimistic subscribe fails.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
out, _ = capsys.readouterr()
# Verify optimistic subscription failed.
assert f"Subscription {subscription_path} not found, creating it." in out
# Verify that subscription created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" in out
# Verify that subscription didn't already exist.
assert "Subscription already exists. Timed out." not in out

# Invoke optimistic_subscribe when the subscription is present.
# This tests scenario where optimistic subscribe succeeds.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)

out, _ = capsys.readouterr()
# Verify optimistic subscription succeeded.
assert f"Subscription {subscription_path} not found, creating it." not in out
# Verify that subscription was not created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" not in out
# Verify that subscription already existed.
assert "Subscription already exists. Timed out." in out

# Clean up resources created during test.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_create_subscription_with_dead_letter_policy(
subscriber_client: pubsub_v1.SubscriberClient,
dead_letter_topic: str,
Expand Down

0 comments on commit d16bf30

Please sign in to comment.