diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index cad9d2d0f7..bbbe0d0c45 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -6691,6 +6691,32 @@ public final Observable publish(Func1, ? extends Ob return OperatorPublish.create(this, selector); } + /** + * Requests {@code n} initially from the upstream and then 75% of {@code n} subsequently + * after 75% of {@code n} values have been emitted to the downstream. + * + *

This operator allows preventing the downstream to trigger unbounded mode via {@code request(Long.MAX_VALUE)} + * or compensate for the per-item overhead of small and frequent requests. + * + *

+ *
Backpressure:
+ *
The operator expects backpressure from upstream and honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code rebatchRequests} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param n the initial request amount, further request will happen after 75% of this value + * @return the Observable that rebatches request amounts from downstream + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public final Observable rebatchRequests(int n) { + if (n <= 0) { + throw new IllegalArgumentException("n > 0 required but it was " + n); + } + return lift(OperatorObserveOn.rebatch(n)); + } + /** * Returns an Observable that applies a specified accumulator function to the first item emitted by a source * Observable, then feeds the result of that function along with the second item emitted by the source diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index f09a424020..10ad74e4e2 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -27,6 +27,7 @@ import rx.internal.util.atomic.SpscAtomicArrayQueue; import rx.internal.util.unsafe.*; import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; /** * Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer. @@ -75,6 +76,17 @@ public Subscriber call(Subscriber child) { return parent; } } + + public static Operator rebatch(final int n) { + return new Operator() { + @Override + public Subscriber call(Subscriber child) { + ObserveOnSubscriber parent = new ObserveOnSubscriber(Schedulers.immediate(), child, false, n); + parent.init(); + return parent; + } + }; + } /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber extends Subscriber implements Action0 { diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 6e066c19ad..ef5ab6d332 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -933,4 +933,37 @@ public void bufferSizesWork() { ts.assertNoErrors(); } } + + @Test + public void synchronousRebatching() { + final List requests = new ArrayList(); + + TestSubscriber ts = new TestSubscriber(); + + Observable.range(1, 50) + .doOnRequest(new Action1() { + @Override + public void call(Long r) { + requests.add(r); + } + }) + .rebatchRequests(20) + .subscribe(ts); + + ts.assertValueCount(50); + ts.assertNoErrors(); + ts.assertCompleted(); + + assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests); + } + + @Test + public void rebatchRequestsArgumentCheck() { + try { + Observable.never().rebatchRequests(-99); + fail("Didn't throw IAE"); + } catch (IllegalArgumentException ex) { + assertEquals("n > 0 required but it was -99", ex.getMessage()); + } + } }