Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: request rebatch operator #3971

Merged
merged 3 commits into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6691,6 +6691,30 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
return OperatorPublish.create(this, selector);
}

/**
* Requests {@code n} initially from the upstream and then 75% of {@code n} subsequently
* after 75% of {@code n} values have been emitted to the downstream.
*
* <p>This operator allows preventing the downstream to trigger unbounded mode via {@code request(Long.MAX_VALUE)}
* or compensate for the per-item overhead of small and frequent requests.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operator name incorrect here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

* </dl>
*
* @param n the initial request amount, further request will happen after 75% of this value
* @return the Observable that rebatches request amounts from downstream
Copy link
Collaborator

@DavidMGross DavidMGross May 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

*/
public final Observable<T> rebatchRequests(int n) {
if (n <= 0) {
throw new IllegalArgumentException("n > 0 required but it was " + n);
}
return lift(OperatorObserveOn.<T>rebatch(n));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable, then feeds the result of that function along with the second item emitted by the source
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;

/**
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
Expand Down Expand Up @@ -75,6 +76,17 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
return parent;
}
}

public static <T> Operator<T, T> rebatch(final int n) {
return new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
parent.init();
return parent;
}
};
}

/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -933,4 +933,37 @@ public void bufferSizesWork() {
ts.assertNoErrors();
}
}

@Test
public void synchronousRebatching() {
final List<Long> requests = new ArrayList<Long>();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

Observable.range(1, 50)
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long r) {
requests.add(r);
}
})
.rebatchRequests(20)
.subscribe(ts);

ts.assertValueCount(50);
ts.assertNoErrors();
ts.assertCompleted();

assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests);
}

@Test
public void rebatchRequestsArgumentCheck() {
try {
Observable.never().rebatchRequests(-99);
fail("Didn't throw IAE");
} catch (IllegalArgumentException ex) {
assertEquals("n > 0 required but it was -99", ex.getMessage());
}
}
}