Skip to content

Commit

Permalink
ObserveOn with Buffer Size
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 9, 2014
1 parent e657d22 commit d5e5df4
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 53 deletions.
20 changes: 18 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5139,8 +5139,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
}

/**
* Modify the source Observable so that it asynchronously notifies {@link Observer}s on the
* specified {@link Scheduler}.
* Move notifications to the specified {@link Scheduler} one `onNext` at a time.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
Expand All @@ -5154,6 +5153,23 @@ public final Observable<T> observeOn(Scheduler scheduler) {
return lift(new OperatorObserveOn<T>(scheduler));
}

/**
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize
* that will be rounded up to the next power of 2
* @return the source Observable modified so that its {@link Observer}s are notified on the
* specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
}

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* <p>
Expand Down
132 changes: 95 additions & 37 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,35 @@
public class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final int bufferSize;

public OperatorObserveOn(Scheduler scheduler) {
/**
*
* @param scheduler
* @param bufferSize
* that will be rounded up to the next power of 2
*/
public OperatorObserveOn(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize);
}

public OperatorObserveOn(Scheduler scheduler) {
this(scheduler, 1);
}

private static int roundToNextPowerOfTwoIfNecessary(int num) {
if ((num & -num) == num) {
return num;
} else {
int result = 1;
while (num != 0)
{
num >>= 1;
result <<= 1;
}
return result;
}
}

@Override
Expand Down Expand Up @@ -87,7 +113,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
final Subscriber<? super T> observer;
private volatile Scheduler.Inner recursiveScheduler;

private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue();
private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize);
final AtomicLong counter = new AtomicLong(0);

public ObserveOnSubscriber(Subscriber<? super T> observer) {
Expand All @@ -101,9 +127,9 @@ public void onNext(final T t) {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
if (t == null) {
queue.put(NULL_SENTINEL);
queue.addBlocking(NULL_SENTINEL);
} else {
queue.put(t);
queue.addBlocking(t);
}
schedule();
} catch (InterruptedException e) {
Expand All @@ -118,7 +144,7 @@ public void onCompleted() {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.put(COMPLETE_SENTINEL);
queue.addBlocking(COMPLETE_SENTINEL);
schedule();
} catch (InterruptedException e) {
onError(e);
Expand All @@ -130,7 +156,7 @@ public void onError(final Throwable e) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.put(new ErrorSentinel(e));
queue.addBlocking(new ErrorSentinel(e));
schedule();
} catch (InterruptedException e2) {
// call directly if we can't schedule
Expand Down Expand Up @@ -195,57 +221,89 @@ private void pollQueue() {

}

/**
* Same behavior as ArrayBlockingQueue<Object>(1) except that we can interrupt/unsubscribe it.
*/
private class InterruptibleBlockingQueue {

private final Semaphore semaphore = new Semaphore(1);
private volatile Object item;
private final Semaphore semaphore;
private volatile boolean interrupted = false;

public Object poll() {
if (interrupted) {
return null;
}
if (item == null) {
return null;
}
try {
return item;
} finally {
item = null;
semaphore.release();
}
private final Object[] buffer;

private AtomicLong tail = new AtomicLong();
private AtomicLong head = new AtomicLong();
private final int capacity;
private final int mask;

public InterruptibleBlockingQueue(final int size) {
this.semaphore = new Semaphore(size);
this.capacity = size;
this.mask = size - 1;
buffer = new Object[size];
}

/**
* Add an Object, blocking if an item is already in the queue.
*
* @param o
* @throws InterruptedException
* Used to unsubscribe and interrupt the producer if blocked in put()
*/
public void put(Object o) throws InterruptedException {
public void interrupt() {
interrupted = true;
semaphore.release();
}

public void addBlocking(final Object e) throws InterruptedException {
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
semaphore.acquire();
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
if (o == null) {
if (e == null) {
throw new IllegalArgumentException("Can not put null");
}
item = o;

if (offer(e)) {
return;
} else {
throw new IllegalStateException("Queue is full");
}
}

/**
* Used to unsubscribe and interrupt the producer if blocked in put()
*/
public void interrupt() {
interrupted = true;
semaphore.release();
private boolean offer(final Object e) {
final long _t = tail.get();
if (_t - head.get() == capacity) {
// queue is full
return false;
}
int index = (int) (_t & mask);
buffer[index] = e;
// move the tail forward
tail.lazySet(_t + 1);

return true;
}

public Object poll() {
if (interrupted) {
return null;
}
final long _h = head.get();
if (tail.get() == _h) {
// nothing available
return null;
}
int index = (int) (_h & mask);

// fetch the item
Object v = buffer[index];
// allow GC to happen
buffer[index] = null;
// increment and signal we're done
head.lazySet(_h + 1);
if (v != null) {
semaphore.release();
}
return v;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class OperatorObserveOnPerformance extends AbstractPerformanceTester {

private static long reps = 1000000;
private static long reps = 10000;

OperatorObserveOnPerformance() {
super(reps);
Expand All @@ -34,10 +34,43 @@ public void call() {
/**
* Observable.from(1L).observeOn()
*
* --- version 0.17.1 => with queue size == 1
*
* Run: 10 - 115,033 ops/sec
* Run: 11 - 118,155 ops/sec
* Run: 12 - 120,526 ops/sec
* Run: 13 - 115,035 ops/sec
* Run: 14 - 116,102 ops/sec
*
* --- version 0.17.1 => with queue size == 16
*
* Run: 10 - 850,412 ops/sec
* Run: 11 - 711,642 ops/sec
* Run: 12 - 788,332 ops/sec
* Run: 13 - 1,064,056 ops/sec
* Run: 14 - 656,857 ops/sec
*
* --- version 0.17.1 => with queue size == 1000000
*
* Run: 10 - 5,162,622 ops/sec
* Run: 11 - 5,271,481 ops/sec
* Run: 12 - 4,442,470 ops/sec
* Run: 13 - 5,149,330 ops/sec
* Run: 14 - 5,146,680 ops/sec
*
* --- version 0.16.1
*
* Run: 10 - 27,098,802 ops/sec
* Run: 11 - 24,204,284 ops/sec
* Run: 12 - 27,208,663 ops/sec
* Run: 13 - 26,879,552 ops/sec
* Run: 14 - 26,658,846 ops/sec
*
*
*/
public long timeObserveOn() {

Observable<Integer> s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread());
Observable<Integer> s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread(), 16);
IntegerSumObserver o = new IntegerSumObserver();
s.subscribe(o);
return o.sum;
Expand Down
36 changes: 24 additions & 12 deletions rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,30 +348,35 @@ public void onNext(Integer t) {

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread());
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1);
}

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8);
}

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io());
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1);
}

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline());
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1);
}

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test());
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1);
}

