Issue 352: Change SubscriberAsReactiveSubscriber to properly apply back-pressure #361
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The
SubscriberAsReactiveSubscriber
conversion does not work properly for async Subscribers, because in version 2.2.4 it uses an unbounded buffer and it keeps requesting from the source without listening to theFuture[Ack]
given by the downstream subscriber. The communication in this case is OK, since a buffer is being used, but the source is not paused while the target is busy.This PR fixes the implementation to apply back-pressure using when communicating with the Reactive Streams Publisher, in accordance with the response given by the downstream Monix Subscriber.
Other changes:
TrampolinedExecutionContext.immediate
reference, used as an optimizationObservable.fromReactivePublisher
gets an overload that can specify therequestCount
on each batchbufferSize
param in that call chain gets renamed torequestCount
, because the actual buffer in the implementation remainsUnbounded
(yet its size will be controlled byrequestCount
due to back-pressure)/cc @jchapuis