Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Jun 4, 2024
1 parent 8a46aa9 commit 0bd7cd1
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,14 @@ def remove_dead_letter_policy(
# [END pubsub_dead_letter_remove]
return subscription_after_update


def optimistic_subscribe(
project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None
project_id: str,
topic_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
Expand Down Expand Up @@ -612,12 +616,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it
message.ack()


# Wrap subscriber in a 'with' block to automatically call close() when done
with subscriber:
try:
# Optimistically subscribe to messages on the subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
Expand All @@ -629,10 +634,12 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

# Subscribe on the created subscription
try:
streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription.name, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
Expand Down Expand Up @@ -1214,7 +1221,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
Expand Down

0 comments on commit 0bd7cd1

Please sign in to comment.