Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On partitioned topics with the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work #43

Open
zbentley opened this issue May 21, 2022 · 16 comments
Assignees

Comments

@zbentley
Copy link
Contributor

zbentley commented May 21, 2022

Describe the bug
Passing receiver_queue_size=0 to the Python client's subscribe method results in a InvalidConfiguration exception.

However, these docs indicate that a receiver queue size of 0 is supported.

To Reproduce

  1. With a connected Python client, call subscribe on any partitioned topic with the receiver_queue_size kwarg set to 0.
  2. Observe that an InvalidConfiguration error is raised.

Expected behavior
A value of 0 should either be supported and documented (is 0 equivalent to 1?), or these docs should be updated to reflect that the python client (if not others) do not support values less than 1.

Environment
Same as #190

@codelipenghui
Copy link
Contributor

@zbentley It should be a feature catch-up, the Java client supported zero queue consumer but looks like the Python client does not support it. We will add zero queue consumer support for the Python client in 2.11.0.

@BewareMyPower
Copy link
Contributor

I think zero queue size consumer has already been supported from a very early version. I've also fixed a related bug last year, see apache/pulsar#10506.

image

It's better to show your Python client version and paste your code.

@BewareMyPower
Copy link
Contributor

is 0 equivalent to 1?

No.

@codelipenghui
Copy link
Contributor

@BewareMyPower Do we have a test for the zero queue consumer of the Python client? If we have a test, please share the test in this issue.

@zbentley
Copy link
Contributor Author

zbentley commented May 22, 2022

@BewareMyPower

It's better to show your Python client version and paste your code.

See #190; that contains Python/client/etc. version.

The code I'm using is:

from pulsar import Client
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'SUBSCRIPTION'

def main():
    client = Client(service_url='pulsar://localhost:6650')
    client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        receiver_queue_size=0,
        consumer_name=f'testconsumer-{os.getpid()}'
    )

if __name__ == '__main__':
    main()

@zbentley
Copy link
Contributor Author

Could you clarify the difference between a setting of 0 and a setting of 1?

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 22, 2022

@codelipenghui See https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/pulsar-client-cpp/python/pulsar_test.py#L289-L306

This test was introduced in apache/pulsar#5706 that was included since Pulsar 2.5.0. It fixes apache/pulsar#5634. (Python consumer does not accept receiver_queue_size=0)

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 22, 2022

@zbentley The common behavior between 0 and 1 receiver queue size is that the permits of FLOW request is 1.

However, if the receiver queue size is 0, only when receive is called will the consumer send the FLOW request to broker, which means the consumer won't prefetch the messages.

If the receiver queue size is 1, the consumer will send a FLOW request to broker immediately after the consumer is created successfully, which means the consumer will prefetch 1 message and cache it inside the consumer. It could affect the logic of the dispatcher in broker.

@zbentley
Copy link
Contributor Author

@BewareMyPower thanks, can that be added to the client documentation for receiver-queue related flags?

@zbentley
Copy link
Contributor Author

zbentley commented May 22, 2022

Update (and updated issue title): this only happens with partitioned topics. While I'm not quite sure what receiver queue size does on partitioned topics, I'm pretty sure it shouldn't start throwing errors for configurations that are valid for non-partitioned topics (unless receiver queue size is not honored at all for partitioned topics, in which case I'd expect it to always be an error to specify it).

@zbentley zbentley changed the title With the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work On partitioned topics with the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work May 22, 2022
@BewareMyPower
Copy link
Contributor

It looks like zero queue consumer cannot be used on a partitioned topic. I tested the Java code as well.

        var topic = "my-topic";
        var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
        try {
            admin.topics().createPartitionedTopic(topic, 1);
        } catch (PulsarAdminException ignored) {
        }
        var client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
        var consumer = client.newConsumer()
                .topic(topic)
                .subscriptionName("sub-xxx")
                .receiverQueueSize(0)
                .subscribe();

The error message is more friendly and clear:

IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

@BewareMyPower
Copy link
Contributor

I think it's a common problem for clients of all languages. This check was introduced in apache/pulsar#1103.

    MultiTopicsConsumerImpl(/* ... */) {
        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                schema, interceptors);

        checkArgument(conf.getReceiverQueueSize() > 0,
            "Receiver queue size needs to be greater than 0 for Topics Consumer");

IMO, it's a bug. It looks like the multi topics consumer requires the receiver queue size is at least 2. (from the Math.max(2, conf.getReceiverQueueSize()) call). I guess in the early time of Pulsar, the number of partitions must be at least 2, so we have this check.

But the zero queue consumer should be an exceptional case. We should fix both Java and C++ clients for this config. @codelipenghui

@codelipenghui
Copy link
Contributor

@zbentley @BewareMyPower

It’s not enabled because we cannot know which partition the next message will be coming from. Any suggestion on how to achieve that?

from apache/pulsar#7280 (comment)
This is the main challenge to supporting zero queue consumers for a partitioned topic.

One option here is to create a function to merge messages from multiple topics/partitions to a non-partitioned topic,
the zero consumer only consumes the messages from the merged topic.

@BewareMyPower
Copy link
Contributor

Got it. For zero queue consumer, there is no way to determine which internal consumer (on a specific partition) should be chosen to call receive.

@zbentley
Copy link
Contributor Author

While I don't know much about this area of the code, that sounds fine to me.

Other solutions that may make sense:

  • Document that a zero queue size with a partitioned topic would potentially buffer up to N messages (where N is the number of partitions on the topic).
  • Sequentially loop over the internal receive calls for each partition, returning when a message is found on any. The interactions with the timeout and missed deliveries might be undesirable here though.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@tisonkun tisonkun transferred this issue from apache/pulsar Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants