Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for NonDurable subscriptions #468

Closed
frankjkelly opened this issue Feb 16, 2021 · 13 comments · Fixed by #992
Closed

Add Support for NonDurable subscriptions #468

frankjkelly opened this issue Feb 16, 2021 · 13 comments · Fixed by #992

Comments

@frankjkelly
Copy link

Is your feature request related to a problem? Please describe.
The Java API supports Non-Durable Subscriptions via SubscriptionMode
https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java#L243-L256

but it appears that the Go client does not

type ConsumerOptions struct {

This is useful in our use case where we have lots of topics with short-lived subscriptions that last minutes to hours and then once data is read the topic is no longer needed.
To ensure that data is compacted in the bookies currently we have to set the subscription expiration time at the namespace level

Describe the solution you'd like
Please add support for Subscription Mode

Describe alternatives you've considered
Continue to use subscription expiration time

Additional context
Add any other context or screenshots about the feature request here.

@codelipenghui
Copy link
Contributor

@frankjkelly The go client support reader API which is based on the non-durable subscription. Is it works for you?

@frankjkelly
Copy link
Author

frankjkelly commented Feb 18, 2021

Thanks - I guess that's not what I think we're seeing using the 0.3.0 Go Client library with Pulsar 2.6.1

In the image below taken from our Pulsar Grafana dashboard hopefully you can see our Consumers (on the cogito-dialog\wav namespace are short-lived) but the count of subscriptions on that namespace lasts much longer and the count of subscribers only goes down at the 1 hour mark since we have set a subscription expiration time on that namespace (largely so we can allow the ledger to rollover and disk to be reclaimed).

image

Here is how we have configured our consumer

	consumer, err := p.client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       topic,
		SubscriptionName:            subscriptionName,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		Type:                        pulsar.Exclusive,
	})

@frankjkelly
Copy link
Author

@codelipenghui @wolfstudy wondering if I am misunderstanding something with the Go client - perhaps my ConsumerOptions are wrong (see above) or there is a bug in the Broker side counts of open subscriptions but it does appear that the Go client uses Durable subscriptions? Thanks in advance!

@flowchartsman
Copy link
Contributor

To add some context, @frankjkelly and I spoke in the Pulsar Slack, and he mentioned that he misunderstood @codelipenghui's recommendation and that a Reader might work for him. Whether there's still a need for a non-durable subscription with a plain consumer, I can't say. If there's effectively no difference between this and a reader, I'd probably say the issue should be closed.

@frankjkelly
Copy link
Author

I think it would be nice to have equivalence with the Java API which supports NonDurable subscriptions on both the Reader and Consumer interfaces

https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java#L35

@flowchartsman
Copy link
Contributor

Feature parity alone is not a compelling reason for me, since it duplicates functionality. Re-reading your use-case, it seems that you might even benefit more from a traditional consumer with aggressive subscription/topic removal policies. This way, at least, you know that if your consumer has a hiccough and needs to restart it won't have to start over, but that the data will still be deleted in a timely manner once the subscription is caught up.

@apapia
Copy link

apapia commented Jan 12, 2022

@flowchartsman I'm picking this up from @frankjkelly and trying to implement the Reader interface. However, I'm unsure how to implement it for our use case. Essentially, we just have a worker than wants to read a topic from the beginning to the "end" where the end is indicated by a message with an end-of-stream property. If this worker dies, the "job" will just be picked up by another worker which will start reading from the beginning of the stream. Now we need to handle reading the stream while a publisher is writing to it in real-time and we may also connect to the topic to read before the publisher does.

What I'm unclear on is how to use the reader.HasNext() and reader.Next(ctx) interface. Under what conditions will HasNext() return false? Presumably any time there are no unread messages? If HasNext() returns false, what should the reader routine do? sleep for some amount of time? We want to read with as minimal latency as possible so we'd like to avoid unnecessary sleeps.

With the Consumer interface, we were able to simply call consumer.Receive(ctx) and it would block until the next message was available. Is there a way to achieve that while using the Reader for a non-durable subscription?

Thanks!

@flowchartsman
Copy link
Contributor

flowchartsman commented Jan 12, 2022

HasNext() gives you the ability to bail when you're caught up. If you just loop on reader.Next(ctx), you should get the behavior you desire: the client should block until it gets new messages and then process them. You can then choose to bail on whatever property you want. That said, reading until HasNext() is false, will also get you to the "end", though it sounds like what you're doing isn't looking for the last message, but a "last" message of your own designation, in which case your ending condition is up to you: you can inspect the messages, bail after a certain amount of time waiting (with a ctx timeout) or whatever you like.

@apapia
Copy link

apapia commented Jan 12, 2022

excellent thanks for the explainer @flowchartsman!

@flowchartsman
Copy link
Contributor

@apapia no problem. If the issue is now resolved, please feel free to close the issue, thanks!

@yangou
Copy link

yangou commented Oct 4, 2022

@flowchartsman Hi, with the reader interface, we will always have to manage the discovery of the topic partitions, and also regex consumer won't work. On top of that the backlog also won't show up in the monitoring stats.

That's why imho, a NonDurable subscription is still meaningful feature.

@dinghram
Copy link
Contributor

At my company, we use Pulsar in an enterprise application that has requirements that my team cannot control. We cannot use "aggressive subscription/topic removal policies" as there are many other requirements that control those policies. We also cannot easily use Reader model as it is very hard to scale a Reader if you want to replicate a Consumer's "shared" subscription model, scaling so that multiple consumers on the subscription will share the load of messages. We cannot just re-code in Java, we are using Golang, which is why we need the feature in Golang client.

In our use case, Pulsar is processing a very large amount of data in throughput. It is important to us that if something goes wrong with our component where it is no longer able to listen/ack on its subscription, the subscription does not persist and cause a backlog of messages that could bring the enterprise level application down. Making our subscriptions non-durable seems like the perfect solution, but we can't use it because the Golang pulsar client doesn't support it.

@dinghram
Copy link
Contributor

dinghram commented Jun 1, 2023

@frankjkelly @yangou @apapia @codelipenghui Take a look at the attached PR. It exposes the Reader's non-durable attribute to Consumer. This works well in all my tests, and my company is now using this implementation in an enterprise application. I do not know how to get the PR some attention from reviewers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants