Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequireStarted sets the _started flag to True on __enter__, but never clears it on __exit__. #468

Open
kangelov opened this issue Dec 16, 2023 · 0 comments
Assignees
Labels
api: pubsublite Issues related to the googleapis/python-pubsublite API.

Comments

@kangelov
Copy link

Environment details

  • OS type and version: MacOS Sonoma 14.2
  • Python version: 3.11
  • pip version: 23.3.1
  • google-cloud-pubsublite version: 1.9.0

Steps to reproduce

  1. Write a method that publishes a message to a topic within a context manager within the method.
  2. Call that method.
  3. Call that method again. <Error that we are already in a context manager: "enter called twice.">

Code example

from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath, MessageMetadata
from google.cloud.pubsublite.cloudpubsub import AsyncPublisherClient

class AbstractPublisher:

    def __init__(self, configuration):
        self._configuration = configuration

        location = CloudRegion(self._configuration.region)
        if not self._configuration.regional:
            location = CloudZone(location, self._configuration.zone_id)

        # This topic obviously needs to exist on GCloud already: it needs to be defined beforehand.
        self.topic_path = TopicPath(os.getenv("GOOGLE_CLOUD_PROJECT"), location, self._configuration.topic_id)

        self._publisher = AsyncPublisherClient(
            per_partition_batching_settings=BatchSettings(
                # These are used for message flow control: messages being published within max_latency sec of each other
                # are batched and sent in with a single request, up to max_messages messages and max_bytes bytes.
                # This is to help limit rapid calls to the backend to once every max_latency in most circumstances.
                max_bytes=self._configuration.max_bytes_num,
                max_latency=self._configuration.max_latency_sec,
                max_messages=self._configuration.max_messages_num
            ),
            enable_idempotence=self._configuration.enable_idempotence
        )

    async def publish_bytes(self, message: bytes, key: str, **kwargs):
        async with self._publisher as publisher:
            message_id = await publisher.publish(self.topic_path, message,
                                                 publish_timestamp=str(datetime.utcnow()),
                                                 message_key=key,
                                                 **kwargs)

#### Stack trace

FailedPrecondition exception at RequireStarted:32 on google.cloud.pubsublite.internal.require_started.py
This on every call to publish_bytes after the first.

Take a look at RequireStarted class, please. Verify if the _started behavior. Thanks!

Kamen
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/python-pubsublite API. label Dec 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/python-pubsublite API.
Projects
None yet
Development

No branches or pull requests

3 participants