-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST #3487
Conversation
@@ -20,6 +20,7 @@ | |||
import rx.functions.*; | |||
import rx.internal.operators.*; | |||
import rx.internal.producers.SingleProducer; | |||
import rx.internal.operators.OperatorOnBackpressureBuffer.OnOverflow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This enumeration should be part of a more public part of the library instead of the internal part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
Any preference rather than in Observable.java? This seems to be the first case of an enum that's part of the public API (other than Notification.Kind, which is in Notification itself).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be moved under rx
and I'd rename it to BackpressureOverflowStrategy
Otherwise, it looks okay. 👍 |
Thanks for the review @akarnokd |
@@ -0,0 +1,5 @@ | |||
package rx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the copyright header.
Rebased - @akarnokd let me know if there are any fixes pending. |
Excellent. 👍. I don't think there are conflicting PRs right now. |
*/ | ||
@Experimental | ||
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow, BackpressureOverflowStrategy overflowStrategy) { | ||
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow, overflowStrategy)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing that some RxJava users like to pass null
in places where RxJava developers do not expect that I'd add null
checks for onOverflow
and overflowStrategy
or at least mention it in javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Great 👍, just a few nits. |
Thanks @artem-zinnatullin & @akarnokd. Addressed Artem's comments. |
👍 |
@abersnaze can you take a look and verify that it solves the issue raised in #3233? |
Rebased on 1.x |
The only concern that @abersnaze and I have had with this is that this uses enums to determine the strategy of operator functionality. This pattern hasn't been used much in the public API as of yet. @akarnokd @zsxwing do you guys have any opinion? We could just convert the strategies to instance methods (for consistency) or is this an emerging case that should be encouraged? |
I wouldn't turn a the whole set of |
@akarnokd @stealthcode I don't love using enums in this way, but I agree with @akarnokd that this can lead to excessive API expansion. But happy to move to methods if you guys prefer. Ditto for moving the enums package from rx to rx.strategies or some similar alternative. |
Agree with @akarnokd I prefer to add a new interface |
I agree with @zsxwing on the use of an empty interface and impls to compare |
Sounds good, I'll update the patch soon. Any suggestion on the package for the strategy? |
Rebased |
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) | ||
*/ | ||
@Experimental | ||
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow, BackpressureOverflowStrategy overflowStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This points to an internal interface from the public API, plus the allowed values are also internal classes and the BufferSubscriber is a private class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still a problem? I'm not sure that I understand this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BackpressureOverflowStrategy
is still in the internals package, it should be moved up to the rx
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, BackpressureOverflowStrategy is now in rx package.
@stealthcode I've been busy these days, just returned to this. I had the same doubt re. BufferSubscribe reference, @akarnokd mind to clarify where it happens? I've fixed the visibility of OverflowStrategy members (new commit on the way) |
@@ -71,6 +175,7 @@ public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) { | |||
|
|||
return parent; | |||
} | |||
|
|||
private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferSubscriber
is private here but the public BackpressureOverflowStrategy
exposes it, which makes it not extendable outside this package, which is not better than having a BackpressureOverflowStrategy
enum in the first place. Plus, leaving BufferSubscriber
private generates accessibility bridges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I replaced with the parent interface implemented by BufferSubscriber, which is public.
Thanks for the addition. |
@stevegury thanks for the review. PR fixed. |
* A convenience class to provide singleton instances of available {@code BackpressureOverflowStrategy}. | ||
*/ | ||
@SuppressWarnings("unused") | ||
public static class OverflowStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once BackpressureOverflowStrategy
has been moved to rx
package, the values of this class can become its constants, but all of them should have the type BackpressureOverflowStrategy
. The type after the assignment can stay the same as now and can point to the current internal location.
If I'm right the only missing thing is the header in |
Wait, it has still the problem of exposing internal classes and types into the public API: boolean triggerOn(rx.internal.util.BackpressureDrainManager.BackpressureQueueCallback buffer) throws MissingBackpressureException; In addition, for a proper instance of BackpressureOverflowStrategy, you have to reach into rx.internal.operators.OperatorOnBackpressureBuffer.ON_OVERFLOW_DEFAULT for example. |
Thanks @akarnokd @stevegury I'm submitting a patch that eliminates the internal API leak. While doing it I'm starting to realize that without exposing the buffer or callback the motivation for a strategy vs a plain enum practically dissapears. The point of using a strategy was to give users ability to implement their own strategies, this led me to expose the internal buffer (which is bad, obviously). However, without it, we're basically doing the same as an enum as the real overflow behaviour is not really in the Strategy. It actually feels redundant with the onOverflow callback. To make the strategy worth the extra code we'd need to make BufferSubscriber implement some public API interface defined by BackpressureOverflow through which we allow the user to actually interact with the buffer without leaking the internal API. Something like:
And then:
This would become similar to my current patch (https://github.com/srvaroa/RxJava/commit/b09061048f34f539bbc022e846e7361ae6e38606) and enable meaningful strategies without leaking APIs. That said, I think that would be overengineering this patch. Options I see: a) revert to enums (problem: we're introducing an enum-based public API). I'm leaning for (a), but let me know what you prefer. |
/** | ||
* Drop oldest items from the buffer making room for newer ones. | ||
*/ | ||
public static class DropOldest implements BackpressureOverflow.Strategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have these package private at least; users should use the ON_OVERFLOW_*
constants.
The current API looks good enough to me, only some small visibility and annotation issues. |
Introduce a new interface BackpressureOverflow.Strategy that allows implementing different handlers for an overflow situation. This patch adds three implementations: - ON_OVERFLOW_ERROR remains the default as the existing implementation. - ON_OVERFLOW_DROP_LATEST will drop newly produced items after the buffer fills up. - ON_OVERFLOW_DROP_OLDEST will drop the oldest elements in the buffer, making room for newer ones. The default strategy remains ON_OVERFLOW_ERROR. In all cases, a drop will result in a notification to the producer by invoking the onOverflow callback. None of the two new behaviours (ON_OVERFLOW_DROP_*) will unsubscribe from the source nor onError. Fixes: #3233
Thanks for the reviews, I updated the patch. |
👍 |
Thanks @srvaroa for taking the time of writing a good PR! |
1.x: OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST
👍 thanks! |
Introduce a new interface BackpressureOverflowStrategy that allows implementing different handlers for an overflow situation. This patch adds three implementations, reachable via OverflowStrategy:
The behavior for each is the following:
newer ones.
In all cases, a drop will result in a notification to the producer by invoking the onOverflow callback.
None of the two new behaviours (DROP_*) will unsubscribe from the source nor onError.