Skip to content

Commit

Permalink
docs(samples): Add code sample for optimistic subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jun 4, 2024
1 parent cfd7926 commit 8a46aa9
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,64 @@ def remove_dead_letter_policy(
# [END pubsub_dead_letter_remove]
return subscription_after_update

def optimistic_subscribe(
project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
# [START pubsub_optimistic_subscribe]
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it
message.ack()


# Wrap subscriber in a 'with' block to automatically call close() when done
with subscriber:
try:
# Optimistically subscribe to messages on the subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except NotFound:
# If the subscription does not exist, then create it.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

# Subscribe on the created subscription
try:
streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_optimistic_subscribe]


def receive_messages(
project_id: str, subscription_id: str, timeout: Optional[float] = None
Expand Down Expand Up @@ -1156,6 +1214,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
optimistic_subscribe_parser.add_argument("topic_id")
optimistic_subscribe_parser.add_argument("subscription_id")
optimistic_subscribe_parser.add_argument("timeout")

receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
receive_parser.add_argument("subscription_id")
Expand Down

0 comments on commit 8a46aa9

Please sign in to comment.