-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge FlowableEmitter.BackpressureMode into BackpressureStrategy #4729
Conversation
Current coverage is 89.91% (diff: 82.22%)@@ 2.x #4729 diff @@
==========================================
Files 571 571
Lines 37241 37276 +35
Methods 0 0
Messages 0 0
Branches 5671 5678 +7
==========================================
+ Hits 33494 33518 +24
- Misses 2253 2263 +10
- Partials 1494 1495 +1
|
@@ -9735,6 +9735,12 @@ public final Completable ignoreElements() { | |||
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<T>(this)); | |||
} | |||
|
|||
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default behavior of most Flowable sources. Unnecessary.
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN) | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
/*package*/ Flowable<T> onBackpressureError() { | ||
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be inlined in Observable.toFlowable()
.
@@ -61,10 +61,13 @@ public void subscribeActual(Subscriber<? super T> t) { | |||
emitter = new LatestAsyncEmitter<T>(t); | |||
break; | |||
} | |||
default: { | |||
case BUFFER: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing this?
TestSubscriber<Integer> ts = Observable.range(1, 5) | ||
.toFlowable(BackpressureStrategy.ERROR) | ||
.test(1) | ||
.assertError(MissingBackpressureException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not assertFailure(MissingBackpressureException.class, 1)
to make sure 1
got through.
return o; | ||
case ERROR: | ||
return o.onBackpressureError(); | ||
case BUFFER: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please leave these as default:
because otherwise the default case can't be coverage-tested.
I have implemented the changes from the code comments. |
No need in this case. Looks good to me. |
This is the implementation of #4727