@Test
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException {
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation());
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1);
}

private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler) throws InterruptedException {
private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException {
final AtomicInteger countEmitted = new AtomicInteger();
final AtomicInteger countTaken = new AtomicInteger();
int value = Observable.create(new OnSubscribeFunc<Integer>() {
Expand All @@ -385,7 +390,7 @@ public Subscription onSubscribe(final Observer<? super Integer> o) {
public void run() {
int i = 1;
while (!s.isUnsubscribed() && i <= 100) {
System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i);
// System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i);
o.onNext(i++);
}
o.onCompleted();
Expand All @@ -405,13 +410,13 @@ public void call(Integer i) {

@Override
public void call() {
System.out.println("-------- Done Emitting from Source ---------");
// System.out.println("-------- Done Emitting from Source ---------");
}
}).observeOn(scheduler).doOnNext(new Action1<Integer>() {
}).observeOn(scheduler, bufferSize).doOnNext(new Action1<Integer>() {

@Override
public void call(Integer i) {
System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i);
// System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i);
//force it to be slower than the producer
try {
Thread.sleep(10);
Expand All @@ -420,15 +425,22 @@ public void call(Integer i) {
}
countTaken.incrementAndGet();
}
}).take(10).toBlockingObservable().last();
}).take(10).doOnNext(new Action1<Integer>() {

@Override
public void call(Integer t) {
System.out.println("*********** value: " + t);
}

}).toBlockingObservable().last();

if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) {
// since there is no concurrency it will block and only emit as many as it can process
assertEquals(10, countEmitted.get());
} else {
// the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline
// NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class.
assertEquals(12, countEmitted.get());
assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize
}
// number received after take (but take will filter any extra)
assertEquals(10, value);
Expand Down

0 comments on commit d5e5df4

Please sign in to comment.