-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update consume backpressure handling #508
Conversation
The capacity now has a limit set to maxMsgs to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations.
// resulting in a large number of pending messages (which can be a problem if the | ||
// application is slow to process messages). The capacity is bounded to a maximum of 1024 | ||
// to avoid LOH allocations. | ||
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(maxMsgs > 1024 ? 1024 : (int)maxMsgs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there ever a case where max msgs is 0(or negative), such as if they only specify max bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. should we set a minimum?
edit set minimum to 64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should just make sure it’s at least 1? If <=0 default it to 1024, because that probably means they are setting max bytes and not max msgs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing the implementation to use 'Delivered()' function. this is set to a fixed 1024 since the size of the channel will not affect the pull requests anymore.
What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again? Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed? |
(revisiting this because of #608) So the idea is to properly handle the backpressure i.e. do not issue pull requests unless application code has received the message. Before this PR that check was done after the message was buffered through the channel and we were pulling 1000 messages (size of the bounded channel) regardless of application code actually picking up the message. Issuing a |
I need to fix the tests... edit: fixed |
we don't have to worry about this any more since calculation is done after message is yielded through the |
moved to #626 |
Moved pull request calculations to when the messages are yielded to calculate if we need to pull more messages accurately to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations.