-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
is flowControl.maxMessages option honored in 0.14.x to limit calls to handler #2638
Comments
This happens because the PubSub API doesn't allow us to limit how many results it sends us, so we use those numbers as an indicator to stop making requests to the API if they are hit/exceeded. Unfortunately in this case its easy to exceed. @lukesneeringer should we only emit the specified number of messages listed in |
/cc @jganetsk |
Thanks @callmehiphop seems like this is a behavior change from the 0.13 client where I could specifically do 1 thing at a time if I wanted. For my specific use case I only can do one thing at a time due to the intensity of the jobs I'm doing. |
We should not do this. The point of flow control is to prevent the process from dying because it ran out of memory. Storing messages in memory defeats that purpose, and it's not very useful to the user to have a message in memory but not to emit it.
It sounds like the user wants flow control to be on, not off. But it's supposed to be on by default, so is there a bug in flow control? |
Maybe? @jganetsk I'm a little lost on how we are supposed to process payloads with message sizes that exceed flow control limits. As it is today all the |
We have a lower bound on flow control limits, iirc, can you take a look at what those are? I think it's ok to overflow by 1 message (which means we can only overflow by up to 10MB, since that's the max message size in Pub/Sub), but beyond that we probably should nack any additional overflowing messages. |
I don't think there are bounds on any of the flow control limits. We have
But I do not believe that there were any bounds specified in the design doc. |
I might have been thinking about limits on batching settings. Well regardless, my point about overflowing 1 message still holds and nacking otherwise still holds. |
@jganetsk sounds good, thanks for the input! |
@jganetsk thanks for taking the time to comment on this issue. I'm not sure I follow completely with what you and @callmehiphop were discussing so maybe it's going to be resolved.... I wanted to clarify what i wanted to happen. I want maxMessages on and set to emit 1 at a time. It doesn't appear to honor that. Thank you again. |
@evilpacket I just cut a new release ( |
hello @callmehiphop, I encounter the same issue than @evilpacket with the version const options = { flowControl: { maxMessages: 1 }, ackDeadlineSeconds: 20 };
const [subscription] = await this.pubsub.createSubscription('any-topic', 'my-sub', options);
subscription.on('message', (message) => {
console.log(‘MSG ID’, new Date(), message.id);
setTimeout(() => {
console.log(‘acking’, new Date(), message.id);
msg.ack();
}, 18000);
});
}); The librairy seems to well respect the
Any idea about how we can get this "flow control" working as expected? Thanks you! |
@thomas-hilaire the latest release has allowed for a single message to be overflowed, meaning at any given time you could have |
Maybe we can overflow by 1 message on maxBytes, but I don't think we should overflow ever on maxMessages. |
@jganetsk ah, I may have misunderstood. |
Hello,
@callmehiphop by reading this, I understood that you removed the overflow on If you plan to Thanks for your help! |
@thomas-hilaire There are no ordering guarantees with pub/sub. If you nack a message, it will be redelivered sooner or later. Probably sooner. And for example, if you have multiple processes pulling in parallel (each with maxMessages = 1), the message might be redelivered to another process as soon as that process finishes with whatever one message it was processing. But maxMessages = 1 is not necessarily a common pattern with users of Cloud Pub/Sub, who often demand high throughput, and therefore need to process many messages simultaneously. Are you sure maxMessages = 1 is necessary for your use case? |
Thanks for your explanations, the The |
@callmehiphop Can you remind me why nacks are necessary to solve this problem, for Note that in our code review, I was very particular that we deal with |
I think because the API sends more than what the user wants: #2638 (comment) |
@callmehiphop Actually, I might again be confusing flow control and batching. We need to be pretty careful about all these off-by-one bugs in both batching and flow control. |
Hello, thanks for your update but I continue unfortunately to encounter the issue with the version 0.14.5 of the lib. I did the same simple test than for my previous comment (#2638 (comment)) and I continue to get more messages than I also noticed a more important issue, the same message can be delivered many times! Note that in my tests, messages are acked after having waited for 18 seconds and I don't encounter these issues when the wait time is around 10ms (the ack deadline is at 60 seconds).
Thanks you! |
@thomas-hilaire the redelivery of acked messages is something that the API team is aware of and I believe a fix is coming soon. I'll update this issue (and likely put out a new release) when the change goes live. |
@callmehiphop was the redelivery of acked messages fixed? Cheers in advance! |
@rhodgkins it should be as of |
I continue to have the redelivery of acked messages with the release @callmehiphop do we open a new issue about that or we reopen this one? Else the |
@thomas-hilaire I'm getting the same issue with redelivery, I raised a separate issue if that helps - #2756! |
@thomas-hilaire @rhodgkins I've moved your issue to googleapis/nodejs-pubsub#2, so we can track it there. |
Environment details
Steps to reproduce
Is it possible to get or limit the inflight messages using the new 0.14.x pubsub library?
I'm calling something like the following
this.subscription = this.topic.subscription(this.options.name, this.options.subscription);
Even if I get a topic.subscription with the following options it appears to call the my handler with as many messages in in the queue.
The text was updated successfully, but these errors were encountered: