diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index ab0c8aafa..225f5de54 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -581,6 +581,64 @@ def remove_dead_letter_policy( # [END pubsub_dead_letter_remove] return subscription_after_update +def optimistic_subscribe( + project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None +) -> None: + """Optimistically subscribe to messages instead of making calls to verify existence + of a subscription first and then subscribing to messages from it. This avoids admin + operation calls to verify the existence of a subscription and reduces the probability + of running out of quota for admin operations.""" + # [START pubsub_optimistic_subscribe] + from google.api_core.exceptions import NotFound + from google.cloud import pubsub_v1 + from concurrent.futures import TimeoutError + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + # topic_id = "your-topic-id" + + # Create a subscriber client + subscriber = pubsub_v1.SubscriberClient() + + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Define callback to be called when a message is received + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Ack message after processing it + message.ack() + + + # Wrap subscriber in a 'with' block to automatically call close() when done + with subscriber: + try: + # Optimistically subscribe to messages on the subscription + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + except NotFound: + # If the subscription does not exist, then create it. + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) + + # Subscribe on the created subscription + try: + streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + # [END pubsub_optimistic_subscribe] + def receive_messages( project_id: str, subscription_id: str, timeout: Optional[float] = None @@ -1156,6 +1214,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: ) remove_dead_letter_policy_parser.add_argument("topic_id") remove_dead_letter_policy_parser.add_argument("subscription_id") + + optimistic_subscribe_parser = subparsers.add_parser( + "optimistic-subscribe", help=optimistic_subscribe.__doc__ + ) + optimistic_subscribe_parser.add_argument("topic_id") + optimistic_subscribe_parser.add_argument("subscription_id") + optimistic_subscribe_parser.add_argument("timeout") receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) receive_parser.add_argument("subscription_id")